From 77a8133acd218fe79f284d2335c2da4a650ac126 Mon Sep 17 00:00:00 2001 From: Ducas Francis Date: Wed, 13 Apr 2016 15:14:43 +1000 Subject: [PATCH 1/2] raise a ConsumerRebalanceFinished event from ZKRebalancerListener after rebalance with owned partitions --- src/KafkaNET.Library/KafkaNET.Library.csproj | 1 + .../Events/ConsumerRebalanceEventArgs.cs | 15 +++++++++++++++ .../Listeners/ZKRebalancerListener.cs | 15 +++++++++++++++ 3 files changed, 31 insertions(+) create mode 100644 src/KafkaNET.Library/ZooKeeperIntegration/Events/ConsumerRebalanceEventArgs.cs diff --git a/src/KafkaNET.Library/KafkaNET.Library.csproj b/src/KafkaNET.Library/KafkaNET.Library.csproj index 222a4f1..4e7bdec 100644 --- a/src/KafkaNET.Library/KafkaNET.Library.csproj +++ b/src/KafkaNET.Library/KafkaNET.Library.csproj @@ -174,6 +174,7 @@ + diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Events/ConsumerRebalanceEventArgs.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Events/ConsumerRebalanceEventArgs.cs new file mode 100644 index 0000000..782ff89 --- /dev/null +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Events/ConsumerRebalanceEventArgs.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; + +namespace Kafka.Client.ZooKeeperIntegration.Events +{ + public class ConsumerRebalanceEventArgs : EventArgs + { + public IDictionary> TopicPartitionMap { get; private set; } + + public ConsumerRebalanceEventArgs(Dictionary> topicPartitionMap) + { + TopicPartitionMap = topicPartitionMap; + } + } +} \ No newline at end of file diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs index f1407bf..a747c80 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs @@ -42,6 +42,7 @@ internal class ZKRebalancerListener : IZooKeeperChildListener public static log4net.ILog Logger = log4net.LogManager.GetLogger("ZKRebalancerListener"); public event EventHandler ConsumerRebalance; + public event EventHandler ConsumerRebalanceFinished; private IDictionary> oldPartitionsPerTopicMap = new Dictionary>(); private IDictionary> oldConsumersPerTopicMap = new Dictionary>(); @@ -206,6 +207,18 @@ protected virtual void OnConsumerRebalance(EventArgs args) } } + protected virtual void OnConsumerRebalanceFinished(ConsumerRebalanceEventArgs args) + { + try + { + ConsumerRebalanceFinished?.Invoke(this, args); + } + catch (Exception ex) + { + Logger.Error("Exception occurred within event handler for ConsumerRebalanceFinished event: " + ex.Message); + } + } + private void SyncedRebalance(CancellationTokenSource cancellationTokenSource) { Logger.InfoFormat("Consumer {0} has entered rebalance", consumerIdString); @@ -252,6 +265,8 @@ private void SyncedRebalance(CancellationTokenSource cancellationTokenSource) isRebalanceRunning = false; } + OnConsumerRebalanceFinished(new ConsumerRebalanceEventArgs(topicRegistry.ToDictionary(t => t.Key, t => t.Value.Keys))); + Logger.InfoFormat("Consumer {0} has exited rebalance", consumerIdString); } From 8dba0a6356ee28331f260c4c794c40e7fdd8ae8f Mon Sep 17 00:00:00 2001 From: Ducas Francis Date: Wed, 13 Apr 2016 15:15:22 +1000 Subject: [PATCH 2/2] add rebalance finished event handler ZookeeperConsumerConnector --- .../Consumers/ZookeeperConsumerConnector.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs index 4d7f651..c3efdae 100644 --- a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs +++ b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs @@ -21,6 +21,7 @@ namespace Kafka.Client.Consumers using Kafka.Client.Serialization; using Kafka.Client.Utils; using Kafka.Client.ZooKeeperIntegration; + using Kafka.Client.ZooKeeperIntegration.Events; using Kafka.Client.ZooKeeperIntegration.Listeners; using System; using System.Collections.Concurrent; @@ -33,6 +34,7 @@ namespace Kafka.Client.Consumers /// public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerConnector { + private readonly EventHandler consumerRebalanceFinishedHandler; public static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ZookeeperConsumerConnector)); public static readonly int MaxNRetries = 4; public static bool UseSharedStaticZookeeperClient = true; @@ -74,7 +76,8 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config, bool enableFetcher, EventHandler rebalanceHandler = null, EventHandler zkDisconnectedHandler = null, - EventHandler zkExpiredHandler = null) + EventHandler zkExpiredHandler = null, + EventHandler rebalanceFinishedHandler = null) { if (string.IsNullOrEmpty(config.GroupId)) { @@ -94,6 +97,7 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config, this.consumerRebalanceHandler = rebalanceHandler; this.zkSessionDisconnectedHandler = zkDisconnectedHandler; this.zkSessionExpiredHandler = zkExpiredHandler; + this.consumerRebalanceFinishedHandler = rebalanceFinishedHandler; if (this.config.AutoCommit) { @@ -596,6 +600,10 @@ private IDictionary>> Consume(IDi { loadBalancerListener.ConsumerRebalance += this.consumerRebalanceHandler; } + if (this.consumerRebalanceFinishedHandler != null) + { + loadBalancerListener.ConsumerRebalanceFinished += this.consumerRebalanceFinishedHandler; + } stopAsyncRebalancing.Add(loadBalancerListener.StopRebalance); this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);