-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathZMQCommunicator.cpp
116 lines (89 loc) · 2.64 KB
/
ZMQCommunicator.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "ZMQCommunicator.h"
void ZMQCommunicator::setHandler(std::function<void(evtmsg)> fun){
this->evtCallback = fun;
}
ZMQCommunicator::ZMQCommunicator(connectiontype conntype, int timeout){
this->conntype = conntype;
mContext = new zmq::context_t();
this->evtCallback = nullptr;
switch (conntype){
case REP:
mSocket = new zmq::socket_t(*(mContext), ZMQ_REP);
break;
case REQ:
mSocket = new zmq::socket_t(*(mContext), ZMQ_REQ);
break;
case PAIR:
mSocket = new zmq::socket_t(*(mContext), ZMQ_PAIR);
break;
case SUB:
mSocket = new zmq::socket_t(*(mContext), ZMQ_SUB);
break;
case PUB:
mSocket = new zmq::socket_t(*(mContext), ZMQ_PUB);
break;
//there's no default, it'll be one of the connectiontypes
}
//set the default timeout to timeout ms
zmq_setsockopt((*mSocket), ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); // in milliseconds
}
ZMQCommunicator::~ZMQCommunicator(){
if (this->running) this->Stop();
mSocket->close();
mContext->close();
delete mSocket;
//delete mContext; //for some unknowable reason, this crashes. Doesn't really matter as this is the end.
}
bool ZMQCommunicator::Connect(std::string ip_address, std::string port){
//std::cout << "Starting up zmq testing server..." << std::endl;
switch (conntype){
case REP:
mSocket->bind("tcp://" + ip_address + ":" + port);
break;
case REQ:
mSocket->connect("tcp://" + ip_address + ":" + port);
break;
case PAIR:
mSocket->connect("tcp://" + ip_address + ":" + port);
break;
case SUB:
mSocket->connect("tcp://" + ip_address + ":" + port);
break;
case PUB:
mSocket->bind("tcp://" + ip_address + ":" + port);
break;
}
return true;
}
void ZMQCommunicator::ProcessLoop(){
auto zerotime = std::chrono::steady_clock::now();
while (running) {
try{
zmq::message_t request;
// Wait for next request from client, this is a blocking operation (run in it's own thread)
// zmq_setsockopt((*mSocket), ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); // allows to set a timeout in milliseconds to avoid infinite wait (done in constructor)
//mSocket->recv(&request);
std::string msg = s_recv((*mSocket));
// Parse could also be made to be a callback?
evtmsg evt = Parse(msg);
//call the callback
evtCallback(evt);
}
catch (const zmq::error_t& ex)
{
// recv() throws ETERM when the zmq context is destroyed,
// as when AsyncZmqListener::Stop() is called
if (ex.num() != ETERM)
throw;
}
}
std::cout << "ZMQC stopped" << std::endl;
}
void ZMQCommunicator::Run(){
this->running = true;
mThread = new std::thread(&ZMQCommunicator::ProcessLoop, this);
}
void ZMQCommunicator::Stop(){
running = false;
mThread->join();
}