Skip to content

Commit

Permalink
Merge pull request #277 from thamindumk/fix/partition_issue
Browse files Browse the repository at this point in the history
Fix kafka stream flow and hash partitioning issue
  • Loading branch information
miyurud authored Feb 28, 2025
2 parents 2b91cde + da04cef commit 7611e83
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 89 deletions.
18 changes: 15 additions & 3 deletions ddl/metadb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ create table graph
upload_start_time TIME not null,
upload_end_time TIME not null,
graph_status_idgraph_status INTEGER not null,
vertexcount BIGINT,
id_algorithm INTEGER,
vertexcount BIGINT default 0,
centralpartitioncount INTEGER,
edgecount INTEGER,
edgecount INTEGER default 0,
upload_time VARCHAR(8),
train_status VARCHAR(20),
feature_count INTEGER(100),
Expand Down Expand Up @@ -81,7 +82,18 @@ create table worker_has_partition
worker_idworker INTEGER
);

create table partitioning_algorithm
(
id_algorithm INTEGER not null primary key,
algorithm_name VARCHAR not null
);

INSERT INTO graph_status (idgraph_status, description) VALUES (1, 'LOADING');
INSERT INTO graph_status (idgraph_status, description) VALUES (2, 'OPERATIONAL');
INSERT INTO graph_status (idgraph_status, description) VALUES (3, 'DELETED');
INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL');
INSERT INTO graph_status (idgraph_status, description) VALUES (4, 'NONOPERATIONAL');

INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (1, 'HASH');
INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (2, 'FENNEL');
INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (3, 'LDG');
INSERT INTO partitioning_algorithm (id_algorithm, algorithm_name) VALUES (4, 'METIS');
267 changes: 202 additions & 65 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ limitations under the License.
#include "../query/processor/cypher/astbuilder/ASTBuilder.h"
#include "../query/processor/cypher/astbuilder/ASTNode.h"
#include "../query/processor/cypher/semanticanalyzer/SemanticAnalyzer.h"
#include "../partitioner/stream/Partitioner.h"


#define MAX_PENDING_CONNECTIONS 10
Expand Down Expand Up @@ -139,9 +140,8 @@ void *frontendservicesesion(void *dummyPt) {
std::string kafka_server_IP;
cppkafka::Configuration configs;
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite);

vector<DataPublisher *> workerClients;

bool workerClientsInitialized = false;

