From ad9b30f757ef42ba5b088bdc911a04d1c42913a0 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 3 Jan 2025 07:55:50 +0000 Subject: [PATCH] Refactor query_conn --- programs/local/LocalServer.cpp | 76 ++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 835fb82c5d8..abc2f3f2cdc 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1493,58 +1493,74 @@ void close_conn(chdb_conn ** conn) struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) { + // Add connection validity check under global lock std::lock_guard global_lock(global_connection_mutex); - if (!conn || !conn->connected || !conn->queue) - return new local_result_v2{nullptr, 0, nullptr, 0, 0, 0, nullptr}; + { + auto * result = new local_result_v2{}; + const char * error = "Invalid or closed connection"; + result->error_message = new char[strlen(error) + 1]; + std::strcpy(result->error_message, error); + return result; + } + // Release global lock before processing query auto * queue = static_cast(conn->queue); + local_result_v2 * result = nullptr; + try { - std::unique_lock lock(queue->mutex); - // Wait until any ongoing query completes - queue->query_cv.wait(lock, [queue]() { return !queue->has_query || queue->shutdown; }); - - if (queue->shutdown) { - auto * result = new local_result_v2{}; - const char * error = "Connection is shutting down"; - result->error_message = new char[strlen(error) + 1]; - std::strcpy(result->error_message, error); - return result; - } + std::unique_lock lock(queue->mutex); + // Wait until any ongoing query completes + queue->result_cv.wait(lock, [queue]() { return !queue->has_query || queue->shutdown; }); - queue->current_query = {query, format}; - queue->has_query = true; - queue->current_result = nullptr; - } - queue->query_cv.notify_one(); + if (queue->shutdown) + { + result = new local_result_v2{}; + const char * error = "Connection is shutting down"; + result->error_message = new char[strlen(error) + 1]; + std::strcpy(result->error_message, error); + return result; + } - local_result_v2 * result = nullptr; - { - std::unique_lock lock(queue->mutex); - queue->result_cv.wait(lock, [queue]() { return queue->current_result != nullptr || queue->shutdown; }); + // Set new query + queue->current_query = {query, format}; + queue->has_query = true; + queue->current_result = nullptr; + } + queue->query_cv.notify_one(); - if (!queue->shutdown && queue->current_result) { - result = queue->current_result; - if (result->len == 0) + std::unique_lock lock(queue->mutex); + queue->result_cv.wait(lock, [queue]() { return queue->current_result != nullptr || queue->shutdown; }); + + if (!queue->shutdown && queue->current_result) { - LOG_DEBUG(getLogger("CHDB"), "Empty result returned for query: {}", query); + result = queue->current_result; + queue->current_result = nullptr; + queue->has_query = false; } - queue->current_result = nullptr; } + queue->query_cv.notify_one(); } - - queue->query_cv.notify_one(); - if (result == nullptr) + catch (...) { + // Handle any exceptions during query processing result = new local_result_v2{}; const char * error = "Error occurred while processing query"; result->error_message = new char[strlen(error) + 1]; std::strcpy(result->error_message, error); } + if (!result) + { + result = new local_result_v2{}; + const char * error = "Query processing failed"; + result->error_message = new char[strlen(error) + 1]; + std::strcpy(result->error_message, error); + } + return result; }