Skip to content

Commit

Permalink
[FLINK-34883] Fix postgres uuid column as PK (#3282)
Browse files Browse the repository at this point in the history
* [FLINK-34883] Fix postgres uuid column as PK

* [FLINK-34883] Fix column comment.
  • Loading branch information
eskabetxe authored Jul 16, 2024
1 parent 11deb62 commit 94a0415
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @return maximum and minimum value.
*/
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException;
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException;

/**
* Query the minimum value of the column in the table, and the minimum value must greater than
Expand All @@ -60,12 +59,11 @@ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException;

/**
Expand All @@ -75,15 +73,15 @@ Object queryMin(
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,28 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
}

@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return MySqlUtils.queryMinMax(jdbc, tableId, columnName);
return MySqlUtils.queryMinMax(jdbc, tableId, column.name());
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return MySqlUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return MySqlUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}

@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return MySqlUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}

@Override
Expand Down Expand Up @@ -161,8 +161,7 @@ public DataType fromDbzColumn(Column splitColumn) {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
Expand All @@ -189,11 +188,10 @@ private List<ChunkRange> splitTableIntoChunks(
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}

Expand Down Expand Up @@ -241,7 +239,7 @@ private List<ChunkRange> splitEvenlySizedChunks(
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
Expand All @@ -250,15 +248,15 @@ private List<ChunkRange> splitUnevenlySizedChunks(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
Expand All @@ -269,17 +267,17 @@ private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
}

@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return Db2Utils.queryMinMax(jdbc, tableId, columnName);
return Db2Utils.queryMinMax(jdbc, tableId, column.name());
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return Db2Utils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return Db2Utils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}

@Override
Expand All @@ -152,11 +152,12 @@ public DataType fromDbzColumn(Column splitColumn) {
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return Db2Utils.queryNextChunkMax(jdbc, tableId, columnName, chunkSize, includedLowerBound);
return Db2Utils.queryNextChunkMax(
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}

@Override
Expand All @@ -177,8 +178,7 @@ public String buildSplitScanQuery(
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
Expand All @@ -205,11 +205,10 @@ private List<ChunkRange> splitTableIntoChunks(
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}

Expand Down Expand Up @@ -259,7 +258,7 @@ private List<ChunkRange> splitEvenlySizedChunks(
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
Expand All @@ -268,15 +267,15 @@ private List<ChunkRange> splitUnevenlySizedChunks(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep awhile to avoid DDOS on Db2 server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
Expand All @@ -287,17 +286,17 @@ private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,28 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
}

@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return OracleUtils.queryMinMax(jdbc, tableId, columnName);
return OracleUtils.queryMinMax(jdbc, tableId, column.name());
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return OracleUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return OracleUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}

@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return OracleUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}

@Override
Expand Down Expand Up @@ -165,8 +165,7 @@ public DataType fromDbzColumn(Column splitColumn) {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
Expand All @@ -180,7 +179,7 @@ private List<ChunkRange> splitTableIntoChunks(

// use ROWID get splitUnevenlySizedChunks by default
if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}

if (isEvenlySplitColumn(splitColumn)) {
Expand All @@ -198,11 +197,10 @@ private List<ChunkRange> splitTableIntoChunks(
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}

Expand Down Expand Up @@ -239,7 +237,7 @@ private List<ChunkRange> splitEvenlySizedChunks(
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
Expand All @@ -248,7 +246,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;

while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
Expand All @@ -257,7 +255,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
Expand Down Expand Up @@ -294,17 +292,17 @@ private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ limitations under the License.
<!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.1</version>
<version>42.7.3</version>
</dependency>

<!-- test dependencies on Debezium -->
Expand Down
Loading

0 comments on commit 94a0415

Please sign in to comment.