Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to make consumer stages to use base classes #45

Merged
9 changes: 9 additions & 0 deletions src/Akka.Streams.Kafka.Tests/KafkaFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public async Task InitializeAsync()
["KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"] = "1"
});

/*
// Remove old networks, if there are any
var networks = await _client.Networks.ListNetworksAsync(new NetworksListParameters());
foreach (var existingNetwork in networks.Where(n => n.Name.StartsWith("network-")))
{
await _client.Networks.DeleteNetworkAsync(existingNetwork.ID);
}
*/
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved

// Setting up network for containers to communicate
var network = await _client.Networks.CreateNetworkAsync(new NetworksCreateParameters(new NetworkCreate())
{
Expand Down
5 changes: 3 additions & 2 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Akka.Streams.Kafka.Stages;
using Confluent.Kafka;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Stages.Consumers;

namespace Akka.Streams.Kafka.Dsl
{
Expand All @@ -24,7 +25,7 @@ public static class KafkaConsumer
/// </summary>
public static Source<ConsumeResult<K, V>, Task> PlainSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
{
return Source.FromGraph(new KafkaSourceStage<K, V>(settings, subscription));
return Source.FromGraph(new PlainSourceStage<K, V>(settings, subscription));
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand All @@ -37,7 +38,7 @@ public static Source<ConsumeResult<K, V>, Task> PlainSource<K, V>(ConsumerSettin
/// </summary>
public static Source<CommittableMessage<K, V>, Task> CommittableSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
{
return Source.FromGraph(new CommittableConsumerStage<K, V>(settings, subscription));
return Source.FromGraph(new CommittableSourceStage<K, V>(settings, subscription));
}
}
}
41 changes: 32 additions & 9 deletions src/Akka.Streams.Kafka/Messages/CommittableMessage.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Stages.Consumers;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Messages
Expand All @@ -18,10 +20,19 @@ public CommittableMessage(ConsumeResult<K, V> record, CommitableOffset commitabl
}

public ConsumeResult<K, V> Record { get; }

public CommitableOffset CommitableOffset { get; }
}

/// <summary>
/// Commit an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// If you need to store offsets in anything other than Kafka, this API
/// should not be used.
/// </summary>
public interface ICommittable
{
Task Commit();
}

/// <summary>
/// Included in <see cref="CommittableMessage{K,V}"/>. Makes it possible to
/// commit an offset or aggregate several offsets before committing.
Expand All @@ -30,21 +41,33 @@ public CommittableMessage(ConsumeResult<K, V> record, CommitableOffset commitabl
/// should be the next message your application will consume,
/// i.e. lastProcessedMessageOffset + 1.
/// </summary>
public class CommitableOffset
public interface ICommittableOffset : ICommittable
{
private readonly Func<List<TopicPartitionOffset>> _task;
PartitionOffset Offset { get; }
}

public CommitableOffset(Func<List<TopicPartitionOffset>> task, PartitionOffset offset)
public interface ICommittableOffsetMetadata : ICommittableOffset
{
string Metadata { get; }
}


public class CommitableOffset : ICommittableOffsetMetadata
{
private readonly IInternalCommitter _committer;
public PartitionOffset Offset { get; }
public string Metadata { get; }

public CommitableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata)
{
_task = task;
_committer = committer;
Offset = offset;
Metadata = metadata;
}

public PartitionOffset Offset { get; }

public List<TopicPartitionOffset> Commit()
public Task Commit()
{
return _task();
return Task.FromResult(_committer.Commit());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alpakka's Commit method returns Future[Done], and committed partition offsets can be accessed from CommittableMessage directly, so no need to return them from Commit method. Also, in different stages there are different implementations of IInternalCommitter commit method, so more general Task response is better here.

Also, the Func<List<TopicPartitionOffset>> task field is replaced with IInternalCommitter committer field to allow using additional info for commitment as alpakka does (and we will use this flexibility in KafkaInternalCommitter class).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change - easier to support future versions of Kafka client where they might return a real Task upon calling commit. Make sure you add some XML-DOC to these classes though.

}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public ConsumerSettings<TKey, TValue> WithProperty(string key, string value) =>
public ConsumerSettings<TKey, TValue> WithPollTimeout(TimeSpan pollTimeout) => Copy(pollTimeout: pollTimeout);

public ConsumerSettings<TKey, TValue> WithDispatcher(string dispatcherId) => Copy(dispatcherId: dispatcherId);

public string GroupId => Properties.ContainsKey("group.id") ? Properties["group.id"] : null;

private ConsumerSettings<TKey, TValue> Copy(
IDeserializer<TKey> keyDeserializer = null,
Expand Down
202 changes: 0 additions & 202 deletions src/Akka.Streams.Kafka/Stages/CommittableConsumerStage.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Threading.Tasks;
using Akka.Streams.Stage;

namespace Akka.Streams.Kafka.Stages.Consumers
{
public abstract class KafkaSourceStage<K, V, TMessage> : GraphStageWithMaterializedValue<SourceShape<TMessage>, Task>
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved
{
public string StageName { get; }
public Outlet<TMessage> Out { get; } = new Outlet<TMessage>("out");
public override SourceShape<TMessage> Shape { get; }

protected KafkaSourceStage(string stageName)
{
StageName = stageName;
Shape = new SourceShape<TMessage>(Out);
}

protected override Attributes InitialAttributes => Attributes.CreateName(StageName);

protected abstract GraphStageLogic Logic(SourceShape<TMessage> shape, TaskCompletionSource<NotUsed> completion, Attributes inheritedAttributes);

public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
{
var completion = new TaskCompletionSource<NotUsed>();
var result = Logic(Shape, completion, inheritedAttributes);
return new LogicAndMaterializedValue<Task>(result, completion.Task);
}
}
}
Loading