-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstart_sink.py
45 lines (34 loc) · 1.21 KB
/
start_sink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import pulsar, time
from pulsar.schema import *
import json
import requests
from schema import TableSchema
import auth
#DSE Auth
r = requests.post(auth.auth_url, data=auth.body, headers=auth.auth_header)
response_dict = json.loads(r.text)
dse_token = response_dict.get("authToken")
#DSE insert
insert_headers = {'X-Cassandra-Token': dse_token,'Content-type': 'application/json'}
msjdata = String()
#Create Pulsar client
client = pulsar.Client(auth.service_url,
authentication=pulsar.AuthenticationToken(auth.pulsar_token),
tls_trust_certs_file_path=auth.trust_certs)
#Create subscriber
consumer = client.subscribe('persistent://tres-amigos-hack-gcp/biometrics/dseconsumer', 'dseconsumer-sub', schema=AvroSchema(TableSchema))
#Check for new messages and wait.
waitingForMsg = True
while waitingForMsg:
try:
msg = consumer.receive(2000)
msjdata = str(msg.value()).replace("'", '"')
print(msjdata)
consumer.acknowledge(msg)
#Post to DSE
r = requests.post(auth.insert_row_url, data=msjdata, headers=insert_headers)
print(str(r))
except:
print("Still waiting for a message...");
time.sleep(1)
client.close()