Skip to content

Commit 0b8f9de

Browse files
authored
[Improve][Connector-V2] MaxComputeSink support create partition in savemode (#8474)
1 parent d61cba2 commit 0b8f9de

File tree

4 files changed

+103
-6
lines changed

4 files changed

+103
-6
lines changed

docs/en/connector-v2/sink/Maxcompute.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ You can use the following placeholders
9191

9292
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
9393
Option introduction:
94-
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
95-
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
94+
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved. If the `partition_spec` is set, the partition will be deleted and rebuilt.
95+
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved. If the `partition_spec` is set, the partition will be created.
9696
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
9797
`IGNORE` :Ignore the treatment of the table
9898

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,27 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
271271
}
272272
}
273273

274+
public void createPartition(TablePath tablePath, PartitionSpec partitionSpec) {
275+
try {
276+
Odps odps = getOdps(tablePath.getDatabaseName());
277+
Table odpsTable = odps.tables().get(tablePath.getTableName());
278+
odpsTable.createPartition(partitionSpec, true);
279+
} catch (Exception e) {
280+
throw new CatalogException("create partition error", e);
281+
}
282+
}
283+
284+
public void truncatePartition(TablePath tablePath, PartitionSpec partitionSpec) {
285+
try {
286+
Odps odps = getOdps(tablePath.getDatabaseName());
287+
Table odpsTable = odps.tables().get(tablePath.getTableName());
288+
odpsTable.deletePartition(partitionSpec, true);
289+
odpsTable.createPartition(partitionSpec, true);
290+
} catch (Exception e) {
291+
throw new CatalogException("create partition error", e);
292+
}
293+
}
294+
274295
@Override
275296
public boolean isExistsData(TablePath tablePath) {
276297
throw new UnsupportedOperationException();
@@ -280,7 +301,15 @@ public boolean isExistsData(TablePath tablePath) {
280301
public void executeSql(TablePath tablePath, String sql) {
281302
try {
282303
Odps odps = getOdps(tablePath.getDatabaseName());
283-
SQLTask.run(odps, sql).waitForSuccess();
304+
String[] sqls = sql.split(";");
305+
for (String s : sqls) {
306+
if (!s.trim().isEmpty()) {
307+
if (!s.trim().endsWith(";")) {
308+
s = s.trim() + ";";
309+
}
310+
SQLTask.run(odps, s).waitForSuccess();
311+
}
312+
}
284313
} catch (OdpsException e) {
285314
throw new CatalogException("execute sql error", e);
286315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.sink.DataSaveMode;
22+
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
23+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
24+
import org.apache.seatunnel.api.table.catalog.Catalog;
25+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
26+
import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
27+
28+
import org.apache.commons.lang3.StringUtils;
29+
30+
import com.aliyun.odps.PartitionSpec;
31+
32+
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
33+
34+
public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {
35+
36+
private final ReadonlyConfig readonlyConfig;
37+
38+
public MaxComputeSaveModeHandler(
39+
SchemaSaveMode schemaSaveMode,
40+
DataSaveMode dataSaveMode,
41+
Catalog catalog,
42+
CatalogTable catalogTable,
43+
String customSql,
44+
ReadonlyConfig readonlyConfig) {
45+
super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
46+
this.readonlyConfig = readonlyConfig;
47+
}
48+
49+
@Override
50+
protected void createSchemaWhenNotExist() {
51+
super.createSchemaWhenNotExist();
52+
if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
53+
((MaxComputeCatalog) catalog)
54+
.createPartition(
55+
tablePath, new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
56+
}
57+
}
58+
59+
@Override
60+
protected void recreateSchema() {
61+
super.recreateSchema();
62+
if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
63+
((MaxComputeCatalog) catalog)
64+
.createPartition(
65+
tablePath, new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
66+
}
67+
}
68+
}

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2222
import org.apache.seatunnel.api.sink.DataSaveMode;
23-
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
2423
import org.apache.seatunnel.api.sink.SaveModeHandler;
2524
import org.apache.seatunnel.api.sink.SinkWriter;
2625
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
@@ -94,12 +93,13 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
9493
}
9594

9695
return Optional.of(
97-
new DefaultSaveModeHandler(
96+
new MaxComputeSaveModeHandler(
9897
readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
9998
dataSaveMode,
10099
catalog,
101100
catalogTable,
102-
readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
101+
readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
102+
readonlyConfig));
103103
}
104104

105105
@Override

0 commit comments

Comments
 (0)