Skip to content

Commit

Permalink
[Improve][Connector-V2] Remove hard code iceberg table format version (
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Aug 29, 2024
1 parent 138d2a4 commit f49b263
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 146 deletions.
21 changes: 0 additions & 21 deletions docs/en/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,6 @@ spark {
}
```

## How do I specify a different JDK version for SeaTunnel on YARN?

For example, if you want to set the JDK version to JDK8, there are two cases:

- The YARN cluster has deployed JDK8, but the default JDK is not JDK8. Add two configurations to the SeaTunnel config file:

```
env {
...
spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
...
}
```
- YARN cluster does not deploy JDK8. At this time, start SeaTunnel attached with JDK8. For detailed operations, see:
https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html

## What should I do if OOM always appears when running SeaTunnel in Spark local[*] mode?

If you run in local mode, you need to modify the `start-seatunnel.sh` startup script. After `spark-submit`, add a parameter `--driver-memory 4g` . Under normal circumstances, local mode is not used in the production environment. Therefore, this parameter generally does not need to be set during On YARN. See: [Application Properties](https://spark.apache.org/docs/latest/configuration.html#application-properties) for details.
Expand Down Expand Up @@ -334,10 +317,6 @@ spark-submit --verbose
...
```

## How do I use SeaTunnel to synchronize data across HDFS clusters?

Just configure hdfs-site.xml properly. Refer to: https://www.cnblogs.com/suanec/p/7828139.html.

## I want to learn the source code of SeaTunnel. Where should I start?

SeaTunnel has a completely abstract and structured code implementation, and many people have chosen SeaTunnel As a way to learn Spark. You can learn the source code from the main program entry: SeaTunnel.java
Expand Down
21 changes: 0 additions & 21 deletions docs/zh/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,6 @@ spark {
}
```

## 如何为 YARN 上的 SeaTunnel 指定不同的 JDK 版本?

例如要设置JDK版本为JDK8,有两种情况:

- YARN集群已部署JDK8,但默认JDK不是JDK8。 在 SeaTunnel 配置文件中添加两个配置:

```
env {
...
spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
...
}
```
- YARN集群未部署JDK8。 此时,启动附带JDK8的SeaTunnel。 详细操作参见:
https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html

## Spark local[*]模式运行SeaTunnel时总是出现OOM怎么办?

如果以本地模式运行,则需要修改`start-seatunnel.sh`启动脚本。 在 `spark-submit` 之后添加参数 `--driver-memory 4g` 。 一般情况下,生产环境中不使用本地模式。 因此,On YARN时一般不需要设置该参数。 有关详细信息,请参阅:[应用程序属性](https://spark.apache.org/docs/latest/configuration.html#application-properties)
Expand Down Expand Up @@ -335,10 +318,6 @@ spark-submit --verbose
...
```

## 如何使用SeaTunnel跨HDFS集群同步数据?

只需正确配置 hdfs-site.xml 即可。 参考:https://www.cnblogs.com/suanec/p/7828139.html。

## 我想学习SeaTunnel的源代码。 我应该从哪里开始?

SeaTunnel 拥有完全抽象、结构化的代码实现,很多人都选择 SeaTunnel 作为学习 Spark 的方式。 您可以从主程序入口了解源代码:SeaTunnel.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,20 @@ public class IcebergCatalogLoader implements Serializable {
private static final long serialVersionUID = -6003040601422350869L;
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
private CommonConfig config;
private final CommonConfig config;

public IcebergCatalogLoader(CommonConfig config) {
this.config = config;
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
// When using the SeaTunnel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(IcebergCatalogLoader.class.getClassLoader());
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(), loadHadoopConfig(config));
}

/**
* Loading Hadoop configuration through reflection
*
* @param config
* @return
*/
/** Loading Hadoop configuration through reflection */
public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@

@Slf4j
public class IcebergCatalog implements Catalog {
private String catalogName;
private ReadonlyConfig readonlyConfig;
private IcebergCatalogLoader icebergCatalogLoader;
private final String catalogName;
private final ReadonlyConfig readonlyConfig;
private final IcebergCatalogLoader icebergCatalogLoader;
private org.apache.iceberg.catalog.Catalog catalog;

public IcebergCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
Expand Down Expand Up @@ -224,22 +224,21 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) {
List<Types.NestedField> columns = icebergTable.schema().columns();
TableSchema.Builder builder = TableSchema.builder();
columns.stream()
.forEach(
nestedField -> {
String name = nestedField.name();
SeaTunnelDataType<?> seaTunnelType =
SchemaUtils.toSeaTunnelType(name, nestedField.type());
PhysicalColumn physicalColumn =
PhysicalColumn.of(
name,
seaTunnelType,
(Long) null,
true,
null,
nestedField.doc());
builder.column(physicalColumn);
});
columns.forEach(
nestedField -> {
String name = nestedField.name();
SeaTunnelDataType<?> seaTunnelType =
SchemaUtils.toSeaTunnelType(name, nestedField.type());
PhysicalColumn physicalColumn =
PhysicalColumn.of(
name,
seaTunnelType,
(Long) null,
true,
null,
nestedField.doc());
builder.column(physicalColumn);
});

List<String> partitionKeys =
icebergTable.spec().fields().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ private NameMapping createNameMapping(Table table) {
return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
}

public Record convert(Object row, SeaTunnelDataType rowType) {
public Record convert(Object row, SeaTunnelDataType<?> rowType) {
return convertStructValue(row, rowType, tableSchema.asStruct(), -1, null);
}

public Record convert(Object row, SeaTunnelDataType rowType, SchemaChangeWrapper wrapper) {
public Record convert(Object row, SeaTunnelDataType<?> rowType, SchemaChangeWrapper wrapper) {
return convertStructValue(row, rowType, tableSchema.asStruct(), -1, wrapper);
}

protected GenericRecord convertStructValue(
Object value,
SeaTunnelDataType fromType,
SeaTunnelDataType<?> fromType,
Types.StructType schema,
int parentFieldId,
SchemaChangeWrapper wrapper) {
Expand All @@ -120,15 +120,7 @@ protected GenericRecord convertStructValue(
}
}

/**
* Convert RowType
*
* @param row
* @param fromType
* @param schema
* @param structFieldId
* @return
*/
/** Convert RowType */
private GenericRecord convertToStruct(
SeaTunnelRow row,
SeaTunnelRowType fromType,
Expand Down Expand Up @@ -179,7 +171,7 @@ private GenericRecord convertToStruct(

public Object convertValue(
Object value,
SeaTunnelDataType fromType,
SeaTunnelDataType<?> fromType,
Type type,
int fieldId,
SchemaChangeWrapper wrapper) {
Expand Down Expand Up @@ -252,7 +244,7 @@ private Map<String, Types.NestedField> createStructNameMap(Types.StructType sche

protected List<Object> convertListValue(
Object value,
SeaTunnelDataType fromType,
SeaTunnelDataType<?> fromType,
Types.ListType type,
SchemaChangeWrapper wrapper) {
Preconditions.checkArgument(value.getClass().isArray());
Expand All @@ -269,7 +261,7 @@ protected List<Object> convertListValue(

protected Map<Object, Object> convertMapValue(
Object value,
SeaTunnelDataType fromType,
SeaTunnelDataType<?> fromType,
Types.MapType type,
SchemaChangeWrapper wrapper) {
Preconditions.checkArgument(value instanceof Map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class IcebergSink
SupportSaveMode,
SupportMultiTableSink {
private static String PLUGIN_NAME = "Iceberg";
private SinkConfig config;
private ReadonlyConfig readonlyConfig;
private CatalogTable catalogTable;
private final SinkConfig config;
private final ReadonlyConfig readonlyConfig;
private final CatalogTable catalogTable;

public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.readonlyConfig = pluginConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ public class IcebergSinkWriter
implements SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState>,
SupportMultiTableSinkWriter<Void> {
private SeaTunnelRowType rowType;
private SinkConfig config;
private IcebergTableLoader icebergTableLoader;
private final SinkConfig config;
private final IcebergTableLoader icebergTableLoader;
private RecordWriter writer;
private IcebergFilesCommitter filesCommitter;
private List<WriteResult> results = Lists.newArrayList();
private final IcebergFilesCommitter filesCommitter;
private final List<WriteResult> results = Lists.newArrayList();
private String commitUser = UUID.randomUUID().toString();
private long checkpointId;

private final DataTypeChangeEventHandler dataTypeChangeEventHandler;

Expand All @@ -77,7 +76,6 @@ public IcebergSinkWriter(
tryCreateRecordWriter();
if (Objects.nonNull(states) && !states.isEmpty()) {
this.commitUser = states.get(0).getCommitUser();
this.checkpointId = states.get(0).getCheckpointId();
preCommit(states);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class IcebergRecordWriter implements RecordWriter {
private final List<WriteResult> writerResults;
private TaskWriter<Record> writer;
private RowConverter recordConverter;
private IcebergWriterFactory writerFactory;
private final IcebergWriterFactory writerFactory;

public IcebergRecordWriter(Table table, IcebergWriterFactory writerFactory, SinkConfig config) {
this.config = config;
Expand Down Expand Up @@ -122,12 +122,7 @@ private void changeColumn(
}
}

/**
* apply schema update
*
* @param updates
* @return
*/
/** apply schema update */
private void applySchemaUpdate(SchemaChangeWrapper updates) {
// complete the current file
flush();
Expand Down Expand Up @@ -169,7 +164,4 @@ private void flush() {
table.spec().partitionType()));
writer = null;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.extern.slf4j.Slf4j;

import java.util.List;
Expand All @@ -58,7 +55,6 @@

@Slf4j
public class IcebergWriterFactory {
private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class);
private final IcebergTableLoader tableLoader;
private final SinkConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public class IcebergSource

private static final long serialVersionUID = 4343414808223919870L;

private SourceConfig sourceConfig;
private Schema tableSchema;
private Schema projectedSchema;
private SeaTunnelRowType seaTunnelRowType;
private final SourceConfig sourceConfig;
private final Schema tableSchema;
private final Schema projectedSchema;
private final SeaTunnelRowType seaTunnelRowType;
private JobContext jobContext;
private CatalogTable catalogTable;
private final CatalogTable catalogTable;

public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) {
this.sourceConfig = SourceConfig.loadConfig(config);
Expand Down
Loading

0 comments on commit f49b263

Please sign in to comment.