diff --git a/src/example/filter.js b/src/example/filter.js index 9f5a5c662..405139906 100644 --- a/src/example/filter.js +++ b/src/example/filter.js @@ -8,6 +8,7 @@ * * .suppress() -> call this to remove row from output * .kafka_topic = "topic" -> override the default kafka topic for this row + * .partition_string = "string" -> override kafka/kinesis partition-by string * .data -> hash containing row data. May be modified. * .old_data -> hash containing old data * .extra_attributes -> if you set values in this hash, they will be output at the top level of the final JSON diff --git a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java index 8b9105d6d..04edd41ff 100644 --- a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java +++ b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java @@ -71,6 +71,9 @@ public String getHashString(RowMap r, PartitionBy by) { } public String getHashString(RowMap r) { - return getHashString(r, partitionBy); + if ( r.getPartitionString() != null ) + return r.getPartitionString(); + else + return getHashString(r, partitionBy); } } diff --git a/src/main/java/com/zendesk/maxwell/row/RowMap.java b/src/main/java/com/zendesk/maxwell/row/RowMap.java index ad6a3d308..cda6f22b6 100644 --- a/src/main/java/com/zendesk/maxwell/row/RowMap.java +++ b/src/main/java/com/zendesk/maxwell/row/RowMap.java @@ -20,6 +20,7 @@ public class RowMap implements Serializable { + public enum KeyFormat { HASH, ARRAY } static final Logger LOGGER = LoggerFactory.getLogger(RowMap.class); @@ -33,6 +34,7 @@ public enum KeyFormat { HASH, ARRAY } private final Position position; private Position nextPosition; private String kafkaTopic; + private String partitionString; protected boolean suppressed; private Long xid; @@ -393,4 +395,13 @@ public String getKafkaTopic() { public void setKafkaTopic(String topic) { this.kafkaTopic = topic; } + + public String getPartitionString() { + return this.partitionString; + } + + public void setPartitionString(String partitionString) { + this.partitionString = partitionString; + } + } diff --git a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java index d3704239e..284bb6903 100644 --- a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java +++ b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java @@ -46,12 +46,6 @@ public void run() { } } - @Override - public void requestStop() { - super.requestStop(); - thread.interrupt(); - } - @Override protected void beforeStop() { if ( exception == null ) { diff --git a/src/main/java/com/zendesk/maxwell/scripting/WrappedRowMap.java b/src/main/java/com/zendesk/maxwell/scripting/WrappedRowMap.java index 3357d4902..305f08ba7 100644 --- a/src/main/java/com/zendesk/maxwell/scripting/WrappedRowMap.java +++ b/src/main/java/com/zendesk/maxwell/scripting/WrappedRowMap.java @@ -82,4 +82,8 @@ public void suppress() { public void setKafka_topic(String topic) { row.setKafkaTopic(topic); } + + public void setPartition_string(String partitionString) { + row.setPartitionString(partitionString); + } } diff --git a/src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java b/src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java index 4bf514c27..26357cb46 100644 --- a/src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java +++ b/src/test/java/com/zendesk/maxwell/MaxwellIntegrationTest.java @@ -607,10 +607,17 @@ public void testJavascriptFilters() throws Exception { requireMinimumVersion(server.VERSION_5_6); String dir = MaxwellTestSupport.getSQLDir(); - runJSON("/json/test_javascript_filters", (c) -> { + List rows = runJSON("/json/test_javascript_filters", (c) -> { c.javascriptFile = dir + "/json/filter.javascript"; c.outputConfig.includesRowQuery = true; }); + + boolean foundPartitionString = false; + for ( RowMap row : rows ) { + if ( row.getPartitionString() != null ) + foundPartitionString = true; + } + assertTrue(foundPartitionString); } @Test diff --git a/src/test/java/com/zendesk/maxwell/MaxwellTestJSON.java b/src/test/java/com/zendesk/maxwell/MaxwellTestJSON.java index f6a28c112..a1a806db5 100644 --- a/src/test/java/com/zendesk/maxwell/MaxwellTestJSON.java +++ b/src/test/java/com/zendesk/maxwell/MaxwellTestJSON.java @@ -55,7 +55,7 @@ public static void assertJSON(List> jsonOutput, List sql, List> expectedJSON, + private static List runJSONTest(MysqlIsolatedServer server, List sql, List> expectedJSON, Consumer configLambda) throws Exception { List> eventJSON = new ArrayList<>(); @@ -86,6 +86,7 @@ private static void runJSONTest(MysqlIsolatedServer server, List sql, Li } assertJSON(eventJSON, expectedJSON); + return rows; } public static class SQLAndJSON { @@ -152,10 +153,10 @@ public static SQLAndJSON parseJSONTestFile(String fname) throws Exception { return ret; } - protected static void runJSONTestFile(MysqlIsolatedServer server, String fname, Consumer configLambda) throws Exception { + protected static List runJSONTestFile(MysqlIsolatedServer server, String fname, Consumer configLambda) throws Exception { String dir = MaxwellTestSupport.getSQLDir(); SQLAndJSON testResources = parseJSONTestFile(new File(dir, fname).toString()); - runJSONTest(server, testResources.inputSQL, testResources.jsonAsserts, configLambda); + return runJSONTest(server, testResources.inputSQL, testResources.jsonAsserts, configLambda); } } diff --git a/src/test/java/com/zendesk/maxwell/MaxwellTestWithIsolatedServer.java b/src/test/java/com/zendesk/maxwell/MaxwellTestWithIsolatedServer.java index 0af657627..00834f54b 100644 --- a/src/test/java/com/zendesk/maxwell/MaxwellTestWithIsolatedServer.java +++ b/src/test/java/com/zendesk/maxwell/MaxwellTestWithIsolatedServer.java @@ -105,12 +105,12 @@ protected List getRowsForDDLTransaction(String[] input, Filter filter) t }); } - protected void runJSON(String filename) throws Exception { - MaxwellTestJSON.runJSONTestFile(server, filename, null); + protected List runJSON(String filename) throws Exception { + return MaxwellTestJSON.runJSONTestFile(server, filename, null); } - protected void runJSON(String filename, Consumer configLambda) throws Exception { - MaxwellTestJSON.runJSONTestFile(server, filename, configLambda); + protected List runJSON(String filename, Consumer configLambda) throws Exception { + return MaxwellTestJSON.runJSONTestFile(server, filename, configLambda); } protected MaxwellContext buildContext() throws Exception { diff --git a/src/test/resources/sql/json/filter.javascript b/src/test/resources/sql/json/filter.javascript index 4697ea110..5ce14d57a 100644 --- a/src/test/resources/sql/json/filter.javascript +++ b/src/test/resources/sql/json/filter.javascript @@ -9,4 +9,6 @@ function process_row(rowmap) { if ( rowmap.query && rowmap.query.match(/mangle/) ) { rowmap.query = "mangled"; } + + rowmap.partition_string = "test_part_string"; }