bool loop_exit = false;
Expand Down Expand Up @@ -957,39 +957,205 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p) {
string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?";
int result_wr = write(connFd, msg_1.c_str(), msg_1.length());
string exist = "Do you want to stream into existing graph(y/n) ? ";
int result_wr = write(connFd, exist.c_str(), exist.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, "\r\n", 2);

// Get user response.
string existingGraph = Utils::getFrontendInput(connFd);
string graphId;
string partitionAlgo;
string direction;

if (existingGraph == "y") {
string existingGraphIdMsg = "Send the existing graph ID ? ";
result_wr = write(connFd, existingGraphIdMsg.c_str(), existingGraphIdMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
string existingGraphId = Utils::getFrontendInput(connFd);

bool isExist = sqlite->isGraphIdExist(existingGraphId);
if (!isExist) {
string errorMsg = "Error: Graph ID you entered is not in the system";
result_wr = write(connFd, errorMsg.c_str(), errorMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
return;
}
string existingSuccessMsg = "Set data streaming into graph ID: " + existingGraphId;
result_wr = write(connFd, existingSuccessMsg.c_str(), existingSuccessMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(),
Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
graphId = existingGraphId;
partitionAlgo = sqlite->getPartitionAlgoByGraphID(graphId);

} else {
int nextID = sqlite->getNextGraphId();
if (nextID < 0) {
return;
}
graphId = to_string(nextID);
string defaultIdMsg = "Do you use default graph ID: "+ graphId +"(y/n) ? ";
result_wr = write(connFd, defaultIdMsg.c_str(), defaultIdMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
string isDefaultGraphId = Utils::getFrontendInput(connFd);

if (isDefaultGraphId != "y") {
string inputGraphIdMsg = "Input your graph ID: ";
result_wr = write(connFd, inputGraphIdMsg.c_str(), inputGraphIdMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

// Get user response.
string userGraphId = Utils::getFrontendInput(connFd);

bool isExist = sqlite->isGraphIdExist(userGraphId);
if (isExist) {
string errorMsg = "Error: Graph ID you entered already exists";
result_wr = write(connFd, errorMsg.c_str(), errorMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
return;
}

string userGraphIdSuccessMsg = "Set graph ID successfully";
result_wr = write(connFd, userGraphIdSuccessMsg.c_str(), userGraphIdSuccessMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
graphId = userGraphId;
}

std::string partitionSelectionMsg = "Select the partitioning technique\n"
"\toption 1: Hash partitioning\n"
"\toption 2: Fennel partitioning\n"
"\toption 3: LDG partitioning\n"
"Choose an option(1,2,3): ";
result_wr = write(connFd, partitionSelectionMsg.c_str(), partitionSelectionMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
string partitionAlgoInput = Utils::getFrontendInput(connFd);

if (partitionAlgoInput == "1" || partitionAlgoInput == "2" || partitionAlgoInput == "3") {
string partition_success_msg = "Set partition technique: " + partitionAlgoInput;
result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
partitionAlgo = partitionAlgoInput;
} else {
string errorMsg = "Error: invalid partition option: "+partitionAlgoInput;
result_wr = write(connFd, errorMsg.c_str(), errorMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
return;
}

string checkDirection = "Is this graph Directed (y/n)? ";
result_wr = write(connFd, checkDirection.c_str(), checkDirection.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
string isDirected = Utils::getFrontendInput(connFd);
if (isDirected == "y") {
direction = Conts::DIRECTED;
} else {
direction = Conts::UNDIRECTED;
}

string checkGraphType = "Graph type received";
result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
}

string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?";
result_wr = write(connFd, msg_1.c_str(), msg_1.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

// Get user response.
char user_res[FRONTEND_DATA_LENGTH + 1];
bzero(user_res, FRONTEND_DATA_LENGTH + 1);
read(connFd, user_res, FRONTEND_DATA_LENGTH);
string user_res_s(user_res);
user_res_s = Utils::trim_copy(user_res_s);
for (char &c : user_res_s) {
c = tolower(c);
}
string default_kafka = Utils::getFrontendInput(connFd);
// use default kafka consumer details
string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER
if (user_res_s == "y") {
if (default_kafka == "y") {
kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host");
configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}};
} else {
// user need to start relevant kafka cluster using relevant IP address
// read relevant IP address from given file path
string message = "Send file path to the kafka configuration file.";
int result_wr = write(connFd, message.c_str(), message.length());
result_wr = write(connFd, message.c_str(), message.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
Expand Down Expand Up @@ -1062,64 +1228,35 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
*loop_exit_p = true;
return;
}
string checkDirection = "Is this graph Directed (y/n)? ";
result_wr = write(connFd, checkDirection.c_str(), checkDirection.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
char isDirected[FRONTEND_DATA_LENGTH + 1];
bzero(isDirected, FRONTEND_DATA_LENGTH + 1);
read(connFd, isDirected, FRONTEND_DATA_LENGTH);
string is_directed(isDirected);
is_directed = Utils::trim_copy(is_directed);
for (char &c : is_directed) {
c = tolower(c);
}
string direction;
if (is_directed == "y") {
direction = Conts::DIRECTED;
} else {
direction = Conts::UNDIRECTED;
}

string checkGraphType = "Graph type received";
result_wr = write(connFd, checkGraphType.c_str(), checkGraphType.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite);
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite);

string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
string uploadStartTime = ctime(&time);
string sqlStatement =
"INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status,"
"vertexcount,centralpartitioncount,edgecount,is_directed) VALUES(\"" +
topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" +
to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\",\"" +direction+"\")";
int newGraphID = sqlite->runInsert(sqlStatement);

StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite,
stoi(graphId),
spt::getPartitioner(partitionAlgo));

if (existingGraph != "y") {
string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
string uploadStartTime = ctime(&time);
string sqlStatement =
"INSERT INTO graph (idgraph,id_algorithm,name,upload_path, upload_start_time, upload_end_time,"
"graph_status_idgraph_status, vertexcount, centralpartitioncount, edgecount, is_directed) VALUES("+
graphId+","+partitionAlgo+",\"" +topic_name_s + "\", \"" + path + "\", \"" +uploadStartTime+ "\", \"\",\"" +
to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\","+ to_string(numberOfPartitions)+
", \"\",\"" +direction+"\")";
int newGraphID = sqlite->runInsert(sqlStatement);
} else {
std::string sqlStatement = "UPDATE graph SET graph_status_idgraph_status ="+
to_string(Conts::GRAPH_STATUS::STREAMING)+
" WHERE idgraph = " + graphId;
sqlite->runUpdate(sqlStatement);
}
frontend_logger.info("Start listening to " + topic_name_s);
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}
Expand Down
Loading

0 comments on commit 7611e83

Please sign in to comment.