diff --git a/config.properties.example b/config.properties.example index 1a60b9b27..15e1942ba 100644 --- a/config.properties.example +++ b/config.properties.example @@ -174,7 +174,7 @@ kafka.acks=1 # *** partitioning *** # What part of the data do we partition by? -#producer_partition_by=database # [database, table, primary_key, transaction_id, column] +#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column] # specify what fields to partition by when using producer_partition_by=column # column separated list. diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 6061b892e..62797b489 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -636,11 +636,11 @@ private void validatePartitionBy() { this.producerPartitionFallback = this.kafkaPartitionFallback; } - String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "column", "random"}; + String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "thread_id", "column", "random"}; if ( this.producerPartitionKey == null ) { this.producerPartitionKey = "database"; } else if ( !ArrayUtils.contains(validPartitionBy, this.producerPartitionKey) ) { - usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|column|random", "producer_partition_by"); + usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|thread_id|column|random", "producer_partition_by"); } else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionColumns) ) { usageForOptions("please specify --producer_partition_columns=column1 when using producer_partition_by=column", "producer_partition_columns"); } else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionFallback) ) { diff --git a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java index 5a88698d4..6da79111d 100644 --- a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java +++ b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java @@ -26,6 +26,8 @@ private PartitionBy partitionByForString(String key) { return PartitionBy.PRIMARY_KEY; case "transaction_id": return PartitionBy.TRANSACTION_ID; + case "thread_id": + return PartitionBy.THREAD_ID; case "column": return PartitionBy.COLUMN; case "random": @@ -65,6 +67,8 @@ public String getHashString(RowMap r, PartitionBy by) { return r.getRowIdentity().toConcatString(); case TRANSACTION_ID: return String.valueOf(r.getXid()); + case THREAD_ID: + return String.valueOf(r.getThreadId()); case COLUMN: String s = r.buildPartitionKey(partitionColumns); if ( s.length() > 0 ) diff --git a/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java b/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java index 6a785a03f..9de1cdd7f 100644 --- a/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java +++ b/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java @@ -5,6 +5,7 @@ public enum PartitionBy { TABLE, PRIMARY_KEY, TRANSACTION_ID, + THREAD_ID, COLUMN, RANDOM }