Skip to content

Commit

Permalink
Hotfix for MongoDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 20, 2024
1 parent a00b2ef commit 4128a64
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,20 @@ func (Debezium) Labels() []string {
}

func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) {
fmt.Println("GetPrimaryKey", "key", string(key))

kvMap, err := debezium.ParsePartitionKey(key, tc.CDCKeyFormat)
if err != nil {
return nil, err
}

fmt.Println("before kvmap", kvMap)
if payload, ok := kvMap["payload"]; ok {
kvMap = payload.(map[string]any)
}

fmt.Println("after kvMap", kvMap)

// This code is needed because the partition key bytes returns nested objects as a string
// Such that, the value looks like this: {"id":"{\"$oid\": \"640127e4beeb1ccfc821c25b\"}"}
for k, v := range kvMap {
Expand Down

0 comments on commit 4128a64

Please sign in to comment.