-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
executable file
·126 lines (92 loc) · 3.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
import os
import pika
import random
from retry import retry
import logging
import base64
import json
import aes
from SwobThirdPartyPlatforms import ImportPlatform
from SwobThirdPartyPlatforms.exceptions import PlatformDoesNotExist
logging.basicConfig(level=logging.INFO)
shared_key = os.environ["PUBLISHER_DECRYPTION_KEY"]
def publishing_payload(ch, method, properties, body: bytes) -> None:
"""
"""
logging.info("Publishing payload: %s", body)
try:
body = base64.b64decode(body)
iv = body[:16]
body = body[16:]
body = aes.AESCipher.decrypt(data=body, shared_key=shared_key, iv=iv)
body = json.loads(body)
except Exception as error:
logging.exception(error)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
body_content = body['data']
body_content = ':'.join(body_content.split(':')[1:])
platform_name = body['platform_name']
try:
platform = ImportPlatform(platform_name=platform_name)
platform.execute(body=body_content, user_details=body)
except PlatformDoesNotExist as error:
logging.exception(error)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
except Exception as error:
logging.exception(error)
# ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("publishing complete...")
@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPHeartbeatTimeout),
delay=5, jitter=(1, 3))
def consumer():
"""
"""
user = os.environ["RMQ_USER"]
password = os.environ["RMQ_PASSWORD"]
host = os.environ["RMQ_HOST"]
GMAIL_CREDENTIALS=os.environ["GMAIL_CREDENTIALS"]
queue_name = "default-smswithoutborders-queue" \
if not os.environ.get("RMQ_QUEUE_NAME") \
else os.environ.get("RMQ_QUEUE_NAME")
routing_key = "default-smswithoutborders-routing-key" \
if not os.environ.get("RMQ_ROUTING_KEY") \
else os.environ.get("RMQ_ROUTING_KEY")
exchange_name = "default-smswithoutborders-exchange" \
if not os.environ.get("RMQ_EXCHANGE") \
else os.environ.get("RMQ_EXCHANGE")
connection_name = "default-smswithoutborders-consumer" \
if not os.environ.get("RMQ_CONNECTION_NAME") \
else os.environ.get("RMQ_CONNECTION_NAME")
credentials=pika.PlainCredentials(user, password)
client_properties = {'connection_name' : connection_name}
connection = pika.BlockingConnection(pika.ConnectionParameters(
heartbeat=30,
blocked_connection_timeout=300,
host=host,
client_properties=client_properties,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(queue=queue_name, durable=True)
channel.exchange_declare(
exchange=exchange_name,
exchange_type="topic",
durable=True)
channel.queue_bind(exchange=exchange_name,
queue=result.method.queue,
routing_key=routing_key)
channel.basic_consume(
queue=result.method.queue,
on_message_callback=publishing_payload)
try:
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
logging.warning("Clean stoping broker...")
except Exception as error:
logging.exception(error)
if __name__ == "__main__":
consumer()