Skip to content

Commit 2c5b48e

Browse files
authored
[Improve] Add batch flush in doris sink (#6024)
1 parent 04234ac commit 2c5b48e

File tree

13 files changed

+198
-360
lines changed

13 files changed

+198
-360
lines changed

docs/en/connector-v2/sink/Doris.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Version Supported
4545
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
4646
| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. |
4747
| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. |
48+
| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. |
4849
| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. |
4950

5051
## Data Type Mapping

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class DorisConfig implements Serializable {
6565
private String password;
6666
private Integer queryPort;
6767
private String tableIdentifier;
68+
private int batchSize;
6869

6970
// source option
7071
private String readField;
@@ -76,7 +77,6 @@ public class DorisConfig implements Serializable {
7677
private Integer requestRetries;
7778
private boolean deserializeArrowAsync;
7879
private int deserializeQueueSize;
79-
private int batchSize;
8080
private int execMemLimit;
8181
private boolean useOldApi;
8282

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ public interface DorisOptions {
9090
.stringType()
9191
.noDefaultValue()
9292
.withDescription("the doris password.");
93+
Option<Integer> DORIS_BATCH_SIZE =
94+
Options.key("doris.batch.size")
95+
.intType()
96+
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
97+
.withDescription("the batch size of the doris read/write.");
9398

9499
// source config options
95100
Option<String> DORIS_READ_FIELD =
@@ -139,22 +144,6 @@ public interface DorisOptions {
139144
.intType()
140145
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
141146
.withDescription("");
142-
Option<Integer> DORIS_BATCH_SIZE =
143-
Options.key("doris.batch.size")
144-
.intType()
145-
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
146-
.withDescription("");
147-
Option<Long> DORIS_EXEC_MEM_LIMIT =
148-
Options.key("doris.exec.mem.limit")
149-
.longType()
150-
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
151-
.withDescription("");
152-
Option<Boolean> SOURCE_USE_OLD_API =
153-
Options.key("source.use-old-api")
154-
.booleanType()
155-
.defaultValue(false)
156-
.withDescription(
157-
"Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
158147

159148
// sink config options
160149
Option<Boolean> SINK_ENABLE_2PC =
@@ -224,7 +213,9 @@ public interface DorisOptions {
224213
.withDescription("Create table statement template, used to create Doris table");
225214

226215
OptionRule.Builder SINK_RULE =
227-
OptionRule.builder().required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER);
216+
OptionRule.builder()
217+
.required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
218+
.optional(DORIS_BATCH_SIZE);
228219

229220
OptionRule.Builder CATALOG_RULE =
230221
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD);

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,7 @@ public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig dorisConfig
372372
HttpGet httpGet = new HttpGet(beUrl);
373373
String response = send(dorisConfig, httpGet, logger);
374374
logger.info("Backend Info:{}", response);
375-
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
376-
return backends;
375+
return parseBackendV2(response, logger);
377376
} catch (DorisConnectorException e) {
378377
logger.info(
379378
"Doris FE node {} is unavailable: {}, Request the next Doris FE node",

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java

Lines changed: 5 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,40 @@
1717

1818
package org.apache.seatunnel.connectors.doris.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.common.PrepareFailException;
24-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2521
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2622
import org.apache.seatunnel.api.serialization.Serializer;
2723
import org.apache.seatunnel.api.sink.SaveModeHandler;
2824
import org.apache.seatunnel.api.sink.SeaTunnelSink;
29-
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
3025
import org.apache.seatunnel.api.sink.SinkCommitter;
3126
import org.apache.seatunnel.api.sink.SinkWriter;
3227
import org.apache.seatunnel.api.sink.SupportSaveMode;
3328
import org.apache.seatunnel.api.table.catalog.CatalogTable;
34-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3529
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3630
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
37-
import org.apache.seatunnel.common.config.CheckConfigUtil;
38-
import org.apache.seatunnel.common.config.CheckResult;
39-
import org.apache.seatunnel.common.constants.PluginType;
4031
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
41-
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
42-
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
4332
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
4433
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
4534
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
4635
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
4736
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
4837
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
4938

50-
import com.google.auto.service.AutoService;
51-
5239
import java.io.IOException;
5340
import java.util.Collections;
5441
import java.util.List;
5542
import java.util.Optional;
5643

57-
@AutoService(SeaTunnelSink.class)
5844
public class DorisSink
5945
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo>,
6046
SupportSaveMode {
6147

62-
private DorisConfig dorisConfig;
63-
private SeaTunnelRowType seaTunnelRowType;
48+
private final DorisConfig dorisConfig;
49+
private final SeaTunnelRowType seaTunnelRowType;
6450
private String jobId;
6551

66-
private CatalogTable catalogTable;
67-
68-
public DorisSink() {}
69-
7052
public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
7153
this.dorisConfig = DorisConfig.of(config);
72-
this.catalogTable = catalogTable;
7354
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
7455
}
7556

@@ -78,63 +59,22 @@ public String getPluginName() {
7859
return "Doris";
7960
}
8061

81-
@Override
82-
public void prepare(Config pluginConfig) throws PrepareFailException {
83-
this.dorisConfig = DorisConfig.of(pluginConfig);
84-
CheckResult result =
85-
CheckConfigUtil.checkAllExists(
86-
pluginConfig,
87-
DorisOptions.FENODES.key(),
88-
DorisOptions.USERNAME.key(),
89-
DorisOptions.TABLE_IDENTIFIER.key());
90-
if (!result.isSuccess()) {
91-
throw new DorisConnectorException(
92-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
93-
String.format(
94-
"PluginName: %s, PluginType: %s, Message: %s",
95-
getPluginName(), PluginType.SINK, result.getMsg()));
96-
}
97-
if (dorisConfig.getTableIdentifier().isEmpty() && catalogTable != null) {
98-
String tableIdentifier =
99-
catalogTable.getTableId().getDatabaseName()
100-
+ "."
101-
+ catalogTable.getTableId().getTableName();
102-
dorisConfig.setTableIdentifier(tableIdentifier);
103-
}
104-
}
105-
10662
@Override
10763
public void setJobContext(JobContext jobContext) {
10864
this.jobId = jobContext.getJobId();
10965
}
11066

111-
@Override
112-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
113-
this.seaTunnelRowType = seaTunnelRowType;
114-
}
115-
116-
@Override
117-
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
118-
return this.seaTunnelRowType;
119-
}
120-
12167
@Override
12268
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter(
12369
SinkWriter.Context context) throws IOException {
124-
DorisSinkWriter dorisSinkWriter =
125-
new DorisSinkWriter(
126-
context, Collections.emptyList(), seaTunnelRowType, dorisConfig, jobId);
127-
dorisSinkWriter.initializeLoad(Collections.emptyList());
128-
return dorisSinkWriter;
70+
return new DorisSinkWriter(
71+
context, Collections.emptyList(), seaTunnelRowType, dorisConfig, jobId);
12972
}
13073

13174
@Override
13275
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
13376
SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
134-
DorisSinkWriter dorisWriter =
135-
new DorisSinkWriter(context, states, seaTunnelRowType, dorisConfig, jobId);
136-
dorisWriter.initializeLoad(states);
137-
return dorisWriter;
77+
return new DorisSinkWriter(context, states, seaTunnelRowType, dorisConfig, jobId);
13878
}
13979

14080
@Override
@@ -152,17 +92,6 @@ public Optional<Serializer<DorisCommitInfo>> getCommitInfoSerializer() {
15292
return Optional.of(new DorisCommitInfoSerializer());
15393
}
15494

155-
@Override
156-
public Optional<SinkAggregatedCommitter<DorisCommitInfo, DorisCommitInfo>>
157-
createAggregatedCommitter() throws IOException {
158-
return Optional.empty();
159-
}
160-
161-
@Override
162-
public Optional<Serializer<DorisCommitInfo>> getAggregatedCommitInfoSerializer() {
163-
return Optional.empty();
164-
}
165-
16695
@Override
16796
public Optional<SaveModeHandler> getSaveModeHandler() {
16897
return Optional.empty();

0 commit comments

Comments
 (0)