From 73cf83b4fc450b1e6378a1aff477f34e4f2e3e80 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Tue, 15 Oct 2024 16:39:56 +0800 Subject: [PATCH 1/3] [FLINK-36524][pipeline-connector][paimon] 1) bump version to 0.9.0. 2) Forcibly add cache-enabled = false in catalog option to avoid using previous table schema. --- .../pom.xml | 2 +- .../paimon/sink/PaimonDataSinkFactory.java | 2 ++ .../paimon/sink/v2/PaimonCommitter.java | 34 +++++++++++++++++-- .../paimon/sink/v2/StoreSinkWriteImpl.java | 3 ++ .../sink/v2/bucket/BucketAssignOperator.java | 8 ++--- .../sink/PaimonMetadataApplierTest.java | 31 +++++++++++++++-- .../paimon/sink/v2/PaimonSinkITCase.java | 2 ++ 7 files changed, 72 insertions(+), 10 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index e6b510ce6a8..940bbc0acbb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -29,7 +29,7 @@ limitations under the License. flink-cdc-pipeline-connector-paimon - 0.8.2 + 0.9.0 2.8.5 2.3.9 3.4.6 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index 302ba629ac9..388658fe653 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -69,6 +69,8 @@ public DataSink createDataSink(Context context) { } }); Options options = Options.fromMap(catalogOptions); + // Avoid using previous table schema. + options.setString("cache-enabled", "false"); try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) { Preconditions.checkNotNull( catalog.listDatabases(), "catalog option of Paimon is invalid."); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index 07abb03bf64..5a06d0d80b5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -17,7 +17,9 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.MultiTableCommittable; @@ -27,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -45,8 +49,34 @@ public PaimonCommitter(Options catalogOptions, String commitUser) { storeMultiCommitter = new StoreMultiCommitter( () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), - commitUser, - null); + new org.apache.paimon.flink.sink.Committer.Context() { + + @Override + public String commitUser() { + return commitUser; + } + + @Nullable + @Override + public OperatorMetricGroup metricGroup() { + return null; + } + + @Override + public boolean streamingCheckpointEnabled() { + return false; + } + + @Override + public boolean isRestored() { + return false; + } + + @Override + public OperatorStateStore stateStore() { + return null; + } + }); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index fb748954209..21b21d50df7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -142,6 +142,9 @@ public void withCompactExecutor(ExecutorService compactExecutor) { write.withCompactExecutor(compactExecutor); } + @Override + public void withInsertOnly(boolean b) {} + @Override public SinkRecord write(InternalRow internalRow) throws Exception { return write.writeAndReturn(internalRow); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index 9b3b3afb96c..b528f53aacf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -151,7 +151,7 @@ public void processElement(StreamRecord streamRecord) throws Exception { dataChangeEvent, schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); switch (tuple4.f0) { - case DYNAMIC: + case HASH_DYNAMIC: { bucket = tuple4.f2.assign( @@ -159,18 +159,18 @@ public void processElement(StreamRecord streamRecord) throws Exception { tuple4.f3.trimmedPrimaryKey(genericRow).hashCode()); break; } - case FIXED: + case HASH_FIXED: { tuple4.f1.setRecord(genericRow); bucket = tuple4.f1.bucket(); break; } - case UNAWARE: + case BUCKET_UNAWARE: { bucket = 0; break; } - case GLOBAL_DYNAMIC: + case CROSS_PARTITION: default: { throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 9f3cd806c47..08a88829d74 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -87,6 +87,7 @@ private void initialize(String metastore) } catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); this.catalog.dropDatabase(TEST_DATABASE, true, true); } @@ -206,6 +207,30 @@ public void testApplySchemaChange(String metastore) catalog.getTable(Identifier.fromString("test.table_with_partition")); Assertions.assertEquals(tableSchema, tableWithPartition.rowType()); Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys()); + // Create table with upper case. + catalogOptions.setString("allow-upper-case", "true"); + metadataApplier = new PaimonMetadataApplier(catalogOptions); + createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_upper_case"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "COL1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("COL1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "COL1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertEquals( + tableSchema, + catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType()); } @ParameterizedTest @@ -246,10 +271,10 @@ public void testCreateTableWithOptions(String metastore) Arrays.asList( new DataField(0, "col1", DataTypes.STRING().notNull()), new DataField(1, "col2", DataTypes.STRING()), - new DataField(2, "col3", DataTypes.STRING().notNull()), - new DataField(3, "col4", DataTypes.STRING().notNull()))); + new DataField(2, "col3", DataTypes.STRING()), + new DataField(3, "col4", DataTypes.STRING()))); Assertions.assertEquals(tableSchema, table.rowType()); - Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys()); + Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys()); Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys()); Assertions.assertEquals("-1", table.options().get("bucket")); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 5635dcfd886..cababd0e085 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -127,6 +127,8 @@ private void initialize(String metastore) + "'metastore'='hive', " + "'hadoop-conf-dir'='%s', " + "'hive-conf-dir'='%s', " + + "'cache-enabled'='false'" + + "'hive-conf-dir'='%s', " + "'cache-enabled'='false' " + ")", warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR)); From 038d36421bc2c76973f999f31101d9f78fa7ca66 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Thu, 24 Oct 2024 20:14:02 +0800 Subject: [PATCH 2/3] address comment. --- .../paimon/sink/v2/PaimonCommitter.java | 32 ++----------------- 1 file changed, 2 insertions(+), 30 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index 5a06d0d80b5..aa012f6019c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -17,9 +17,7 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.MultiTableCommittable; @@ -49,34 +47,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) { storeMultiCommitter = new StoreMultiCommitter( () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), - new org.apache.paimon.flink.sink.Committer.Context() { - - @Override - public String commitUser() { - return commitUser; - } - - @Nullable - @Override - public OperatorMetricGroup metricGroup() { - return null; - } - - @Override - public boolean streamingCheckpointEnabled() { - return false; - } - - @Override - public boolean isRestored() { - return false; - } - - @Override - public OperatorStateStore stateStore() { - return null; - } - }); + org.apache.paimon.flink.sink.Committer.createContext( + commitUser, null, true, false, null)); } @Override From 5134ced362abc8c80e58a839d269dab2338b9413 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Wed, 27 Nov 2024 14:49:07 +0800 Subject: [PATCH 3/3] [FLINK-36524][pipeline-connector][paimon] 1) bump version to 0.9.0. 2) Forcibly add cache-enabled = false in catalog option to avoid using previous table schema. --- .../cdc/connectors/paimon/sink/v2/PaimonCommitter.java | 2 -- .../connectors/paimon/sink/PaimonMetadataApplierTest.java | 6 +++--- .../cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java | 2 -- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index aa012f6019c..03c0be6be15 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -27,8 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.Collection; import java.util.Collections; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 08a88829d74..7b362ee0d40 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -271,10 +271,10 @@ public void testCreateTableWithOptions(String metastore) Arrays.asList( new DataField(0, "col1", DataTypes.STRING().notNull()), new DataField(1, "col2", DataTypes.STRING()), - new DataField(2, "col3", DataTypes.STRING()), - new DataField(3, "col4", DataTypes.STRING()))); + new DataField(2, "col3", DataTypes.STRING().notNull()), + new DataField(3, "col4", DataTypes.STRING().notNull()))); Assertions.assertEquals(tableSchema, table.rowType()); - Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys()); + Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys()); Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys()); Assertions.assertEquals("-1", table.options().get("bucket")); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index cababd0e085..0dd60c2f7a4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -128,8 +128,6 @@ private void initialize(String metastore) + "'hadoop-conf-dir'='%s', " + "'hive-conf-dir'='%s', " + "'cache-enabled'='false'" - + "'hive-conf-dir'='%s', " - + "'cache-enabled'='false' " + ")", warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR)); } else {