Skip to content
This repository has been archived by the owner on Jul 15, 2023. It is now read-only.

rebalance finished event #51

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Kafka.Client.Consumers
using Kafka.Client.Serialization;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration;
using Kafka.Client.ZooKeeperIntegration.Events;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using System;
using System.Collections.Concurrent;
Expand All @@ -33,6 +34,7 @@ namespace Kafka.Client.Consumers
/// </summary>
public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerConnector
{
private readonly EventHandler<ConsumerRebalanceEventArgs> consumerRebalanceFinishedHandler;
public static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ZookeeperConsumerConnector));
public static readonly int MaxNRetries = 4;
public static bool UseSharedStaticZookeeperClient = true;
Expand Down Expand Up @@ -74,7 +76,8 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config,
bool enableFetcher,
EventHandler rebalanceHandler = null,
EventHandler zkDisconnectedHandler = null,
EventHandler zkExpiredHandler = null)
EventHandler zkExpiredHandler = null,
EventHandler<ConsumerRebalanceEventArgs> rebalanceFinishedHandler = null)
{
if (string.IsNullOrEmpty(config.GroupId))
{
Expand All @@ -94,6 +97,7 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config,
this.consumerRebalanceHandler = rebalanceHandler;
this.zkSessionDisconnectedHandler = zkDisconnectedHandler;
this.zkSessionExpiredHandler = zkExpiredHandler;
this.consumerRebalanceFinishedHandler = rebalanceFinishedHandler;

if (this.config.AutoCommit)
{
Expand Down Expand Up @@ -596,6 +600,10 @@ private IDictionary<string, IList<KafkaMessageStream<TData>>> Consume<TData>(IDi
{
loadBalancerListener.ConsumerRebalance += this.consumerRebalanceHandler;
}
if (this.consumerRebalanceFinishedHandler != null)
{
loadBalancerListener.ConsumerRebalanceFinished += this.consumerRebalanceFinishedHandler;
}

stopAsyncRebalancing.Add(loadBalancerListener.StopRebalance);
this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);
Expand Down
1 change: 1 addition & 0 deletions src/KafkaNET.Library/KafkaNET.Library.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
<Compile Include="Consumers\IKafkaMessageStream.cs" />
<Compile Include="Consumers\IZookeeperConsumerConnector.cs" />
<Compile Include="ZooKeeperIntegration\Events\ChildChangedEventItem.cs" />
<Compile Include="ZooKeeperIntegration\Events\ConsumerRebalanceEventArgs.cs" />
<Compile Include="ZooKeeperIntegration\Events\DataChangedEventItem.cs" />
<Compile Include="ZooKeeperIntegration\Events\ZooKeeperChildChangedEventArgs.cs" />
<Compile Include="ZooKeeperIntegration\Events\ZooKeeperDataChangedEventArgs.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;

namespace Kafka.Client.ZooKeeperIntegration.Events
{
public class ConsumerRebalanceEventArgs : EventArgs
{
public IDictionary<string, ICollection<int>> TopicPartitionMap { get; private set; }

public ConsumerRebalanceEventArgs(Dictionary<string, ICollection<int>> topicPartitionMap)
{
TopicPartitionMap = topicPartitionMap;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ internal class ZKRebalancerListener<TData> : IZooKeeperChildListener
public static log4net.ILog Logger = log4net.LogManager.GetLogger("ZKRebalancerListener");

public event EventHandler ConsumerRebalance;
public event EventHandler<ConsumerRebalanceEventArgs> ConsumerRebalanceFinished;

private IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
Expand Down Expand Up @@ -206,6 +207,18 @@ protected virtual void OnConsumerRebalance(EventArgs args)
}
}

protected virtual void OnConsumerRebalanceFinished(ConsumerRebalanceEventArgs args)
{
try
{
ConsumerRebalanceFinished?.Invoke(this, args);
}
catch (Exception ex)
{
Logger.Error("Exception occurred within event handler for ConsumerRebalanceFinished event: " + ex.Message);
}
}

private void SyncedRebalance(CancellationTokenSource cancellationTokenSource)
{
Logger.InfoFormat("Consumer {0} has entered rebalance", consumerIdString);
Expand Down Expand Up @@ -252,6 +265,8 @@ private void SyncedRebalance(CancellationTokenSource cancellationTokenSource)
isRebalanceRunning = false;
}

OnConsumerRebalanceFinished(new ConsumerRebalanceEventArgs(topicRegistry.ToDictionary(t => t.Key, t => t.Value.Keys)));

Logger.InfoFormat("Consumer {0} has exited rebalance", consumerIdString);
}

Expand Down