From b33ebe2893dbd4cfec856ad5c21d24f0fd7c6a32 Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Mon, 8 Jan 2024 15:46:19 +0800 Subject: [PATCH] feat(sql router): new interface (#3630) SQLTODAG(string) -> DAG {name, sql, producers[]} --- hybridse/include/sdk/base.h | 1 - .../com/_4paradigm/openmldb/sdk/DAGNode.java | 31 +++++++ .../_4paradigm/openmldb/sdk/SqlExecutor.java | 7 ++ .../openmldb/sdk/impl/SqlClusterExecutor.java | 26 ++++++ .../openmldb/jdbc/SQLRouterSmokeTest.java | 57 +++++++++++++ src/sdk/sql_router.cc | 83 +++++++++++++++++++ src/sdk/sql_router.h | 21 +++++ src/sdk/sql_router_sdk.i | 3 + src/sdk/sql_router_test.cc | 63 ++++++++++++++ 9 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/DAGNode.java diff --git a/hybridse/include/sdk/base.h b/hybridse/include/sdk/base.h index e5da48094f8..8d0cd4d9e1c 100644 --- a/hybridse/include/sdk/base.h +++ b/hybridse/include/sdk/base.h @@ -20,7 +20,6 @@ #include #include -#include #include #include #include diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/DAGNode.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/DAGNode.java new file mode 100644 index 00000000000..c3334f281b1 --- /dev/null +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/DAGNode.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2023 OpenMLDB authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com._4paradigm.openmldb.sdk; + +import java.util.ArrayList; + +public class DAGNode { + public DAGNode(String name, String sql, ArrayList producers) { + this.name = name; + this.sql = sql; + this.producers = producers; + } + + public String name; + public String sql; + public ArrayList producers; +} diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java index b55da67a430..1d81c271b7f 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java @@ -83,6 +83,13 @@ PreparedStatement getBatchRequestPreparedStmt(String db, String sql, NS.TableInfo getTableInfo(String db, String table); List getTableNames(String db); + /** + * Parse SQL query into DAG representation + * + * @param query SQL query string + * @throws SQLException exception if input query not valid for SQL parser + */ + DAGNode SQLToDAG(String query) throws SQLException; void close(); } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index 3a88fb9489e..0f1cd191911 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -665,4 +665,30 @@ public boolean updateOfflineTableInfo(NS.TableInfo info) { public boolean refreshCatalog() { return sqlRouter.RefreshCatalog(); } + + @Override + public DAGNode SQLToDAG(String query) throws SQLException { + Status status = new Status(); + final com._4paradigm.openmldb.DAGNode dag = sqlRouter.SQLToDAG(query, status); + + try { + if (status.getCode() != 0) { + throw new SQLException(status.ToString()); + } + return convertDAG(dag); + } finally { + dag.delete(); + status.delete(); + } + } + + private static DAGNode convertDAG(com._4paradigm.openmldb.DAGNode dag) { + ArrayList convertedProducers = new ArrayList<>(); + for (com._4paradigm.openmldb.DAGNode producer : dag.getProducers()) { + final DAGNode converted = convertDAG(producer); + convertedProducers.add(converted); + } + + return new DAGNode(dag.getName(), dag.getSql(), convertedProducers); + } } diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index bc92f20d3f5..60a0ef744f5 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -21,6 +21,7 @@ import com._4paradigm.openmldb.common.Pair; import com._4paradigm.openmldb.proto.NS; import com._4paradigm.openmldb.sdk.Column; +import com._4paradigm.openmldb.sdk.DAGNode; import com._4paradigm.openmldb.sdk.Schema; import com._4paradigm.openmldb.sdk.SdkOption; import com._4paradigm.openmldb.sdk.SqlExecutor; @@ -870,4 +871,60 @@ public void testMergeSQL() throws SQLException { + "(select db.main.id as merge_id_3, db.main.c1 as merge_c1_3, sum(c2) over w1 from main window w1 as (union (select \"\" as id, * from t1) partition by c1 order by c2 rows between unbounded preceding and current row)) as out3 " + "on out0.merge_id_0 = out3.merge_id_3 and out0.merge_c1_0 = out3.merge_c1_3;"); } + + @Test(dataProvider = "executor") + public void testSQLToDag(SqlExecutor router) throws SQLException { + String sql = " WITH q1 as (WITH q3 as (select * from t1 LIMIT 10), q4 as (select * from t2) select * from q3 left join q4 on q3.id = q4.id)," + + + "q2 as (select * from t3)" + + "select * from q1 last join q2 on q1.id = q2.id"; + + DAGNode dag = router.SQLToDAG(sql); + + Assert.assertEquals(dag.name, ""); + Assert.assertEquals(dag.sql, "SELECT\n" + + " *\n" + + "FROM\n" + + " q1\n" + + " LAST JOIN\n" + + " q2\n" + + " ON q1.id = q2.id\n"); + Assert.assertEquals(dag.producers.size(), 2); + + DAGNode input1 = dag.producers.get(0); + Assert.assertEquals(input1.name, "q1"); + Assert.assertEquals(input1.sql, "SELECT\n" + + " *\n" + + "FROM\n" + + " q3\n" + + " LEFT JOIN\n" + + " q4\n" + + " ON q3.id = q4.id\n"); + Assert.assertEquals(2, input1.producers.size()); + + DAGNode input2 = dag.producers.get(1); + Assert.assertEquals(input2.name, "q2"); + Assert.assertEquals(input2.sql, "SELECT\n" + + " *\n" + + "FROM\n" + + " t3\n"); + Assert.assertEquals(input2.producers.size(), 0); + + DAGNode q1In1 = input1.producers.get(0); + Assert.assertEquals(q1In1.producers.size(), 0); + Assert.assertEquals(q1In1.name, "q3"); + Assert.assertEquals(q1In1.sql, "SELECT\n" + + " *\n" + + "FROM\n" + + " t1\n" + + "LIMIT 10\n"); + + DAGNode q1In2 = input1.producers.get(1); + Assert.assertEquals(q1In2.producers.size(), 0); + Assert.assertEquals(q1In2.name, "q4"); + Assert.assertEquals(q1In2.sql, "SELECT\n" + + " *\n" + + "FROM\n" + + " t2\n"); + } } diff --git a/src/sdk/sql_router.cc b/src/sdk/sql_router.cc index 72e3adeaa3d..32c555a096b 100644 --- a/src/sdk/sql_router.cc +++ b/src/sdk/sql_router.cc @@ -15,11 +15,17 @@ */ #include "sdk/sql_router.h" + #include + +#include "absl/strings/substitute.h" #include "base/ddl_parser.h" #include "glog/logging.h" #include "schema/schema_adapter.h" #include "sdk/sql_cluster_router.h" +#include "zetasql/parser/parser.h" +#include "zetasql/public/error_helpers.h" +#include "zetasql/public/error_location.pb.h" namespace openmldb::sdk { @@ -274,4 +280,81 @@ std::vector> GetDependentTables( return tables; } +std::shared_ptr QueryToDAG(const zetasql::ASTQuery* query, absl::string_view name) { + std::vector> producers; + if (query->with_clause() != nullptr) { + for (auto with_entry : query->with_clause()->with()) { + producers.push_back(QueryToDAG(with_entry->query(), with_entry->alias()->GetAsStringView())); + } + } + + // SQL without WITH clause + std::string sql = zetasql::Unparse(query->query_expr()); + if (query->order_by() != nullptr) { + absl::StrAppend(&sql, zetasql::Unparse(query->order_by())); + } + if (query->limit_offset() != nullptr) { + absl::StrAppend(&sql, zetasql::Unparse(query->limit_offset())); + } + + return std::make_shared(name, sql, producers); +} + +std::shared_ptr SQLRouter::SQLToDAG(const std::string& query, hybridse::sdk::Status* status) { + std::unique_ptr parser_output; + zetasql::ParserOptions parser_opts; + zetasql::LanguageOptions language_opts; + language_opts.EnableLanguageFeature(zetasql::FEATURE_V_1_3_COLUMN_DEFAULT_VALUE); + parser_opts.set_language_options(&language_opts); + auto zetasql_status = zetasql::ParseStatement(query, parser_opts, &parser_output); + zetasql::ErrorLocation location; + if (!zetasql_status.ok()) { + zetasql::ErrorLocation location; + GetErrorLocation(zetasql_status, &location); + status->msg = zetasql::FormatError(zetasql_status); + status->code = hybridse::common::kSyntaxError; + return {}; + } + + auto stmt = parser_output->statement(); + if (stmt == nullptr) { + status->msg = "not a statement"; + status->code = hybridse::common::kSyntaxError; + return {}; + } + + if (stmt->node_kind() != zetasql::AST_QUERY_STATEMENT) { + status->msg = "not a query"; + status->code = hybridse::common::kSyntaxError; + return {}; + } + + auto const query_stmt = stmt->GetAsOrNull(); + if (query_stmt == nullptr) { + status->msg = "not a query"; + status->code = hybridse::common::kSyntaxError; + return {}; + } + + status->code = hybridse::common::kOk; + return QueryToDAG(query_stmt->query(), ""); +} + +bool DAGNode::operator==(const DAGNode& rhs) const noexcept { + return name == rhs.name && sql == rhs.sql && + absl::c_equal(producers, rhs.producers, + [](const std::shared_ptr& left, const std::shared_ptr& right) { + return left != nullptr && right != nullptr && *left == *right; + }); +} + +std::ostream& operator<<(std::ostream& os, const DAGNode& obj) { return os << obj.DebugString(); } + +std::string DAGNode::DebugString() const { + return absl::Substitute("{$0, $1, [$2]}", name, sql, + absl::StrJoin(producers, ",", [](std::string* out, const std::shared_ptr& e) { + absl::StrAppend(out, (e == nullptr ? "" : e->DebugString())); + })); +} + } // namespace openmldb::sdk diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index 68186a83b00..4317d435f8c 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -80,6 +80,22 @@ class ExplainInfo { virtual const std::string& GetRequestDbName() = 0; }; +struct DAGNode { + DAGNode(absl::string_view name, absl::string_view sql) : name(name), sql(sql) {} + DAGNode(absl::string_view name, absl::string_view sql, const std::vector>& producers) + : name(name), sql(sql), producers(producers) {} + + std::string name; + std::string sql; + std::vector> producers; + + bool operator==(const DAGNode& op) const noexcept; + + std::string DebugString() const; + + friend std::ostream& operator<<(std::ostream& os, const DAGNode& obj); +}; + class QueryFuture { public: QueryFuture() {} @@ -234,6 +250,11 @@ class SQLRouter { virtual bool IsOnlineMode() = 0; virtual std::string GetDatabase() = 0; + + // parse SQL query into DAG representation + // + // Optional CONFIG clause from SQL query statement is skipped in output DAG + std::shared_ptr SQLToDAG(const std::string& query, hybridse::sdk::Status* status); }; std::shared_ptr NewClusterSQLRouter(const SQLRouterOptions& options); diff --git a/src/sdk/sql_router_sdk.i b/src/sdk/sql_router_sdk.i index 22ee63b3e6d..07bb3d5741b 100644 --- a/src/sdk/sql_router_sdk.i +++ b/src/sdk/sql_router_sdk.i @@ -69,6 +69,7 @@ %template(VectorUint32) std::vector; %template(VectorString) std::vector; +%shared_ptr(openmldb::sdk::DAGNode); %{ #include "sdk/sql_router.h" #include "sdk/result_set.h" @@ -117,3 +118,5 @@ using openmldb::sdk::DefaultValueContainer; %template(DBTable) std::pair; %template(DBTableVector) std::vector>; + +%template(DAGNodeList) std::vector>; diff --git a/src/sdk/sql_router_test.cc b/src/sdk/sql_router_test.cc index cc91375c6d7..daa2a1ed059 100644 --- a/src/sdk/sql_router_test.cc +++ b/src/sdk/sql_router_test.cc @@ -1226,6 +1226,69 @@ TEST_F(SQLRouterTest, DDLParseMethodsCombineIndex) { ddl_list.at(0)); } +TEST_F(SQLRouterTest, SQLToDAG) { + auto sql = R"(WITH q1 as ( + WITH q3 as (select * from t1 ORDER BY ts), + q4 as (select * from t2 LIMIT 10) + + select * from q3 left join q4 on q3.key = q4.key + ), + q2 as (select * from t3) + + select * from q1 last join q2 on q1.id = q2.id)"; + + + hybridse::sdk::Status status; + auto dag = router_->SQLToDAG(sql, &status); + ASSERT_TRUE(status.IsOK()); + + std::string_view q3 = R"(SELECT + * +FROM + t1 +ORDER BY ts +)"; + std::string_view q4 = R"(SELECT + * +FROM + t2 +LIMIT 10 +)"; + std::string_view q2 = R"(SELECT + * +FROM + t3 +)"; + std::string_view q1 = R"(SELECT + * +FROM + q3 + LEFT JOIN + q4 + ON q3.key = q4.key +)"; + std::string_view q = R"(SELECT + * +FROM + q1 + LAST JOIN + q2 + ON q1.id = q2.id +)"; + + std::shared_ptr dag_q3 = std::make_shared("q3", q3); + std::shared_ptr dag_q4 = std::make_shared("q4", q4); + + std::shared_ptr dag_q1 = + std::make_shared("q1", q1, std::vector>({dag_q3, dag_q4})); + std::shared_ptr dag_q2 = std::make_shared("q2", q2); + + std::shared_ptr expect = + std::make_shared("", q, std::vector>({dag_q1, dag_q2})); + + EXPECT_EQ(*dag, *expect); +} + } // namespace openmldb::sdk int main(int argc, char** argv) {