From 45f460e68051e296d31832bcde64f010cafe1726 Mon Sep 17 00:00:00 2001 From: "helloliuxg@gmail.com" Date: Wed, 1 Jan 2025 17:17:45 +0800 Subject: [PATCH] remove Catalog.ColumnAlreadyExistException when apply applyAddColumnEventWithPosition in paimon. because use may create table already before with the target column. --- .../cdc/connectors/paimon/sink/PaimonMetadataApplier.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 928e7b6dfcb..bf9907b9073 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -293,7 +293,11 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + if (e instanceof Catalog.ColumnAlreadyExistException) { + LOG.warn("{}, skip it.", e.getMessage()); + } else { + throw new SchemaEvolveException(event, e.getMessage(), e); + } } } @@ -312,6 +316,7 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); } }