Skip to content

Commit ac358c4

Browse files
committed
[FLINK-34944] Use Incremental Source Framework in OceanBase Source Connector
1 parent 1ec3cf1 commit ac358c4

31 files changed

+3739
-208
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.oceanbase.source;
19+
20+
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
21+
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
22+
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
23+
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
24+
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
25+
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
26+
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
27+
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
28+
import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseSourceConfig;
29+
import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnection;
30+
import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnectionPoolFactory;
31+
import org.apache.flink.cdc.connectors.oceanbase.source.offset.LogMessageOffset;
32+
import org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch.OceanBaseScanFetchTask;
33+
import org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch.OceanBaseSourceFetchTaskContext;
34+
import org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch.OceanBaseStreamFetchTask;
35+
import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseSchema;
36+
import org.apache.flink.cdc.connectors.oceanbase.source.splitter.OceanBaseChunkSplitter;
37+
import org.apache.flink.util.FlinkRuntimeException;
38+
39+
import io.debezium.jdbc.JdbcConnection;
40+
import io.debezium.relational.TableId;
41+
import io.debezium.relational.history.TableChanges;
42+
43+
import java.sql.SQLException;
44+
import java.util.HashMap;
45+
import java.util.List;
46+
import java.util.Map;
47+
48+
/** The {@link JdbcDataSourceDialect} implementation for OceanBase datasource. */
49+
public class OceanBaseDialect implements JdbcDataSourceDialect {
50+
51+
private static final long serialVersionUID = 1L;
52+
53+
private transient OceanBaseSchema obSchema;
54+
55+
@Override
56+
public String getName() {
57+
return "OceanBase";
58+
}
59+
60+
@Override
61+
public OceanBaseConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
62+
String compatibleMode = ((OceanBaseSourceConfig) sourceConfig).getCompatibleMode();
63+
String quote = "mysql".equalsIgnoreCase(compatibleMode) ? "`" : "\"";
64+
OceanBaseConnection jdbcConnection =
65+
new OceanBaseConnection(
66+
sourceConfig.getDbzConnectorConfig().getJdbcConfig(),
67+
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
68+
compatibleMode,
69+
quote,
70+
quote);
71+
try {
72+
jdbcConnection.connect();
73+
} catch (Exception e) {
74+
throw new FlinkRuntimeException(e);
75+
}
76+
return jdbcConnection;
77+
}
78+
79+
@Override
80+
public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
81+
try (OceanBaseConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
82+
return new LogMessageOffset(jdbcConnection.getCurrentTimestampS());
83+
} catch (SQLException e) {
84+
throw new FlinkRuntimeException("Failed to query current timestamp", e);
85+
}
86+
}
87+
88+
@Override
89+
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
90+
try (OceanBaseConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
91+
return !"0".equals(jdbcConnection.readSystemVariable("lower_case_table_names"));
92+
} catch (SQLException e) {
93+
throw new FlinkRuntimeException("Failed to determine case sensitivity", e);
94+
}
95+
}
96+
97+
@Override
98+
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
99+
return new OceanBaseChunkSplitter(sourceConfig, this);
100+
}
101+
102+
@Override
103+
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
104+
try (OceanBaseConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
105+
return jdbcConnection.listTables(sourceConfig.getTableFilters());
106+
} catch (SQLException e) {
107+
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
108+
}
109+
}
110+
111+
@Override
112+
public Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(
113+
JdbcSourceConfig sourceConfig) {
114+
final List<TableId> capturedTableIds = discoverDataCollections(sourceConfig);
115+
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
116+
Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
117+
for (TableId tableId : capturedTableIds) {
118+
TableChanges.TableChange tableSchema = queryTableSchema(jdbc, tableId);
119+
tableSchemas.put(tableId, tableSchema);
120+
}
121+
return tableSchemas;
122+
} catch (Exception e) {
123+
throw new FlinkRuntimeException("Failed to discover table schemas", e);
124+
}
125+
}
126+
127+
@Override
128+
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
129+
return new OceanBaseConnectionPoolFactory();
130+
}
131+
132+
@Override
133+
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
134+
if (obSchema == null) {
135+
obSchema = new OceanBaseSchema();
136+
}
137+
return obSchema.getTableSchema(jdbc, tableId);
138+
}
139+
140+
@Override
141+
public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
142+
if (sourceSplitBase.isSnapshotSplit()) {
143+
return new OceanBaseScanFetchTask(sourceSplitBase.asSnapshotSplit());
144+
} else {
145+
return new OceanBaseStreamFetchTask(sourceSplitBase.asStreamSplit());
146+
}
147+
}
148+
149+
@Override
150+
public FetchTask.Context createFetchTaskContext(JdbcSourceConfig sourceConfig) {
151+
return new OceanBaseSourceFetchTaskContext(
152+
sourceConfig, this, openJdbcConnection(sourceConfig));
153+
}
154+
155+
@Override
156+
public boolean isIncludeDataCollection(JdbcSourceConfig sourceConfig, TableId tableId) {
157+
return sourceConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId);
158+
}
159+
}

Diff for: flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ public void open(final Configuration config) throws Exception {
169169
super.open(config);
170170
this.outputCollector = new OutputCollector<>();
171171
this.connectorConfig =
172-
new OceanBaseConnectorConfig(compatibleMode, serverTimeZone, debeziumProperties);
173-
this.sourceInfo = new OceanBaseSourceInfo(connectorConfig, tenantName);
172+
new OceanBaseConnectorConfig(
173+
compatibleMode, serverTimeZone, tenantName, debeziumProperties);
174+
this.sourceInfo = new OceanBaseSourceInfo(connectorConfig);
174175
}
175176

176177
@Override

0 commit comments

Comments
 (0)