Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Kafka support #1579

Closed
wants to merge 4 commits into from

Conversation

rodneyosodo
Copy link
Member

@rodneyosodo rodneyosodo commented Mar 15, 2022

What does this do?

Adds Kafka support. Since we are introducing a multi-broker system, we need to add support for Kafka. This allows us to use Kafka as a message broker for our system. Kafka has been chosen because it is the most popular legacy message broker. This allows us to support legacy systems that use Kafka.

Which issue(s) does this PR fix/relate to?

Resolves #990

List any changes that modify/break current functionality

N/A

Have you included tests for your changes?

Yes

Did you document any new/modified functionality?

absmach/supermq-docs#157

Notes

The publisher implementation is based on the segmentio/kafka-go library. Publishing messages is well supported by the library, but subscribing to topics is not. The library does not provide a way to subscribe to all topics, but only to a specific topic. This is a problem because the Mainflux platform uses a topic per channel, and the number of channels is not known in advance. The solution is to use the Zookeeper library to get a list of all topics and then subscribe to each of them. The list of topics is obtained by connecting to the Zookeeper server and reading the list of topics from the /brokers/topics node. The first message received from the topic can be lost if subscription happens closely followed by publishing. After the subscription, we guarantee that all messages will be received.

@rodneyosodo rodneyosodo requested a review from a team as a code owner March 15, 2022 16:45
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch 3 times, most recently from 2778396 to 5ef5fee Compare March 21, 2022 13:31
@codecov-commenter
Copy link

codecov-commenter commented Mar 21, 2022

Codecov Report

Merging #1579 (6a1e87a) into master (28f4965) will increase coverage by 0.07%.
Report is 28 commits behind head on master.
The diff coverage is 70.91%.

❗ Current head 6a1e87a differs from pull request most recent head 8537a3c. Consider uploading reports for the commit 8537a3c to get more accurate results

@@            Coverage Diff             @@
##           master    #1579      +/-   ##
==========================================
+ Coverage   67.14%   67.21%   +0.07%     
==========================================
  Files         120      122       +2     
  Lines        9145     9341     +196     
==========================================
+ Hits         6140     6279     +139     
- Misses       2368     2413      +45     
- Partials      637      649      +12     
Files Coverage Δ
pkg/messaging/kafka/publisher.go 63.79% <63.79%> (ø)
pkg/messaging/kafka/pubsub.go 73.91% <73.91%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

