From 4615f98b3459ae702b3ccf8641ce907ad95fde99 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Tue, 21 Jan 2025 02:05:56 +0530 Subject: [PATCH 1/7] Fix kafka streaming issues --- ddl/metadb.sql | 18 +- src/frontend/JasmineGraphFrontEnd.cpp | 295 ++++++++++++++++++++----- src/partitioner/stream/Partitioner.cpp | 17 +- src/partitioner/stream/Partitioner.h | 12 +- src/util/dbinterface/DBInterface.cpp | 82 +++++++ src/util/dbinterface/DBInterface.h | 6 + src/util/kafka/StreamHandler.cpp | 17 +- src/util/kafka/StreamHandler.h | 5 +- 8 files changed, 375 insertions(+), 77 deletions(-) diff --git a/ddl/metadb.sql b/ddl/metadb.sql index 2c5016b96..71e36ffb6 100644 --- a/ddl/metadb.sql +++ b/ddl/metadb.sql @@ -7,9 +7,10 @@ create table graph upload_start_time TIME not null, upload_end_time TIME not null, graph_status_idgraph_status INTEGER not null, - vertexcount BIGINT, + idalgorithm INTEGER, + vertexcount BIGINT default 0, centralpartitioncount INTEGER, - edgecount INTEGER, + edgecount INTEGER default 0, upload_time VARCHAR(8), train_status VARCHAR(20), feature_count INTEGER(100), @@ -81,7 +82,18 @@ create table worker_has_partition worker_idworker INTEGER ); +create table partitioning_algorithm +( + idalgorithm INTEGER not null primary key, + algorithm_name VARCHAR not null +); + INSERT INTO graph_status (idgraph_status, description) VALUES (1, 'LOADING'); INSERT INTO graph_status (idgraph_status, description) VALUES (2, 'OPERATIONAL'); INSERT INTO graph_status (idgraph_status, description) VALUES (3, 'DELETED'); -INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL'); \ No newline at end of file +INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL'); + +INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (1, 'HASH'); +INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (2, 'FENNEL'); +INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (3, 'LDG'); +INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (4, 'METIS'); diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index fb9d1b01f..d203d9ab3 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -52,6 +52,7 @@ limitations under the License. #include "../query/processor/cypher/astbuilder/ASTBuilder.h" #include "../query/processor/cypher/astbuilder/ASTNode.h" #include "../query/processor/cypher/semanticanalyzer/SemanticAnalyzer.h" +#include "../partitioner/stream/Partitioner.h" #define MAX_PENDING_CONNECTIONS 10 @@ -139,9 +140,8 @@ void *frontendservicesesion(void *dummyPt) { std::string kafka_server_IP; cppkafka::Configuration configs; KafkaConnector *kstream; - Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite); - vector workerClients; + bool workerClientsInitialized = false; bool loop_exit = false; @@ -957,14 +957,228 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c KafkaConnector *&kstream, thread &input_stream_handler_thread, vector &workerClients, int numberOfPartitions, SQLiteDBInterface *sqlite, bool *loop_exit_p) { - string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?"; - int result_wr = write(connFd, msg_1.c_str(), msg_1.length()); + string exist = "Do you want to stream into existing graph(y/n) ? " ; + int result_wr = write(connFd, exist.c_str(), exist.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - result_wr = write(connFd, "\r\n", 2); + + // Get user response. + char exist_graph[FRONTEND_DATA_LENGTH + 1]; + bzero(exist_graph, FRONTEND_DATA_LENGTH + 1); + read(connFd, exist_graph, FRONTEND_DATA_LENGTH); + string exist_g(exist_graph); + exist_g = Utils::trim_copy(exist_g); + for (char &c : exist_g) { + c = tolower(c); + } + string graphId; + string partitionAlgo; + string direction; + + if (exist_g == "y") { + string exist_graph_id_msg = "Send the existing graph ID ? " ; + int result_wr = write(connFd, exist_graph_id_msg.c_str(), exist_graph_id_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char exist_graph_id[FRONTEND_DATA_LENGTH + 1]; + bzero(exist_graph_id, FRONTEND_DATA_LENGTH + 1); + read(connFd, exist_graph_id, FRONTEND_DATA_LENGTH); + string exist_g_i(exist_graph_id); + exist_g_i = Utils::trim_copy(exist_g_i); + for (char &c : exist_g_i) { + c = tolower(c); + } + + bool isExist = sqlite->isGraphIdExist(exist_g_i); + if (!isExist) { + string Err_msg = "Error: Graph ID you entered is not in the system"; + result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + string exist_success_msg = "Set data streaming into graph ID: "+exist_g_i; + result_wr = write(connFd, exist_success_msg.c_str(), exist_success_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + graphId = exist_g_i; + partitionAlgo = sqlite->getPartitionAlgoByGraphID(graphId); + + } else { + int nextID = sqlite->getNextGraphId(); + if (nextID < 0) { + return; + } + graphId = to_string(nextID); + string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? " ; + int result_wr = write(connFd, default_id.c_str(), default_id.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char default_graph_id[FRONTEND_DATA_LENGTH + 1]; + bzero(default_graph_id, FRONTEND_DATA_LENGTH + 1); + read(connFd, default_graph_id, FRONTEND_DATA_LENGTH); + string default_g_i(default_graph_id); + default_g_i = Utils::trim_copy(default_g_i); + for (char &c : default_g_i) { + c = tolower(c); + } + + if (default_g_i != "y") { + string input_graph_id = "Input your graph ID: "; + result_wr = write(connFd, input_graph_id.c_str(), input_graph_id.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + + // Get user response. + char graph_id[FRONTEND_DATA_LENGTH + 1]; + bzero(graph_id, FRONTEND_DATA_LENGTH + 1); + read(connFd, graph_id, FRONTEND_DATA_LENGTH); + string user_graph_id(graph_id); + user_graph_id = Utils::trim_copy(user_graph_id); + for (char &c : user_graph_id) { + c = tolower(c); + } + + bool isExist = sqlite->isGraphIdExist(user_graph_id); + if (isExist) { + string Err_msg = "Error: Graph ID you entered is already exist"; + result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + + string user_success_msg = "Set graph ID successfully"; + result_wr = write(connFd, user_success_msg.c_str(), user_success_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + graphId = user_graph_id; + } + + std::string partition_selection = "Select the partition technique\n" + "\toption 1: Hash partitioning\n" + "\toption 2: Fennel partitioning\n" + "\toption 3: LDG partitioning\n" + "Choose an option(1,2,3): "; + result_wr = write(connFd, partition_selection.c_str(), partition_selection.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char partition_algo[FRONTEND_DATA_LENGTH + 1]; + bzero(partition_algo, FRONTEND_DATA_LENGTH + 1); + read(connFd, partition_algo, FRONTEND_DATA_LENGTH); + string partition_a(partition_algo); + partition_a = Utils::trim_copy(partition_a); + for (char &c : partition_a) { + c = tolower(c); + } + + if (partition_a == "1" || partition_a == "2" || partition_a == "3") { + string partition_success_msg = "Set partition technique: "+partition_a; + result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + partitionAlgo = partition_a; + } else { + string Err_msg = "Error: invalid partition option: "+partition_a; + result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + return; + } + + string checkDirection = "Is this graph Directed (y/n)? "; + result_wr = write(connFd, checkDirection.c_str(), checkDirection.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + // Get user response. + char isDirected[FRONTEND_DATA_LENGTH + 1]; + bzero(isDirected, FRONTEND_DATA_LENGTH + 1); + read(connFd, isDirected, FRONTEND_DATA_LENGTH); + string is_directed(isDirected); + is_directed = Utils::trim_copy(is_directed); + for (char &c : is_directed) { + c = tolower(c); + } + if (is_directed == "y") { + direction = Conts::DIRECTED; + } else { + direction = Conts::UNDIRECTED; + } + + string checkGraphType = "Graph type received"; + result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + if (result_wr < 0) { + frontend_logger.error("Error writing to socket"); + *loop_exit_p = true; + return; + } + } + + string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?"; + result_wr = write(connFd, msg_1.c_str(), msg_1.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1062,64 +1276,35 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - string checkDirection = "Is this graph Directed (y/n)? "; - result_wr = write(connFd, checkDirection.c_str(), checkDirection.length()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } - // Get user response. - char isDirected[FRONTEND_DATA_LENGTH + 1]; - bzero(isDirected, FRONTEND_DATA_LENGTH + 1); - read(connFd, isDirected, FRONTEND_DATA_LENGTH); - string is_directed(isDirected); - is_directed = Utils::trim_copy(is_directed); - for (char &c : is_directed) { - c = tolower(c); - } - string direction; - if (is_directed == "y") { - direction = Conts::DIRECTED; - } else { - direction = Conts::UNDIRECTED; - } - - string checkGraphType = "Graph type received"; - result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } - result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); - if (result_wr < 0) { - frontend_logger.error("Error writing to socket"); - *loop_exit_p = true; - return; - } // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); - // Create the Partitioner object. - Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite); // Create the KafkaConnector object. kstream = new KafkaConnector(configs); // Subscribe to the Kafka topic. kstream->Subscribe(topic_name_s); // Create the StreamHandler object. - StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite); - - string path = "kafka:\\" + topic_name_s + ":" + group_id; - std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); - string uploadStartTime = ctime(&time); - string sqlStatement = - "INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status," - "vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" + - topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" + - to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")"; - int newGraphID = sqlite->runInsert(sqlStatement); - + StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite, + stoi(graphId), + spt::getPartitioner(partitionAlgo)); + + if (exist_g != "y") { + string path = "kafka:\\" + topic_name_s + ":" + group_id; + std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); + string uploadStartTime = ctime(&time); + string sqlStatement = + "INSERT INTO graph (idgraph,idalgorithm,name,upload_path, upload_start_time, upload_end_time," + "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, is_directed) VALUES("+ + graphId+","+partitionAlgo+",\"" +topic_name_s + "\", \"" + path + "\", \"" +uploadStartTime+ "\", \"\",\"" + + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\","+ to_string(numberOfPartitions)+ + ", \"\",\"" +direction+"\")"; + int newGraphID = sqlite->runInsert(sqlStatement); + } else { + std::string sqlStatement = "UPDATE graph SET graph_status_idgraph_status ="+ + to_string(Conts::GRAPH_STATUS::STREAMING)+ + " WHERE idgraph = " + graphId; + sqlite->runUpdate(sqlStatement); + } frontend_logger.info("Start listening to " + topic_name_s); input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler); } diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 6e3f1342c..83b3fb0c7 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -17,8 +17,11 @@ #include #include #include +#include +#include #include "../../util/logger/Logger.h" +#include "../../util/Conts.h" Logger streaming_partitioner_logger; @@ -143,16 +146,20 @@ void Partitioner::updateMetaDB() { edgeCutsCount += partition.edgeCutsCount(); } double numberOfEdges = edgesCount + edgeCutsCount/2; - std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) + - "' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) + - "' ,edgecount = '" + std::to_string(numberOfEdges) + - "' WHERE idgraph = '" + std::to_string(this->graphID) + "'"; + + std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); + string uploadEndTime = ctime(&time); + std::string sqlStatement = "UPDATE graph SET vertexcount = vertexcount+" + std::to_string(vertexCount) + + " ,upload_end_time = \"" + uploadEndTime + + "\" ,graph_status_idgraph_status = " + to_string(Conts::GRAPH_STATUS::OPERATIONAL) + + " ,edgecount = edgecount+" + std::to_string(numberOfEdges) + + " WHERE idgraph = " + std::to_string(this->graphID); this->sqlite->runUpdate(sqlStatement); streaming_partitioner_logger.info("Successfully updated metaDB"); } bool Partitioner::getIsDirected() { - std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = '"+std::to_string(this->graphID)+"'"; + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = "+std::to_string(this->graphID); auto result = this->sqlite->runSelect(sqlStatement); if (result[0][0].second == "0") { return false; diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 85be6715f..c850b0140 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -20,7 +20,16 @@ typedef std::vector> partitionedEdge; namespace spt { // spt : Streaming Partitioner enum Algorithms { HASH, FENNEL, LDG }; - +static Algorithms getPartitioner(string id) { + if (id == "1") { + return Algorithms::HASH; + } else if (id == "2") { + return Algorithms::FENNEL; + } else if (id == "3") { + return Algorithms::LDG; + } + return Algorithms::HASH; +} } class Partitioner { @@ -40,6 +49,7 @@ class Partitioner { for (size_t i = 0; i < numberOfPartitions; i++) { this->partitions.push_back(Partition(i, numberOfPartitions)); }; + }; void printStats(); long getTotalVertices(); diff --git a/src/util/dbinterface/DBInterface.cpp b/src/util/dbinterface/DBInterface.cpp index 11a296057..34f212890 100644 --- a/src/util/dbinterface/DBInterface.cpp +++ b/src/util/dbinterface/DBInterface.cpp @@ -124,3 +124,85 @@ int DBInterface::runSqlNoCallback(const char *zSql) { return rc; } + +bool DBInterface::isGraphIdExist(std::string graphId) { + std::string query = "SELECT COUNT(idgraph) FROM graph WHERE idgraph = ?"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return false; + } + + if (sqlite3_bind_text(stmt, 1, graphId.c_str(), -1, SQLITE_STATIC) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to bind parameter"); + sqlite3_finalize(stmt); + return false; + } + + bool exists = false; + if (sqlite3_step(stmt) == SQLITE_ROW) { + int count = sqlite3_column_int(stmt, 0); + exists = (count > 0); + } + + sqlite3_finalize(stmt); + return exists; +} + + +int DBInterface::getNextGraphId() { + + std::string query = "SELECT MAX(idgraph) FROM graph;"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return -1; + } + + int nextGraphId = 1; + if (sqlite3_step(stmt) == SQLITE_ROW) { + + if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { + int maxId = sqlite3_column_int(stmt, 0); + nextGraphId = maxId + 1; + } + } + + sqlite3_finalize(stmt); + + return nextGraphId; +} + +std::string DBInterface::getPartitionAlgoByGraphID(std::string graphID) { + std::string query = "SELECT idalgorithm FROM graph WHERE idgraph = ?;"; + sqlite3_stmt* stmt; + + if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to prepare statement"); + return ""; + } + + if (sqlite3_bind_text(stmt, 1, graphID.c_str(), -1, SQLITE_STATIC) != SQLITE_OK) { + interface_logger.error("SQL Error: Failed to bind parameter"); + sqlite3_finalize(stmt); + return ""; + } + + std::string result = ""; + + if (sqlite3_step(stmt) == SQLITE_ROW) { + + if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { + result = reinterpret_cast(sqlite3_column_text(stmt, 0)); + } + } else { + interface_logger.info("No record found for graphID: " + graphID); + } + + sqlite3_finalize(stmt); + + return result; +} + diff --git a/src/util/dbinterface/DBInterface.h b/src/util/dbinterface/DBInterface.h index f434a7119..7655d4ff2 100644 --- a/src/util/dbinterface/DBInterface.h +++ b/src/util/dbinterface/DBInterface.h @@ -39,6 +39,12 @@ class DBInterface { void runInsertNoIDReturn(std::string); int runSqlNoCallback(const char *zSql); + + bool isGraphIdExist(std::string); + + int getNextGraphId(); + + std::string getPartitionAlgoByGraphID(std::string graphID); }; #endif // JASMINEGRAPH_SRC_UTIL_DBINTERFACE_H_ diff --git a/src/util/kafka/StreamHandler.cpp b/src/util/kafka/StreamHandler.cpp index 2b00edc40..473e9fcb1 100644 --- a/src/util/kafka/StreamHandler.cpp +++ b/src/util/kafka/StreamHandler.cpp @@ -27,10 +27,12 @@ using namespace std::chrono; Logger stream_handler_logger; StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - vector &workerClients, SQLiteDBInterface* sqlite) + vector &workerClients, SQLiteDBInterface* sqlite, + int graphId, spt::Algorithms algorithms) : kstream(kstream), + graphId(graphId), workerClients(workerClients), - graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite), + graphPartitioner(numberOfPartitions, graphId, algorithms, sqlite), stream_topic_name("stream_topic_name") { } @@ -74,18 +76,11 @@ void StreamHandler::listen_to_kafka_topic() { frontend_logger.log("Couldn't retrieve message from Kafka.", "info"); continue; } - string data(msg.get_payload()); auto edgeJson = json::parse(data); - // Check if graphID exists in properties - if (edgeJson["properties"].find("graphId") == edgeJson["properties"].end()) { - stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID."); - continue; - } auto prop = edgeJson["properties"]; - auto graphID = std::string(prop["graphId"]); - graphPartitioner.setGraphID(stoi(graphID)); + prop["graphId"] = to_string(this->graphId); auto sourceJson = edgeJson["source"]; auto destinationJson = edgeJson["destination"]; string sId = std::string(sourceJson["id"]); @@ -98,7 +93,7 @@ void StreamHandler::listen_to_kafka_topic() { json obj; obj["source"] = sourceJson; obj["destination"] = destinationJson; - obj["properties"] = edgeJson["properties"]; + obj["properties"] = prop; long part_s = partitionedEdge[0].second; long part_d = partitionedEdge[1].second; int n_workers = atoi((Utils::getJasmineGraphProperty("org.jasminegraph.server.nworkers")).c_str()); diff --git a/src/util/kafka/StreamHandler.h b/src/util/kafka/StreamHandler.h index 6370fcda8..96dba8ec7 100644 --- a/src/util/kafka/StreamHandler.h +++ b/src/util/kafka/StreamHandler.h @@ -25,13 +25,14 @@ limitations under the License. class StreamHandler { public: StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - std::vector &workerClients, SQLiteDBInterface* sqlite); + std::vector &workerClients, SQLiteDBInterface* sqlite, + int graphId, spt::Algorithms algo = spt::Algorithms::HASH); void listen_to_kafka_topic(); cppkafka::Message pollMessage(); bool isErrorInMessage(const cppkafka::Message &msg); bool isEndOfStream(const cppkafka::Message &msg); Partitioner graphPartitioner; - + int graphId; private: KafkaConnector *kstream; Logger frontend_logger; From 20acbb1f0a5282143c8b0a44a41aa0f60b168d27 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Thu, 6 Feb 2025 17:02:04 +0530 Subject: [PATCH 2/7] Fix an issue on hashPartitioning method --- src/partitioner/stream/Partitioner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 83b3fb0c7..ed483e24e 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -108,8 +108,8 @@ partitionedEdge Partitioner::ldgPartitioning(std::pair } partitionedEdge Partitioner::hashPartitioning(std::pair edge) { - int firstIndex = std::hash()(edge.first) % this->numberOfPartitions; // Hash partitioning - int secondIndex = std::hash()(edge.second) % this->numberOfPartitions; // Hash partitioning + int firstIndex = stoi(edge.first) % this->numberOfPartitions; // Hash partitioning + int secondIndex = stoi(edge.second) % this->numberOfPartitions; // Hash partitioning if (firstIndex == secondIndex) { this->partitions[firstIndex].addEdge(edge, this->getIsDirected()); From 0163259f42f677f25057df6187a0fa598b116a1d Mon Sep 17 00:00:00 2001 From: thamindumk Date: Thu, 6 Feb 2025 17:22:09 +0530 Subject: [PATCH 3/7] Fix lint issues --- src/frontend/JasmineGraphFrontEnd.cpp | 6 +++--- src/partitioner/stream/Partitioner.h | 3 +-- src/util/dbinterface/DBInterface.cpp | 3 --- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index d203d9ab3..6921d3357 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -957,7 +957,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c KafkaConnector *&kstream, thread &input_stream_handler_thread, vector &workerClients, int numberOfPartitions, SQLiteDBInterface *sqlite, bool *loop_exit_p) { - string exist = "Do you want to stream into existing graph(y/n) ? " ; + string exist = "Do you want to stream into existing graph(y/n) ? "; int result_wr = write(connFd, exist.c_str(), exist.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -979,7 +979,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c string direction; if (exist_g == "y") { - string exist_graph_id_msg = "Send the existing graph ID ? " ; + string exist_graph_id_msg = "Send the existing graph ID ? "; int result_wr = write(connFd, exist_graph_id_msg.c_str(), exist_graph_id_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1029,7 +1029,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } graphId = to_string(nextID); - string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? " ; + string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? "; int result_wr = write(connFd, default_id.c_str(), default_id.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index c850b0140..d4923d8ef 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -30,7 +30,7 @@ static Algorithms getPartitioner(string id) { } return Algorithms::HASH; } -} +} // namespace spt class Partitioner { std::vector partitions; @@ -49,7 +49,6 @@ class Partitioner { for (size_t i = 0; i < numberOfPartitions; i++) { this->partitions.push_back(Partition(i, numberOfPartitions)); }; - }; void printStats(); long getTotalVertices(); diff --git a/src/util/dbinterface/DBInterface.cpp b/src/util/dbinterface/DBInterface.cpp index 34f212890..fa5db73c3 100644 --- a/src/util/dbinterface/DBInterface.cpp +++ b/src/util/dbinterface/DBInterface.cpp @@ -152,7 +152,6 @@ bool DBInterface::isGraphIdExist(std::string graphId) { int DBInterface::getNextGraphId() { - std::string query = "SELECT MAX(idgraph) FROM graph;"; sqlite3_stmt* stmt; @@ -163,7 +162,6 @@ int DBInterface::getNextGraphId() { int nextGraphId = 1; if (sqlite3_step(stmt) == SQLITE_ROW) { - if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { int maxId = sqlite3_column_int(stmt, 0); nextGraphId = maxId + 1; @@ -193,7 +191,6 @@ std::string DBInterface::getPartitionAlgoByGraphID(std::string graphID) { std::string result = ""; if (sqlite3_step(stmt) == SQLITE_ROW) { - if (sqlite3_column_type(stmt, 0) != SQLITE_NULL) { result = reinterpret_cast(sqlite3_column_text(stmt, 0)); } From 09752ec58c4c2a84d85959491347f7c89f73335a Mon Sep 17 00:00:00 2001 From: thamindumk Date: Sat, 8 Feb 2025 01:05:34 +0530 Subject: [PATCH 4/7] Change metadb.sql file --- ddl/metadb.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/metadb.sql b/ddl/metadb.sql index 71e36ffb6..7a349382a 100644 --- a/ddl/metadb.sql +++ b/ddl/metadb.sql @@ -7,7 +7,7 @@ create table graph upload_start_time TIME not null, upload_end_time TIME not null, graph_status_idgraph_status INTEGER not null, - idalgorithm INTEGER, + id_algorithm INTEGER, vertexcount BIGINT default 0, centralpartitioncount INTEGER, edgecount INTEGER default 0, @@ -84,7 +84,7 @@ create table worker_has_partition create table partitioning_algorithm ( - idalgorithm INTEGER not null primary key, + id_algorithm INTEGER not null primary key, algorithm_name VARCHAR not null ); From fcb0d20d3543e48d4fbb1da33d3c82f5e92901f8 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Tue, 11 Feb 2025 13:37:45 +0530 Subject: [PATCH 5/7] Do code refactoring --- src/frontend/JasmineGraphFrontEnd.cpp | 81 +++++--------------------- src/partitioner/stream/Partitioner.cpp | 2 +- src/util/Utils.cpp | 10 ++++ src/util/Utils.h | 2 + 4 files changed, 29 insertions(+), 66 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index 6921d3357..ea0028e7b 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -966,14 +966,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char exist_graph[FRONTEND_DATA_LENGTH + 1]; - bzero(exist_graph, FRONTEND_DATA_LENGTH + 1); - read(connFd, exist_graph, FRONTEND_DATA_LENGTH); - string exist_g(exist_graph); - exist_g = Utils::trim_copy(exist_g); - for (char &c : exist_g) { - c = tolower(c); - } + string exist_g = Utils::getFrontendInput(connFd); string graphId; string partitionAlgo; string direction; @@ -987,14 +980,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char exist_graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(exist_graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, exist_graph_id, FRONTEND_DATA_LENGTH); - string exist_g_i(exist_graph_id); - exist_g_i = Utils::trim_copy(exist_g_i); - for (char &c : exist_g_i) { - c = tolower(c); - } + string exist_g_i = Utils::getFrontendInput(connFd); bool isExist = sqlite->isGraphIdExist(exist_g_i); if (!isExist) { @@ -1007,7 +993,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } return; } - string exist_success_msg = "Set data streaming into graph ID: "+exist_g_i; + string exist_success_msg = "Set data streaming into graph ID: " + exist_g_i; result_wr = write(connFd, exist_success_msg.c_str(), exist_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1029,7 +1015,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } graphId = to_string(nextID); - string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? "; + string default_id = "Do you use default graph ID: "+ graphId +"(y/n) ? "; int result_wr = write(connFd, default_id.c_str(), default_id.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1037,14 +1023,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char default_graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(default_graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, default_graph_id, FRONTEND_DATA_LENGTH); - string default_g_i(default_graph_id); - default_g_i = Utils::trim_copy(default_g_i); - for (char &c : default_g_i) { - c = tolower(c); - } + string default_g_i = Utils::getFrontendInput(connFd); if (default_g_i != "y") { string input_graph_id = "Input your graph ID: "; @@ -1056,18 +1035,11 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, graph_id, FRONTEND_DATA_LENGTH); - string user_graph_id(graph_id); - user_graph_id = Utils::trim_copy(user_graph_id); - for (char &c : user_graph_id) { - c = tolower(c); - } + string user_graph_id = Utils::getFrontendInput(connFd); bool isExist = sqlite->isGraphIdExist(user_graph_id); if (isExist) { - string Err_msg = "Error: Graph ID you entered is already exist"; + string Err_msg = "Error: Graph ID you entered already exists"; result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1093,7 +1065,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c graphId = user_graph_id; } - std::string partition_selection = "Select the partition technique\n" + std::string partition_selection = "Select the partitioning technique\n" "\toption 1: Hash partitioning\n" "\toption 2: Fennel partitioning\n" "\toption 3: LDG partitioning\n" @@ -1105,17 +1077,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char partition_algo[FRONTEND_DATA_LENGTH + 1]; - bzero(partition_algo, FRONTEND_DATA_LENGTH + 1); - read(connFd, partition_algo, FRONTEND_DATA_LENGTH); - string partition_a(partition_algo); - partition_a = Utils::trim_copy(partition_a); - for (char &c : partition_a) { - c = tolower(c); - } + string partition_algo = Utils::getFrontendInput(connFd); - if (partition_a == "1" || partition_a == "2" || partition_a == "3") { - string partition_success_msg = "Set partition technique: "+partition_a; + if (partition_algo == "1" || partition_algo == "2" || partition_algo == "3") { + string partition_success_msg = "Set partition technique: " + partition_algo; result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1128,9 +1093,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - partitionAlgo = partition_a; + partitionAlgo = partition_algo; } else { - string Err_msg = "Error: invalid partition option: "+partition_a; + string Err_msg = "Error: invalid partition option: "+partition_algo; result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1148,14 +1113,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char isDirected[FRONTEND_DATA_LENGTH + 1]; - bzero(isDirected, FRONTEND_DATA_LENGTH + 1); - read(connFd, isDirected, FRONTEND_DATA_LENGTH); - string is_directed(isDirected); - is_directed = Utils::trim_copy(is_directed); - for (char &c : is_directed) { - c = tolower(c); - } + string is_directed = Utils::getFrontendInput(connFd); if (is_directed == "y") { direction = Conts::DIRECTED; } else { @@ -1186,17 +1144,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char user_res[FRONTEND_DATA_LENGTH + 1]; - bzero(user_res, FRONTEND_DATA_LENGTH + 1); - read(connFd, user_res, FRONTEND_DATA_LENGTH); - string user_res_s(user_res); - user_res_s = Utils::trim_copy(user_res_s); - for (char &c : user_res_s) { - c = tolower(c); - } + string default_kafka = Utils::getFrontendInput(connFd); // use default kafka consumer details string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER - if (user_res_s == "y") { + if (default_kafka == "y") { kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host"); configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}}; } else { diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index ed483e24e..f996c047a 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -159,7 +159,7 @@ void Partitioner::updateMetaDB() { } bool Partitioner::getIsDirected() { - std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = "+std::to_string(this->graphID); + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = " + std::to_string(this->graphID); auto result = this->sqlite->runSelect(sqlStatement); if (result[0][0].second == "0") { return false; diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index cf7cbc3e6..7dea17c25 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1228,3 +1228,13 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos delete sqlite; } + +string Utils::getFrontendInput(int connFd) { + char frontendInput[FRONTEND_DATA_LENGTH + 1]; + bzero(frontendInput, FRONTEND_DATA_LENGTH + 1); + read(connFd, frontendInput, FRONTEND_DATA_LENGTH); + std::string input(frontendInput); + input = Utils::trim_copy(input); + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + return input; +} diff --git a/src/util/Utils.h b/src/util/Utils.h index ace8c3d64..2130fff95 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -27,6 +27,7 @@ limitations under the License. #include "../metadb/SQLiteDBInterface.h" #include "../performancedb/PerformanceSQLiteDBInterface.h" +#include "../frontend/JasmineGraphFrontEndProtocol.h" #include "Conts.h" using std::map; @@ -189,6 +190,7 @@ class Utils { static bool transferPartition(std::string sourceWorker, int sourceWorkerPort, std::string destinationWorker, int destinationWorkerDataPort, std::string graphID, std::string partitionID, std::string workerID, SQLiteDBInterface *sqlite); + static string getFrontendInput(int connFd); }; #endif // JASMINEGRAPH_UTILS_H From 7c9e042b8dd0acdb27d5b5117a1af0cb9c61f760 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Wed, 19 Feb 2025 11:26:16 +0530 Subject: [PATCH 6/7] Change variable naming --- src/frontend/JasmineGraphFrontEnd.cpp | 77 ++++++++++++++------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index ea0028e7b..5ad275ca5 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -966,26 +966,26 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - string exist_g = Utils::getFrontendInput(connFd); + string existingGraph = Utils::getFrontendInput(connFd); string graphId; string partitionAlgo; string direction; - if (exist_g == "y") { - string exist_graph_id_msg = "Send the existing graph ID ? "; - int result_wr = write(connFd, exist_graph_id_msg.c_str(), exist_graph_id_msg.length()); + if (existingGraph == "y") { + string existingGraphIdMsg = "Send the existing graph ID ? "; + result_wr = write(connFd, existingGraphIdMsg.c_str(), existingGraphIdMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } // Get user response. - string exist_g_i = Utils::getFrontendInput(connFd); + string existingGraphId = Utils::getFrontendInput(connFd); - bool isExist = sqlite->isGraphIdExist(exist_g_i); + bool isExist = sqlite->isGraphIdExist(existingGraphId); if (!isExist) { - string Err_msg = "Error: Graph ID you entered is not in the system"; - result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + string errorMsg = "Error: Graph ID you entered is not in the system"; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -993,20 +993,21 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } return; } - string exist_success_msg = "Set data streaming into graph ID: " + exist_g_i; - result_wr = write(connFd, exist_success_msg.c_str(), exist_success_msg.length()); + string existingSuccessMsg = "Set data streaming into graph ID: " + existingGraphId; + result_wr = write(connFd, existingSuccessMsg.c_str(), existingSuccessMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size()); + result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), + Conts::CARRIAGE_RETURN_NEW_LINE.size()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } - graphId = exist_g_i; + graphId = existingGraphId; partitionAlgo = sqlite->getPartitionAlgoByGraphID(graphId); } else { @@ -1015,19 +1016,19 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } graphId = to_string(nextID); - string default_id = "Do you use default graph ID: "+ graphId +"(y/n) ? "; - int result_wr = write(connFd, default_id.c_str(), default_id.length()); + string defaultIdMsg = "Do you use default graph ID: "+ graphId +"(y/n) ? "; + result_wr = write(connFd, defaultIdMsg.c_str(), defaultIdMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } // Get user response. - string default_g_i = Utils::getFrontendInput(connFd); + string isDefaultGraphId = Utils::getFrontendInput(connFd); - if (default_g_i != "y") { - string input_graph_id = "Input your graph ID: "; - result_wr = write(connFd, input_graph_id.c_str(), input_graph_id.length()); + if (isDefaultGraphId != "y") { + string inputGraphIdMsg = "Input your graph ID: "; + result_wr = write(connFd, inputGraphIdMsg.c_str(), inputGraphIdMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1035,12 +1036,12 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - string user_graph_id = Utils::getFrontendInput(connFd); + string userGraphId = Utils::getFrontendInput(connFd); - bool isExist = sqlite->isGraphIdExist(user_graph_id); + bool isExist = sqlite->isGraphIdExist(userGraphId); if (isExist) { - string Err_msg = "Error: Graph ID you entered already exists"; - result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + string errorMsg = "Error: Graph ID you entered already exists"; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1049,8 +1050,8 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } - string user_success_msg = "Set graph ID successfully"; - result_wr = write(connFd, user_success_msg.c_str(), user_success_msg.length()); + string userGraphIdSuccessMsg = "Set graph ID successfully"; + result_wr = write(connFd, userGraphIdSuccessMsg.c_str(), userGraphIdSuccessMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1062,25 +1063,25 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - graphId = user_graph_id; + graphId = userGraphId; } - std::string partition_selection = "Select the partitioning technique\n" + std::string partitionSelectionMsg = "Select the partitioning technique\n" "\toption 1: Hash partitioning\n" "\toption 2: Fennel partitioning\n" "\toption 3: LDG partitioning\n" "Choose an option(1,2,3): "; - result_wr = write(connFd, partition_selection.c_str(), partition_selection.length()); + result_wr = write(connFd, partitionSelectionMsg.c_str(), partitionSelectionMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; return; } // Get user response. - string partition_algo = Utils::getFrontendInput(connFd); + string partitionAlgo = Utils::getFrontendInput(connFd); - if (partition_algo == "1" || partition_algo == "2" || partition_algo == "3") { - string partition_success_msg = "Set partition technique: " + partition_algo; + if (partitionAlgo == "1" || partitionAlgo == "2" || partitionAlgo == "3") { + string partition_success_msg = "Set partition technique: " + partitionAlgo; result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1093,10 +1094,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - partitionAlgo = partition_algo; + partitionAlgo = partitionAlgo; } else { - string Err_msg = "Error: invalid partition option: "+partition_algo; - result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); + string errorMsg = "Error: invalid partition option: "+partitionAlgo; + result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1113,8 +1114,8 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - string is_directed = Utils::getFrontendInput(connFd); - if (is_directed == "y") { + string isDirected = Utils::getFrontendInput(connFd); + if (isDirected == "y") { direction = Conts::DIRECTED; } else { direction = Conts::UNDIRECTED; @@ -1154,7 +1155,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c // user need to start relevant kafka cluster using relevant IP address // read relevant IP address from given file path string message = "Send file path to the kafka configuration file."; - int result_wr = write(connFd, message.c_str(), message.length()); + result_wr = write(connFd, message.c_str(), message.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); *loop_exit_p = true; @@ -1239,13 +1240,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c stoi(graphId), spt::getPartitioner(partitionAlgo)); - if (exist_g != "y") { + if (existingGraph != "y") { string path = "kafka:\\" + topic_name_s + ":" + group_id; std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); string uploadStartTime = ctime(&time); string sqlStatement = "INSERT INTO graph (idgraph,idalgorithm,name,upload_path, upload_start_time, upload_end_time," - "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, is_directed) VALUES("+ + "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, isDirected) VALUES("+ graphId+","+partitionAlgo+",\"" +topic_name_s + "\", \"" + path + "\", \"" +uploadStartTime+ "\", \"\",\"" + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\","+ to_string(numberOfPartitions)+ ", \"\",\"" +direction+"\")"; From da04cefc65e27587c8d742319e3c0a8744c015f6 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Tue, 21 Jan 2025 02:05:56 +0530 Subject: [PATCH 7/7] Fix the issue when changing the meta DB --- ddl/metadb.sql | 8 ++++---- src/frontend/JasmineGraphFrontEnd.cpp | 14 +++++++------- src/util/dbinterface/DBInterface.cpp | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ddl/metadb.sql b/ddl/metadb.sql index 7a349382a..d917eed34 100644 --- a/ddl/metadb.sql +++ b/ddl/metadb.sql @@ -93,7 +93,7 @@ INSERT INTO graph_status (idgraph_status, description) VALUES (2, 'OPERATIONAL') INSERT INTO graph_status (idgraph_status, description) VALUES (3, 'DELETED'); INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL'); -INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (1, 'HASH'); -INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (2, 'FENNEL'); -INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (3, 'LDG'); -INSERT INTO partitioning_algorithm (idalgorithm, algorithm_name) VALUES (4, 'METIS'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (1, 'HASH'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (2, 'FENNEL'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (3, 'LDG'); +INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (4, 'METIS'); diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index 5ad275ca5..1d7b56eb1 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -1078,10 +1078,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - string partitionAlgo = Utils::getFrontendInput(connFd); + string partitionAlgoInput = Utils::getFrontendInput(connFd); - if (partitionAlgo == "1" || partitionAlgo == "2" || partitionAlgo == "3") { - string partition_success_msg = "Set partition technique: " + partitionAlgo; + if (partitionAlgoInput == "1" || partitionAlgoInput == "2" || partitionAlgoInput == "3") { + string partition_success_msg = "Set partition technique: " + partitionAlgoInput; result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1094,9 +1094,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - partitionAlgo = partitionAlgo; + partitionAlgo = partitionAlgoInput; } else { - string errorMsg = "Error: invalid partition option: "+partitionAlgo; + string errorMsg = "Error: invalid partition option: "+partitionAlgoInput; result_wr = write(connFd, errorMsg.c_str(), errorMsg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1245,8 +1245,8 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); string uploadStartTime = ctime(&time); string sqlStatement = - "INSERT INTO graph (idgraph,idalgorithm,name,upload_path, upload_start_time, upload_end_time," - "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, isDirected) VALUES("+ + "INSERT INTO graph (idgraph,id_algorithm,name,upload_path, upload_start_time, upload_end_time," + "graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, is_directed) VALUES("+ graphId+","+partitionAlgo+",\"" +topic_name_s + "\", \"" + path + "\", \"" +uploadStartTime+ "\", \"\",\"" + to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\","+ to_string(numberOfPartitions)+ ", \"\",\"" +direction+"\")"; diff --git a/src/util/dbinterface/DBInterface.cpp b/src/util/dbinterface/DBInterface.cpp index fa5db73c3..cd81887f8 100644 --- a/src/util/dbinterface/DBInterface.cpp +++ b/src/util/dbinterface/DBInterface.cpp @@ -174,7 +174,7 @@ int DBInterface::getNextGraphId() { } std::string DBInterface::getPartitionAlgoByGraphID(std::string graphID) { - std::string query = "SELECT idalgorithm FROM graph WHERE idgraph = ?;"; + std::string query = "SELECT id_algorithm FROM graph WHERE idgraph = ?;"; sqlite3_stmt* stmt; if (sqlite3_prepare_v2(database, query.c_str(), -1, &stmt, NULL) != SQLITE_OK) {