From 94a0415475e7040cc6a61e9bd8aca9c3ccea80ad Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Tue, 16 Jul 2024 17:22:10 +0200 Subject: [PATCH] [FLINK-34883] Fix postgres uuid column as PK (#3282) * [FLINK-34883] Fix postgres uuid column as PK * [FLINK-34883] Fix column comment. --- .../splitter/JdbcSourceChunkSplitter.java | 14 ++-- .../base/experimental/MySqlChunkSplitter.java | 32 ++++---- .../db2/source/dialect/Db2ChunkSplitter.java | 33 ++++---- .../splitter/OracleChunkSplitter.java | 34 ++++---- .../flink-connector-postgres-cdc/pom.xml | 2 +- .../source/PostgresChunkSplitter.java | 32 ++++---- .../source/fetch/PostgresScanFetchTask.java | 10 ++- .../source/utils/PostgresQueryUtils.java | 81 ++++++++++++++----- .../source/utils/PostgresTypeUtils.java | 2 + .../dialect/SqlServerChunkSplitter.java | 32 ++++---- .../flink-cdc-source-e2e-tests/pom.xml | 2 +- 11 files changed, 159 insertions(+), 115 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index cae2a9d6fde..4a24aed0061 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -47,11 +47,10 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @return maximum and minimum value. */ - Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) - throws SQLException; + Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException; /** * Query the minimum value of the column in the table, and the minimum value must greater than @@ -60,12 +59,11 @@ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @param excludedLowerBound the minimum value should be greater than this value. * @return minimum value. */ - Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException; /** @@ -75,7 +73,7 @@ Object queryMin( * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @param chunkSize chunk size. * @param includedLowerBound the previous chunk end value. * @return next chunk end value. @@ -83,7 +81,7 @@ Object queryMin( Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java index 8b2107ca122..29db714eb9c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java @@ -110,28 +110,28 @@ public Collection generateSplits(TableId tableId) { } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException { - return MySqlUtils.queryMinMax(jdbc, tableId, columnName); + return MySqlUtils.queryMinMax(jdbc, tableId, column.name()); } @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return MySqlUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return MySqlUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); } @Override public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException { return MySqlUtils.queryNextChunkMax( - jdbc, tableId, columnName, chunkSize, includedLowerBound); + jdbc, tableId, column.name(), chunkSize, includedLowerBound); } @Override @@ -161,8 +161,7 @@ public DataType fromDbzColumn(Column splitColumn) { */ private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -189,11 +188,10 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -241,7 +239,7 @@ private List splitEvenlySizedChunks( private List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -250,7 +248,7 @@ private List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) @@ -258,7 +256,7 @@ private List splitUnevenlySizedChunks( // may sleep a while to avoid DDOS on MySQL server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -269,17 +267,17 @@ private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java index 7cc6ddbf902..6381cd74ac4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java @@ -127,16 +127,16 @@ public Collection generateSplits(TableId tableId) { } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException { - return Db2Utils.queryMinMax(jdbc, tableId, columnName); + return Db2Utils.queryMinMax(jdbc, tableId, column.name()); } @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return Db2Utils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return Db2Utils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); } @Override @@ -152,11 +152,12 @@ public DataType fromDbzColumn(Column splitColumn) { public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException { - return Db2Utils.queryNextChunkMax(jdbc, tableId, columnName, chunkSize, includedLowerBound); + return Db2Utils.queryNextChunkMax( + jdbc, tableId, column.name(), chunkSize, includedLowerBound); } @Override @@ -177,8 +178,7 @@ public String buildSplitScanQuery( */ private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -205,11 +205,10 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -259,7 +258,7 @@ private List splitEvenlySizedChunks( private List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -268,7 +267,7 @@ private List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) @@ -276,7 +275,7 @@ private List splitUnevenlySizedChunks( // may sleep awhile to avoid DDOS on Db2 server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -287,17 +286,17 @@ private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java index a714205ff38..6dfb16e0be9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java @@ -114,28 +114,28 @@ public Collection generateSplits(TableId tableId) { } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException { - return OracleUtils.queryMinMax(jdbc, tableId, columnName); + return OracleUtils.queryMinMax(jdbc, tableId, column.name()); } @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return OracleUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return OracleUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); } @Override public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException { return OracleUtils.queryNextChunkMax( - jdbc, tableId, columnName, chunkSize, includedLowerBound); + jdbc, tableId, column.name(), chunkSize, includedLowerBound); } @Override @@ -165,8 +165,7 @@ public DataType fromDbzColumn(Column splitColumn) { */ private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -180,7 +179,7 @@ private List splitTableIntoChunks( // use ROWID get splitUnevenlySizedChunks by default if (splitColumn.name().equals(ROWID.class.getSimpleName())) { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } if (isEvenlySplitColumn(splitColumn)) { @@ -198,11 +197,10 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, dynamicChunkSize); } else { - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -239,7 +237,7 @@ private List splitEvenlySizedChunks( private List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -248,7 +246,7 @@ private List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) { @@ -257,7 +255,7 @@ private List splitUnevenlySizedChunks( // may sleep a while to avoid DDOS on MySQL server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -294,17 +292,17 @@ private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (isChunkEndGeMax(chunkEnd, max)) { return null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml index 4fc75d0ca55..0b12465a2d0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml @@ -73,7 +73,7 @@ limitations under the License. org.postgresql postgresql - 42.5.1 + 42.7.3 diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java index 871b0cb5b55..96a4cc18d33 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java @@ -127,28 +127,28 @@ public Collection generateSplits(TableId tableId) { } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - return PostgresQueryUtils.queryMinMax(jdbc, tableId, columnName); + return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn); } @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return PostgresQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return PostgresQueryUtils.queryMin(jdbc, tableId, column, excludedLowerBound); } @Override public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException { return PostgresQueryUtils.queryNextChunkMax( - jdbc, tableId, columnName, chunkSize, includedLowerBound); + jdbc, tableId, column, chunkSize, includedLowerBound); } // -------------------------------------------------------------------------------------------- @@ -179,8 +179,7 @@ public DataType fromDbzColumn(Column splitColumn) { */ private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -207,11 +206,10 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -261,7 +259,7 @@ private List splitEvenlySizedChunks( private List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -270,7 +268,7 @@ private List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) @@ -278,7 +276,7 @@ private List splitUnevenlySizedChunks( // may sleep a while to avoid DDOS on PostgreSQL server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -289,17 +287,17 @@ private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 8feb6d6dde1..7487f9edab4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -53,7 +53,9 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady; import static io.debezium.connector.postgresql.Utils.refreshSchema; @@ -280,12 +282,18 @@ private void createDataEventsForTable( snapshotSplit.splitId(), table.id()); + List uuidFields = + snapshotSplit.getSplitKeyType().getFieldNames().stream() + .filter(field -> table.columnWithName(field).typeName().equals("uuid")) + .collect(Collectors.toList()); + final String selectSql = PostgresQueryUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, - snapshotSplit.getSplitEnd() == null); + snapshotSplit.getSplitEnd() == null, + uuidFields); LOG.debug( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java index 925532bcb60..3e8a1feb669 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.table.types.logical.RowType; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; import io.debezium.relational.TableId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,9 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -40,12 +43,12 @@ public class PostgresQueryUtils { private PostgresQueryUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException { final String minMaxQuery = String.format( "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); + quoteForMinMax(column), quoteForMinMax(column), quote(tableId)); return jdbc.queryAndMap( minMaxQuery, rs -> { @@ -85,12 +88,15 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) } public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { final String query = String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); + "SELECT MIN(%s) FROM %s WHERE %s > %s", + quoteForMinMax(column), + quote(tableId), + quote(column.name()), + castParam(column)); return jdbc.prepareQueryAndMap( query, ps -> ps.setObject(1, excludedLowerBound), @@ -109,20 +115,21 @@ public static Object queryMin( public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException { - String quotedColumn = quote(splitColumnName); + String quotedColumn = quote(splitColumn.name()); String query = String.format( "SELECT MAX(%s) FROM (" - + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + "SELECT %s FROM %s WHERE %s >= %s ORDER BY %s ASC LIMIT %s" + ") AS T", - quotedColumn, + quoteForMinMax(splitColumn), quotedColumn, quote(tableId), quotedColumn, + castParam(splitColumn), quotedColumn, chunkSize); return jdbc.prepareQueryAndMap( @@ -141,7 +148,17 @@ public static Object queryNextChunkMax( public static String buildSplitScanQuery( TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true); + return buildSplitScanQuery( + tableId, pkRowType, isFirstSplit, isLastSplit, new ArrayList<>()); + } + + public static String buildSplitScanQuery( + TableId tableId, + RowType pkRowType, + boolean isFirstSplit, + boolean isLastSplit, + List uuidFields) { + return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, uuidFields, -1, true); } private static String buildSplitQuery( @@ -149,6 +166,7 @@ private static String buildSplitQuery( RowType pkRowType, boolean isFirstSplit, boolean isLastSplit, + List uuidFields, int limitSize, boolean isScanningData) { final String condition; @@ -157,27 +175,27 @@ private static String buildSplitQuery( condition = null; } else if (isFirstSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); sql.append(")"); } condition = sql.toString(); } else if (isLastSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields); condition = sql.toString(); } else { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields); sql.append(")"); } sql.append(" AND "); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields); condition = sql.toString(); } @@ -237,6 +255,31 @@ public static String quote(String dbOrTableName) { return "\"" + dbOrTableName + "\""; } + private static String quoteForMinMax(Column column) { + String quoteColumn = quote(column.name()); + return isUUID(column) ? castToText(quoteColumn) : quoteColumn; + } + + private static String castParam(Column column) { + return castParam(isUUID(column)); + } + + private static String castParam(boolean isUUID) { + return isUUID ? castToUuid("?") : "?"; + } + + private static String castToUuid(String value) { + return String.format("(%s)::uuid", value); + } + + private static String castToText(String value) { + return String.format("(%s)::text", value); + } + + private static boolean isUUID(Column column) { + return column.typeName().equals("uuid"); + } + public static String quote(TableId tableId) { return tableId.toQuotedString('"'); } @@ -251,10 +294,12 @@ private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, } private static void addPrimaryKeyColumnsToCondition( - RowType pkRowType, StringBuilder sql, String predicate) { + RowType pkRowType, StringBuilder sql, String predicate, List uuidFields) { for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + String fieldName = fieldNamesIt.next(); + boolean isUUID = uuidFields.contains(fieldName); + sql.append(fieldName).append(predicate).append(castParam(isUUID)); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java index 74d8f1c7998..f8276806f2e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java @@ -60,6 +60,7 @@ public class PostgresTypeUtils { private static final String PG_CHARACTER_ARRAY = "_character"; private static final String PG_CHARACTER_VARYING = "varchar"; private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + private static final String PG_UUID = "uuid"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ public static DataType fromDbzColumn(Column column) { @@ -136,6 +137,7 @@ private static DataType convertFromColumn(Column column) { case PG_CHARACTER_VARYING_ARRAY: return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); case PG_TEXT: + case PG_UUID: return DataTypes.STRING(); case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java index a3d7c008070..39546717572 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java @@ -129,16 +129,16 @@ public Collection generateSplits(TableId tableId) { } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException { - return SqlServerUtils.queryMinMax(jdbc, tableId, columnName); + return SqlServerUtils.queryMinMax(jdbc, tableId, column.name()); } @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return SqlServerUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return SqlServerUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); } @Override @@ -154,12 +154,12 @@ public DataType fromDbzColumn(Column splitColumn) { public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) throws SQLException { return SqlServerUtils.queryNextChunkMax( - jdbc, tableId, columnName, chunkSize, includedLowerBound); + jdbc, tableId, column.name(), chunkSize, includedLowerBound); } @Override @@ -180,8 +180,7 @@ public String buildSplitScanQuery( */ private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -208,11 +207,10 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -262,7 +260,7 @@ private List splitEvenlySizedChunks( private List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -271,7 +269,7 @@ private List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) @@ -279,7 +277,7 @@ private List splitUnevenlySizedChunks( // may sleep awhile to avoid DDOS on SqlServer server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -290,17 +288,17 @@ private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index 6f604f34611..a7f35cf042e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -36,7 +36,7 @@ limitations under the License. 3.1.2-1.18 3.1.2-1.18 8.0.27 - 42.5.1 + 42.7.3