pkg/messaging/broker/pubsub_kafka.go Outdated Show resolved Hide resolved
pkg/messaging/broker/pubsub_nats.go Outdated Show resolved Hide resolved
pkg/messaging/broker/pubsub_rabbitmq.go Outdated Show resolved Hide resolved
Makefile Outdated Show resolved Hide resolved
Makefile Outdated Show resolved Hide resolved
const (
chansPrefix = "channels"
// SubjectAllChannels represents subject to subscribe for all the channels.
SubjectAllChannels = "channels.>"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would construct this one as chansPrefix + ".>"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what is implemented currently across all brokers. Should we change?

twins/README.md Outdated Show resolved Hide resolved
docker/broker/rabbitmq.yml Outdated Show resolved Hide resolved
pkg/messaging/README.md Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/pubsub.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/pubsub_test.go Outdated Show resolved Hide resolved
@dborovcanin
Copy link
Collaborator

dborovcanin commented Jun 22, 2022

@0x6f736f646f Do not forget to update Makefile and docker-compose.yml to enable run with MF_BROKER_TYPE=kafka make run.

Copy link
Collaborator

@dborovcanin dborovcanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix CI.

Makefile Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@dborovcanin dborovcanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is failing.

pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
@rodneyosodo
Copy link
Member Author

The LeaderNotAvailable is retriable according to this docs. A possible workaround would be to retry and if not possibly fail and manage the fails. The library we are using retries up to 1s then fails. As demonstrated earlier, if you retry for up to at least 10 seconds the writer will write the message. The current work around it is to create topics using the Kafka command inside the broker as referenced here.
Our problem is mainly on the CI as when mainflux is deployed you can be able to publish a message from one adapter and receive it using another. The interim workaround is to create the topics first before testing the code.

Another problem is that on mainflux when we publish message for the first time it throws the same error mainflux-http | {"level":"warn","message":"Method publish to channel 242bcd02-fb24-4c8c-b56d-de97ad56de34 took 49.085996ms to complete with error: [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.","ts":"2022-06-27T14:01:40.789712934Z"}

This is an error with regard to Kafka as it is being experienced with other library users such as Sarama.

My recommendation would be to increase the retrying on public method upto 10s then manage the error after that

@dborovcanin
Copy link
Collaborator

The LeaderNotAvailable is retriable according to this docs

Can you point out the exact section for this?

Our problem is mainly on the CI as when mainflux is deployed you can be able to publish a message from one adapter and receive it using another. The interim workaround is to create the topics first before testing the code.

Looks like the problem is actually that first-time writing always arises this error, not due to Kafka not being ready but because we do not have a topic created. I guess Kafka creates a topic internally when we try to publish, and the next attempt will pass because on the first - Kafka took care of topic for us. Please investigate if it works like that and how to correctly handle topics lifetime (we do not remove them when we unsubscribe, and I assume there are other ways to configure Kafka to handle it for us).

@rodneyosodo
Copy link
Member Author

Can you point out the exact section for this?

You can check it from here

Please investigate if it works like

Yes it works like that.
I have tried two approaches:

  1. To create the topic as reference here but only when KAFKA_AUTO_CREATE_TOPICS_ENABLE='false'

    This doesn't work as expected as

topicConfigs := []kafka.TopicConfig{
	{
		Topic:             subject,
		NumPartitions:     1,
		ReplicationFactor: 1,
	},
}
err = pub.conn.CreateTopics(topicConfigs...)
if err != nil {
	return err
}

after initially creating the kafka connection object.

  1. To create the topic while publishing as reference here
    This one worked fine.

@rodneyosodo
Copy link
Member Author

rodneyosodo commented Jul 1, 2022

Topic management

Kafka topics are the categories used to organize messages. Each topic has a name that is unique across the entire Kafka cluster.

Creation

  • We are manually creating the topics before publishing to it.
  • If we try and create an already created topic the topic it is idempotent since the values haven't changed

Deletion

  • Since we are creating the topics ourselves we have the privilege to delete them ourselves. This is not an elegant solution but a feasible solution to our case.
  • Before we can delete any topic we would want to make sure the deletion of topics is enabled in our cluster. Set delete.topic.enable=true.
  • We have to make sure all consumers have stopped consuming the data from the topic we want to delete, otherwise, kafka will throw errors.
  • If there is a chance that a consumer is up and running, the topic will get auto-created if the cluster-wide property auto.create.topics.enable=true. Not bad per se, but it will use a default number of partitions (1) and a replication factor (1), which might not be what we wanted.

We are currently deleting topics when we close the publisher interface

Publishing message

  • Configured retries to be at least 1000 since we care about data delivery to clients Ref
  • Configured producer for to wait for acknowledgment Ref
  • Stuck to random partitioning when writing to topics Ref

pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/publisher.go Show resolved Hide resolved
pkg/messaging/kafka/pubsub.go Outdated Show resolved Hide resolved
pkg/messaging/kafka/pubsub.go Show resolved Hide resolved
drasko
drasko previously approved these changes Dec 25, 2022
Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

users/postgres/users.go Outdated Show resolved Hide resolved
@rodneyosodo
Copy link
Member Author

From this it appears that in the segmentio library you can list all topics with a bit of work around. But the problem is when the MQTT forwarder is subscribing to the broker, it uses the wildcard to subscribe to all topics but they are no topics in the kakfa broker.

@rodneyosodo
Copy link
Member Author

Regarding Kafka support for wildcard, I have tested using confluent-kafka-python and it supports. I think the issue is the library we are using I have tried to pass in a regex on the topic and it throws this error

2023/02/07 14:24:37 Failed to join group groupID2: [17] Invalid Topic: a request which attempted to access an invalid topic (e.g. one which has an illegal name), or if an attempt was made to write to an internal topic (such as the consumer offsets topic)
2023/02/07 14:24:37 Leaving group groupID2, member main@r0x6f736f646f (github.com/segmentio/kafka-go)-77b03394-27c9-4846-89c2-fe1fb51f7034

I have tried with another library https://github.com/confluentinc/confluent-kafka-go and it works. I want to write the implementation on this kafka PR and update after testing it on mainflux

@dborovcanin
Copy link
Collaborator

Let's address the subscriber problem and research our options in a separate PR and make this PR only a Publisher implementation. Using Subscriber will simply panic. Don't forget to document it.

@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch 2 times, most recently from 5bf7d40 to ec7e316 Compare April 6, 2023 08:32
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch 6 times, most recently from 2c5ba59 to 07b67c2 Compare June 24, 2024 17:40
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch 6 times, most recently from 5f83992 to 27d8ad9 Compare June 28, 2024 10:49
@dborovcanin dborovcanin force-pushed the NOISSUE-kafka-support branch from 27d8ad9 to 8eb5e6f Compare July 2, 2024 21:24
@dborovcanin dborovcanin force-pushed the NOISSUE-kafka-support branch from 8eb5e6f to 764ee6b Compare July 15, 2024 15:38
@dborovcanin
Copy link
Collaborator

This PR might be a better fit for https://github.com/absmach/mg-contrib since Kafka does not support all the features MG messaging requires, but may also be useful for integration with systems that use Kafka and do not need those features.

@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch from 764ee6b to 7e7f756 Compare July 15, 2024 19:07
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch from 7e7f756 to ab76b19 Compare August 6, 2024 09:08
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch from ab76b19 to cd3496e Compare August 27, 2024 06:59
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch 2 times, most recently from c885bff to eb85775 Compare September 22, 2024 14:52
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch from eb85775 to f5e434f Compare October 7, 2024 09:18
@rodneyosodo rodneyosodo force-pushed the NOISSUE-kafka-support branch from f5e434f to 39839ed Compare October 15, 2024 07:57
rodneyosodo and others added 4 commits October 15, 2024 12:11
Signed-off-by: Rodney Osodo <[email protected]>
…ncies.

Modified Close function in Kafka messaging package and updated pubsub struct and NewPubSub function.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
@dborovcanin dborovcanin force-pushed the NOISSUE-kafka-support branch from 39839ed to 0f0c9b8 Compare October 15, 2024 10:11
@dborovcanin
Copy link
Collaborator

As already commented here, this may be a better fit for the mg-contrib repository. I'm closing this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

Add Kafka writer
6 participants