Skip to content

Commit

Permalink
Merge pull request #1533 from jl-gogovapps/partition-thread-id
Browse files Browse the repository at this point in the history
Allow Publishers to produce messages by thread ID
  • Loading branch information
osheroff authored Jun 30, 2020
2 parents e2ec527 + 093b745 commit 25e463d
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ kafka.acks=1
# *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, column]
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]

# specify what fields to partition by when using producer_partition_by=column
# column separated list.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,11 @@ private void validatePartitionBy() {
this.producerPartitionFallback = this.kafkaPartitionFallback;
}

String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "column", "random"};
String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "thread_id", "column", "random"};
if ( this.producerPartitionKey == null ) {
this.producerPartitionKey = "database";
} else if ( !ArrayUtils.contains(validPartitionBy, this.producerPartitionKey) ) {
usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|column|random", "producer_partition_by");
usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|thread_id|column|random", "producer_partition_by");
} else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionColumns) ) {
usageForOptions("please specify --producer_partition_columns=column1 when using producer_partition_by=column", "producer_partition_columns");
} else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionFallback) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private PartitionBy partitionByForString(String key) {
return PartitionBy.PRIMARY_KEY;
case "transaction_id":
return PartitionBy.TRANSACTION_ID;
case "thread_id":
return PartitionBy.THREAD_ID;
case "column":
return PartitionBy.COLUMN;
case "random":
Expand Down Expand Up @@ -65,6 +67,8 @@ public String getHashString(RowMap r, PartitionBy by) {
return r.getRowIdentity().toConcatString();
case TRANSACTION_ID:
return String.valueOf(r.getXid());
case THREAD_ID:
return String.valueOf(r.getThreadId());
case COLUMN:
String s = r.buildPartitionKey(partitionColumns);
if ( s.length() > 0 )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public enum PartitionBy {
TABLE,
PRIMARY_KEY,
TRANSACTION_ID,
THREAD_ID,
COLUMN,
RANDOM
}

0 comments on commit 25e463d

Please sign in to comment.