From 0f7f2c0563ad78b9d2ed293050e15f3ddfbb1bf4 Mon Sep 17 00:00:00 2001 From: ckl750 Date: Tue, 18 Feb 2025 16:04:59 +0800 Subject: [PATCH 1/3] Resolve apache/seatunnel#8691 --- .../jdbc/config/JdbcSourceTableConfig.java | 5 +++ .../jdbc/internal/dialect/JdbcDialect.java | 40 ++++++++++++++++--- .../internal/dialect/mysql/MysqlDialect.java | 9 +++-- .../dialect/oracle/OracleDialect.java | 14 ++++--- .../dialect/psql/PostgresDialect.java | 7 +++- .../seatunnel/jdbc/source/ChunkSplitter.java | 5 +++ .../jdbc/source/DynamicChunkSplitter.java | 3 ++ .../jdbc/source/JdbcSourceTable.java | 1 + .../jdbc/utils/JdbcCatalogUtils.java | 2 + 9 files changed, 69 insertions(+), 17 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java index a3522e9c146..0ea60a3f251 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java @@ -64,6 +64,9 @@ public class JdbcSourceTableConfig implements Serializable { @JsonProperty("skip_analyze") private Boolean skipAnalyze; + @JsonProperty("where_condition") + private String whereConditionClause; + @Tolerate public JdbcSourceTableConfig() {} @@ -97,6 +100,8 @@ public static List of(ReadonlyConfig connectorConfig) { tableConfig.setUseSelectCount( connectorConfig.get(JdbcSourceOptions.USE_SELECT_COUNT)); tableConfig.setSkipAnalyze(connectorConfig.get(JdbcSourceOptions.SKIP_ANALYZE)); + tableConfig.setWhereConditionClause( + connectorConfig.get(JdbcSourceOptions.WHERE_CONDITION)); }); if (tableList.size() > 1) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 07fe42c07ba..3fab234b92a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -316,6 +316,30 @@ default Long approximateRowCntStatement(Connection connection, JdbcSourceTable t return SQLUtils.countForTable(connection, tableIdentifier(table.getTablePath())); } + /** + * Total number of entries in the lookup table with condition. + * + * @param connection The JDBC connection object used to connect to the database. + * @param table table info. + * @return row count + */ + default Long rowCntWithWhereCondition(Connection connection, JdbcSourceTable table) + throws SQLException { + String subQuerySQL; + if (StringUtils.isNotBlank(table.getQuery())) { + subQuerySQL = + String.format( + "SELECT * FROM (%s) AS T %s", + table.getQuery(), table.getWhereConditionClause()); + } else { + subQuerySQL = + String.format( + "SELECT * FROM %s %s", + tableIdentifier(table.getTablePath()), table.getWhereConditionClause()); + } + return SQLUtils.countForSubquery(connection, subQuerySQL); + } + /** * Performs a sampling operation on the specified column of a table in a JDBC-connected * database. @@ -335,17 +359,18 @@ default Object[] sampleDataFromColumn( int samplingRate, int fetchSize) throws Exception { + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( - "SELECT %s FROM (%s) AS T", - quoteIdentifier(columnName), table.getQuery()); + "SELECT %s FROM (%s) AS T %s", + quoteIdentifier(columnName), table.getQuery(), whereConditionClause); } else { sampleQuery = String.format( - "SELECT %s FROM %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + "SELECT %s FROM %s %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); } try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) { @@ -390,16 +415,18 @@ default Object queryNextChunkMax( Object includedLowerBound) throws SQLException { String quotedColumn = quoteIdentifier(columnName); + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM (%s) AS T1 WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + "SELECT %s FROM (%s) AS T1 %s %s >= ? ORDER BY %s ASC LIMIT %s" + ") AS T2", quotedColumn, quotedColumn, table.getQuery(), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); @@ -407,11 +434,12 @@ default Object queryNextChunkMax( sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC LIMIT %s" + ") AS T", quotedColumn, quotedColumn, tableIdentifier(table.getTablePath()), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 9968b29bc34..161e1659a7a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -151,16 +151,17 @@ public Object[] sampleDataFromColumn( int fetchSize) throws Exception { String sampleQuery; + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( - "SELECT %s FROM (%s) AS T", - quoteIdentifier(columnName), table.getQuery()); + ""SELECT %s FROM (%s) AS T %s", + quoteIdentifier(columnName), table.getQuery(), whereConditionClause); } else { sampleQuery = String.format( - "SELECT %s FROM %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + "SELECT %s FROM %s %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); } try (Statement stmt = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 683de83e384..6943137bb2d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -258,16 +258,18 @@ public Object queryNextChunkMax( Object includedLowerBound) throws SQLException { String quotedColumn = quoteIdentifier(columnName); + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM (%s) WHERE %s >= ? ORDER BY %s ASC " + + "SELECT %s FROM (%s) %s %s >= ? ORDER BY %s ASC " + ") WHERE ROWNUM <= %s", quotedColumn, quotedColumn, table.getQuery(), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); @@ -275,11 +277,12 @@ public Object queryNextChunkMax( sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC " + + "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC " + ") WHERE ROWNUM <= %s", quotedColumn, quotedColumn, tableIdentifier(table.getTablePath()), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); @@ -306,16 +309,17 @@ public Object[] sampleDataFromColumn( int samplingRate, int fetchSize) throws Exception { + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( - "SELECT %s FROM (%s) T", quoteIdentifier(columnName), table.getQuery()); + "SELECT %s FROM (%s) T %s", quoteIdentifier(columnName), table.getQuery(), whereConditionClause); } else { sampleQuery = String.format( - "SELECT %s FROM %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + "SELECT %s FROM %s %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); } try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index b56930303d7..fb15677de12 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -112,16 +112,18 @@ public Object queryNextChunkMax( String quotedColumn = quoteIdentifier(columnName); quotedColumn = convertType(quotedColumn, column.getSourceType()); + String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM (%s) AS T1 WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + "SELECT %s FROM (%s) AS T1 %s %s >= ? ORDER BY %s ASC LIMIT %s" + ") AS T2", quotedColumn, quotedColumn, table.getQuery(), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); @@ -129,11 +131,12 @@ public Object queryNextChunkMax( sqlQuery = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC LIMIT %s" + ") AS T", quotedColumn, quotedColumn, tableIdentifier(table.getTablePath()), + whereConditionClause, quotedColumn, quotedColumn, chunkSize); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java index f4da0a8d946..f7cad6f1669 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java @@ -233,6 +233,11 @@ protected Pair queryMinMax(JdbcSourceTable table, String columnN columnName, jdbcDialect.tableIdentifier(table.getTablePath())); } + + if (StringUtils.isNotBlank(config.getWhereConditionClause())) { + sqlQuery = String.format("%s %s", sqlQuery, config.getWhereConditionClause()); + } + try (Statement stmt = getOrEstablishConnection().createStatement()) { log.info("Split table, query min max: {}", sqlQuery); try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java index d958c405dfb..36144f99434 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java @@ -208,6 +208,9 @@ private List evenlyColumnSplitChunks( } private Long queryApproximateRowCnt(JdbcSourceTable table) throws SQLException { + if (StringUtils.isNotBlank(table.getWhereConditionClause())) { + return jdbcDialect.rowCntWithWhereCondition(getOrEstablishConnection(), table); + } return jdbcDialect.approximateRowCntStatement(getOrEstablishConnection(), table); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java index 8aad94c8b69..da1d8d85578 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java @@ -40,4 +40,5 @@ public class JdbcSourceTable implements Serializable { private final Boolean useSelectCount; private final Boolean skipAnalyze; private final CatalogTable catalogTable; + private final String whereConditionClause; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index 73773b559bc..91bd797508a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -94,6 +94,7 @@ public static Map getTables( .partitionEnd(tableConfig.getPartitionEnd()) .useSelectCount(tableConfig.getUseSelectCount()) .skipAnalyze(tableConfig.getSkipAnalyze()) + .whereConditionClause(tableConfig.getWhereConditionClause()) .catalogTable(catalogTable) .build(); tables.put(tablePath, jdbcSourceTable); @@ -143,6 +144,7 @@ public static Map getTables( .partitionEnd(tableConfig.getPartitionEnd()) .useSelectCount(tableConfig.getUseSelectCount()) .skipAnalyze(tableConfig.getSkipAnalyze()) + .whereConditionClause(tableConfig.getWhereConditionClause()) .catalogTable(catalogTable) .build(); From 0cdaae5cd76f7af23055055fbfb1e875f8b687a3 Mon Sep 17 00:00:00 2001 From: ckl750 Date: Thu, 20 Feb 2025 18:58:33 +0800 Subject: [PATCH 2/3] fix check code style error --- .../jdbc/internal/dialect/JdbcDialect.java | 14 +++++++++++--- .../internal/dialect/mysql/MysqlDialect.java | 9 +++++++-- .../internal/dialect/oracle/OracleDialect.java | 17 +++++++++++++---- .../internal/dialect/psql/PostgresDialect.java | 5 ++++- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 3fab234b92a..c5a0bc2baf4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -359,7 +359,10 @@ default Object[] sampleDataFromColumn( int samplingRate, int fetchSize) throws Exception { - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + : ""; String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = @@ -370,7 +373,9 @@ default Object[] sampleDataFromColumn( sampleQuery = String.format( "SELECT %s FROM %s %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); + quoteIdentifier(columnName), + tableIdentifier(table.getTablePath()), + whereConditionClause); } try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) { @@ -415,7 +420,10 @@ default Object queryNextChunkMax( Object includedLowerBound) throws SQLException { String quotedColumn = quoteIdentifier(columnName); - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + " AND" + : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 161e1659a7a..677c9824cab 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -151,7 +151,10 @@ public Object[] sampleDataFromColumn( int fetchSize) throws Exception { String sampleQuery; - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + : ""; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( @@ -161,7 +164,9 @@ public Object[] sampleDataFromColumn( sampleQuery = String.format( "SELECT %s FROM %s %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); + quoteIdentifier(columnName), + tableIdentifier(table.getTablePath()), + whereConditionClause); } try (Statement stmt = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 6943137bb2d..d7a042bf59a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -258,7 +258,10 @@ public Object queryNextChunkMax( Object includedLowerBound) throws SQLException { String quotedColumn = quoteIdentifier(columnName); - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + " AND" + : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = @@ -309,17 +312,23 @@ public Object[] sampleDataFromColumn( int samplingRate, int fetchSize) throws Exception { - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() : ""; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + : ""; String sampleQuery; if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( - "SELECT %s FROM (%s) T %s", quoteIdentifier(columnName), table.getQuery(), whereConditionClause); + "SELECT %s FROM (%s) T %s", + quoteIdentifier(columnName), table.getQuery(), whereConditionClause); } else { sampleQuery = String.format( "SELECT %s FROM %s %s", - quoteIdentifier(columnName), tableIdentifier(table.getTablePath()), whereConditionClause); + quoteIdentifier(columnName), + tableIdentifier(table.getTablePath()), + whereConditionClause); } try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index fb15677de12..4927145cef6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -112,7 +112,10 @@ public Object queryNextChunkMax( String quotedColumn = quoteIdentifier(columnName); quotedColumn = convertType(quotedColumn, column.getSourceType()); - String whereConditionClause = StringUtils.isNotBlank(table.getWhereConditionClause()) ? table.getWhereConditionClause() + " AND" : "WHERE"; + String whereConditionClause = + StringUtils.isNotBlank(table.getWhereConditionClause()) + ? table.getWhereConditionClause() + " AND" + : "WHERE"; String sqlQuery; if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = From 3625e4dd64932d948642b6335a98063214fd24a5 Mon Sep 17 00:00:00 2001 From: ckl750 Date: Thu, 20 Feb 2025 19:05:54 +0800 Subject: [PATCH 3/3] fix check code style error --- .../seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 677c9824cab..576651fd079 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -158,7 +158,7 @@ public Object[] sampleDataFromColumn( if (StringUtils.isNotBlank(table.getQuery())) { sampleQuery = String.format( - ""SELECT %s FROM (%s) AS T %s", + "SELECT %s FROM (%s) AS T %s", quoteIdentifier(columnName), table.getQuery(), whereConditionClause); } else { sampleQuery =