Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to make consumer stages to use base classes #45

Merged

Conversation

IgorFedchenko
Copy link
Contributor

This work is a part of issue #36 .

Alpakka project uses base classes to implement common logic between stages. There are lots of abstractions, and even more would be required to be implemented in c# because scala has mixin feature and allows developers to do kind of inline inheritance (like specifying base class and overrides in same place as a part of inheritance syntax) - and in c# we need to implement separate child class each time and use it instead.

This is a first PR in the series of PRs in issue #36 , and the goal of this one is to:

  1. Use KafkaSourceStage base class for all consumer stages definition
  2. Use SingleSourceStageLogic base class of some of consumer stage logic classes. So far we have PlainSourceStage and CommittableSourceStage, and both of them inherit from this class.

While common logic of message consuming and handling is moved to SingleSourceStageLogic class, the difference is in downstream messages - PlainSourceStage pushes plain consumed records, while CommittableSourceStage pushes CommittableMessage's, which allows committing offsets when desired. To support this difference, there is a IMessageBuilder interface and different implementations, so far only PlainMessageBuilder and CommittableSourceMessageBuilder are implemented.

To simplify the review, I will make review and put some comments on changes right now.

{
return _task();
return Task.FromResult(_committer.Commit());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alpakka's Commit method returns Future[Done], and committed partition offsets can be accessed from CommittableMessage directly, so no need to return them from Commit method. Also, in different stages there are different implementations of IInternalCommitter commit method, so more general Task response is better here.

Also, the Func<List<TopicPartitionOffset>> task field is replaced with IInternalCommitter committer field to allow using additional info for commitment as alpakka does (and we will use this flexibility in KafkaInternalCommitter class).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change - easier to support future versions of Kafka client where they might return a real Task upon calling commit. Make sure you add some XML-DOC to these classes though.

{
Push(stage.Out, message);
Push(shape.Outlet, messageBuilder.CreateMessage(message, _consumer));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using generic IMessageBuilder to build downstream message.
The tricky part here is passing _consumer instance to builder. Alpakka does not do that. Instead, they use GraphStage's materializer property to get consumer actor reference and use it in message builder like this:

  override protected def logic(shape: SourceShape[CommittableMessage[K, V]]): GraphStageLogic with Control =
    new SingleSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription)
    with CommittableMessageBuilder[K, V] {
      override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
      override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
      lazy val committer: KafkaAsyncConsumerCommitterRef = {
        val ec = materializer.executionContext
        new KafkaAsyncConsumerCommitterRef(consumerActor, settings.commitTimeout)(ec)
      }
    }

We do not use any consumer actor right now, neither Akka.Streams gives materializer property - or maybe I did not find it - so to allow builder to execute commit action, I am passing consumer instance directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IgorFedchenko the materializer might be passed in as a Scala implicit - which makes the code somewhat hard to read. I'll take a look and see how they're doing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe you might have some other ideas

Copy link
Contributor Author

@IgorFedchenko IgorFedchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, I left some notes on file changes, now it is ready for review.

@IgorFedchenko IgorFedchenko marked this pull request as ready for review August 24, 2019 13:41
Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some minor changes that need to be implemented

src/Akka.Streams.Kafka.Tests/KafkaFixture.cs Show resolved Hide resolved
{
return _task();
return Task.FromResult(_committer.Commit());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change - easier to support future versions of Kafka client where they might return a real Task upon calling commit. Make sure you add some XML-DOC to these classes though.

/// <summary>
/// Commit all offsets (of different topics) belonging to the same stage
/// </summary>
List<TopicPartitionOffset> Commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this list need to be mutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think so - it is kafka's driver returning mutable collection, but indeed, there is no point in such mutations. Will update this


namespace Akka.Streams.Kafka.Stages.Consumers
{
public interface IInternalCommitter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need an XML-DOC that describes this as an internal API.

Also, should this interface be internal or does it need to be exposed to outside of the library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you are right, this is internal API - will fix this


namespace Akka.Streams.Kafka.Stages.Consumers
{
internal class CommittableSourceStage<K, V> : KafkaSourceStage<K, V, CommittableMessage<K, V>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs some XML-DOC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


namespace Akka.Streams.Kafka.Stages.Consumers
{
internal class PlainSourceStage<K, V> : KafkaSourceStage<K, V, ConsumeResult<K, V>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XML-DOC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

@IgorFedchenko IgorFedchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added xml comments, and changed public to internal in few places, as per your changes request.

src/Akka.Streams.Kafka.Tests/KafkaFixture.cs Show resolved Hide resolved

namespace Akka.Streams.Kafka.Stages.Consumers
{
public interface IInternalCommitter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you are right, this is internal API - will fix this

/// <summary>
/// Commit all offsets (of different topics) belonging to the same stage
/// </summary>
List<TopicPartitionOffset> Commit();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think so - it is kafka's driver returning mutable collection, but indeed, there is no point in such mutations. Will update this


namespace Akka.Streams.Kafka.Stages.Consumers
{
internal class CommittableSourceStage<K, V> : KafkaSourceStage<K, V, CommittableMessage<K, V>>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


namespace Akka.Streams.Kafka.Stages.Consumers
{
internal class PlainSourceStage<K, V> : KafkaSourceStage<K, V, ConsumeResult<K, V>>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/// <summary>
/// Commit all offsets (of different topics) belonging to the same stage
/// </summary>
Task Commit();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Alpakka returnes Task here instead of topic partitions list as I did before - and this makes sense, because the public ICommittable interface returns general Task without typed value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine

@IgorFedchenko
Copy link
Contributor Author

Sorry, I see some conflicts with dev branch - will resolve now

@IgorFedchenko
Copy link
Contributor Author

Ok, now ready for review.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 45a95ed into akkadotnet:dev Aug 27, 2019
@IgorFedchenko IgorFedchenko deleted the refactor_to_use_base_classes branch September 4, 2019 10:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants