From d3c049d8a7a21fa0d89747f5596cf3f44b3559e1 Mon Sep 17 00:00:00 2001 From: Kunni Date: Thu, 9 Jan 2025 12:01:52 +0800 Subject: [PATCH] [FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0 This closes #3644 --- .../pom.xml | 2 +- .../paimon/sink/PaimonDataSinkFactory.java | 2 ++ .../paimon/sink/v2/PaimonCommitter.java | 4 +-- .../paimon/sink/v2/StoreSinkWriteImpl.java | 3 +++ .../sink/v2/bucket/BucketAssignOperator.java | 8 +++--- .../sink/PaimonMetadataApplierTest.java | 25 +++++++++++++++++++ .../paimon/sink/v2/PaimonSinkITCase.java | 2 +- 7 files changed, 38 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index e6b510ce6a8..940bbc0acbb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -29,7 +29,7 @@ limitations under the License. flink-cdc-pipeline-connector-paimon - 0.8.2 + 0.9.0 2.8.5 2.3.9 3.4.6 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index 302ba629ac9..388658fe653 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -69,6 +69,8 @@ public DataSink createDataSink(Context context) { } }); Options options = Options.fromMap(catalogOptions); + // Avoid using previous table schema. + options.setString("cache-enabled", "false"); try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) { Preconditions.checkNotNull( catalog.listDatabases(), "catalog option of Paimon is invalid."); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index 07abb03bf64..03c0be6be15 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -45,8 +45,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) { storeMultiCommitter = new StoreMultiCommitter( () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), - commitUser, - null); + org.apache.paimon.flink.sink.Committer.createContext( + commitUser, null, true, false, null)); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index fb748954209..21b21d50df7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -142,6 +142,9 @@ public void withCompactExecutor(ExecutorService compactExecutor) { write.withCompactExecutor(compactExecutor); } + @Override + public void withInsertOnly(boolean b) {} + @Override public SinkRecord write(InternalRow internalRow) throws Exception { return write.writeAndReturn(internalRow); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index 9b3b3afb96c..b528f53aacf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -151,7 +151,7 @@ public void processElement(StreamRecord streamRecord) throws Exception { dataChangeEvent, schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); switch (tuple4.f0) { - case DYNAMIC: + case HASH_DYNAMIC: { bucket = tuple4.f2.assign( @@ -159,18 +159,18 @@ public void processElement(StreamRecord streamRecord) throws Exception { tuple4.f3.trimmedPrimaryKey(genericRow).hashCode()); break; } - case FIXED: + case HASH_FIXED: { tuple4.f1.setRecord(genericRow); bucket = tuple4.f1.bucket(); break; } - case UNAWARE: + case BUCKET_UNAWARE: { bucket = 0; break; } - case GLOBAL_DYNAMIC: + case CROSS_PARTITION: default: { throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 9f3cd806c47..7b362ee0d40 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -87,6 +87,7 @@ private void initialize(String metastore) } catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); this.catalog.dropDatabase(TEST_DATABASE, true, true); } @@ -206,6 +207,30 @@ public void testApplySchemaChange(String metastore) catalog.getTable(Identifier.fromString("test.table_with_partition")); Assertions.assertEquals(tableSchema, tableWithPartition.rowType()); Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys()); + // Create table with upper case. + catalogOptions.setString("allow-upper-case", "true"); + metadataApplier = new PaimonMetadataApplier(catalogOptions); + createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_upper_case"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "COL1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("COL1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "COL1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertEquals( + tableSchema, + catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType()); } @ParameterizedTest diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 3a554ef2f87..8c54837e8af 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -127,7 +127,7 @@ private void initialize(String metastore) + "'metastore'='hive', " + "'hadoop-conf-dir'='%s', " + "'hive-conf-dir'='%s', " - + "'cache-enabled'='false' " + + "'cache-enabled'='false'" + ")", warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR)); } else {