From 6dc6699e2c758cd84e308f9a900f7fd1ab16beae Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Wed, 27 Nov 2024 11:56:45 -0500 Subject: [PATCH] Field cleanup should be skippable --- CHANGELOG.md | 5 +++- VERSION | 2 +- .../connect/sink/ClickHouseSinkConfig.java | 24 +++++++++++++++++++ .../connect/sink/db/ClickHouseWriter.java | 4 ++++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1b3bdcc..e7877fdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,12 @@ +# 1.2.6 +* Detect if table schema has changed and refresh the schema +* Allow bypassing field cleanup + # 1.2.5 * Remove redis state provide since we are using KeeperMap for state storage * Remove unused avro property from `build.gradle.kts` * Trim schemaless data to only pass the fields that are in the table * Allow bypassing the schema validation -* Detect if table schema has changed and refresh the schema # 1.2.4 * Adjusting underlying client version to 0.7.0 diff --git a/VERSION b/VERSION index 212c3bf8..a6c5252c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.2.5 +v1.2.6 diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index 552be1c0..063840d6 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -49,6 +49,7 @@ public class ClickHouseSinkConfig { public static final String DATE_TIME_FORMAT = "dateTimeFormats"; public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch"; public static final String BYPASS_SCHEMA_VALIDATION = "bypassSchemaValidation"; + public static final String BYPASS_FIELD_CLEANUP = "bypassFieldCleanup"; public static final int MILLI_IN_A_SEC = 1000; private static final String databaseDefault = "default"; @@ -90,6 +91,7 @@ public class ClickHouseSinkConfig { private final String clientVersion; private final boolean tolerateStateMismatch; private final boolean bypassSchemaValidation; + private final boolean bypassFieldCleanup; public enum InsertFormats { NONE, @@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map props) { this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1"); this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false")); this.bypassSchemaValidation = Boolean.parseBoolean(props.getOrDefault(BYPASS_SCHEMA_VALIDATION, "false")); + this.bypassFieldCleanup = Boolean.parseBoolean(props.getOrDefault(BYPASS_FIELD_CLEANUP, "false")); LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}", hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce); @@ -285,6 +288,7 @@ public void addClickHouseSetting(String key, String value, boolean override) { private static ConfigDef createConfigDef() { ConfigDef configDef = new ConfigDef(); + //TODO: At some point we should group these more clearly String group = "Connection"; int orderInGroup = 0; configDef.define(HOSTNAME, @@ -568,6 +572,26 @@ private static ConfigDef createConfigDef() { ConfigDef.Width.SHORT, "Tolerate state mismatch." ); + configDef.define(BYPASS_SCHEMA_VALIDATION, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + "Bypass schema validation. default: false", + group, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Bypass schema validation." + ); + configDef.define(BYPASS_FIELD_CLEANUP, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + "Bypass field cleanup. default: false", + group, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Bypass field cleanup." + ); return configDef; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 441a690d..cfedf3d5 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -967,6 +967,10 @@ protected void doInsertJsonV2(List records, Table table, QueryIdentifier } protected Map cleanupExtraFields(Map m, Table t) { + if (csc.isBypassFieldCleanup()) { + return m; + } + Map cleaned = new HashMap<>(); for (Column c : t.getRootColumnsList()) { if (m.containsKey(c.getName())) {