Skip to content

Commit

Permalink
ZMQ lib change. (#958)
Browse files Browse the repository at this point in the history
* zmq wait

* ut pass case

* swig_verify

* Revert "swig_verify"

* ...

* zmqwait

* const

* ...

* constant

* Without debugs
  • Loading branch information
divyagayathri-hcl authored Feb 4, 2025
1 parent 505381e commit 1593cc6
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 45 deletions.
18 changes: 15 additions & 3 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
initialize(endpoint, vrf);
}

ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs)
: m_waitTimeMs(waitTimeMs)
{
initialize(endpoint);
}

ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
Expand Down Expand Up @@ -55,7 +61,7 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)

connect();
}

bool ZmqClient::isConnected()
{
return m_connected;
Expand Down Expand Up @@ -137,7 +143,7 @@ void ZmqClient::sendMsg(
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
Expand All @@ -146,7 +152,6 @@ void ZmqClient::sendMsg(
// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}

if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
Expand Down Expand Up @@ -197,4 +202,11 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
bool ZmqClient::wait(
const std::string &dbName, const std::string &tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
return false;
}
}
13 changes: 11 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

bool isConnected();
Expand All @@ -23,8 +25,13 @@ class ZmqClient
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf);
void initialize(const std::string& endpoint, const std::string& vrf = "");

std::string m_endpoint;

Expand All @@ -36,8 +43,10 @@ class ZmqClient

bool m_connected;

uint32_t m_waitTimeMs;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
};

Expand Down
29 changes: 18 additions & 11 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#include <stdlib.h>
#include <tuple>
#include <sstream>
#include <utility>
#include "zmqproducerstatetable.h"
#include "binaryserializer.h"
#include "redisapi.h"
#include "redispipeline.h"
#include "redisreply.h"
#include "table.h"
#include "zmqconsumerstatetable.h"
#include <algorithm>
#include <chrono>
#include <cmath>
#include <sstream>
#include <stdlib.h>
#include <tuple>
#include <utility>
#include <zmq.h>
#include "redisreply.h"
#include "table.h"
#include "redisapi.h"
#include "redispipeline.h"
#include "zmqproducerstatetable.h"
#include "zmqconsumerstatetable.h"
#include "binaryserializer.h"

using namespace std;

Expand Down Expand Up @@ -164,6 +164,13 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos);
}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
5 changes: 5 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

// To wait for the response from the peer.
virtual bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
68 changes: 39 additions & 29 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand All @@ -20,6 +21,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
connect();
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
m_mqPollThread = std::make_shared<std::thread>(&ZmqServer::mqPollThread, this);
Expand All @@ -31,6 +33,33 @@ ZmqServer::~ZmqServer()
{
m_runThread = false;
m_mqPollThread->join();

zmq_close(m_socket);
zmq_ctx_destroy(m_context);
}

void ZmqServer::connect()
{
SWSS_LOG_ENTER();
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(m_socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno());
}
}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -90,40 +119,18 @@ void ZmqServer::mqPollThread()
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");

// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
m_endpoint.c_str(),
zmq_errno(),
strerror(zmq_errno()));
}

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
while (m_runThread)
{
// receive message
rc = zmq_poll(&poll_item, 1, 1000);
auto rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -132,7 +139,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -160,11 +167,14 @@ void ZmqServer::mqPollThread()
// deserialize and write to redis:
handleReceivedData(m_buffer.data(), rc);
}

zmq_close(socket);
zmq_ctx_destroy(context);

SWSS_LOG_NOTICE("mqPollThread end");
}

// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
void ZmqServer::sendMsg(
const std::string &dbName, const std::string &tableName,
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
return;
}
}
10 changes: 10 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:

void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -56,6 +62,10 @@ class ZmqServer

std::string m_vrf;

void* m_context;

void* m_socket;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
2 changes: 2 additions & 0 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

SWSSSelectResult result;
SWSSZmqConsumerStateTable_readData(cst, 1500, true, &result);
Expand Down Expand Up @@ -434,6 +435,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_del(pst, arr.data[i].key);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

SWSSZmqConsumerStateTable_readData(cst, 500, true, &result);
SWSSZmqConsumerStateTable_pops(cst, &arr);
Expand Down
Loading

0 comments on commit 1593cc6

Please sign in to comment.