diff --git a/ddl/metadb.sql b/ddl/metadb.sql index 2c5016b96..d917eed34 100644 --- a/ddl/metadb.sql +++ b/ddl/metadb.sql @@ -7,9 +7,10 @@ create table graph upload_start_time TIME not null, upload_end_time TIME not null, graph_status_idgraph_status INTEGER not null, - vertexcount BIGINT, + id_algorithm INTEGER, + vertexcount BIGINT default 0, centralpartitioncount INTEGER, - edgecount INTEGER, + edgecount INTEGER default 0, upload_time VARCHAR(8), train_status VARCHAR(20), feature_count INTEGER(100), @@ -81,7 +82,18 @@ create table worker_has_partition worker_idworker INTEGER ); +create table partitioning_algorithm +( + id_algorithm INTEGER not null primary key, + algorithm_name VARCHAR not null +); + INSERT INTO graph_status (idgraph_status, description) VALUES (1, 'LOADING'); INSERT INTO graph_status (idgraph_status, description) VALUES (2, 'OPERATIONAL'); INSERT INTO graph_status (idgraph_status, description) VALUES (3, 'DELETED'); -INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL'); \ No newline at end of file +INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL'); + +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (1, 'HASH'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (2, 'FENNEL'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (3, 'LDG'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (4, 'METIS'); diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index fb9d1b01f..1d7b56eb1 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -52,6 +52,7 @@ limitations under the License. #include "../query/processor/cypher/astbuilder/ASTBuilder.h" #include "../query/processor/cypher/astbuilder/ASTNode.h" #include "../query/processor/cypher/semanticanalyzer/SemanticAnalyzer.h" +#include "../partitioner/stream/Partitioner.h" #define MAX_PENDING_CONNECTIONS 10 @@ -139,9 +140,8 @@ void *frontendservicesesion(void *dummyPt) { std::string kafka_server_IP; cppkafka::Configuration configs; KafkaConnector *kstream; - Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite); - vector workerClients; + bool workerClientsInitialized = false; bool loop_exit = false; @@ -957,14 +957,187 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c KafkaConnector *&kstream, thread &input_stream_handler_thread, vector &workerClients, int numberOfPartitions, SQLiteDBInterface *sqlite, bool *loop_exit_p) { - string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?"; - int result_wr = write(connFd, msg_1.c_str(), msg_1.length()); + string exist = "Do you want to stream into existing graph(y/n) ? "; + int result_wr = write(connFd, exist.c_str(), exist.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - result_wr = write(connFd, "\r\n", 2); + + // Get user response. + string existingGraph = Utils::getFrontendInput(connFd); + string graphId; + string partitionAlgo; + string direction; + + if (existingGraph == "y") { + string existingGraphIdMsg = "Send the existing graph ID ? "; + result_wr = write(connFd, existingGraphIdMsg.c_str(), existingGraphIdMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + string existingGraphId = Utils::getFrontendInput(connFd); + + bool isExist = sqlite->isGraphIdExist(existingGraphId); + if (!isExist) { + string errorMsg = "Error: Graph ID you entered is not in the system"; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + string existingSuccessMsg = "Set data streaming into graph ID: " + existingGraphId; + result_wr = write(connFd, existingSuccessMsg.c_str(), existingSuccessMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), + Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + graphId = existingGraphId; + partitionAlgo = sqlite->getPartitionAlgoByGraphID(graphId); + + } else { + int nextID = sqlite->getNextGraphId(); + if (nextID < 0) { + return; + } + graphId = to_string(nextID); + string defaultIdMsg = "Do you use default graph ID: "+ graphId +"(y/n) ? "; + result_wr = write(connFd, defaultIdMsg.c_str(), defaultIdMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + string isDefaultGraphId = Utils::getFrontendInput(connFd); + + if (isDefaultGraphId != "y") { + string inputGraphIdMsg = "Input your graph ID: "; + result_wr = write(connFd, inputGraphIdMsg.c_str(), inputGraphIdMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + + // Get user response. + string userGraphId = Utils::getFrontendInput(connFd); + + bool isExist = sqlite->isGraphIdExist(userGraphId); + if (isExist) { + string errorMsg = "Error: Graph ID you entered already exists"; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + + string userGraphIdSuccessMsg = "Set graph ID successfully"; + result_wr = write(connFd, userGraphIdSuccessMsg.c_str(), userGraphIdSuccessMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + graphId = userGraphId; + } + + std::string partitionSelectionMsg = "Select the partitioning technique\n" + "\toption 1: Hash partitioning\n" + "\toption 2: Fennel partitioning\n" + "\toption 3: LDG partitioning\n" + "Choose an option(1,2,3): "; + result_wr = write(connFd, partitionSelectionMsg.c_str(), partitionSelectionMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + string partitionAlgoInput = Utils::getFrontendInput(connFd); + + if (partitionAlgoInput == "1" || partitionAlgoInput == "2" || partitionAlgoInput == "3") { + string partition_success_msg = "Set partition technique: " + partitionAlgoInput; + result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + partitionAlgo = partitionAlgoInput; + } else { + string errorMsg = "Error: invalid partition option: "+partitionAlgoInput; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + + string checkDirection = "Is this graph Directed (y/n)? "; + result_wr = write(connFd, checkDirection.c_str(), checkDirection.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + string isDirected = Utils::getFrontendInput(connFd); + if (isDirected == "y") { + direction = Conts::DIRECTED; + } else { + direction = Conts::UNDIRECTED; + } + + string checkGraphType = "Graph type received"; + result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + } + + string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?"; + result_wr = write(connFd, msg_1.c_str(), msg_1.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -972,24 +1145,17 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char user_res[FRONTEND_DATA_LENGTH + 1]; - bzero(user_res, FRONTEND_DATA_LENGTH + 1); - read(connFd, user_res, FRONTEND_DATA_LENGTH); - string user_res_s(user_res); - user_res_s = Utils::trim_copy(user_res_s); - for (char &c : user_res_s) { - c = tolower(c); - } + string default_kafka = Utils::getFrontendInput(connFd); // use default kafka consumer details string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER - if (user_res_s == "y") { + if (default_kafka == "y") { kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host"); configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}}; } else { // user need to start relevant kafka cluster using relevant IP address // read relevant IP address from given file path string message = "Send file path to the kafka configuration file."; - int result_wr = write(connFd, message.c_str(), message.length()); + result_wr = write(connFd, message.c_str(), message.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1062,64 +1228,35 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - string checkDirection = "Is this graph Directed (y/n)? "; - result_wr = write(connFd, checkDirection.c_str(), checkDirection.length()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } - // Get user response. - char isDirected[FRONTEND_DATA_LENGTH + 1]; - bzero(isDirected, FRONTEND_DATA_LENGTH + 1); - read(connFd, isDirected, FRONTEND_DATA_LENGTH); - string is_directed(isDirected); - is_directed = Utils::trim_copy(is_directed); - for (char &c : is_directed) { - c = tolower(c); - } - string direction; - if (is_directed == "y") { - direction = Conts::DIRECTED; - } else { - direction = Conts::UNDIRECTED; - } - - string checkGraphType = "Graph type received"; - result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } - result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); - // Create the Partitioner object. - Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite); // Create the KafkaConnector object. kstream = new KafkaConnector(configs); // Subscribe to the Kafka topic. kstream->Subscribe(topic_name_s); // Create the StreamHandler object. - StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite); - - string path = "kafka:\\" + topic_name_s + ":" + group_id; - std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); - string uploadStartTime = ctime(&time); - string sqlStatement = - "INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status," - "vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" + - topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" + - to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")"; - int newGraphID = sqlite->runInsert(sqlStatement); - + StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite, + stoi(graphId), + spt::getPartitioner(partitionAlgo)); + + if (existingGraph != "y") { + string path = "kafka:\\" + topic_name_s + ":" + group_id; + std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); + string uploadStartTime = ctime(&time); + string sqlStatement = + "INSERT INTO graph (idgraph,id_algorithm,name,upload_path, upload_start_time, upload_end_time," + "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, is_directed) VALUES("+ + graphId+","+partitionAlgo+",\"" +topic_name_s + "\", \"" + path + "\", \"" +uploadStartTime+ "\", \"\",\"" + + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\","+ to_string(numberOfPartitions)+ + ", \"\",\"" +direction+"\")"; + int newGraphID = sqlite->runInsert(sqlStatement); + } else { + std::string sqlStatement = "UPDATE graph SET graph_status_idgraph_status ="+ + to_string(Conts::GRAPH_STATUS::STREAMING)+ + " WHERE idgraph = " + graphId; + sqlite->runUpdate(sqlStatement); + } frontend_logger.info("Start listening to " + topic_name_s); input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler); } diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 6e3f1342c..f996c047a 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -17,8 +17,11 @@ #include #include #include +#include +#include #include "../../util/logger/Logger.h" +#include "../../util/Conts.h" Logger streaming_partitioner_logger; @@ -105,8 +108,8 @@ partitionedEdge Partitioner::ldgPartitioning(std::pair } partitionedEdge Partitioner::hashPartitioning(std::pair edge) { - int firstIndex = std::hash()(edge.first) % this->numberOfPartitions; // Hash partitioning - int secondIndex = std::hash()(edge.second) % this->numberOfPartitions; // Hash partitioning + int firstIndex = stoi(edge.first) % this->numberOfPartitions; // Hash partitioning + int secondIndex = stoi(edge.second) % this->numberOfPartitions; // Hash partitioning if (firstIndex == secondIndex) { this->partitions[firstIndex].addEdge(edge, this->getIsDirected()); @@ -143,16 +146,20 @@ void Partitioner::updateMetaDB() { edgeCutsCount += partition.edgeCutsCount(); } double numberOfEdges = edgesCount + edgeCutsCount/2; - std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) + - "' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) + - "' ,edgecount = '" + std::to_string(numberOfEdges) + - "' WHERE idgraph = '" + std::to_string(this->graphID) + "'"; + + std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); + string uploadEndTime = ctime(&time); + std::string sqlStatement = "UPDATE graph SET vertexcount = vertexcount+" + std::to_string(vertexCount) + + " ,upload_end_time = \"" + uploadEndTime + + "\" ,graph_status_idgraph_status = " + to_string(Conts::GRAPH_STATUS::OPERATIONAL) + + " ,edgecount = edgecount+" + std::to_string(numberOfEdges) + + " WHERE idgraph = " + std::to_string(this->graphID); this->sqlite->runUpdate(sqlStatement); streaming_partitioner_logger.info("Successfully updated metaDB"); } bool Partitioner::getIsDirected() { - std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = '"+std::to_string(this->graphID)+"'"; + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = " + std::to_string(this->graphID); auto result = this->sqlite->runSelect(sqlStatement); if (result[0][0].second == "0") { return false; diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 85be6715f..d4923d8ef 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -20,8 +20,17 @@ typedef std::vector> partitionedEdge; namespace spt { // spt : Streaming Partitioner enum Algorithms { HASH, FENNEL, LDG }; - +static Algorithms getPartitioner(string id) { + if (id == "1") { + return Algorithms::HASH; + } else if (id == "2") { + return Algorithms::FENNEL; + } else if (id == "3") { + return Algorithms::LDG; + } + return Algorithms::HASH; } +} // namespace spt class Partitioner { std::vector partitions; diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index cf7cbc3e6..7dea17c25 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1228,3 +1228,13 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos delete sqlite; } + +string Utils::getFrontendInput(int connFd) { + char frontendInput[FRONTEND_DATA_LENGTH + 1]; + bzero(frontendInput, FRONTEND_DATA_LENGTH + 1); + read(connFd, frontendInput, FRONTEND_DATA_LENGTH); + std::string input(frontendInput); + input = Utils::trim_copy(input); + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + return input; +} diff --git a/src/util/Utils.h b/src/util/Utils.h index ace8c3d64..2130fff95 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -27,6 +27,7 @@ limitations under the License. #include "../metadb/SQLiteDBInterface.h" #include "../performancedb/PerformanceSQLiteDBInterface.h" +#include "../frontend/JasmineGraphFrontEndProtocol.h" #include "Conts.h" using std::map; @@ -189,6 +190,7 @@ class Utils { static bool transferPartition(std::string sourceWorker, int sourceWorkerPort, std::string destinationWorker, int destinationWorkerDataPort, std::string graphID, std::string partitionID, std::string workerID, SQLiteDBInterface *sqlite); + static string getFrontendInput(int connFd); }; #endif // JASMINEGRAPH_UTILS_H diff --git a/src/util/dbinterface/DBInterface.cpp b/src/util/dbinterface/DBInterface.cpp index 11a296057..cd81887f8 100644 --- a/src/util/dbinterface/DBInterface.cpp +++ b/src/util/dbinterface/DBInterface.cpp @@ -124,3 +124,82 @@ int DBInterface::runSqlNoCallback(const char *zSql) { return rc; } + +bool DBInterface::isGraphIdExist(std::string graphId) { + std::string query = "SELECT COUNT(idgraph) FROM graph WHERE idgraph = ?"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return false; + } + + if (sqlite3_bind_text(stmt, 1, graphId.c_str(), -1, SQLITE_STATIC) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to bind parameter"); + sqlite3_finalize(stmt); + return false; + } + + bool exists = false; + if (sqlite3_step(stmt) == SQLITE_ROW) { + int count = sqlite3_column_int(stmt, 0); + exists = (count > 0); + } + + sqlite3_finalize(stmt); + return exists; +} + + +int DBInterface::getNextGraphId() { + std::string query = "SELECT MAX(idgraph) FROM graph;"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return -1; + } + + int nextGraphId = 1; + if (sqlite3_step(stmt) == SQLITE_ROW) { + if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { + int maxId = sqlite3_column_int(stmt, 0); + nextGraphId = maxId + 1; + } + } + + sqlite3_finalize(stmt); + + return nextGraphId; +} + +std::string DBInterface::getPartitionAlgoByGraphID(std::string graphID) { + std::string query = "SELECT id_algorithm FROM graph WHERE idgraph = ?;"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return ""; + } + + if (sqlite3_bind_text(stmt, 1, graphID.c_str(), -1, SQLITE_STATIC) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to bind parameter"); + sqlite3_finalize(stmt); + return ""; + } + + std::string result = ""; + + if (sqlite3_step(stmt) == SQLITE_ROW) { + if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { + result = reinterpret_cast(sqlite3_column_text(stmt, 0)); + } + } else { + interface_logger.info("No record found for graphID: " + graphID); + } + + sqlite3_finalize(stmt); + + return result; +} + diff --git a/src/util/dbinterface/DBInterface.h b/src/util/dbinterface/DBInterface.h index f434a7119..7655d4ff2 100644 --- a/src/util/dbinterface/DBInterface.h +++ b/src/util/dbinterface/DBInterface.h @@ -39,6 +39,12 @@ class DBInterface { void runInsertNoIDReturn(std::string); int runSqlNoCallback(const char *zSql); + + bool isGraphIdExist(std::string); + + int getNextGraphId(); + + std::string getPartitionAlgoByGraphID(std::string graphID); }; #endif // JASMINEGRAPH_SRC_UTIL_DBINTERFACE_H_ diff --git a/src/util/kafka/StreamHandler.cpp b/src/util/kafka/StreamHandler.cpp index 2b00edc40..473e9fcb1 100644 --- a/src/util/kafka/StreamHandler.cpp +++ b/src/util/kafka/StreamHandler.cpp @@ -27,10 +27,12 @@ using namespace std::chrono; Logger stream_handler_logger; StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - vector &workerClients, SQLiteDBInterface* sqlite) + vector &workerClients, SQLiteDBInterface* sqlite, + int graphId, spt::Algorithms algorithms) : kstream(kstream), + graphId(graphId), workerClients(workerClients), - graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite), + graphPartitioner(numberOfPartitions, graphId, algorithms, sqlite), stream_topic_name("stream_topic_name") { } @@ -74,18 +76,11 @@ void StreamHandler::listen_to_kafka_topic() { frontend_logger.log("Couldn't retrieve message from Kafka.", "info"); continue; } - string data(msg.get_payload()); auto edgeJson = json::parse(data); - // Check if graphID exists in properties - if (edgeJson["properties"].find("graphId") == edgeJson["properties"].end()) { - stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID."); - continue; - } auto prop = edgeJson["properties"]; - auto graphID = std::string(prop["graphId"]); - graphPartitioner.setGraphID(stoi(graphID)); + prop["graphId"] = to_string(this->graphId); auto sourceJson = edgeJson["source"]; auto destinationJson = edgeJson["destination"]; string sId = std::string(sourceJson["id"]); @@ -98,7 +93,7 @@ void StreamHandler::listen_to_kafka_topic() { json obj; obj["source"] = sourceJson; obj["destination"] = destinationJson; - obj["properties"] = edgeJson["properties"]; + obj["properties"] = prop; long part_s = partitionedEdge[0].second; long part_d = partitionedEdge[1].second; int n_workers = atoi((Utils::getJasmineGraphProperty("org.jasminegraph.server.nworkers")).c_str()); diff --git a/src/util/kafka/StreamHandler.h b/src/util/kafka/StreamHandler.h index 6370fcda8..96dba8ec7 100644 --- a/src/util/kafka/StreamHandler.h +++ b/src/util/kafka/StreamHandler.h @@ -25,13 +25,14 @@ limitations under the License. class StreamHandler { public: StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - std::vector &workerClients, SQLiteDBInterface* sqlite); + std::vector &workerClients, SQLiteDBInterface* sqlite, + int graphId, spt::Algorithms algo = spt::Algorithms::HASH); void listen_to_kafka_topic(); cppkafka::Message pollMessage(); bool isErrorInMessage(const cppkafka::Message &msg); bool isEndOfStream(const cppkafka::Message &msg); Partitioner graphPartitioner; - + int graphId; private: KafkaConnector *kstream; Logger frontend_logger;