-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path3-1WithoutSchemas.py
89 lines (71 loc) · 2.56 KB
/
3-1WithoutSchemas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Please complete the TODO items in this code
import asyncio
from dataclasses import dataclass, field
import json
import random
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from faker import Faker
faker = Faker()
BROKER_URL = "PLAINTEXT://localhost:9092"
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
while True:
p.produce(topic_name, ClickEvent().serialize())
await asyncio.sleep(1.0)
async def consume(topic_name):
"""Consumes data from the Kafka Topic"""
c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
c.subscribe([topic_name])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
#
# TODO: Load the value as JSON and then create a ClickEvent object. The web team has
# told us to expect the keys "email", "uri", and "timestamp".
#
try:
pass
except KeyError as e:
print(f"Failed to unpack message {e}")
await asyncio.sleep(1.0)
def main():
"""Checks for topic and creates the topic if it does not exist"""
client = AdminClient({"bootstrap.servers": BROKER_URL})
try:
asyncio.run(produce_consume("com.udacity.lesson3.exercise1.clicks"))
except KeyboardInterrupt as e:
print("shutting down")
async def produce_consume(topic_name):
"""Runs the Producer and Consumer tasks"""
t1 = asyncio.create_task(produce(topic_name))
t2 = asyncio.create_task(consume(topic_name))
await t1
await t2
@dataclass
class ClickEvent:
email: str = field(default_factory=faker.email)
timestamp: str = field(default_factory=faker.iso8601)
uri: str = field(default_factory=faker.uri)
num_calls = 0
def serialize(self):
email_key = "email" if ClickEvent.num_calls < 10 else "user_email"
ClickEvent.num_calls += 1
return json.dumps(
{"uri": self.uri, "timestamp": self.timestamp, email_key: self.email}
)
@classmethod
def deserialize(self, json_data):
purchase_json = json.loads(json_data)
return ClickEvent(
username=purchase_json["username"],
currency=purchase_json["currency"],
amount=purchase_json["amount"],
)
if __name__ == "__main__":
main()