Skip to content

Commit 91b86b2

Browse files
[hotfix] [connector-cdc-oracle ] support read partition table (#8265)
1 parent ebe6207 commit 91b86b2

File tree

4 files changed

+206
-3
lines changed

4 files changed

+206
-3
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public static List<TableId> listTables(
9292
Set<TableId> tableIdSet = new HashSet<>();
9393
String queryTablesSql =
9494
"SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n"
95-
+ "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSAUX')";
95+
+ "WHERE PARTITIONED = 'YES' OR (TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSAUX'))";
96+
9697
try {
9798
jdbcConnection.query(
9899
queryTablesSql,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java

+44-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,48 @@ public void tearDown() throws Exception {
9191
ORACLE_CONTAINER.stop();
9292
}
9393

94+
@TestTemplate
95+
public void testOracleCdcPartition(TestContainer container) throws Exception {
96+
String sourceTable = "PARTITION_SOURCE_TABLE";
97+
String sinkTable = "PARTITION_SINK_TABLE";
98+
clearTable(SCEHMA_NAME, sinkTable);
99+
clearTable(SCEHMA_NAME, sourceTable);
100+
101+
insertSourceTable(SCEHMA_NAME, sourceTable);
102+
103+
CompletableFuture.supplyAsync(
104+
() -> {
105+
try {
106+
container.executeJob("/oraclecdc_to_oracle_with_partition.conf");
107+
} catch (Exception e) {
108+
log.error("Commit task exception :" + e.getMessage());
109+
throw new RuntimeException(e);
110+
}
111+
return null;
112+
});
113+
114+
// snapshot stage
115+
await().atMost(600000, TimeUnit.MILLISECONDS)
116+
.untilAsserted(
117+
() -> {
118+
Assertions.assertIterableEquals(
119+
querySql(getSourceQuerySQL(SCEHMA_NAME, sourceTable)),
120+
querySql(getSourceQuerySQL(SCEHMA_NAME, sinkTable)));
121+
});
122+
123+
// insert update delete
124+
updateSourceTable(SCEHMA_NAME, sourceTable);
125+
126+
// stream stage
127+
await().atMost(600000, TimeUnit.MILLISECONDS)
128+
.untilAsserted(
129+
() -> {
130+
Assertions.assertIterableEquals(
131+
querySql(getSourceQuerySQL(SCEHMA_NAME, sourceTable)),
132+
querySql(getSourceQuerySQL(SCEHMA_NAME, sinkTable)));
133+
});
134+
}
135+
94136
@TestTemplate
95137
public void testOracleCdcCheckDataE2e(TestContainer container) throws Exception {
96138
checkDataForTheJob(container, "/oraclecdc_to_oracle.conf", false);
@@ -579,7 +621,7 @@ private void insertSourceTable(String database, String tableName) {
579621
+ database
580622
+ "."
581623
+ tableName
582-
+ " VALUES (1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444, 5555, 1, 99, 9999, 999999999, 999999999999999999,94, 9949, 999999994, 999999999999999949, 99999999999999999999999999999999999949,TO_DATE('2022-10-30', 'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),TO_TIMESTAMP_TZ('2022-10-30 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'))");
624+
+ " VALUES (1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444, 5555, 1, 99, 1001, 999999999, 999999999999999999,94, 9949, 999999994, 999999999999999949, 99999999999999999999999999999999999949,TO_DATE('2022-10-30', 'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),TO_TIMESTAMP_TZ('2022-10-30 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'))");
583625
}
584626

585627
private void updateSourceTable(String database, String tableName) {
@@ -588,7 +630,7 @@ private void updateSourceTable(String database, String tableName) {
588630
+ database
589631
+ "."
590632
+ tableName
591-
+ " VALUES (2, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444, 5555, 1, 99, 9999, 999999999, 999999999999999999,94, 9949, 999999994, 999999999999999949, 99999999999999999999999999999999999949,TO_DATE('2022-10-30', 'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),TO_TIMESTAMP_TZ('2022-10-30 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'))");
633+
+ " VALUES (2, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444, 5555, 1, 99, 2001, 999999999, 999999999999999999,94, 9949, 999999994, 999999999999999949, 99999999999999999999999999999999999949,TO_DATE('2022-10-30', 'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),TO_TIMESTAMP_TZ('2022-10-30 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'))");
592634

593635
executeSql(
594636
"INSERT INTO "

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql

+97
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,100 @@ create table DEBEZIUM.SINK_FULL_TYPES2 (
269269
primary key (ID)
270270
);
271271

272+
CREATE TABLE DEBEZIUM.PARTITION_SOURCE_TABLE (
273+
ID NUMBER(9) not null,
274+
VAL_VARCHAR VARCHAR2(1000),
275+
VAL_VARCHAR2 VARCHAR2(1000),
276+
VAL_NVARCHAR2 NVARCHAR2(1000),
277+
VAL_CHAR CHAR(3),
278+
VAL_NCHAR NCHAR(3),
279+
VAL_BF BINARY_FLOAT,
280+
VAL_BD BINARY_DOUBLE,
281+
VAL_F FLOAT,
282+
VAL_F_10 FLOAT(10),
283+
VAL_NUM NUMBER(10, 6),
284+
VAL_DP FLOAT,
285+
VAL_R FLOAT(63),
286+
VAL_DECIMAL NUMBER(10, 6),
287+
VAL_NUMERIC NUMBER(10, 6),
288+
VAL_NUM_VS NUMBER,
289+
VAL_INT NUMBER,
290+
VAL_INTEGER NUMBER,
291+
VAL_SMALLINT NUMBER,
292+
VAL_NUMBER_38_NO_SCALE NUMBER(38),
293+
VAL_NUMBER_38_SCALE_0 NUMBER(38),
294+
VAL_NUMBER_1 NUMBER(1),
295+
VAL_NUMBER_2 NUMBER(2),
296+
VAL_NUMBER_4 NUMBER(4),
297+
VAL_NUMBER_9 NUMBER(9),
298+
VAL_NUMBER_18 NUMBER(18),
299+
VAL_NUMBER_2_NEGATIVE_SCALE NUMBER(1, -1),
300+
VAL_NUMBER_4_NEGATIVE_SCALE NUMBER(2, -2),
301+
VAL_NUMBER_9_NEGATIVE_SCALE NUMBER(8, -1),
302+
VAL_NUMBER_18_NEGATIVE_SCALE NUMBER(16, -2),
303+
VAL_NUMBER_36_NEGATIVE_SCALE NUMBER(36, -2),
304+
VAL_DATE DATE,
305+
VAL_TS TIMESTAMP(6),
306+
VAL_TS_PRECISION2 TIMESTAMP(2),
307+
VAL_TS_PRECISION4 TIMESTAMP(4),
308+
VAL_TS_PRECISION9 TIMESTAMP(6),
309+
VAL_TSLTZ TIMESTAMP(6) WITH LOCAL TIME ZONE,
310+
PRIMARY KEY (ID)
311+
)
312+
PARTITION BY RANGE (VAL_NUMBER_4)
313+
(
314+
PARTITION p1 VALUES LESS THAN (1000),
315+
PARTITION p2 VALUES LESS THAN (2000),
316+
PARTITION p3 VALUES LESS THAN (3000),
317+
PARTITION p4 VALUES LESS THAN (MAXVALUE)
318+
);
319+
320+
ALTER TABLE DEBEZIUM.PARTITION_SOURCE_TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
321+
322+
CREATE TABLE DEBEZIUM.PARTITION_SINK_TABLE (
323+
ID NUMBER(9) not null,
324+
VAL_VARCHAR VARCHAR2(1000),
325+
VAL_VARCHAR2 VARCHAR2(1000),
326+
VAL_NVARCHAR2 NVARCHAR2(1000),
327+
VAL_CHAR CHAR(3),
328+
VAL_NCHAR NCHAR(3),
329+
VAL_BF BINARY_FLOAT,
330+
VAL_BD BINARY_DOUBLE,
331+
VAL_F FLOAT,
332+
VAL_F_10 FLOAT(10),
333+
VAL_NUM NUMBER(10, 6),
334+
VAL_DP FLOAT,
335+
VAL_R FLOAT(63),
336+
VAL_DECIMAL NUMBER(10, 6),
337+
VAL_NUMERIC NUMBER(10, 6),
338+
VAL_NUM_VS NUMBER,
339+
VAL_INT NUMBER,
340+
VAL_INTEGER NUMBER,
341+
VAL_SMALLINT NUMBER,
342+
VAL_NUMBER_38_NO_SCALE NUMBER(38),
343+
VAL_NUMBER_38_SCALE_0 NUMBER(38),
344+
VAL_NUMBER_1 NUMBER(1),
345+
VAL_NUMBER_2 NUMBER(2),
346+
VAL_NUMBER_4 NUMBER(4),
347+
VAL_NUMBER_9 NUMBER(9),
348+
VAL_NUMBER_18 NUMBER(18),
349+
VAL_NUMBER_2_NEGATIVE_SCALE NUMBER(1, -1),
350+
VAL_NUMBER_4_NEGATIVE_SCALE NUMBER(2, -2),
351+
VAL_NUMBER_9_NEGATIVE_SCALE NUMBER(8, -1),
352+
VAL_NUMBER_18_NEGATIVE_SCALE NUMBER(16, -2),
353+
VAL_NUMBER_36_NEGATIVE_SCALE NUMBER(36, -2),
354+
VAL_DATE DATE,
355+
VAL_TS TIMESTAMP(6),
356+
VAL_TS_PRECISION2 TIMESTAMP(2),
357+
VAL_TS_PRECISION4 TIMESTAMP(4),
358+
VAL_TS_PRECISION9 TIMESTAMP(6),
359+
VAL_TSLTZ TIMESTAMP(6) WITH LOCAL TIME ZONE,
360+
PRIMARY KEY (ID)
361+
)
362+
PARTITION BY RANGE (VAL_NUMBER_4)
363+
(
364+
PARTITION p1 VALUES LESS THAN (1000),
365+
PARTITION p2 VALUES LESS THAN (2000),
366+
PARTITION p3 VALUES LESS THAN (3000),
367+
PARTITION p4 VALUES LESS THAN (MAXVALUE)
368+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
# You can set engine configuration here
23+
parallelism = 1
24+
job.mode = "STREAMING"
25+
checkpoint.interval = 5000
26+
}
27+
28+
source {
29+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
30+
Oracle-CDC {
31+
plugin_output = "customers"
32+
username = "system"
33+
password = "top_secret"
34+
database-names = ["ORCLCDB"]
35+
schema-names = ["DEBEZIUM"]
36+
table-names = ["ORCLCDB.DEBEZIUM.PARTITION_SOURCE_TABLE"]
37+
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
38+
source.reader.close.timeout = 120000
39+
connection.pool.size = 1
40+
debezium {
41+
database.oracle.jdbc.timezoneAsRegion = "false"
42+
}
43+
}
44+
}
45+
46+
transform {
47+
}
48+
49+
sink {
50+
Jdbc {
51+
plugin_input = "customers"
52+
driver = "oracle.jdbc.driver.OracleDriver"
53+
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
54+
user = "system"
55+
password = "top_secret"
56+
generate_sink_sql = true
57+
database = "ORCLCDB"
58+
table = "DEBEZIUM.PARTITION_SINK_TABLE"
59+
batch_size = 1
60+
primary_keys = ["ID"]
61+
connection.pool.size = 1
62+
}
63+
}

0 commit comments

Comments
 (0)