From 9216ce1a8a08255e8a3ee3cc8bc5dd9ab17e6393 Mon Sep 17 00:00:00 2001 From: lizhilin Date: Fri, 21 Feb 2025 19:00:42 +0800 Subject: [PATCH] 1 --- .../config/ClickhouseSourceOptions.java | 18 ++++++++++++++++++ .../clickhouse/source/ClickhouseSource.java | 18 +++++++++--------- .../source/ClickhouseSourceFactory.java | 3 ++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java index 6354cf22a39..44c76e40373 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java @@ -17,9 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import java.util.List; +import java.util.Map; + public class ClickhouseSourceOptions { public static final Option SQL = @@ -27,4 +32,17 @@ public class ClickhouseSourceOptions { .stringType() .noDefaultValue() .withDescription("Clickhouse sql used to query data"); + + /** clickhouse multi table */ + public static final Option>> TABLE_LIST = + Options.key("table_list") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("table list config"); + + public static final Option TABLE_PATH = + Options.key("table_path") + .stringType() + .noDefaultValue() + .withDescription("table full path"); } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 42b8aba4dc3..8208ecc8c9f 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -56,15 +56,15 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_LIST; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.CLICKHOUSE_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.SERVER_TIME_ZONE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceOptions.SQL; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceOptions.TABLE_LIST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceOptions.TABLE_PATH; public class ClickhouseSource implements SeaTunnelSource, diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index ffa7e3a1717..c0411f711c9 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -37,6 +37,7 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.SERVER_TIME_ZONE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseBaseOptions.USERNAME; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceOptions.SQL; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceOptions.TABLE_LIST; @AutoService(Factory.class) public class ClickhouseSourceFactory implements TableSourceFactory { @@ -57,7 +58,7 @@ public OptionRule optionRule() { return OptionRule.builder() .required(HOST, DATABASE, SQL, USERNAME, PASSWORD) .exclusive(SQL, TABLE_LIST) - .optional(CLICKHOUSE_CONFIG) + .optional(CLICKHOUSE_CONFIG, SERVER_TIME_ZONE) .build(); }