From 4d9034a00c9968c2577ab0e94b7ad3275f99e2d4 Mon Sep 17 00:00:00 2001 From: gogov Date: Mon, 29 Jun 2020 16:18:01 -0400 Subject: [PATCH 1/3] Allow Publishers to produce messages by thread ID --- CHANGELOG.md | 6 ++++++ Dockerfile | 2 +- README.md | 2 +- config.properties.example | 2 +- docs/docs/quickstart.md | 4 ++-- pom.xml | 2 +- .../producer/partitioners/AbstractMaxwellPartitioner.java | 4 ++++ .../zendesk/maxwell/producer/partitioners/PartitionBy.java | 1 + 8 files changed, 17 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7f1a0e67..d7bd5a7d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Maxwell changelog +### [v1.26.5](https://github.com/zendesk/maxwell/releases/tag/v1.26.5): "Producer partitioning by thread id" + + + - Allows Kinesis Producer to partition Kinesis Records by thread id + + ### [v1.26.4](https://github.com/zendesk/maxwell/releases/tag/v1.26.4): "Now function precision support" diff --git a/Dockerfile b/Dockerfile index 461edf15e..27073eb75 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM maven:3.6-jdk-11 -ENV MAXWELL_VERSION=1.26.4 KAFKA_VERSION=1.0.0 +ENV MAXWELL_VERSION=1.26.5 KAFKA_VERSION=1.0.0 RUN apt-get update \ && apt-get -y upgrade \ diff --git a/README.md b/README.md index 83e300963..ae24c71fe 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform. Download:
-[https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz) +[https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz)
Source:
[https://github.com/zendesk/maxwell](https://github.com/zendesk/maxwell) diff --git a/config.properties.example b/config.properties.example index 1a60b9b27..15e1942ba 100644 --- a/config.properties.example +++ b/config.properties.example @@ -174,7 +174,7 @@ kafka.acks=1 # *** partitioning *** # What part of the data do we partition by? -#producer_partition_by=database # [database, table, primary_key, transaction_id, column] +#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column] # specify what fields to partition by when using producer_partition_by=column # column separated list. diff --git a/docs/docs/quickstart.md b/docs/docs/quickstart.md index f169bb28d..e71452392 100644 --- a/docs/docs/quickstart.md +++ b/docs/docs/quickstart.md @@ -1,13 +1,13 @@ ### Download *** -- Download binary distro: [https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz) +- Download binary distro: [https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz) - Sources and bug tracking is available on github: [https://github.com/zendesk/maxwell](https://github.com/zendesk/maxwell) - Obligatory copy/paste to terminal: ``` curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz \ | tar zxvf - -cd maxwell-1.26.4 +cd maxwell-1.26.5 ``` or get the docker image: diff --git a/pom.xml b/pom.xml index 355fc1870..9bb61966c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk maxwell - 1.26.4 + 1.26.5 jar maxwell diff --git a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java index 5a88698d4..6da79111d 100644 --- a/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java +++ b/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java @@ -26,6 +26,8 @@ private PartitionBy partitionByForString(String key) { return PartitionBy.PRIMARY_KEY; case "transaction_id": return PartitionBy.TRANSACTION_ID; + case "thread_id": + return PartitionBy.THREAD_ID; case "column": return PartitionBy.COLUMN; case "random": @@ -65,6 +67,8 @@ public String getHashString(RowMap r, PartitionBy by) { return r.getRowIdentity().toConcatString(); case TRANSACTION_ID: return String.valueOf(r.getXid()); + case THREAD_ID: + return String.valueOf(r.getThreadId()); case COLUMN: String s = r.buildPartitionKey(partitionColumns); if ( s.length() > 0 ) diff --git a/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java b/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java index 6a785a03f..9de1cdd7f 100644 --- a/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java +++ b/src/main/java/com/zendesk/maxwell/producer/partitioners/PartitionBy.java @@ -5,6 +5,7 @@ public enum PartitionBy { TABLE, PRIMARY_KEY, TRANSACTION_ID, + THREAD_ID, COLUMN, RANDOM } From ef408a7405e5a224055d71a30420b15b93bb9908 Mon Sep 17 00:00:00 2001 From: gogov Date: Mon, 29 Jun 2020 16:50:25 -0400 Subject: [PATCH 2/3] Update configuration --- src/main/java/com/zendesk/maxwell/MaxwellConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 6061b892e..62797b489 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -636,11 +636,11 @@ private void validatePartitionBy() { this.producerPartitionFallback = this.kafkaPartitionFallback; } - String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "column", "random"}; + String[] validPartitionBy = {"database", "table", "primary_key", "transaction_id", "thread_id", "column", "random"}; if ( this.producerPartitionKey == null ) { this.producerPartitionKey = "database"; } else if ( !ArrayUtils.contains(validPartitionBy, this.producerPartitionKey) ) { - usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|column|random", "producer_partition_by"); + usageForOptions("please specify --producer_partition_by=database|table|primary_key|transaction_id|thread_id|column|random", "producer_partition_by"); } else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionColumns) ) { usageForOptions("please specify --producer_partition_columns=column1 when using producer_partition_by=column", "producer_partition_columns"); } else if ( this.producerPartitionKey.equals("column") && StringUtils.isEmpty(this.producerPartitionFallback) ) { From 093b7453d77365d81f08a9b2b665f9c4daa91fb2 Mon Sep 17 00:00:00 2001 From: gogov Date: Tue, 30 Jun 2020 08:24:44 -0400 Subject: [PATCH 3/3] Revert version bump --- CHANGELOG.md | 6 ------ Dockerfile | 2 +- README.md | 2 +- docs/docs/quickstart.md | 4 ++-- pom.xml | 2 +- 5 files changed, 5 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7bd5a7d7..e7f1a0e67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,5 @@ # Maxwell changelog -### [v1.26.5](https://github.com/zendesk/maxwell/releases/tag/v1.26.5): "Producer partitioning by thread id" - - - - Allows Kinesis Producer to partition Kinesis Records by thread id - - ### [v1.26.4](https://github.com/zendesk/maxwell/releases/tag/v1.26.4): "Now function precision support" diff --git a/Dockerfile b/Dockerfile index 27073eb75..461edf15e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM maven:3.6-jdk-11 -ENV MAXWELL_VERSION=1.26.5 KAFKA_VERSION=1.0.0 +ENV MAXWELL_VERSION=1.26.4 KAFKA_VERSION=1.0.0 RUN apt-get update \ && apt-get -y upgrade \ diff --git a/README.md b/README.md index ae24c71fe..83e300963 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform. Download:
-[https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz) +[https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz)
Source:
[https://github.com/zendesk/maxwell](https://github.com/zendesk/maxwell) diff --git a/docs/docs/quickstart.md b/docs/docs/quickstart.md index e71452392..f169bb28d 100644 --- a/docs/docs/quickstart.md +++ b/docs/docs/quickstart.md @@ -1,13 +1,13 @@ ### Download *** -- Download binary distro: [https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.5/maxwell-1.26.5.tar.gz) +- Download binary distro: [https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz) - Sources and bug tracking is available on github: [https://github.com/zendesk/maxwell](https://github.com/zendesk/maxwell) - Obligatory copy/paste to terminal: ``` curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.26.4/maxwell-1.26.4.tar.gz \ | tar zxvf - -cd maxwell-1.26.5 +cd maxwell-1.26.4 ``` or get the docker image: diff --git a/pom.xml b/pom.xml index 9bb61966c..355fc1870 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk maxwell - 1.26.5 + 1.26.4 jar maxwell