Skip to content

Commit

Permalink
Merge pull request #1238 from zendesk/ben/javascript_partition_string
Browse files Browse the repository at this point in the history
javascript partition-string override
  • Loading branch information
osheroff authored Apr 16, 2019
2 parents 9c9aadf + 1977f61 commit e3e1568
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/example/filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*
* .suppress() -> call this to remove row from output
* .kafka_topic = "topic" -> override the default kafka topic for this row
* .partition_string = "string" -> override kafka/kinesis partition-by string
* .data -> hash containing row data. May be modified.
* .old_data -> hash containing old data
* .extra_attributes -> if you set values in this hash, they will be output at the top level of the final JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public String getHashString(RowMap r, PartitionBy by) {
}

public String getHashString(RowMap r) {
return getHashString(r, partitionBy);
if ( r.getPartitionString() != null )
return r.getPartitionString();
else
return getHashString(r, partitionBy);
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/zendesk/maxwell/row/RowMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

public class RowMap implements Serializable {


public enum KeyFormat { HASH, ARRAY }

static final Logger LOGGER = LoggerFactory.getLogger(RowMap.class);
Expand All @@ -33,6 +34,7 @@ public enum KeyFormat { HASH, ARRAY }
private final Position position;
private Position nextPosition;
private String kafkaTopic;
private String partitionString;
protected boolean suppressed;

private Long xid;
Expand Down Expand Up @@ -393,4 +395,13 @@ public String getKafkaTopic() {
public void setKafkaTopic(String topic) {
this.kafkaTopic = topic;
}

public String getPartitionString() {
return this.partitionString;
}

public void setPartitionString(String partitionString) {
this.partitionString = partitionString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ public void run() {
}
}

@Override
public void requestStop() {
super.requestStop();
thread.interrupt();
}

@Override
protected void beforeStop() {
if ( exception == null ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,8 @@ public void suppress() {
public void setKafka_topic(String topic) {
row.setKafkaTopic(topic);
}

public void setPartition_string(String partitionString) {
row.setPartitionString(partitionString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,17 @@ public void testJavascriptFilters() throws Exception {
requireMinimumVersion(server.VERSION_5_6);

String dir = MaxwellTestSupport.getSQLDir();
runJSON("/json/test_javascript_filters", (c) -> {
List<RowMap> rows = runJSON("/json/test_javascript_filters", (c) -> {
c.javascriptFile = dir + "/json/filter.javascript";
c.outputConfig.includesRowQuery = true;
});

boolean foundPartitionString = false;
for ( RowMap row : rows ) {
if ( row.getPartitionString() != null )
foundPartitionString = true;
}
assertTrue(foundPartitionString);
}

@Test
Expand Down
7 changes: 4 additions & 3 deletions src/test/java/com/zendesk/maxwell/MaxwellTestJSON.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void assertJSON(List<Map<String, Object>> jsonOutput, List<Map<Str
}
}

private static void runJSONTest(MysqlIsolatedServer server, List<String> sql, List<Map<String, Object>> expectedJSON,
private static List<RowMap> runJSONTest(MysqlIsolatedServer server, List<String> sql, List<Map<String, Object>> expectedJSON,
Consumer<MaxwellConfig> configLambda) throws Exception {
List<Map<String, Object>> eventJSON = new ArrayList<>();

Expand Down Expand Up @@ -86,6 +86,7 @@ private static void runJSONTest(MysqlIsolatedServer server, List<String> sql, Li
}
assertJSON(eventJSON, expectedJSON);

return rows;
}

public static class SQLAndJSON {
Expand Down Expand Up @@ -152,10 +153,10 @@ public static SQLAndJSON parseJSONTestFile(String fname) throws Exception {
return ret;
}

protected static void runJSONTestFile(MysqlIsolatedServer server, String fname, Consumer<MaxwellConfig> configLambda) throws Exception {
protected static List<RowMap> runJSONTestFile(MysqlIsolatedServer server, String fname, Consumer<MaxwellConfig> configLambda) throws Exception {
String dir = MaxwellTestSupport.getSQLDir();
SQLAndJSON testResources = parseJSONTestFile(new File(dir, fname).toString());

runJSONTest(server, testResources.inputSQL, testResources.jsonAsserts, configLambda);
return runJSONTest(server, testResources.inputSQL, testResources.jsonAsserts, configLambda);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ protected List<RowMap> getRowsForDDLTransaction(String[] input, Filter filter) t
});
}

protected void runJSON(String filename) throws Exception {
MaxwellTestJSON.runJSONTestFile(server, filename, null);
protected List<RowMap> runJSON(String filename) throws Exception {
return MaxwellTestJSON.runJSONTestFile(server, filename, null);
}

protected void runJSON(String filename, Consumer<MaxwellConfig> configLambda) throws Exception {
MaxwellTestJSON.runJSONTestFile(server, filename, configLambda);
protected List<RowMap> runJSON(String filename, Consumer<MaxwellConfig> configLambda) throws Exception {
return MaxwellTestJSON.runJSONTestFile(server, filename, configLambda);
}

protected MaxwellContext buildContext() throws Exception {
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/sql/json/filter.javascript
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ function process_row(rowmap) {
if ( rowmap.query && rowmap.query.match(/mangle/) ) {
rowmap.query = "mangled";
}

rowmap.partition_string = "test_part_string";
}

0 comments on commit e3e1568

Please sign in to comment.