Skip to content

Commit 2d1eb0a

Browse files
authored
[FLINK-34990][cdc-connector][oracle] Oracle cdc support newly add table (#3203)
* [cdc-connector][oracle] Oracle cdc support newly add table * [cdc-connector][oracle] Fix code style * [cdc-connector][oracle] Address comment
1 parent 137dc1b commit 2d1eb0a

File tree

10 files changed

+1078
-67
lines changed

10 files changed

+1078
-67
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ public OracleSourceBuilder<T> skipSnapshotBackfill(boolean skipSnapshotBackfill)
243243
return this;
244244
}
245245

246+
/** Whether the {@link OracleIncrementalSource} should scan the newly added tables or not. */
247+
public OracleSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
248+
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
249+
return this;
250+
}
251+
246252
/**
247253
* Build the {@link OracleIncrementalSource}.
248254
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public OracleSourceConfig(
6363
int connectMaxRetries,
6464
int connectionPoolSize,
6565
String chunkKeyColumn,
66-
boolean skipSnapshotBackfill) {
66+
boolean skipSnapshotBackfill,
67+
boolean scanNewlyAddedTableEnabled) {
6768
super(
6869
startupOptions,
6970
databaseList,
@@ -89,7 +90,7 @@ public OracleSourceConfig(
8990
connectionPoolSize,
9091
chunkKeyColumn,
9192
skipSnapshotBackfill,
92-
false);
93+
scanNewlyAddedTableEnabled);
9394
this.url = url;
9495
}
9596

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public OracleSourceConfig create(int subtaskId) {
133133
connectMaxRetries,
134134
connectionPoolSize,
135135
chunkKeyColumn,
136-
skipSnapshotBackfill);
136+
skipSnapshotBackfill,
137+
scanNewlyAddedTableEnabled);
137138
}
138139
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
7979
private final String chunkKeyColumn;
8080
private final boolean closeIdleReaders;
8181
private final boolean skipSnapshotBackfill;
82+
private final boolean scanNewlyAddedTableEnabled;
8283

8384
// --------------------------------------------------------------------------------------------
8485
// Mutable attributes
@@ -113,7 +114,8 @@ public OracleTableSource(
113114
double distributionFactorLower,
114115
@Nullable String chunkKeyColumn,
115116
boolean closeIdleReaders,
116-
boolean skipSnapshotBackfill) {
117+
boolean skipSnapshotBackfill,
118+
boolean scanNewlyAddedTableEnabled) {
117119
this.physicalSchema = physicalSchema;
118120
this.url = url;
119121
this.port = port;
@@ -139,6 +141,7 @@ public OracleTableSource(
139141
this.chunkKeyColumn = chunkKeyColumn;
140142
this.closeIdleReaders = closeIdleReaders;
141143
this.skipSnapshotBackfill = skipSnapshotBackfill;
144+
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
142145
}
143146

144147
@Override
@@ -187,6 +190,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
187190
.closeIdleReaders(closeIdleReaders)
188191
.skipSnapshotBackfill(skipSnapshotBackfill)
189192
.chunkKeyColumn(chunkKeyColumn)
193+
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
190194
.build();
191195

192196
return SourceProvider.of(oracleChangeEventSource);
@@ -252,7 +256,8 @@ public DynamicTableSource copy() {
252256
distributionFactorLower,
253257
chunkKeyColumn,
254258
closeIdleReaders,
255-
skipSnapshotBackfill);
259+
skipSnapshotBackfill,
260+
scanNewlyAddedTableEnabled);
256261
source.metadataKeys = metadataKeys;
257262
source.producedDataType = producedDataType;
258263
return source;
@@ -291,7 +296,8 @@ public boolean equals(Object o) {
291296
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
292297
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
293298
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
294-
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill);
299+
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
300+
&& Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled);
295301
}
296302

297303
@Override
@@ -321,7 +327,8 @@ public int hashCode() {
321327
distributionFactorLower,
322328
chunkKeyColumn,
323329
closeIdleReaders,
324-
skipSnapshotBackfill);
330+
skipSnapshotBackfill,
331+
scanNewlyAddedTableEnabled);
325332
}
326333

327334
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
4848
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
4949
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
50+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
5051
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
5152
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
5253
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -106,6 +107,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
106107

107108
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
108109
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
110+
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
109111

110112
if (enableParallelRead) {
111113
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@@ -142,7 +144,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
142144
distributionFactorLower,
143145
chunkKeyColumn,
144146
closeIdlerReaders,
145-
skipSnapshotBackfill);
147+
skipSnapshotBackfill,
148+
scanNewlyAddedTableEnabled);
146149
}
147150

148151
@Override
@@ -180,6 +183,7 @@ public Set<ConfigOption<?>> optionalOptions() {
180183
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
181184
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
182185
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
186+
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
183187
return options;
184188
}
185189

0 commit comments

Comments
 (0)