Skip to content

Commit

Permalink
output a (sorted) json object as the kafka key instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Osheroff committed Apr 9, 2015
1 parent 6b51fcf commit 8bea745
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
23 changes: 12 additions & 11 deletions src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,29 +247,30 @@ public List<String> toJSONStrings() {
public List<String> getPKStrings() {
ArrayList<String> list = new ArrayList<>();

String pkString = "db:" + table.getDatabase().getName() + "/"
+ "tbl:" + table.getName() + "/";

for ( Row r : filteredRows()) {
list.add(pkString + getPKStringForRow(r));
list.add(new MaxwelllPKJSONObject(table.getDatabase().getName(),
table.getName(),
getPKMapForRow(r)).toString());
}

return list;
}

private String getPKStringForRow(Row r) {
if ( table.getPKList().isEmpty() )
return "_uuid: " + java.util.UUID.randomUUID().toString();
private Map<String, Object> getPKMapForRow(Row r) {
HashMap<String, Object> map = new HashMap<>();

if ( table.getPKList().isEmpty() ) {
map.put("_uuid", java.util.UUID.randomUUID().toString());
}

ArrayList<String> pkValueList = new ArrayList<>();
for ( String pk : table.getPKList() ) {
int idx = table.findColumnIndex(pk);

Column column = r.getColumns().get(idx);
ColumnDef def = table.getColumnList().get(idx);


pkValueList.add(def.getName() + ":" + def.asJSON(column.getValue()).toString());
map.put(pk, def.asJSON(column.getValue()));
}
return StringUtils.join(pkValueList, "/");
return map;
}
}
44 changes: 44 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwelllPKJSONObject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.zendesk.maxwell;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import org.json.JSONObject;

public class MaxwelllPKJSONObject extends JSONObject {
private final ArrayList<String> pkKeys;

public MaxwelllPKJSONObject(String db, String table, Map<String, Object> pks) {
super();

this.put("database", db);
this.put("table", table);
this.pkKeys = new ArrayList<String>();

ArrayList<String> sortedPKs = new ArrayList<String>(pks.keySet());
Collections.sort(sortedPKs);

for ( String pk : sortedPKs ) {
String prefixed = "pk." + pk;
this.pkKeys.add(prefixed);
this.put(prefixed, pks.get(pk));
}
}

@Override
public Set<String> keySet() {
LinkedHashSet<String> set = new LinkedHashSet<>();

set.add("database");
set.add("table");
for ( String key : this.pkKeys ) {
set.add(key);
}

return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public void testGetEvent() throws Exception {
public void testPrimaryKeyStrings() throws Exception {
List<MaxwellAbstractRowsEvent> list;
String input[] = {"insert into minimal set account_id =1, text_field='hello'"};
String expectedJSON = "{\"database\":\"shard_1\",\"table\":\"minimal\",\"pk.id\":1,\"pk.text_field\":\"hello\"}";
list = getRowsForSQL(null, input);
assertThat(list.size(), is(1));
assertThat(StringUtils.join(list.get(0).getPKStrings(), ""), is("db:shard_1/tbl:minimal/id:1/text_field:hello"));
assertThat(StringUtils.join(list.get(0).getPKStrings(), ""), is(expectedJSON));
}

@Test
Expand Down

0 comments on commit 8bea745

Please sign in to comment.