Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq_new
  • Loading branch information
divyagayathri-hcl committed Jan 13, 2025
2 parents 1c2c16b + c872f42 commit 510132f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 64 deletions.
4 changes: 4 additions & 0 deletions common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ namespace swss {
#define COUNTERS_RIF_TYPE_MAP "COUNTERS_RIF_TYPE_MAP"
#define COUNTERS_RIF_NAME_MAP "COUNTERS_RIF_NAME_MAP"
#define COUNTERS_TRAP_NAME_MAP "COUNTERS_TRAP_NAME_MAP"
#define COUNTERS_POLICER_NAME_MAP "COUNTERS_POLICER_NAME_MAP"
#define COUNTERS_CRM_TABLE "CRM"
#define COUNTERS_BUFFER_POOL_NAME_MAP "COUNTERS_BUFFER_POOL_NAME_MAP"
#define COUNTERS_SWITCH_NAME_MAP "COUNTERS_SWITCH_NAME_MAP"
Expand Down Expand Up @@ -279,10 +280,13 @@ namespace swss {
#define TUNNEL_ATTR_ID_LIST "TUNNEL_ATTR_ID_LIST"
#define ACL_COUNTER_ATTR_ID_LIST "ACL_COUNTER_ATTR_ID_LIST"
#define FLOW_COUNTER_ID_LIST "FLOW_COUNTER_ID_LIST"
#define POLICER_COUNTER_ID_LIST "POLICER_COUNTER_ID_LIST"
#define PLUGIN_TABLE "PLUGIN_TABLE"
#define LUA_PLUGIN_TYPE "LUA_PLUGIN_TYPE"
#define SAI_OBJECT_TYPE "SAI_OBJECT_TYPE"

#define BULK_CHUNK_SIZE_FIELD "BULK_CHUNK_SIZE"
#define BULK_CHUNK_SIZE_PER_PREFIX_FIELD "BULK_CHUNK_SIZE_PER_PREFIX"
#define POLL_INTERVAL_FIELD "POLL_INTERVAL"
#define STATS_MODE_FIELD "STATS_MODE"
#define STATS_MODE_READ "STATS_MODE_READ"
Expand Down
14 changes: 6 additions & 8 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,11 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

//TODO: To implement the wait() method later.
bool ZmqClient::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ENTER();

return false;
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
bool ZmqClient::wait(
const std::string &dbName, const std::string &tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
return false;
}
}
22 changes: 11 additions & 11 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#include <stdlib.h>
#include <tuple>
#include <sstream>
#include <utility>
#include "zmqproducerstatetable.h"
#include "binaryserializer.h"
#include "redisapi.h"
#include "redispipeline.h"
#include "redisreply.h"
#include "table.h"
#include "zmqconsumerstatetable.h"
#include <algorithm>
#include <chrono>
#include <cmath>
#include <sstream>
#include <stdlib.h>
#include <tuple>
#include <utility>
#include <zmq.h>
#include "redisreply.h"
#include "table.h"
#include "redisapi.h"
#include "redispipeline.h"
#include "zmqproducerstatetable.h"
#include "zmqconsumerstatetable.h"
#include "binaryserializer.h"

using namespace std;

Expand Down
14 changes: 6 additions & 8 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ void ZmqServer::mqPollThread()
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
SWSS_LOG_DEBUG("m_runThread: %d", m_runThread);
while (m_runThread)
{
// receive message
Expand Down Expand Up @@ -171,12 +170,11 @@ void ZmqServer::mqPollThread()
SWSS_LOG_NOTICE("mqPollThread end");
}

//TODO: To implement the sendMsg() method later.
void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{

return;
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
void ZmqServer::sendMsg(
const std::string &dbName, const std::string &tableName,
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
return;
}

}
10 changes: 4 additions & 6 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ TEST(c_api, SubscriberStateTable) {
TEST(c_api, ZmqConsumerProducerStateTable) {
clearDB();
SWSSStringManager sm;

SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true);

SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312");
SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312");
EXPECT_TRUE(SWSSZmqClient_isConnected(cli));
Expand Down Expand Up @@ -262,16 +264,12 @@ TEST(c_api, ZmqConsumerProducerStateTable) {

if (flag == 0)
for (uint64_t i = 0; i < arr.len; i++)
{
SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues);
}
else
{
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
}
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA);
sleep(2);
arr = SWSSZmqConsumerStateTable_pops(cst);

vector<KeyOpFieldsValuesTuple> kfvs = takeKeyOpFieldValuesArray(arr);
Expand All @@ -295,7 +293,6 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
EXPECT_EQ(fieldValues1[0].first, "myfield3");
EXPECT_EQ(fieldValues1[0].second, "myvalue3");

sleep(2);
arr = SWSSZmqConsumerStateTable_pops(cst);
ASSERT_EQ(arr.len, 0);
freeKeyOpFieldValuesArray(arr);
Expand All @@ -309,6 +306,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_del(pst, arr.data[i].key);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);
Expand Down
59 changes: 28 additions & 31 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence
}

// Wait for some time to write into the DB.

sleep(3);

allDataReceived = true;
Expand Down Expand Up @@ -480,36 +479,35 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test)

static bool zmq_done = false;

static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence)
{
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";

cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, "");
ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence);
//validate received data
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});

while (!zmq_done)
{
static void zmqConsumerWorker(string tableName, string endpoint,
bool dbPersistence) {
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, "");
ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence);
// validate received data
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{
"k", SET_COMMAND,
std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});

while (!zmq_done) {
sleep(10);
std::string rec_dbName, rec_tableName;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> rec_kcos_ptrs;
std::vector<KeyOpFieldsValuesTuple> deserialized_kcos;
std::string recDbName, recTableName;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> recKcos;
std::vector<KeyOpFieldsValuesTuple> deserializedKcos;

BinarySerializer::deserializeBuffer(server.m_buffer.data(), server.m_buffer.size(), rec_dbName, rec_tableName, rec_kcos_ptrs);
BinarySerializer::deserializeBuffer(server.m_buffer.data(),
server.m_buffer.size(), recDbName,
recTableName, recKcos);

for (auto kco_ptr : rec_kcos_ptrs)
for (auto kcoPtr : recKcos)
{
deserialized_kcos.push_back(*kco_ptr);
deserializedKcos.push_back(*kcoPtr);
}
EXPECT_EQ(rec_dbName, TEST_DB);
EXPECT_EQ(rec_tableName, tableName);
EXPECT_EQ(deserialized_kcos, values);

EXPECT_EQ(recDbName, TEST_DB);
EXPECT_EQ(recTableName, tableName);
EXPECT_EQ(deserializedKcos, values);
}

allDataReceived = true;
Expand Down Expand Up @@ -541,9 +539,8 @@ static void ZmqWithResponse(bool producerPersistence)
ZmqProducerStateTable p(&db, testTableName, client, true);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
for (int i = 0; i < 5; ++i)
{
p.send(kcos);
for (int i = 0; i < 3; ++i) {
p.send(kcos);
}

zmq_done = true;
Expand All @@ -567,9 +564,9 @@ TEST(ZmqWithResponseClientError, test)
ZmqProducerStateTable p(&db, testTableName, client, true);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcosPtr;
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
EXPECT_FALSE(p.wait(dbName, tableName, kcosPtr));
}

0 comments on commit 510132f

Please sign in to comment.