generated from TogetherCrew/python-service
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
89 lines (66 loc) · 2.91 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import functools
import logging
from typing import Any
from bot.saga.extract_likes import find_saga_and_fire_extract_likes
from bot.saga.extract_profiles import find_saga_and_fire_extract_profiles
from bot.saga.extract_tweets import find_saga_and_fire_extract_tweets
from bot.saga.saga import publish_on_success
from bot.utils.rabbitmq_connection import get_rabbit_mq_credentials
from bot.utils.redis_connection import get_redis_credentials
from redis import Redis
from rq import Queue as RQ_Queue
from tc_messageBroker.message_broker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue
def twitter_bot():
rabbit_mq_creds = get_rabbit_mq_credentials()
redis_creds = get_redis_credentials()
rabbit_mq = RabbitMQ(
broker_url=rabbit_mq_creds["broker_url"],
port=rabbit_mq_creds["port"],
username=rabbit_mq_creds["username"],
password=rabbit_mq_creds["password"],
)
redis = Redis(
host=redis_creds["host"],
port=redis_creds["port"],
password=redis_creds["pass"],
)
# 24 hours equal to 86400 seconds
rq_queue = RQ_Queue(connection=redis, default_timeout=86400)
on_tweets_event_bind = functools.partial(on_tweets_event, redis_queue=rq_queue)
on_profiles_event_bind = functools.partial(on_profiles_event, redis_queue=rq_queue)
on_likes_event_bind = functools.partial(on_likes_event, redis_queue=rq_queue)
rabbit_mq.connect(queue_name=Queue.TWITTER_BOT)
rabbit_mq.on_event(Event.TWITTER_BOT.EXTRACT.TWEETS, on_tweets_event_bind)
rabbit_mq.on_event(Event.TWITTER_BOT.EXTRACT.PROFILES, on_profiles_event_bind)
rabbit_mq.on_event(Event.TWITTER_BOT.EXTRACT.LIKES, on_likes_event_bind)
if rabbit_mq.channel is None:
logging.info("Error: was not connected to RabbitMQ broker!")
else:
logging.info("Started Consuming!")
rabbit_mq.channel.start_consuming()
def on_tweets_event(body: dict[str, Any], redis_queue: RQ_Queue):
sagaId = body["content"]["uuid"]
logging.info(f"SAGAID:{sagaId} recompute job Adding to queue")
redis_queue.enqueue(
find_saga_and_fire_extract_tweets, sagaId=sagaId, on_success=publish_on_success
)
def on_profiles_event(body: dict[str, Any], redis_queue: RQ_Queue):
sagaId = body["content"]["uuid"]
logging.info(f"SAGAID:{sagaId} recompute job Adding to queue")
redis_queue.enqueue(
find_saga_and_fire_extract_profiles,
sagaId=sagaId,
on_success=publish_on_success,
)
def on_likes_event(body: dict[str, Any], redis_queue: RQ_Queue):
sagaId = body["content"]["uuid"]
logging.info(f"SAGAID:{sagaId} recompute job Adding to queue")
redis_queue.enqueue(
find_saga_and_fire_extract_likes, sagaId=sagaId, on_success=publish_on_success
)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
twitter_bot()