Skip to content

Commit 7997f51

Browse files
oli2vleonardBang
andauthored
[FLINK-35067][cdc-connector][postgres] Adding metadata 'row_kind' for Postgres CDC Connector.
This closes #3716. Co-authored-by: Leonard Xu <[email protected]>
1 parent a16abd5 commit 7997f51

File tree

4 files changed

+98
-55
lines changed

4 files changed

+98
-55
lines changed

docs/content/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
387387
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
388388
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
389389
</tr>
390+
<tr>
391+
<td>row_kind</td>
392+
<td>STRING NOT NULL</td>
393+
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
394+
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
395+
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
396+
</tr>
390397
</tbody>
391398
</table>
392399

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.flink.cdc.connectors.postgres.table;
1919

2020
import org.apache.flink.cdc.debezium.table.MetadataConverter;
21+
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
2122
import org.apache.flink.table.api.DataTypes;
23+
import org.apache.flink.table.data.RowData;
2224
import org.apache.flink.table.data.StringData;
2325
import org.apache.flink.table.data.TimestampData;
2426
import org.apache.flink.table.types.DataType;
@@ -95,6 +97,28 @@ public Object read(SourceRecord record) {
9597
return TimestampData.fromEpochMillis(
9698
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
9799
}
100+
}),
101+
102+
/**
103+
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
104+
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
105+
*/
106+
ROW_KIND(
107+
"row_kind",
108+
DataTypes.STRING().notNull(),
109+
new RowDataMetadataConverter() {
110+
private static final long serialVersionUID = 1L;
111+
112+
@Override
113+
public Object read(RowData rowData) {
114+
return StringData.fromString(rowData.getRowKind().shortString());
115+
}
116+
117+
@Override
118+
public Object read(SourceRecord record) {
119+
throw new UnsupportedOperationException(
120+
"Please call read(RowData rowData) method instead.");
121+
}
98122
});
99123

100124
private final String key;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,24 @@ public void testConsumingAllEvents()
165165
* The final database table looks like this:
166166
*
167167
* > SELECT * FROM products;
168-
* +-----+--------------------+---------------------------------------------------------+--------+
169-
* | id | name | description | weight |
170-
* +-----+--------------------+---------------------------------------------------------+--------+
171-
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
172-
* | 102 | car battery | 12V car battery | 8.1 |
173-
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
174-
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
175-
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
176-
* | 106 | hammer | 18oz carpenter hammer | 1 |
177-
* | 107 | rocks | box of assorted rocks | 5.1 |
178-
* | 108 | jacket | water resistent black wind breaker | 0.1 |
179-
* | 109 | spare tire | 24 inch spare tire | 22.2 |
180-
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
181-
* +-----+--------------------+---------------------------------------------------------+--------+
168+
* +-----+--------------------+-------------------------------------------------
169+
* --------+--------+
170+
* | id | name | description | weight |
171+
* +-----+--------------------+-------------------------------------------------
172+
* --------+--------+
173+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
174+
* | 102 | car battery | 12V car battery | 8.1 |
175+
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
176+
* #40 to #3 | 0.8 |
177+
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
178+
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
179+
* | 106 | hammer | 18oz carpenter hammer | 1 |
180+
* | 107 | rocks | box of assorted rocks | 5.1 |
181+
* | 108 | jacket | water resistent black wind breaker | 0.1 |
182+
* | 109 | spare tire | 24 inch spare tire | 22.2 |
183+
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
184+
* +-----+--------------------+-------------------------------------------------
185+
* --------+--------+
182186
* </pre>
183187
*/
184188

