-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path3-3AvroComplexRecord.py
102 lines (78 loc) · 2.92 KB
/
3-3AvroComplexRecord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Please complete the TODO items in the code
import asyncio
from dataclasses import asdict, dataclass, field
from io import BytesIO
import json
import random
from confluent_kafka import Producer
from faker import Faker
from fastavro import parse_schema, writer
faker = Faker()
BROKER_URL = "PLAINTEXT://localhost:9092"
@dataclass
class ClickAttribute:
# description: str = field(default_factory=faker.bs)
# amount: int = field(default_factory=lambda: random.randint(100, 200000))
element: str = field(default_factory=lambda: random.choice(["div", "a", "button"]))
content: str = field(default_factory=faker.bs)
@classmethod
def attributes(self):
return {faker.uri_page(): ClickAttribute() for _ in range(random.randint(1, 5))}
@dataclass
class ClickEvent:
email: str = field(default_factory=faker.email)
timestamp: str = field(default_factory=faker.iso8601)
uri: str = field(default_factory=faker.uri)
number: int = field(default_factory=lambda: random.randint(0, 999))
attributes: dict = field(default_factory=ClickAttribute.attributes)
#
# TODO: Update this Avro schema to include a map of attributes
# See: https://avro.apache.org/docs/1.8.2/spec.html#Maps
#
schema = parse_schema(
{
"type": "record",
"name": "click_event",
"namespace": "com.udacity.lesson3.exercise2",
"fields": [
{"name": "email", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "uri", "type": "string"},
{"name": "number", "type": "int"},
{"name":"attributes", "type":{
"type": "array",
"items":
{
"name": "attribute", "type": "record",
"fields":
[
{"type": "int", "name": "amount"},
{"type": "string", "name": "description"}
]
}
}}
],
}
)
def serialize(self):
"""Serializes the ClickEvent for sending to Kafka"""
out = BytesIO()
writer(out, ClickEvent.schema, [asdict(self)])
return out.getvalue()
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
while True:
p.produce(topic_name, ClickEvent().serialize())
await asyncio.sleep(1.0)
def main():
"""Checks for topic and creates the topic if it does not exist"""
try:
asyncio.run(produce_consume("com.udacity.lesson3.exercise3.clicks"))
except KeyboardInterrupt as e:
print("shutting down")
async def produce_consume(topic_name):
"""Runs the Producer and Consumer tasks"""
await asyncio.create_task(produce(topic_name))
if __name__ == "__main__":
main()