-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_consumer.py
53 lines (43 loc) · 1.54 KB
/
kafka_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from confluent_kafka import Consumer, KafkaError
import os
from dotenv import load_dotenv
# load the environment variables from .env file
load_dotenv()
# Define the configuration
conf = {
'bootstrap.servers': os.getenv('BOOTSTRAP.SERVERS'),
'security.protocol': os.getenv('SECURITY.PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL.MECHANISMS'),
'ssl.ca.location': None,
'sasl.username': os.getenv('SASL.USERNAME'),
'sasl.password': os.getenv('SASL.PASSWORD'),
'session.timeout.ms': int(os.getenv('SESSION.TIMEOUT.MS')),
'group.id': os.getenv('GROUP.ID'),
'enable.auto.commit': bool(os.getenv('ENABLE.AUTO.COMMIT'))
}
# Create the consumer
consumer = Consumer(conf)
try:
# Subscribe to the topic
consumer.subscribe(['demo'])
# Continuously poll for new messages
while True:
msg = consumer.poll(1.0)
if msg is None:
print('No message received')
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print("Reached end of topic {}, partition {}".format(msg.topic(), msg.partition()))
else:
# Error
print("Error: {}".format(msg.error()))
else:
# Message received. Only message
print("Received message: {}".format(msg.value().decode('utf-8')))
# Acknowledge the message
consumer.commit(message=msg)
except Exception as e:
print("Error: {}".format(e))
consumer.close()