@@ -246,7 +250,8 @@ public void testStartupFromLatestOffset() throws Exception {
246250

247251
// async submit job
248252
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
249-
// wait for the source startup, we don't have a better way to wait it, use sleep for now
253+
// wait for the source startup, we don't have a better way to wait it, use sleep
254+
// for now
250255
Thread.sleep(10000L);
251256

252257
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
@@ -469,6 +474,7 @@ public void testMetadataColumns() throws Throwable {
469474
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
470475
+ " schema_name STRING METADATA VIRTUAL,"
471476
+ " table_name STRING METADATA VIRTUAL,"
477+
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
472478
+ " id INT NOT NULL,"
473479
+ " name STRING,"
474480
+ " description STRING,"
@@ -501,6 +507,7 @@ public void testMetadataColumns() throws Throwable {
501507
+ " database_name STRING,"
502508
+ " schema_name STRING,"
503509
+ " table_name STRING,"
510+
+ " row_kind STRING,"
504511
+ " id INT,"
505512
+ " name STRING,"
506513
+ " description STRING,"
@@ -546,52 +553,52 @@ public void testMetadataColumns() throws Throwable {
546553
Arrays.asList(
547554
"+I("
548555
+ databaseName
549-
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
556+
+ ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)",
550557
"+I("
551558
+ databaseName
552-
+ ",inventory,products,102,car battery,12V car battery,8.100)",
559+
+ ",inventory,products,+I,102,car battery,12V car battery,8.100)",
553560
"+I("
554561
+ databaseName
555-
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
562+
+ ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
556563
"+I("
557564
+ databaseName
558-
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
565+
+ ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)",
559566
"+I("
560567
+ databaseName
561-
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
568+
+ ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)",
562569
"+I("
563570
+ databaseName
564-
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
571+
+ ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)",
565572
"+I("
566573
+ databaseName
567-
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
574+
+ ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)",
568575
"+I("
569576
+ databaseName
570-
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
577+
+ ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)",
571578
"+I("
572579
+ databaseName
573-
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
580+
+ ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)",
574581
"+I("
575582
+ databaseName
576-
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
583+
+ ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)",
577584
"+I("
578585
+ databaseName
579-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
586+
+ ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)",
580587
"+U("
581588
+ databaseName
582-
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
589+
+ ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)",
583590
"+U("
584591
+ databaseName
585-
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
592+
+ ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)",
586593
"+U("
587594
+ databaseName
588-
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
595+
+ ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)",
589596
"+U("
590597
+ databaseName
591-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
598+
+ ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)",
592599
"-D("
593600
+ databaseName
594-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
601+
+ ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)");
595602
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
596603
Collections.sort(actual);
597604
Collections.sort(expected);
@@ -679,20 +686,24 @@ public void testUpsertMode() throws Exception {
679686
* The final database table looks like this:
680687
*
681688
* > SELECT * FROM products;
682-
* +-----+--------------------+---------------------------------------------------------+--------+
683-
* | id | name | description | weight |
684-
* +-----+--------------------+---------------------------------------------------------+--------+
685-
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
686-
* | 102 | car battery | 12V car battery | 8.1 |
687-
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
688-
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
689-
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
690-
* | 106 | hammer | 18oz carpenter hammer | 1 |
691-
* | 107 | rocks | box of assorted rocks | 5.1 |
692-
* | 108 | jacket | water resistent black wind breaker | 0.1 |
693-
* | 109 | spare tire | 24 inch spare tire | 22.2 |
694-
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
695-
* +-----+--------------------+---------------------------------------------------------+--------+
689+
* +-----+--------------------+-------------------------------------------------
690+
* --------+--------+
691+
* | id | name | description | weight |
692+
* +-----+--------------------+-------------------------------------------------
693+
* --------+--------+
694+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
695+
* | 102 | car battery | 12V car battery | 8.1 |
696+
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
697+
* #40 to #3 | 0.8 |
698+
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
699+
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
700+
* | 106 | hammer | 18oz carpenter hammer | 1 |
701+
* | 107 | rocks | box of assorted rocks | 5.1 |
702+
* | 108 | jacket | water resistent black wind breaker | 0.1 |
703+
* | 109 | spare tire | 24 inch spare tire | 22.2 |
704+
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
705+
* +-----+--------------------+-------------------------------------------------
706+
* --------+--------+
696707
* </pre>
697708
*/
698709

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,19 @@
5050
import java.util.Map;
5151
import java.util.Properties;
5252

53+
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
54+
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
55+
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
56+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
5357
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
5458
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
59+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
5560
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
56-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
57-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
58-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
59-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
61+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
62+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
63+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
6064
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
61-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
6265
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
63-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
64-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
65-
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
6666
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
6767
import static org.junit.Assert.assertEquals;
6868
import static org.junit.Assert.assertTrue;
@@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest {
100100
Column.physical("name", DataTypes.STRING()),
101101
Column.physical("count", DataTypes.DECIMAL(38, 18)),
102102
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
103+
Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true),
103104
Column.metadata(
104105
"database_name", DataTypes.STRING(), "database_name", true),
105106
Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
@@ -211,7 +212,7 @@ public void testMetadataColumns() {
211212
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
212213
PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource;
213214
postgreSQLTableSource.applyReadableMetadata(
214-
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"),
215+
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"),
215216
SCHEMA_WITH_METADATA.toSourceRowDataType());
216217
actualSource = postgreSQLTableSource.copy();
217218
PostgreSQLTableSource expectedSource =
@@ -246,7 +247,7 @@ public void testMetadataColumns() {
246247
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
247248
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
248249
expectedSource.metadataKeys =
249-
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
250+
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name");
250251

251252
assertEquals(expectedSource, actualSource);
252253

0 commit comments

Comments
 (0)