-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkombu_topics.py
70 lines (57 loc) · 2.59 KB
/
kombu_topics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from kombu import Exchange, Queue
PRI_TOPIC_NAME = 'primary_topic'
priTopicXchg = Exchange(name=PRI_TOPIC_NAME,
type='topic',
durable=True,
deilvery_mode="persistent")
priTopicQinfo = [
{'name': 'L_TRACE', 'routing_key': 'log.trace'},
{'name': 'L_DEBUG', 'routing_key': 'log.debug'},
{'name': 'L_WARNING', 'routing_key': 'log.warning'},
{'name': 'L_INFO', 'routing_key': 'log.info'},
{'name': 'E_TRACE', 'routing_key': 'event.trace'},
{'name': 'E_ACTIVITY', 'routing_key': 'event.activity'},
{'name': 'E_ALERT', 'routing_key': 'event.alert'},
{'name': 'E_INFO', 'routing_key': 'event.info'}
]
q_expires = {'x-expires': 1*60*1000} # 60 secs
priTopicExplicitQs = list(map(lambda x: Queue(x['name'],
priTopicXchg,
routing_key=x['routing_key'],
queue_arguments=q_expires),
priTopicQinfo))
#priTopicPartialImplicitQs = [
# Queue('ALL_LOGS', priTopicXchg, routing_key='log.*'),
# Queue('ALL_TRACE', priTopicXchg, routing_key='*.trace'),
# Queue('ALL', priTopicXchg, routing_key='#')
#]
allQs = priTopicExplicitQs # + priTopicPartialImplicitQs
msgs = [
{'topic': PRI_TOPIC_NAME, 'routing': 'log.trace', 'msg': "trace log message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'log.debug', 'msg': "debug log message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'log.warning', 'msg': "warning log message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'log.info', 'msg': "info log message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'event.trace', 'msg': "trace event message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'event.activity', 'msg': "activity event message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'event.alert', 'msg': "alert event message"},
{'topic': PRI_TOPIC_NAME, 'routing': 'event.info', 'msg': "info event message"}
]
# localhost single host
_amqp_hosts1 = [
'amqp://guest:guest@localhost:5672//'
]
# cluster mode
_amqp_hosts2 = [
'amqp://guest:[email protected]:5672//',
'amqp://guest:[email protected]:5672//',
'amqp://guest:[email protected]:5672//'
]
# cluster mode on the same machien
_amqp_hosts3 = [
'amqp://guest:guest@localhost:56722//',
'amqp://guest:guest@localhost:56723//'
]
amqp_hosts = _amqp_hosts3
def on_ens_conn_err_cb(exception, interval):
print("got connection error cb, error:[%s], interval:[%s]" %
(repr(exception), interval))