Skip to content

Commit

Permalink
[FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0
Browse files Browse the repository at this point in the history
This closes #3644
  • Loading branch information
lvyanquan authored Jan 9, 2025
1 parent fc71888 commit d3c049d
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>

<properties>
<paimon.version>0.8.2</paimon.version>
<paimon.version>0.9.0</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.9</hive.version>
<mockito.version>3.4.6</mockito.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public DataSink createDataSink(Context context) {
}
});
Options options = Options.fromMap(catalogOptions);
// Avoid using previous table schema.
options.setString("cache-enabled", "false");
try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
storeMultiCommitter =
new StoreMultiCommitter(
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
commitUser,
null);
org.apache.paimon.flink.sink.Committer.createContext(
commitUser, null, true, false, null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
write.withCompactExecutor(compactExecutor);
}

@Override
public void withInsertOnly(boolean b) {}

@Override
public SinkRecord write(InternalRow internalRow) throws Exception {
return write.writeAndReturn(internalRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,26 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
switch (tuple4.f0) {
case DYNAMIC:
case HASH_DYNAMIC:
{
bucket =
tuple4.f2.assign(
tuple4.f3.partition(genericRow),
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
break;
}
case FIXED:
case HASH_FIXED:
{
tuple4.f1.setRecord(genericRow);
bucket = tuple4.f1.bucket();
break;
}
case UNAWARE:
case BUCKET_UNAWARE:
{
bucket = 0;
break;
}
case GLOBAL_DYNAMIC:
case CROSS_PARTITION:
default:
{
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private void initialize(String metastore)
}
catalogOptions.setString("metastore", metastore);
catalogOptions.setString("warehouse", warehouse);
catalogOptions.setString("cache-enabled", "false");
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.catalog.dropDatabase(TEST_DATABASE, true, true);
}
Expand Down Expand Up @@ -206,6 +207,30 @@ public void testApplySchemaChange(String metastore)
catalog.getTable(Identifier.fromString("test.table_with_partition"));
Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys());
// Create table with upper case.
catalogOptions.setString("allow-upper-case", "true");
metadataApplier = new PaimonMetadataApplier(catalogOptions);
createTableEvent =
new CreateTableEvent(
TableId.parse("test.table_with_upper_case"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"COL1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
.primaryKey("COL1")
.build());
metadataApplier.applySchemaChange(createTableEvent);
tableSchema =
new RowType(
Arrays.asList(
new DataField(0, "COL1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.INT())));
Assertions.assertEquals(
tableSchema,
catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void initialize(String metastore)
+ "'metastore'='hive', "
+ "'hadoop-conf-dir'='%s', "
+ "'hive-conf-dir'='%s', "
+ "'cache-enabled'='false' "
+ "'cache-enabled'='false'"
+ ")",
warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
} else {
Expand Down

0 comments on commit d3c049d

Please sign in to comment.