Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to make consumer stages to use base classes #45

Merged
9 changes: 9 additions & 0 deletions src/Akka.Streams.Kafka.Tests/KafkaFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public async Task InitializeAsync()
["KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"] = "1"
});

/*
// Remove old networks, if there are any
var networks = await _client.Networks.ListNetworksAsync(new NetworksListParameters());
foreach (var existingNetwork in networks.Where(n => n.Name.StartsWith("network-")))
{
await _client.Networks.DeleteNetworkAsync(existingNetwork.ID);
}
*/
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved

// Setting up network for containers to communicate
var network = await _client.Networks.CreateNetworkAsync(new NetworksCreateParameters(new NetworkCreate())
{
Expand Down
6 changes: 4 additions & 2 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using Akka.Streams.Kafka.Stages;
using Confluent.Kafka;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Stages.Consumers;
using Akka.Streams.Kafka.Stages.Consumers.Concrete;

namespace Akka.Streams.Kafka.Dsl
{
Expand All @@ -24,7 +26,7 @@ public static class KafkaConsumer
/// </summary>
public static Source<ConsumeResult<K, V>, Task> PlainSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
{
return Source.FromGraph(new KafkaSourceStage<K, V>(settings, subscription));
return Source.FromGraph(new PlainSourceStage<K, V>(settings, subscription));
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand All @@ -37,7 +39,7 @@ public static Source<ConsumeResult<K, V>, Task> PlainSource<K, V>(ConsumerSettin
/// </summary>
public static Source<CommittableMessage<K, V>, Task> CommittableSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
{
return Source.FromGraph(new CommittableConsumerStage<K, V>(settings, subscription));
return Source.FromGraph(new CommittableSourceStage<K, V>(settings, subscription));
}
}
}
89 changes: 76 additions & 13 deletions src/Akka.Streams.Kafka/Messages/CommittableMessage.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Stages.Consumers;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Messages
Expand All @@ -11,15 +13,33 @@ namespace Akka.Streams.Kafka.Messages
/// </summary>
public sealed class CommittableMessage<K, V>
{
public CommittableMessage(ConsumeResult<K, V> record, CommitableOffset commitableOffset)
public CommittableMessage(ConsumeResult<K, V> record, ICommittableOffset commitableOffset)
{
Record = record;
CommitableOffset = commitableOffset;
}

/// <summary>
/// The consumed record data
/// </summary>
public ConsumeResult<K, V> Record { get; }
/// <summary>
/// Consumer offset that can be commited
/// </summary>
public ICommittableOffset CommitableOffset { get; }
}

public CommitableOffset CommitableOffset { get; }
/// <summary>
/// Commit an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// If you need to store offsets in anything other than Kafka, this API
/// should not be used.
/// </summary>
public interface ICommittable
{
/// <summary>
/// Commits an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// </summary>
Task Commit();
}

/// <summary>
Expand All @@ -30,21 +50,55 @@ public CommittableMessage(ConsumeResult<K, V> record, CommitableOffset commitabl
/// should be the next message your application will consume,
/// i.e. lastProcessedMessageOffset + 1.
/// </summary>
public class CommitableOffset
public interface ICommittableOffset : ICommittable
{
/// <summary>
/// Offset value
/// </summary>
PartitionOffset Offset { get; }
}

/// <summary>
/// Extends <see cref="ICommittableOffset"/> with some metadata
/// </summary>
public interface ICommittableOffsetMetadata : ICommittableOffset
{
/// <summary>
/// Cosumed record metadata
/// </summary>
string Metadata { get; }
}

/// <summary>
/// Implementation of the offset, contained in <see cref="CommittableMessage{K,V}"/>.
/// Can be commited via <see cref="Commit"/> method.
/// </summary>
internal class CommittableOffset : ICommittableOffsetMetadata
{
private readonly Func<List<TopicPartitionOffset>> _task;
private readonly IInternalCommitter _committer;

/// <summary>
/// Offset value
/// </summary>
public PartitionOffset Offset { get; }
/// <summary>
/// Cosumed record metadata
/// </summary>
public string Metadata { get; }

public CommitableOffset(Func<List<TopicPartitionOffset>> task, PartitionOffset offset)
public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata)
{
_task = task;
_committer = committer;
Offset = offset;
Metadata = metadata;
}

public PartitionOffset Offset { get; }

public List<TopicPartitionOffset> Commit()
/// <summary>
/// Commits offset to Kafka
/// </summary>
public Task Commit()
{
return _task();
return _committer.Commit();
}
}

Expand All @@ -61,12 +115,21 @@ public PartitionOffset(string groupId, string topic, int partition, Offset offse
Offset = offset;
}

/// <summary>
/// Consumer's group Id
/// </summary>
public string GroupId { get; }

/// <summary>
/// Topic
/// </summary>
public string Topic { get; }

/// <summary>
/// Partition
/// </summary>
public int Partition { get; }

/// <summary>
/// Kafka partition offset value
/// </summary>
public Offset Offset { get; }
}
}
15 changes: 15 additions & 0 deletions src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,23 @@ namespace Akka.Streams.Kafka.Messages
/// <typeparam name="V">Type of value</typeparam>
public class MessageAndMeta<K, V>
{
/// <summary>
/// The message to send
/// </summary>
public Message<K, V> Message { get; set; }
/// <summary>
/// Topic to send to.
/// </summary>
/// <remarks>
/// If TopicPartition property is specified, the Topic property value is ignored.
/// </remarks>
public string Topic { get; set; }
/// <summary>
/// Topic partition to sent to.
/// </summary>
/// <remarks>
/// If TopicPartition property is specified, the Topic property value is ignored.
/// </remarks>
public TopicPartition TopicPartition { get; set; }
}
}
2 changes: 2 additions & 0 deletions src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public ConsumerSettings<TKey, TValue> WithProperty(string key, string value) =>
public ConsumerSettings<TKey, TValue> WithPollTimeout(TimeSpan pollTimeout) => Copy(pollTimeout: pollTimeout);

public ConsumerSettings<TKey, TValue> WithDispatcher(string dispatcherId) => Copy(dispatcherId: dispatcherId);

public string GroupId => Properties.ContainsKey("group.id") ? Properties["group.id"] : null;

private ConsumerSettings<TKey, TValue> Copy(
IDeserializer<TKey> keyDeserializer = null,
Expand Down
Loading