Skip to content

Commit

Permalink
[improve] add StarRocks options (#8639)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcb-xiaobo authored Feb 11, 2025
1 parent f1c313e commit da8d9cb
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ private Set<String> buildWhiteList() {
whiteList.add("IoTDBSinkOptions");
whiteList.add("EasysearchSourceOptions");
whiteList.add("RabbitmqSinkOptions");
whiteList.add("StarRocksSourceOptions");
whiteList.add("IcebergSourceOptions");
whiteList.add("HbaseSourceOptions");
whiteList.add("PaimonSourceOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceOptions;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class StarRocksCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
public static final String IDENTIFIER = StarRocksSinkOptions.CONNECTOR_IDENTITY;

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
return new StarRocksCatalog(
catalogName,
options.get(StarRocksOptions.USERNAME),
options.get(StarRocksOptions.PASSWORD),
options.get(StarRocksOptions.BASE_URL),
options.get(StarRocksSourceOptions.USERNAME),
options.get(StarRocksSourceOptions.PASSWORD),
options.get(StarRocksSinkOptions.BASE_URL),
options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
}

Expand All @@ -50,9 +49,9 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(StarRocksOptions.BASE_URL)
.required(StarRocksOptions.USERNAME)
.required(StarRocksOptions.PASSWORD)
.required(StarRocksSinkOptions.BASE_URL)
.required(StarRocksSourceOptions.USERNAME)
.required(StarRocksSourceOptions.PASSWORD)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public static SinkConfig of(ReadonlyConfig config) {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
sinkConfig.setJdbcUrl(config.get(StarRocksSinkOptions.BASE_URL));
config.getOptional(StarRocksSinkOptions.USERNAME).ifPresent(sinkConfig::setUsername);
config.getOptional(StarRocksSinkOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Getter;
Expand All @@ -29,22 +27,20 @@

@Setter
@Getter
public class SourceConfig extends CommonConfig {

private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
public class SourceConfig extends StarRocksConfig {

public SourceConfig(ReadonlyConfig config) {
super(config);
this.maxRetries = config.get(MAX_RETRIES);
this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
this.scanFilter = config.get(SCAN_FILTER);
this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
this.batchRows = config.get(SCAN_BATCH_ROWS);
this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
this.memLimit = config.get(SCAN_MEM_LIMIT);

String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
this.maxRetries = config.get(StarRocksSourceOptions.MAX_RETRIES);
this.requestTabletSize = config.get(StarRocksSourceOptions.QUERY_TABLET_SIZE);
this.scanFilter = config.get(StarRocksSourceOptions.SCAN_FILTER);
this.connectTimeoutMs = config.get(StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT);
this.batchRows = config.get(StarRocksSourceOptions.SCAN_BATCH_ROWS);
this.keepAliveMin = config.get(StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN);
this.queryTimeoutSec = config.get(StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC);
this.memLimit = config.get(StarRocksSourceOptions.SCAN_MEM_LIMIT);

String prefix = StarRocksSourceOptions.STARROCKS_SCAN_CONFIG_PREFIX.key();
config.toMap()
.forEach(
(key, value) -> {
Expand All @@ -55,64 +51,13 @@ public SourceConfig(ReadonlyConfig config) {
});
}

public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
.intType()
.defaultValue(3)
.withDescription("number of retry requests sent to StarRocks");

public static final Option<Integer> QUERY_TABLET_SIZE =
Options.key("request_tablet_size")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription("The number of Tablets corresponding to an Partition");

public static final Option<String> SCAN_FILTER =
Options.key("scan_filter").stringType().defaultValue("").withDescription("SQL filter");

public static final Option<Integer> SCAN_CONNECT_TIMEOUT =
Options.key("scan_connect_timeout_ms")
.intType()
.defaultValue(1000)
.withDescription("scan connect timeout");

public static final Option<Integer> SCAN_BATCH_ROWS =
Options.key("scan_batch_rows")
.intType()
.defaultValue(1024)
.withDescription("scan batch rows");

public static final Option<Integer> SCAN_KEEP_ALIVE_MIN =
Options.key("scan_keep_alive_min")
.intType()
.defaultValue(10)
.withDescription("Max keep alive time min");

public static final Option<Integer> SCAN_QUERY_TIMEOUT_SEC =
Options.key("scan_query_timeout_sec")
.intType()
.defaultValue(3600)
.withDescription("Query timeout for a single query");

public static final Option<Long> SCAN_MEM_LIMIT =
Options.key("scan_mem_limit")
.longType()
.defaultValue(DEFAULT_SCAN_MEM_LIMIT)
.withDescription("Memory byte limit for a single query");

public static final Option<String> STARROCKS_SCAN_CONFIG_PREFIX =
Options.key("scan.params.")
.stringType()
.noDefaultValue()
.withDescription("The parameter of the scan data from be");

private int maxRetries = MAX_RETRIES.defaultValue();
private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = SCAN_FILTER.defaultValue();
private long memLimit = SCAN_MEM_LIMIT.defaultValue();
private int queryTimeoutSec = SCAN_QUERY_TIMEOUT_SEC.defaultValue();
private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
private int requestTabletSize = StarRocksSourceOptions.QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = StarRocksSourceOptions.SCAN_FILTER.defaultValue();
private long memLimit = StarRocksSourceOptions.SCAN_MEM_LIMIT.defaultValue();
private int queryTimeoutSec = StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC.defaultValue();
private int keepAliveMin = StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
private Map<String, String> sourceOptionProps = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.io.Serializable;
import java.util.List;

@Getter
@ToString
@AllArgsConstructor
public class CommonConfig implements Serializable {

public class StarRocksBaseOptions implements Serializable {
public static final String CONNECTOR_IDENTITY = "StarRocks";

public static final Option<List<String>> NODE_URLS =
Options.key("nodeUrls")
.listType()
Expand Down Expand Up @@ -65,18 +54,4 @@ public class CommonConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("StarRocks user password");

private List<String> nodeUrls;
private String username;
private String password;
private String database;
private String table;

public CommonConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(NODE_URLS);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.database = config.get(DATABASE);
this.table = config.get(TABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.io.Serializable;
import java.util.List;

@Getter
@ToString
@AllArgsConstructor
public class StarRocksConfig implements Serializable {

private List<String> nodeUrls;
private String username;
private String password;
private String database;
private String table;

public StarRocksConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(StarRocksBaseOptions.NODE_URLS);
this.username = config.get(StarRocksBaseOptions.USERNAME);
this.password = config.get(StarRocksBaseOptions.PASSWORD);
this.database = config.get(StarRocksBaseOptions.DATABASE);
this.table = config.get(StarRocksBaseOptions.TABLE);
}
}

This file was deleted.

Loading

0 comments on commit da8d9cb

Please sign in to comment.