From a7ca51b58593fc34758e0d713a85b131f99fd5fe Mon Sep 17 00:00:00 2001 From: cloud456 <66239792+cloud456@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:31:08 +0800 Subject: [PATCH] [Feature][Connector-V2][Tablestore] Support Source connector for Tablestore #7448 (#7467) --- docs/en/connector-v2/source/Tablestore.md | 102 ++++++++++ plugin-mapping.properties | 1 + .../tablestore/config/TablestoreOptions.java | 19 ++ .../DefaultSeaTunnelRowDeserializer.java | 38 ++++ .../serialize/SeaTunnelRowDeserializer.java | 26 +++ .../tablestore/source/TableStoreDBSource.java | 102 ++++++++++ .../source/TableStoreDBSourceReader.java | 175 ++++++++++++++++++ .../source/TableStoreDBSourceSplit.java | 38 ++++ .../TableStoreDBSourceSplitEnumerator.java | 166 +++++++++++++++++ .../source/TableStoreDBSourceState.java | 34 ++++ .../source/TableStoreDbSourceFactory.java | 64 +++++++ .../source/TableStoreProcessor.java | 95 ++++++++++ 12 files changed, 860 insertions(+) create mode 100644 docs/en/connector-v2/source/Tablestore.md create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java diff --git a/docs/en/connector-v2/source/Tablestore.md b/docs/en/connector-v2/source/Tablestore.md new file mode 100644 index 00000000000..8e0d1aeebc7 --- /dev/null +++ b/docs/en/connector-v2/source/Tablestore.md @@ -0,0 +1,102 @@ +# Tablestore + +> Tablestore source connector + +## Description + +Read data from Alicloud Tablestoreļ¼Œsupport full and CDC. + + +## Key features + +- [ ] [batch](../../concept/connector-v2-features.md) +- [X] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------------|--------|----------|---------------| +| end_point | string | yes | - | +| instance_name | string | yes | - | +| access_key_id | string | yes | - | +| access_key_secret | string | yes | - | +| table | string | yes | - | +| primary_keys | array | yes | - | +| schema | config | yes | - | + + +### end_point [string] + +The endpoint of Tablestore. + +### instance_name [string] + +The intance name of Tablestore. + +### access_key_id [string] + +The access id of Tablestore. + +### access_key_secret [string] + +The access secret of Tablestore. + +### table [string] + +The table name of Tablestore. + +### primary_keys [array] + +The primarky key of table,just add a unique primary key. + +### schema [Config] + + + +## Example + +```bash +env { + parallelism = 1 + job.mode = "STREAMING" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Tablestore { + end_point = "https://****.cn-zhangjiakou.tablestore.aliyuncs.com" + instance_name = "****" + access_key_id="***************2Ag5" + access_key_secret="***********2Dok" + table="test" + primary_keys=["id"] + schema={ + fields { + id = string + name = string + } + } + } +} + + +sink { + MongoDB{ + uri = "mongodb://localhost:27017" + database = "test" + collection = "test" + primary-key = ["id"] + schema = { + fields { + id = string + name = string + } + } + } +} +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index d77b70e5e84..ece3bd0c77c 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -85,6 +85,7 @@ seatunnel.sink.InfluxDB = connector-influxdb seatunnel.source.GoogleSheets = connector-google-sheets seatunnel.sink.GoogleFirestore = connector-google-firestore seatunnel.sink.Tablestore = connector-tablestore +seatunnel.source.Tablestore = connector-tablestore seatunnel.source.Lemlist = connector-http-lemlist seatunnel.source.Klaviyo = connector-http-klaviyo seatunnel.sink.Slack = connector-slack diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index 7b2aa6bae67..be121818932 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -19,11 +19,14 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + import lombok.AllArgsConstructor; import lombok.Data; import java.io.Serializable; import java.util.List; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; @@ -45,6 +48,8 @@ public class TablestoreOptions implements Serializable { public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue()); + public TablestoreOptions() {} + public TablestoreOptions(Config config) { this.endpoint = config.getString(TablestoreConfig.END_POINT.key()); this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME.key()); @@ -57,4 +62,18 @@ public TablestoreOptions(Config config) { this.batchSize = config.getInt(BATCH_SIZE.key()); } } + + public static TablestoreOptions of(ReadonlyConfig config) { + Map map = config.getSourceMap(); + TablestoreOptions tablestoreOptions = new TablestoreOptions(); + tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT)); + tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME)); + tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID)); + tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET)); + tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE)); + List keys = (List) map.get(TablestoreConfig.PRIMARY_KEYS.key()); + + tablestoreOptions.setPrimaryKeys(keys); + return tablestoreOptions; + } } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..9bdb060a49d --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.alicloud.openservices.tablestore.model.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer { + + @Override + public SeaTunnelRow deserialize(StreamRecord r) { + List fields = new ArrayList<>(); + r.getColumns() + .forEach( + k -> { + fields.add(k.getColumn().getValue()); + }); + return new SeaTunnelRow(fields.toArray()); + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..44a2560693f --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.alicloud.openservices.tablestore.model.StreamRecord; + +public interface SeaTunnelRowDeserializer { + + SeaTunnelRow deserialize(StreamRecord streamRecord); +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java new file mode 100644 index 00000000000..85c0062ed32 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceReader.Context; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class TableStoreDBSource + implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { + + private TablestoreOptions tablestoreOptions; + private SeaTunnelRowType typeInfo; + private JobContext jobContext; + + @Override + public String getPluginName() { + return "Tablestore"; + } + + @Override + public List getProducedCatalogTables() { + return SeaTunnelSource.super.getProducedCatalogTables(); + } + + public TableStoreDBSource(ReadonlyConfig config) { + this.tablestoreOptions = TablestoreOptions.of(config); + CatalogTableUtil.buildWithConfig(config); + this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType(); + } + + @Override + public Boundedness getBoundedness() { + return JobMode.BATCH.equals(jobContext.getJobMode()) + ? Boundedness.BOUNDED + : Boundedness.UNBOUNDED; + } + + @Override + public SourceReader createReader(Context readerContext) + throws Exception { + return new TableStoreDBSourceReader(readerContext, tablestoreOptions, typeInfo); + } + + @Override + public SourceSplitEnumerator createEnumerator( + org.apache.seatunnel.api.source.SourceSplitEnumerator.Context + enumeratorContext) + throws Exception { + return new TableStoreDBSourceSplitEnumerator(enumeratorContext, tablestoreOptions); + } + + @Override + public SourceSplitEnumerator + restoreEnumerator( + org.apache.seatunnel.api.source.SourceSplitEnumerator.Context< + TableStoreDBSourceSplit> + enumeratorContext, + TableStoreDBSourceState checkpointState) + throws Exception { + return new TableStoreDBSourceSplitEnumerator( + enumeratorContext, tablestoreOptions, checkpointState); + } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java new file mode 100644 index 00000000000..eefd4aae031 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java @@ -0,0 +1,175 @@ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import com.alicloud.openservices.tablestore.SyncClient; +import com.alicloud.openservices.tablestore.TunnelClient; +import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest; +import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse; +import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest; +import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelResponse; +import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest; +import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelResponse; +import com.alicloud.openservices.tablestore.model.tunnel.TunnelType; +import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker; +import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; + +@Slf4j +public class TableStoreDBSourceReader + implements SourceReader { + + protected SourceReader.Context context; + protected TablestoreOptions tablestoreOptions; + protected SeaTunnelRowType seaTunnelRowType; + Queue pendingSplits = new ConcurrentLinkedDeque<>(); + private SyncClient client; + private volatile boolean noMoreSplit; + private TunnelClient tunnelClient; + + public TableStoreDBSourceReader( + SourceReader.Context context, + TablestoreOptions options, + SeaTunnelRowType seaTunnelRowType) { + + this.context = context; + this.tablestoreOptions = options; + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public void open() throws Exception { + client = + new SyncClient( + tablestoreOptions.getEndpoint(), + tablestoreOptions.getAccessKeyId(), + tablestoreOptions.getAccessKeySecret(), + tablestoreOptions.getInstanceName()); + tunnelClient = + new TunnelClient( + tablestoreOptions.getEndpoint(), + tablestoreOptions.getAccessKeyId(), + tablestoreOptions.getAccessKeySecret(), + tablestoreOptions.getInstanceName()); + } + + @Override + public void close() throws IOException { + tunnelClient.shutdown(); + client.shutdown(); + } + + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + TableStoreDBSourceSplit split = pendingSplits.poll(); + if (Objects.nonNull(split)) { + read(split, output); + } + /*if (split == null) { + log.info( + "TableStore Source Reader [{}] waiting for splits", + context.getIndexOfSubtask()); + }*/ + if (noMoreSplit) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded tablestore source"); + context.signalNoMoreElement(); + Thread.sleep(2000L); + } else { + Thread.sleep(1000L); + } + } + } + + private void read(TableStoreDBSourceSplit split, Collector output) { + String tunnelId = getTunel(split); + TableStoreProcessor processor = + new TableStoreProcessor(split.getTableName(), split.getPrimaryKey(), output); + TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(processor); + TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, workerConfig); + try { + worker.connectAndWorking(); + } catch (Exception e) { + log.error("Start OTS tunnel failed.", e); + worker.shutdown(); + } + } + + public String getTunel(TableStoreDBSourceSplit split) { + deleteTunel(split); + String tunnelId = null; + String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + split.getSplitId(); + + try { + DescribeTunnelRequest drequest = new DescribeTunnelRequest("test", tunnelName); + DescribeTunnelResponse dresp = tunnelClient.describeTunnel(drequest); + tunnelId = dresp.getTunnelInfo().getTunnelId(); + } catch (Exception be) { + CreateTunnelRequest crequest = + new CreateTunnelRequest( + split.getTableName(), tunnelName, TunnelType.valueOf("BaseAndStream")); + CreateTunnelResponse cresp = tunnelClient.createTunnel(crequest); + tunnelId = cresp.getTunnelId(); + } + log.info("Tunnel found, Id: " + tunnelId); + return tunnelId; + } + + public void deleteTunel(TableStoreDBSourceSplit split) { + String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + split.getSplitId(); + try { + DeleteTunnelRequest drequest = + new DeleteTunnelRequest(split.getTableName(), tunnelName); + DeleteTunnelResponse dresp = tunnelClient.deleteTunnel(drequest); + log.info("Tunnel has been deleted: " + dresp.toString()); + } catch (Exception be) { + log.warn("Tunnel deletion failed due to not found: " + tunnelName); + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(pendingSplits); + } + + @Override + public void addSplits(List splits) { + this.pendingSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + log.info("Reader [{}] received noMoreSplit event.", context.getIndexOfSubtask()); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java new file mode 100644 index 00000000000..24328b0a6f9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@AllArgsConstructor +@Getter +@Setter +public class TableStoreDBSourceSplit implements SourceSplit { + + private Integer splitId; + private String tableName; + private String primaryKey; + + @Override + public String splitId() { + return splitId.toString(); + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java new file mode 100644 index 00000000000..3dd58b7e69b --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class TableStoreDBSourceSplitEnumerator + implements SourceSplitEnumerator { + + private final SourceSplitEnumerator.Context enumeratorContext; + private final Map> pendingSplits; + private final TablestoreOptions tablestoreOptions; + + private final Object stateLock = new Object(); + private volatile boolean shouldEnumerate; + + /** + * @param enumeratorContext + * @param tablestoreOptions + */ + public TableStoreDBSourceSplitEnumerator( + Context enumeratorContext, + TablestoreOptions tablestoreOptions) { + this(enumeratorContext, tablestoreOptions, null); + } + + public TableStoreDBSourceSplitEnumerator( + Context enumeratorContext, + TablestoreOptions tablestoreOptions, + TableStoreDBSourceState sourceState) { + this.enumeratorContext = enumeratorContext; + this.tablestoreOptions = tablestoreOptions; + this.pendingSplits = new HashMap<>(); + this.shouldEnumerate = sourceState == null; + if (sourceState != null) { + this.shouldEnumerate = sourceState.isShouldEnumerate(); + this.pendingSplits.putAll(sourceState.getPendingSplits()); + } + } + + @Override + public void open() {} + + @Override + public void run() throws Exception { + Set readers = enumeratorContext.registeredReaders(); + if (shouldEnumerate) { + Set newSplits = getTableStoreDBSourceSplit(); + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } + assignSplit(readers); + } + } + + private void assignSplit(Set readers) { + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", assignmentForReader, reader); + try { + enumeratorContext.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplits.put(reader, assignmentForReader); + } + } + } + } + + private Set getTableStoreDBSourceSplit() { + + Set allSplit = new HashSet<>(); + String tables = tablestoreOptions.getTable(); + String[] tableArr = tables.split(","); + for (int i = 0; i < tableArr.length; i++) { + allSplit.add( + new TableStoreDBSourceSplit( + i, tableArr[i], tablestoreOptions.getPrimaryKeys().get(i))); + } + return allSplit; + } + + private void addPendingSplit(Collection splits) { + int readerCount = enumeratorContext.currentParallelism(); + for (TableStoreDBSourceSplit split : splits) { + int ownerReader = split.getSplitId() % readerCount; + pendingSplits.computeIfAbsent(ownerReader, k -> new ArrayList<>()).add(split); + } + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'close'"); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + log.debug("Add back splits {} to tablestore.", splits); + if (!splits.isEmpty()) { + addPendingSplit(splits); + assignSplit(Collections.singleton(subtaskId)); + enumeratorContext.signalNoMoreSplits(subtaskId); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplits.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to TablestoreSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + assignSplit(Collections.singleton(subtaskId)); + } + } + + @Override + public TableStoreDBSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new TableStoreDBSourceState(shouldEnumerate, pendingSplits); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java new file mode 100644 index 00000000000..05a73a63101 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Getter +@Setter +@AllArgsConstructor +public class TableStoreDBSourceState implements Serializable { + + private boolean shouldEnumerate; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java new file mode 100644 index 00000000000..f93ae4bfe32 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +@AutoService(Factory.class) +public class TableStoreDbSourceFactory implements TableSourceFactory { + + @Override + public String factoryIdentifier() { + return "Tablestore"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + TablestoreConfig.END_POINT, + TablestoreConfig.INSTANCE_NAME, + TablestoreConfig.ACCESS_KEY_ID, + TablestoreConfig.ACCESS_KEY_SECRET, + TablestoreConfig.TABLE, + TablestoreConfig.PRIMARY_KEYS) + .build(); + } + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) new TableStoreDBSource(context.getOptions()); + } + + @Override + public Class getSourceClass() { + return TableStoreDBSource.class; + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java new file mode 100644 index 00000000000..ba5334a85eb --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.tablestore.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowDeserializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alicloud.openservices.tablestore.model.StreamRecord; +import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor; +import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TableStoreProcessor implements IChannelProcessor { + private String tableName = null; + private String primaryKey = null; + private Collector output = null; + protected SeaTunnelRowDeserializer seaTunnelRowDeserializer; + private static final Logger log = LoggerFactory.getLogger(TableStoreProcessor.class); + + public TableStoreProcessor( + String tableName, String primaryKey, Collector output) { + this.tableName = tableName; + this.primaryKey = primaryKey; + this.output = output; + } + + @Override + public void process(ProcessRecordsInput input) { + log.info("Default record processor, would print records count"); + + log.info( + String.format( + "Process %d records, NextToken: %s", + input.getRecords().size(), input.getNextToken())); + + for (StreamRecord r : input.getRecords()) { + try { + List fields = new ArrayList<>(); + Arrays.stream(r.getPrimaryKey().getPrimaryKeyColumns()) + .forEach( + k -> { + fields.add(k.getValue().toString()); + }); + r.getColumns() + .forEach( + k -> { + fields.add(k.getColumn().getValue().toString()); + }); + SeaTunnelRow row = new SeaTunnelRow(fields.toArray()); + row.setTableId(tableName); + switch ((r.getRecordType())) { + case PUT: + row.setRowKind(RowKind.INSERT); + break; + case UPDATE: + row.setRowKind(RowKind.UPDATE_AFTER); + break; + case DELETE: + row.setRowKind(RowKind.DELETE); + break; + } + output.collect(row); + } catch (Exception e) { + log.error("send to target failed with record: " + r.toString(), e); + } + } + } + + @Override + public void shutdown() { + log.info("process shutdown du to finished for table: " + tableName); + } +}