Skip to content

Commit

Permalink
feat: write iceberg rowData for iceberg writer
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Jan 30, 2025
1 parent 99d9650 commit 6750f5e
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSinkV2;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;

import java.io.Serializable;
Expand Down Expand Up @@ -53,8 +53,8 @@ public IcebergDataSink(

@Override
public EventSinkProvider getEventSinkProvider() {
IcebergEventSink icebergEventSink =
new IcebergEventSink(
IcebergEventSinkV2 icebergEventSink =
new IcebergEventSinkV2(
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
return FlinkSinkProvider.of(icebergEventSink);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;

import org.apache.flink.table.data.GenericRowData;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;

public class IcebergEvent {

// Identifier for the iceberg table to be written.
TableIdentifier tableId;

// The actual record to be written to iceberg table.
GenericRecord genericRow;
GenericRowData genericRow;

// if true, means that table schema has changed right before this genericRow.
boolean shouldRefreshSchema;

public IcebergEvent(
TableIdentifier tableId, GenericRecord genericRow, boolean shouldRefreshSchema) {
TableIdentifier tableId, GenericRowData genericRow, boolean shouldRefreshSchema) {
this.tableId = tableId;
this.genericRow = genericRow;
this.shouldRefreshSchema = shouldRefreshSchema;
}

public IcebergEvent(TableIdentifier tableId, GenericRecord genericRow) {
public IcebergEvent(TableIdentifier tableId, GenericRowData genericRow) {
this.tableId = tableId;
this.genericRow = genericRow;
this.shouldRefreshSchema = false;
Expand All @@ -35,11 +36,11 @@ public void setTableId(TableIdentifier tableId) {
this.tableId = tableId;
}

public GenericRecord getGenericRow() {
public GenericRowData getGenericRow() {
return genericRow;
}

public void setGenericRow(GenericRecord genericRow) {
public void setGenericRow(GenericRowData genericRow) {
this.genericRow = genericRow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.flink.sink.FlinkSink;

import java.time.ZoneId;
import java.util.Map;

public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTopology<Event> {
public class IcebergEventSinkV2 extends IcebergSinkV2 implements SupportsPreWriteTopology<Event> {

public final String schemaOperatorUid;

public final ZoneId zoneId;

public IcebergEventSink(
public IcebergEventSinkV2(
Map<String, String> catalogOptions,
String commitUser,
IcebergRecordSerializer<Event> serializer,
Expand All @@ -27,7 +26,6 @@ public IcebergEventSink(

@Override
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {

return dataStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.table.data.GenericRowData;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;

import java.io.IOException;
import java.time.ZoneId;
Expand Down Expand Up @@ -50,8 +50,8 @@ public IcebergEvent serialize(Event event) throws IOException {
return new IcebergEvent(tableId, null, true);
} else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
GenericRecord genericRecord =
IcebergWriterHelper.convertEventToGenericRow(
GenericRowData genericRecord =
IcebergWriterHelper.convertEventToRow(
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
return new IcebergEvent(tableId, genericRecord, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.io.IOException;
import java.util.Map;

public class IcebergSink
public class IcebergSinkV2
implements Sink<Event>,
SupportsPreWriteTopology<Event>,
SupportsCommitter<ImmutableTableCommit> {
Expand All @@ -30,14 +30,14 @@ public class IcebergSink

private final IcebergRecordSerializer<Event> serializer;

public IcebergSink(
public IcebergSinkV2(
Map<String, String> catalogOptions, IcebergRecordSerializer<Event> serializer) {
this.catalogOptions = catalogOptions;
this.serializer = serializer;
commitUser = DEFAULT_COMMIT_USER;
}

public IcebergSink(
public IcebergSinkV2(
Map<String, String> catalogOptions,
String commitUser,
IcebergRecordSerializer<Event> serializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,126 @@

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import org.apache.iceberg.catalog.ImmutableTableCommit;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IcebergWriter<InputT> implements CommittingSinkWriter<InputT, ImmutableTableCommit> {
public class IcebergWriter<InputT> implements CommittingSinkWriter<InputT, WriteResult> {

private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class);

private final Table table;
private final String commitUser;
private final MetricGroup metricGroup;
private final Map<String, TaskWriter<RowData>> writes;
private final IcebergRecordSerializer<InputT> serializer;

private long lastCheckpointId;

public IcebergWriter(
Table table,
MetricGroup metricGroup,
String commitUser,
IcebergRecordSerializer<InputT> serializer,
long lastCheckpointId) {
this.table = table;
this.commitUser = commitUser;
this.metricGroup = metricGroup;
this.serializer = serializer;
this.writes = new HashMap<>();
this.lastCheckpointId = lastCheckpointId;
}

@Override
public Collection<ImmutableTableCommit> prepareCommit()
throws IOException, InterruptedException {
return Collections.emptyList();
public Collection<WriteResult> prepareCommit() throws IOException, InterruptedException {
List<WriteResult> committables = new ArrayList<>();
for (Map.Entry<String, TaskWriter<RowData>> entry : writes.entrySet()) {
String key = entry.getKey();
TaskWriter<RowData> writer = entry.getValue();
WriteResult result = writer.complete();
committables.add(result);
writes.put(key, getTaskWriter());
LOG.info(
"Iceberg writer flushed {} data files and {} delete files",
result.dataFiles().length,
result.deleteFiles().length);
}
return committables;
}

@Override
public void write(InputT inputT, Context context) throws IOException, InterruptedException {}
public void write(InputT inputT, Context context) throws IOException, InterruptedException {
IcebergEvent icebergEvent = serializer.serialize(inputT);
String tableId = icebergEvent.getTableId().name();

// Handle schema changes (if any)
if (icebergEvent.isShouldRefreshSchema()) {
// In case of schema changes, refresh the table
try {
table.refresh();
} catch (Exception e) {
throw new IOException("Failed to refresh Iceberg table schema", e);
}
}

// Write the data to Iceberg
if (icebergEvent.getGenericRow() != null) {
TaskWriter<RowData> writer = writes.computeIfAbsent(tableId, id -> getTaskWriter());

try {
writer.write(icebergEvent.getGenericRow());
} catch (Exception e) {
throw new IOException("Failed to write event to Iceberg", e);
}
}
}

private TaskWriter<RowData> getTaskWriter() {
String formatString =
PropertyUtil.propertyAsString(
table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat format = FileFormat.fromString(formatString);
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
RowDataTaskWriterFactory taskWriterFactory =
new RowDataTaskWriterFactory(
table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, true);
return taskWriterFactory.create();
}

@Override
public void flush(boolean b) throws IOException, InterruptedException {}
public void flush(boolean b) throws IOException, InterruptedException {
// flush is used to handle flush/endOfInput, so no action is taken here.
}

@Override
public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {}

@Override
public void close() throws Exception {}
public void close() throws Exception {
for (TaskWriter<RowData> writer : writes.values()) {
if (writer != null) {
writer.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.types.RowKind;

import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.flink.data.StructRowData;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.DecimalUtil;

Expand Down Expand Up @@ -145,32 +143,30 @@ private static RecordData.FieldGetter createFieldGetter(
};
}

public static GenericRecord convertEventToGenericRow(
public static GenericRowData convertEventToRow(
DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) {
StructRowData structRowData;
GenericRecord genericRow = null;
GenericRowData genericRow;
RecordData recordData;
switch (dataChangeEvent.op()) {
case INSERT:
case UPDATE:
case REPLACE:
{
recordData = dataChangeEvent.after();
structRowData = new StructRowData(Types.StructType.of(), RowKind.INSERT);
genericRow = new GenericRowData(RowKind.INSERT, recordData.getArity());
break;
}
case DELETE:
{
recordData = dataChangeEvent.before();
structRowData = new StructRowData(Types.StructType.of(), RowKind.DELETE);
genericRow = new GenericRowData(RowKind.DELETE, recordData.getArity());
break;
}
default:
throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op());
}
for (int i = 0; i < recordData.getArity(); i++) {
// todo : how to set this row to
genericRow.setField(null, fieldGetters.get(i).getFieldOrNull(recordData));
genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
}
return genericRow;
}
Expand Down

0 comments on commit 6750f5e

Please sign in to comment.