-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path4-6REST_AvroData.py
76 lines (59 loc) · 2.04 KB
/
4-6REST_AvroData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Please complete TODO items in the code
from dataclasses import asdict, dataclass, field
import json
import time
import random
import requests
from confluent_kafka import avro, Consumer, Producer
from confluent_kafka.avro import AvroConsumer, AvroProducer, CachedSchemaRegistryClient
from faker import Faker
faker = Faker()
REST_PROXY_URL = "http://localhost:8082"
AVRO_SCHEMA = """{
"type": "record",
"name": "click_event",
"fields": [
{"name": "email", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "uri", "type": "string"},
{"name": "number", "type": "int"}
]
}"""
def produce():
"""Produces data using REST Proxy"""
# TODO: Set the appropriate headers
# See: https://docs.confluent.io/current/kafka-rest/api.html#content-types
headers = {"Content-Type": "application/vnd.kafka.avro.v2+json"}
# TODO: Update the below payload to include the Avro Schema string
# See: https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)
# TODO
data = {
"value_schema": AVRO_SCHEMA,
"records": [{"value": asdict(ClickEvent())}],
}
resp = requests.post(
f"{REST_PROXY_URL}/topics/lesson4.exercise6.click_events", # TODO
data=json.dumps(data),
headers=headers,
)
try:
resp.raise_for_status()
except:
print(f"Failed to send data to REST Proxy {json.dumps(resp.json(), indent=2)}")
print(f"Sent data to REST Proxy {json.dumps(resp.json(), indent=2)}")
@dataclass
class ClickEvent:
email: str = field(default_factory=faker.email)
timestamp: str = field(default_factory=faker.iso8601)
uri: str = field(default_factory=faker.uri)
number: int = field(default_factory=lambda: random.randint(0, 999))
def main():
"""Runs the simulation against REST Proxy"""
try:
while True:
produce()
time.sleep(0.5)
except KeyboardInterrupt as e:
print("shutting down")
if __name__ == "__main__":
main()