From 0dd966444f8986840039f41d38899ca7ad85bc42 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Sat, 24 Aug 2019 13:39:14 +0300 Subject: [PATCH 1/6] Use KafkaSourceStage base class for all consumer stages --- src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs | 3 +- .../CommittableConsumerStage.cs | 24 ++++++++-------- .../Stages/Consumers/KafkaSourceStage.cs | 28 +++++++++++++++++++ .../PlainSourceStage.cs} | 27 ++++++++---------- .../Stages/{ => Producers}/ProducerStage.cs | 0 5 files changed, 52 insertions(+), 30 deletions(-) rename src/Akka.Streams.Kafka/Stages/{ => Consumers}/CommittableConsumerStage.cs (89%) create mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs rename src/Akka.Streams.Kafka/Stages/{ConsumerStage.cs => Consumers/PlainSourceStage.cs} (86%) rename src/Akka.Streams.Kafka/Stages/{ => Producers}/ProducerStage.cs (100%) diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs index f6bf63a4..fc3254f7 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs @@ -4,6 +4,7 @@ using Akka.Streams.Kafka.Stages; using Confluent.Kafka; using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Stages.Consumers; namespace Akka.Streams.Kafka.Dsl { @@ -24,7 +25,7 @@ public static class KafkaConsumer /// public static Source, Task> PlainSource(ConsumerSettings settings, ISubscription subscription) { - return Source.FromGraph(new KafkaSourceStage(settings, subscription)); + return Source.FromGraph(new PlainSourceStage(settings, subscription)); } /// diff --git a/src/Akka.Streams.Kafka/Stages/CommittableConsumerStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs similarity index 89% rename from src/Akka.Streams.Kafka/Stages/CommittableConsumerStage.cs rename to src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs index 60571ad2..6d6b0db7 100644 --- a/src/Akka.Streams.Kafka/Stages/CommittableConsumerStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs @@ -1,35 +1,32 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.Serialization; +using System.Threading; using System.Threading.Tasks; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; using Akka.Streams.Stage; -using Confluent.Kafka; using Akka.Streams.Supervision; -using System.Runtime.Serialization; -using System.Threading; +using Confluent.Kafka; -namespace Akka.Streams.Kafka.Stages +namespace Akka.Streams.Kafka.Stages.Consumers { - internal class CommittableConsumerStage : GraphStageWithMaterializedValue>, Task> - { - public Outlet> Out { get; } = new Outlet>("kafka.commitable.consumer.out"); - public override SourceShape> Shape { get; } + internal class CommittableConsumerStage : KafkaSourceStage> + { public ConsumerSettings Settings { get; } public ISubscription Subscription { get; } public CommittableConsumerStage(ConsumerSettings settings, ISubscription subscription) + : base("CommittableSource") { Settings = settings; Subscription = subscription; - Shape = new SourceShape>(Out); } - public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion) { - var completion = new TaskCompletionSource(); - return new LogicAndMaterializedValue(new KafkaCommittableSourceStage(this, inheritedAttributes, completion), completion.Task); + return new KafkaCommittableSourceStage(this, InitialAttributes, completion); } } @@ -47,7 +44,8 @@ internal class KafkaCommittableSourceStage : GraphStageLogic private readonly TaskCompletionSource _completion; private readonly CancellationTokenSource _cancellationTokenSource; - public KafkaCommittableSourceStage(CommittableConsumerStage stage, Attributes attributes, TaskCompletionSource completion) : base(stage.Shape) + public KafkaCommittableSourceStage(CommittableConsumerStage stage, Attributes attributes, TaskCompletionSource completion) + : base(stage.Shape) { _settings = stage.Settings; _subscription = stage.Subscription; diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs new file mode 100644 index 00000000..c8a951ac --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using Akka.Streams.Stage; + +namespace Akka.Streams.Kafka.Stages.Consumers +{ + public abstract class KafkaSourceStage : GraphStageWithMaterializedValue, Task> + { + public string StageName { get; } + public Outlet Out => new Outlet("out"); + public override SourceShape Shape => new SourceShape(Out); + + protected KafkaSourceStage(string stageName) + { + StageName = stageName; + } + + protected override Attributes InitialAttributes => Attributes.CreateName(StageName); + + protected abstract GraphStageLogic Logic(SourceShape shape, TaskCompletionSource completion); + + public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var completion = new TaskCompletionSource(); + var result = Logic(Shape, completion); + return new LogicAndMaterializedValue(result, completion.Task); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs similarity index 86% rename from src/Akka.Streams.Kafka/Stages/ConsumerStage.cs rename to src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs index 9cdbf7b6..26cfb550 100644 --- a/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs @@ -1,40 +1,35 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.Serialization; +using System.Threading; using System.Threading.Tasks; using Akka.Streams.Kafka.Settings; using Akka.Streams.Stage; -using Confluent.Kafka; using Akka.Streams.Supervision; -using System.Runtime.Serialization; -using System.Threading; +using Confluent.Kafka; -namespace Akka.Streams.Kafka.Stages +namespace Akka.Streams.Kafka.Stages.Consumers { - internal class KafkaSourceStage : GraphStageWithMaterializedValue>, Task> + internal class PlainSourceStage : KafkaSourceStage> { - public Outlet> Out { get; } = new Outlet>("kafka.consumer.out"); - public override SourceShape> Shape { get; } public ConsumerSettings Settings { get; } public ISubscription Subscription { get; } - public KafkaSourceStage(ConsumerSettings settings, ISubscription subscription) + public PlainSourceStage(ConsumerSettings settings, ISubscription subscription) + : base("PlainSource") { Settings = settings; Subscription = subscription; - Shape = new SourceShape>(Out); - Settings = settings; - Subscription = subscription; } - public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion) { - var completion = new TaskCompletionSource(); - return new LogicAndMaterializedValue(new KafkaSourceStageLogic(this, inheritedAttributes, completion), completion.Task); + return new PlainSourceStageLogic(this, InitialAttributes, completion); } } - internal class KafkaSourceStageLogic : GraphStageLogic + internal class PlainSourceStageLogic : GraphStageLogic { private readonly ConsumerSettings _settings; private readonly ISubscription _subscription; @@ -48,7 +43,7 @@ internal class KafkaSourceStageLogic : GraphStageLogic private readonly TaskCompletionSource _completion; private readonly CancellationTokenSource _cancellationTokenSource; - public KafkaSourceStageLogic(KafkaSourceStage stage, Attributes attributes, TaskCompletionSource completion) : base(stage.Shape) + public PlainSourceStageLogic(PlainSourceStage stage, Attributes attributes, TaskCompletionSource completion) : base(stage.Shape) { _settings = stage.Settings; _subscription = stage.Subscription; diff --git a/src/Akka.Streams.Kafka/Stages/ProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs similarity index 100% rename from src/Akka.Streams.Kafka/Stages/ProducerStage.cs rename to src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs From ba64badd7757c9157f78dc80d92b18a8a04a204a Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Sat, 24 Aug 2019 15:42:06 +0300 Subject: [PATCH 2/6] Use SingleSourceStageLogic base with message builders for source stage logic --- src/Akka.Streams.Kafka.Tests/KafkaFixture.cs | 9 + src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs | 2 +- .../Messages/CommittableMessage.cs | 41 +++- .../Settings/ConsumerSettings.cs | 2 + .../{ => Abstract}/KafkaSourceStage.cs | 9 +- .../SingleSourceStageLogic.cs} | 37 +--- .../Consumers/CommittableConsumerStage.cs | 200 ------------------ .../Stages/Consumers/Committers.cs | 28 +++ .../Concrete/CommittableSourceStage.cs | 36 ++++ .../Consumers/Concrete/PlainSourceStage.cs | 26 +++ .../Stages/Consumers/MessageBuilders.cs | 61 ++++++ 11 files changed, 211 insertions(+), 240 deletions(-) rename src/Akka.Streams.Kafka/Stages/Consumers/{ => Abstract}/KafkaSourceStage.cs (70%) rename src/Akka.Streams.Kafka/Stages/Consumers/{PlainSourceStage.cs => Abstract/SingleSourceStageLogic.cs} (84%) delete mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs create mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs create mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs create mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs create mode 100644 src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs diff --git a/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs b/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs index aca95e36..80eb258b 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs @@ -72,6 +72,15 @@ public async Task InitializeAsync() ["KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"] = "1" }); + /* + // Remove old networks, if there are any + var networks = await _client.Networks.ListNetworksAsync(new NetworksListParameters()); + foreach (var existingNetwork in networks.Where(n => n.Name.StartsWith("network-"))) + { + await _client.Networks.DeleteNetworkAsync(existingNetwork.ID); + } + */ + // Setting up network for containers to communicate var network = await _client.Networks.CreateNetworkAsync(new NetworksCreateParameters(new NetworkCreate()) { diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs index fc3254f7..d2224c9b 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs @@ -38,7 +38,7 @@ public static Source, Task> PlainSource(ConsumerSettin /// public static Source, Task> CommittableSource(ConsumerSettings settings, ISubscription subscription) { - return Source.FromGraph(new CommittableConsumerStage(settings, subscription)); + return Source.FromGraph(new CommittableSourceStage(settings, subscription)); } } } diff --git a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs index c161443f..c6c6036e 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Akka.Streams.Kafka.Dsl; +using Akka.Streams.Kafka.Stages.Consumers; using Confluent.Kafka; namespace Akka.Streams.Kafka.Messages @@ -18,10 +20,19 @@ public CommittableMessage(ConsumeResult record, CommitableOffset commitabl } public ConsumeResult Record { get; } - public CommitableOffset CommitableOffset { get; } } + /// + /// Commit an offset that is included in a + /// If you need to store offsets in anything other than Kafka, this API + /// should not be used. + /// + public interface ICommittable + { + Task Commit(); + } + /// /// Included in . Makes it possible to /// commit an offset or aggregate several offsets before committing. @@ -30,21 +41,33 @@ public CommittableMessage(ConsumeResult record, CommitableOffset commitabl /// should be the next message your application will consume, /// i.e. lastProcessedMessageOffset + 1. /// - public class CommitableOffset + public interface ICommittableOffset : ICommittable { - private readonly Func> _task; + PartitionOffset Offset { get; } + } - public CommitableOffset(Func> task, PartitionOffset offset) + public interface ICommittableOffsetMetadata : ICommittableOffset + { + string Metadata { get; } + } + + + public class CommitableOffset : ICommittableOffsetMetadata + { + private readonly IInternalCommitter _committer; + public PartitionOffset Offset { get; } + public string Metadata { get; } + + public CommitableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata) { - _task = task; + _committer = committer; Offset = offset; + Metadata = metadata; } - public PartitionOffset Offset { get; } - - public List Commit() + public Task Commit() { - return _task(); + return Task.FromResult(_committer.Commit()); } } diff --git a/src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs b/src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs index fda3e56a..f57927f7 100644 --- a/src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs +++ b/src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs @@ -67,6 +67,8 @@ public ConsumerSettings WithProperty(string key, string value) => public ConsumerSettings WithPollTimeout(TimeSpan pollTimeout) => Copy(pollTimeout: pollTimeout); public ConsumerSettings WithDispatcher(string dispatcherId) => Copy(dispatcherId: dispatcherId); + + public string GroupId => Properties.ContainsKey("group.id") ? Properties["group.id"] : null; private ConsumerSettings Copy( IDeserializer keyDeserializer = null, diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs similarity index 70% rename from src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs rename to src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs index c8a951ac..f0b99036 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/KafkaSourceStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs @@ -6,22 +6,23 @@ namespace Akka.Streams.Kafka.Stages.Consumers public abstract class KafkaSourceStage : GraphStageWithMaterializedValue, Task> { public string StageName { get; } - public Outlet Out => new Outlet("out"); - public override SourceShape Shape => new SourceShape(Out); + public Outlet Out { get; } = new Outlet("out"); + public override SourceShape Shape { get; } protected KafkaSourceStage(string stageName) { StageName = stageName; + Shape = new SourceShape(Out); } protected override Attributes InitialAttributes => Attributes.CreateName(StageName); - protected abstract GraphStageLogic Logic(SourceShape shape, TaskCompletionSource completion); + protected abstract GraphStageLogic Logic(SourceShape shape, TaskCompletionSource completion, Attributes inheritedAttributes); public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { var completion = new TaskCompletionSource(); - var result = Logic(Shape, completion); + var result = Logic(Shape, completion, inheritedAttributes); return new LogicAndMaterializedValue(result, completion.Task); } } diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs similarity index 84% rename from src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs rename to src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs index 26cfb550..513f47f7 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/PlainSourceStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs @@ -11,25 +11,7 @@ namespace Akka.Streams.Kafka.Stages.Consumers { - internal class PlainSourceStage : KafkaSourceStage> - { - public ConsumerSettings Settings { get; } - public ISubscription Subscription { get; } - - public PlainSourceStage(ConsumerSettings settings, ISubscription subscription) - : base("PlainSource") - { - Settings = settings; - Subscription = subscription; - } - - protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion) - { - return new PlainSourceStageLogic(this, InitialAttributes, completion); - } - } - - internal class PlainSourceStageLogic : GraphStageLogic + internal class SingleSourceStageLogic : GraphStageLogic { private readonly ConsumerSettings _settings; private readonly ISubscription _subscription; @@ -43,17 +25,20 @@ internal class PlainSourceStageLogic : GraphStageLogic private readonly TaskCompletionSource _completion; private readonly CancellationTokenSource _cancellationTokenSource; - public PlainSourceStageLogic(PlainSourceStage stage, Attributes attributes, TaskCompletionSource completion) : base(stage.Shape) + public SingleSourceStageLogic(SourceShape shape, ConsumerSettings settings, + ISubscription subscription, Attributes attributes, + TaskCompletionSource completion, IMessageBuilder messageBuilder) + : base(shape) { - _settings = stage.Settings; - _subscription = stage.Subscription; + _settings = settings; + _subscription = subscription; _completion = completion; _cancellationTokenSource = new CancellationTokenSource(); var supervisionStrategy = attributes.GetAttribute(null); _decider = supervisionStrategy != null ? supervisionStrategy.Decider : Deciders.ResumingDecider; - SetHandler(stage.Out, onPull: () => + SetHandler(shape.Outlet, onPull: () => { try { @@ -61,9 +46,9 @@ public PlainSourceStageLogic(PlainSourceStage stage, Attributes attributes if (message == null) // No message received, or consume error occured return; - if (IsAvailable(stage.Out)) + if (IsAvailable(shape.Outlet)) { - Push(stage.Out, message); + Push(shape.Outlet, messageBuilder.CreateMessage(message, _consumer)); } } catch (OperationCanceledException) @@ -192,4 +177,4 @@ private void HandleError(Error error) } } } -} +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs deleted file mode 100644 index 6d6b0db7..00000000 --- a/src/Akka.Streams.Kafka/Stages/Consumers/CommittableConsumerStage.cs +++ /dev/null @@ -1,200 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Serialization; -using System.Threading; -using System.Threading.Tasks; -using Akka.Streams.Kafka.Messages; -using Akka.Streams.Kafka.Settings; -using Akka.Streams.Stage; -using Akka.Streams.Supervision; -using Confluent.Kafka; - -namespace Akka.Streams.Kafka.Stages.Consumers -{ - internal class CommittableConsumerStage : KafkaSourceStage> - { - public ConsumerSettings Settings { get; } - public ISubscription Subscription { get; } - - public CommittableConsumerStage(ConsumerSettings settings, ISubscription subscription) - : base("CommittableSource") - { - Settings = settings; - Subscription = subscription; - } - - protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion) - { - return new KafkaCommittableSourceStage(this, InitialAttributes, completion); - } - } - - internal class KafkaCommittableSourceStage : GraphStageLogic - { - private readonly ConsumerSettings _settings; - private readonly ISubscription _subscription; - private IConsumer _consumer; - - private Action> _partitionsAssigned; - private Action> _partitionsRevoked; - private readonly Decider _decider; - - private IEnumerable _assignedPartitions; - private readonly TaskCompletionSource _completion; - private readonly CancellationTokenSource _cancellationTokenSource; - - public KafkaCommittableSourceStage(CommittableConsumerStage stage, Attributes attributes, TaskCompletionSource completion) - : base(stage.Shape) - { - _settings = stage.Settings; - _subscription = stage.Subscription; - _completion = completion; - _cancellationTokenSource = new CancellationTokenSource(); - - var supervisionStrategy = attributes.GetAttribute(null); - _decider = supervisionStrategy != null ? supervisionStrategy.Decider : Deciders.ResumingDecider; - - SetHandler(stage.Out, onPull: () => - { - try - { - var message = _consumer.Consume(_cancellationTokenSource.Token); - if (message == null) // No message received, or consume error occured - return; - - var consumer = _consumer; - var commitableOffset = new CommitableOffset( - () => consumer.Commit(), - new PartitionOffset("groupId", message.Topic, message.Partition, message.Offset)); - - if (IsAvailable(stage.Out)) - { - var commitableMessage = new CommittableMessage(message, commitableOffset); - Push(stage.Out, commitableMessage); - } - } - catch (OperationCanceledException) - { - // Consume was canceled, looks like we are shutting down the stage - } - catch (ConsumeException ex) - { - HandleError(ex.Error); - } - }); - } - - public override void PreStart() - { - base.PreStart(); - - _consumer = _settings.CreateKafkaConsumer(HandleConsumeError, HandleOnPartitionsAssigned, HandleOnPartitionsRevoked); - Log.Debug($"Consumer started: {_consumer.Name}"); - - switch (_subscription) - { - case TopicSubscription ts: - _consumer.Subscribe(ts.Topics); - break; - case Assignment a: - _consumer.Assign(a.TopicPartitions); - break; - case AssignmentWithOffset awo: - _consumer.Assign(awo.TopicPartitions); - break; - } - - _partitionsAssigned = GetAsyncCallback>(PartitionsAssigned); - _partitionsRevoked = GetAsyncCallback>(PartitionsRevoked); - } - - public override void PostStop() - { - Log.Debug($"Consumer stopped: {_consumer.Name}"); - _consumer.Dispose(); - _completion.SetResult(NotUsed.Instance); - - base.PostStop(); - } - - // - // Consumer's events - // - - private void HandleOnPartitionsAssigned(IConsumer consumer, List list) - { - _partitionsAssigned(list); - } - - private void HandleOnPartitionsRevoked(IConsumer consumer, List list) - { - _partitionsRevoked(list); - } - - private void PartitionsAssigned(IEnumerable partitions) - { - Log.Debug($"Partitions were assigned: {_consumer.Name}"); - var partitionsList = partitions.ToList(); - _consumer.Assign(partitionsList); - _assignedPartitions = partitionsList; - } - - private void PartitionsRevoked(IEnumerable partitions) - { - Log.Debug($"Partitions were revoked: {_consumer.Name}"); - _consumer.Unassign(); - _assignedPartitions = null; - } - - private void HandleConsumeError(object sender, Error error) - { - Log.Error(error.Reason); - var exception = new KafkaException(error); - switch (_decider(exception)) - { - case Directive.Stop: - // Throw - _completion.TrySetException(exception); - _cancellationTokenSource.Cancel(); - FailStage(exception); - break; - case Directive.Resume: - // keep going - break; - case Directive.Restart: - // keep going - break; - } - } - - private void HandleError(Error error) - { - Log.Error(error.Reason); - - if (!KafkaExtensions.IsBrokerErrorRetriable(error) && !KafkaExtensions.IsLocalErrorRetriable(error)) - { - var exception = new KafkaException(error); - FailStage(exception); - } - else if (KafkaExtensions.IsLocalValueSerializationError(error)) - { - var exception = new SerializationException(error.Reason); - switch (_decider(exception)) - { - case Directive.Stop: - // Throw - _completion.TrySetException(exception); - FailStage(exception); - break; - case Directive.Resume: - // keep going - break; - case Directive.Restart: - // keep going - break; - } - } - } - } -} diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs new file mode 100644 index 00000000..d668bd74 --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages.Consumers +{ + public interface IInternalCommitter + { + /// + /// Commit all offsets (of different topics) belonging to the same stage + /// + List Commit(); + } + + /// + /// This is a simple committer using kafka consumer directly (not consumer actor, etc) + /// + public class KafkaCommitter : IInternalCommitter + { + private readonly IConsumer _consumer; + + public KafkaCommitter(IConsumer consumer) + { + _consumer = consumer; + } + + public List Commit() => _consumer.Commit(); + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs new file mode 100644 index 00000000..ea45a17b --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.Stage; +using Akka.Streams.Supervision; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages.Consumers +{ + internal class CommittableSourceStage : KafkaSourceStage> + { + private readonly Func, string> _metadataFromMessage; + public ConsumerSettings Settings { get; } + public ISubscription Subscription { get; } + + public CommittableSourceStage(ConsumerSettings settings, ISubscription subscription, + Func, string> metadataFromMessage = null) + : base("CommittableSource") + { + _metadataFromMessage = metadataFromMessage ?? (msg => string.Empty); + Settings = settings; + Subscription = subscription; + } + + protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion, Attributes inheritedAttributes) + { + return new SingleSourceStageLogic>(shape, Settings, Subscription, inheritedAttributes, + completion, new CommittableSourceMessageBuilder(Settings, _metadataFromMessage)); + } + } +} diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs new file mode 100644 index 00000000..14d368c0 --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs @@ -0,0 +1,26 @@ +using System.Threading.Tasks; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.Stage; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages.Consumers +{ + internal class PlainSourceStage : KafkaSourceStage> + { + public ConsumerSettings Settings { get; } + public ISubscription Subscription { get; } + + public PlainSourceStage(ConsumerSettings settings, ISubscription subscription) + : base("PlainSource") + { + Settings = settings; + Subscription = subscription; + } + + protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion, Attributes inheritedAttributes) + { + return new SingleSourceStageLogic>(shape, Settings, Subscription, inheritedAttributes, + completion, new PlainMessageBuilder()); + } + } +} diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs new file mode 100644 index 00000000..401ab8d1 --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages.Consumers +{ + public interface IMessageBuilder + { + /// + /// Creates downstream message + /// + /// + /// We pass consumer here, because there is no way to get consumer instance from + /// some global configuration, like Alpakka does getting consumer actor ref + /// + TMessage CreateMessage(ConsumeResult record, IConsumer consumer); + } + + /// + /// Message builder used for + /// + public class PlainMessageBuilder : IMessageBuilder> + { + public ConsumeResult CreateMessage(ConsumeResult record, IConsumer consumer) => record; + } + + /// + /// This base class used for different committable source message builders + /// + public abstract class CommittableMessageBuilder : IMessageBuilder> + { + public abstract string GroupId { get; } + public abstract string MetadataFromRecord(ConsumeResult record); + + public CommittableMessage CreateMessage(ConsumeResult record, IConsumer consumer) + { + var offset = new PartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); + return new CommittableMessage(record, new CommitableOffset(new KafkaCommitter(consumer), offset, MetadataFromRecord(record))); + } + } + + /// + /// Message builder used for + /// + public class CommittableSourceMessageBuilder : CommittableMessageBuilder + { + private readonly ConsumerSettings _settings; + private readonly Func, string> _metadataFromRecord; + + public CommittableSourceMessageBuilder(ConsumerSettings settings, Func, string> metadataFromRecord) + { + _settings = settings; + _metadataFromRecord = metadataFromRecord; + } + + public override string GroupId => _settings.GroupId; + public override string MetadataFromRecord(ConsumeResult record) => _metadataFromRecord(record); + } +} \ No newline at end of file From 9023a9919956ac42822a39a1f5b1a369cab6beed Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Mon, 26 Aug 2019 21:14:34 +0300 Subject: [PATCH 3/6] Added xml docs to committers and made them internal API --- .../Stages/Consumers/Committers.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs index d668bd74..f2254d30 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs @@ -3,7 +3,10 @@ namespace Akka.Streams.Kafka.Stages.Consumers { - public interface IInternalCommitter + /// + /// Interface for implementing committing consumed messages + /// + internal interface IInternalCommitter { /// /// Commit all offsets (of different topics) belonging to the same stage @@ -12,9 +15,9 @@ public interface IInternalCommitter } /// - /// This is a simple committer using kafka consumer directly (not consumer actor, etc) + /// This is a simple committer using kafka consumer directly (not using consumer actor, etc) /// - public class KafkaCommitter : IInternalCommitter + internal class KafkaCommitter : IInternalCommitter { private readonly IConsumer _consumer; @@ -23,6 +26,9 @@ public KafkaCommitter(IConsumer consumer) _consumer = consumer; } + /// + /// Commit all offsets (of different topics) belonging to the same stage + /// public List Commit() => _consumer.Commit(); } } \ No newline at end of file From fb055bd750e940fd796b411c917229ffdbc88d09 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Mon, 26 Aug 2019 21:25:34 +0300 Subject: [PATCH 4/6] Added xml comments to public messages + only interfaces should be public --- .../Messages/CommittableMessage.cs | 56 ++++++++++++++++--- .../Messages/MessageAndMeta.cs | 15 +++++ .../Stages/Consumers/MessageBuilders.cs | 2 +- 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs index c6c6036e..465f76ec 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs @@ -13,14 +13,20 @@ namespace Akka.Streams.Kafka.Messages /// public sealed class CommittableMessage { - public CommittableMessage(ConsumeResult record, CommitableOffset commitableOffset) + public CommittableMessage(ConsumeResult record, ICommittableOffset commitableOffset) { Record = record; CommitableOffset = commitableOffset; } + /// + /// The consumed record data + /// public ConsumeResult Record { get; } - public CommitableOffset CommitableOffset { get; } + /// + /// Consumer offset that can be commited + /// + public ICommittableOffset CommitableOffset { get; } } /// @@ -30,6 +36,9 @@ public CommittableMessage(ConsumeResult record, CommitableOffset commitabl /// public interface ICommittable { + /// + /// Commits an offset that is included in a + /// Task Commit(); } @@ -43,28 +52,50 @@ public interface ICommittable /// public interface ICommittableOffset : ICommittable { + /// + /// Offset value + /// PartitionOffset Offset { get; } } + /// + /// Extends with some metadata + /// public interface ICommittableOffsetMetadata : ICommittableOffset { + /// + /// Cosumed record metadata + /// string Metadata { get; } } - - public class CommitableOffset : ICommittableOffsetMetadata + /// + /// Implementation of the offset, contained in . + /// Can be commited via method. + /// + internal class CommittableOffset : ICommittableOffsetMetadata { private readonly IInternalCommitter _committer; + + /// + /// Offset value + /// public PartitionOffset Offset { get; } + /// + /// Cosumed record metadata + /// public string Metadata { get; } - public CommitableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata) + public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata) { _committer = committer; Offset = offset; Metadata = metadata; } + /// + /// Commits offset to Kafka + /// public Task Commit() { return Task.FromResult(_committer.Commit()); @@ -84,12 +115,21 @@ public PartitionOffset(string groupId, string topic, int partition, Offset offse Offset = offset; } + /// + /// Consumer's group Id + /// public string GroupId { get; } - + /// + /// Topic + /// public string Topic { get; } - + /// + /// Partition + /// public int Partition { get; } - + /// + /// Kafka partition offset value + /// public Offset Offset { get; } } } diff --git a/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs b/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs index c82bec6e..1cd498bf 100644 --- a/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs +++ b/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs @@ -9,8 +9,23 @@ namespace Akka.Streams.Kafka.Messages /// Type of value public class MessageAndMeta { + /// + /// The message to send + /// public Message Message { get; set; } + /// + /// Topic to send to. + /// + /// + /// If TopicPartition property is specified, the Topic property value is ignored. + /// public string Topic { get; set; } + /// + /// Topic partition to sent to. + /// + /// + /// If TopicPartition property is specified, the Topic property value is ignored. + /// public TopicPartition TopicPartition { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index 401ab8d1..a1e9d337 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -37,7 +37,7 @@ public abstract class CommittableMessageBuilder : IMessageBuilder CreateMessage(ConsumeResult record, IConsumer consumer) { var offset = new PartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); - return new CommittableMessage(record, new CommitableOffset(new KafkaCommitter(consumer), offset, MetadataFromRecord(record))); + return new CommittableMessage(record, new CommittableOffset(new KafkaCommitter(consumer), offset, MetadataFromRecord(record))); } } From c95723201b23ac30725bdb00d9f5a02e960c0967 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Mon, 26 Aug 2019 21:29:26 +0300 Subject: [PATCH 5/6] Updated IInternalCommitter to return plain task --- src/Akka.Streams.Kafka/Messages/CommittableMessage.cs | 2 +- src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs index 465f76ec..a52e0fc4 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs @@ -98,7 +98,7 @@ public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, s /// public Task Commit() { - return Task.FromResult(_committer.Commit()); + return _committer.Commit(); } } diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs index f2254d30..0f1c2042 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading.Tasks; using Confluent.Kafka; namespace Akka.Streams.Kafka.Stages.Consumers @@ -11,7 +12,7 @@ internal interface IInternalCommitter /// /// Commit all offsets (of different topics) belonging to the same stage /// - List Commit(); + Task Commit(); } /// @@ -29,6 +30,6 @@ public KafkaCommitter(IConsumer consumer) /// /// Commit all offsets (of different topics) belonging to the same stage /// - public List Commit() => _consumer.Commit(); + public Task Commit() => Task.FromResult(_consumer.Commit()); } } \ No newline at end of file From 61d0444898b2dcd9c749e8915f9d0b59240dd037 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Mon, 26 Aug 2019 21:35:01 +0300 Subject: [PATCH 6/6] Added comments to consumer stages --- src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs | 1 + .../Consumers/Abstract/KafkaSourceStage.cs | 11 ++++++-- .../Concrete/CommittableSourceStage.cs | 28 +++++++++++++++---- .../Consumers/Concrete/PlainSourceStage.cs | 22 ++++++++++++++- .../Stages/Consumers/MessageBuilders.cs | 1 + 5 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs index d2224c9b..0a46a956 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs @@ -5,6 +5,7 @@ using Confluent.Kafka; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Stages.Consumers; +using Akka.Streams.Kafka.Stages.Consumers.Concrete; namespace Akka.Streams.Kafka.Dsl { diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs index f0b99036..23ef24d2 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/KafkaSourceStage.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Akka.Streams.Stage; -namespace Akka.Streams.Kafka.Stages.Consumers +namespace Akka.Streams.Kafka.Stages.Consumers.Abstract { public abstract class KafkaSourceStage : GraphStageWithMaterializedValue, Task> { @@ -16,7 +16,14 @@ protected KafkaSourceStage(string stageName) } protected override Attributes InitialAttributes => Attributes.CreateName(StageName); - + + /// + /// Provides actual stage logic + /// + /// Shape of the stage + /// Used to specify stage task completion + /// Stage attributes + /// Stage logic protected abstract GraphStageLogic Logic(SourceShape shape, TaskCompletionSource completion, Attributes inheritedAttributes); public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs index ea45a17b..d89f2f83 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/CommittableSourceStage.cs @@ -1,21 +1,30 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Serialization; -using System.Threading; using System.Threading.Tasks; +using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Stages.Consumers.Abstract; using Akka.Streams.Stage; -using Akka.Streams.Supervision; using Confluent.Kafka; -namespace Akka.Streams.Kafka.Stages.Consumers +namespace Akka.Streams.Kafka.Stages.Consumers.Concrete { + /// + /// This stage is used for + /// + /// The key type + /// The value type internal class CommittableSourceStage : KafkaSourceStage> { private readonly Func, string> _metadataFromMessage; + + /// + /// Consumer settings + /// public ConsumerSettings Settings { get; } + /// + /// Subscription + /// public ISubscription Subscription { get; } public CommittableSourceStage(ConsumerSettings settings, ISubscription subscription, @@ -27,6 +36,13 @@ public CommittableSourceStage(ConsumerSettings settings, ISubscription sub Subscription = subscription; } + /// + /// Provides actual stage logic + /// + /// Shape of the stage + /// Used to specify stage task completion + /// Stage attributes + /// Stage logic protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion, Attributes inheritedAttributes) { return new SingleSourceStageLogic>(shape, Settings, Subscription, inheritedAttributes, diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs index 14d368c0..bf15f5c4 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSourceStage.cs @@ -1,13 +1,26 @@ using System.Threading.Tasks; +using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Stages.Consumers.Abstract; using Akka.Streams.Stage; using Confluent.Kafka; -namespace Akka.Streams.Kafka.Stages.Consumers +namespace Akka.Streams.Kafka.Stages.Consumers.Concrete { + /// + /// This stage is used for + /// + /// The key type + /// The value type internal class PlainSourceStage : KafkaSourceStage> { + /// + /// Consumer settings + /// public ConsumerSettings Settings { get; } + /// + /// Subscription + /// public ISubscription Subscription { get; } public PlainSourceStage(ConsumerSettings settings, ISubscription subscription) @@ -17,6 +30,13 @@ public PlainSourceStage(ConsumerSettings settings, ISubscription subscript Subscription = subscription; } + /// + /// Provides actual stage logic + /// + /// Shape of the stage + /// Used to specify stage task completion + /// Stage attributes + /// Stage logic protected override GraphStageLogic Logic(SourceShape> shape, TaskCompletionSource completion, Attributes inheritedAttributes) { return new SingleSourceStageLogic>(shape, Settings, Subscription, inheritedAttributes, diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index a1e9d337..c2723bdf 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Stages.Consumers.Concrete; using Confluent.Kafka; namespace Akka.Streams.Kafka.Stages.Consumers