Skip to content

Commit

Permalink
[Feature][OssFile Connector] Make Oss implement source factory and si…
Browse files Browse the repository at this point in the history
…nk factory (#6062)
  • Loading branch information
EricJoy2048 authored Dec 27, 2023
1 parent b32df93 commit 1a8e9b4
Show file tree
Hide file tree
Showing 58 changed files with 3,959 additions and 231 deletions.
117 changes: 117 additions & 0 deletions docs/en/connector-v2/sink/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,123 @@ sink {
}
```

### Multiple Table

For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the path.

```bash

env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "fake1"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
},
{
schema = {
table = "fake2"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
]
}
}

sink {
OssFile {
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/tmp/fake_empty/text/${table_name}"
row_delimiter = "\n"
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "lzo"
}
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
171 changes: 171 additions & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,177 @@ sink {
}
```

### Multiple Table

No need to config schema file type, eg: `orc`.

```
env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}
source {
OssFile {
tables_configs = [
{
schema = {
table = "fake01"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
},
{
schema = {
table = "fake02"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
}
]
result_table_name = "fake"
}
}
sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}
```

Need config schema file type, eg: `json`

```
env {
execution.parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}
source {
OssFile {
tables_configs = [
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake01"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
},
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake02"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
]
result_table_name = "fake"
}
}
sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ public abstract class BaseFileSourceConfig implements Serializable {
private final FileFormat fileFormat;
private final ReadStrategy readStrategy;
private final List<String> filePaths;
private final ReadonlyConfig baseFileSourceConfig;

public abstract HadoopConf getHadoopConfig();

public abstract String getPluginName();

public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
this.baseFileSourceConfig = readonlyConfig;
this.fileFormat = readonlyConfig.get(BaseSourceConfigOptions.FILE_FORMAT_TYPE);
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
this.filePaths = parseFilePaths(readonlyConfig);

this.catalogTable = parseCatalogTable(readonlyConfig);
}

Expand Down
Loading

0 comments on commit 1a8e9b4

Please sign in to comment.