Skip to content

Commit baba9c6

Browse files
authored
[FLINK-37017][cdc-common] Supports map and array types for binary record data
This closes #3434.
1 parent 6a345aa commit baba9c6

File tree

25 files changed

+2957
-26
lines changed

25 files changed

+2957
-26
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryArrayData.java

Lines changed: 623 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.data.binary;
19+
20+
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.data.MapData;
22+
import org.apache.flink.cdc.common.types.DataType;
23+
import org.apache.flink.core.memory.MemorySegment;
24+
import org.apache.flink.core.memory.MemorySegmentFactory;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
30+
31+
/**
32+
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
33+
*
34+
* <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
35+
*/
36+
@Internal
37+
public class BinaryMapData extends BinarySection implements MapData {
38+
private final BinaryArrayData keys;
39+
private final BinaryArrayData values;
40+
41+
public BinaryMapData() {
42+
keys = new BinaryArrayData();
43+
values = new BinaryArrayData();
44+
}
45+
46+
public int size() {
47+
return keys.size();
48+
}
49+
50+
@Override
51+
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
52+
// Read the numBytes of key array from the first 4 bytes.
53+
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
54+
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
55+
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
56+
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";
57+
58+
keys.pointTo(segments, offset + 4, keyArrayBytes);
59+
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
60+
61+
assert keys.size() == values.size();
62+
63+
this.segments = segments;
64+
this.offset = offset;
65+
this.sizeInBytes = sizeInBytes;
66+
}
67+
68+
public BinaryArrayData keyArray() {
69+
return keys;
70+
}
71+
72+
public BinaryArrayData valueArray() {
73+
return values;
74+
}
75+
76+
public Map<?, ?> toJavaMap(DataType keyType, DataType valueType) {
77+
Object[] keyArray = keys.toObjectArray(keyType);
78+
Object[] valueArray = values.toObjectArray(valueType);
79+
80+
Map<Object, Object> map = new HashMap<>();
81+
for (int i = 0; i < keyArray.length; i++) {
82+
map.put(keyArray[i], valueArray[i]);
83+
}
84+
return map;
85+
}
86+
87+
public BinaryMapData copy() {
88+
return copy(new BinaryMapData());
89+
}
90+
91+
public BinaryMapData copy(BinaryMapData reuse) {
92+
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
93+
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
94+
return reuse;
95+
}
96+
97+
@Override
98+
public int hashCode() {
99+
return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
100+
}
101+
102+
// ------------------------------------------------------------------------------------------
103+
// Construction Utilities
104+
// ------------------------------------------------------------------------------------------
105+
106+
public static BinaryMapData valueOf(BinaryArrayData key, BinaryArrayData value) {
107+
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
108+
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
109+
MemorySegment segment = MemorySegmentFactory.wrap(bytes);
110+
segment.putInt(0, key.sizeInBytes);
111+
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
112+
value.getSegments()[0].copyTo(
113+
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
114+
BinaryMapData map = new BinaryMapData();
115+
map.pointTo(segment, 0, bytes.length);
116+
return map;
117+
}
118+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,14 @@ public byte[] getBinary(int pos) {
200200

201201
@Override
202202
public ArrayData getArray(int pos) {
203-
throw new UnsupportedOperationException("Not support ArrayData");
203+
assertIndexIsValid(pos);
204+
return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos));
204205
}
205206

206207
@Override
207208
public MapData getMap(int pos) {
208-
throw new UnsupportedOperationException("Not support MapData.");
209+
assertIndexIsValid(pos);
210+
return BinarySegmentUtils.readMapData(segments, offset, getLong(pos));
209211
}
210212

211213
@Override

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtils.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.flink.cdc.common.data.binary;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.data.ArrayData;
2122
import org.apache.flink.cdc.common.data.DecimalData;
2223
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
24+
import org.apache.flink.cdc.common.data.MapData;
2325
import org.apache.flink.cdc.common.data.RecordData;
2426
import org.apache.flink.cdc.common.data.StringData;
2527
import org.apache.flink.cdc.common.data.TimestampData;
@@ -318,7 +320,7 @@ public static boolean equals(
318320
}
319321
}
320322

321-
static boolean equalsMultiSegments(
323+
public static boolean equalsMultiSegments(
322324
MemorySegment[] segments1,
323325
int offset1,
324326
MemorySegment[] segments2,
@@ -1154,4 +1156,24 @@ private static int findInMultiSegments(
11541156
}
11551157
return -1;
11561158
}
1159+
1160+
/** Gets an instance of {@link MapData} from underlying {@link MemorySegment}. */
1161+
public static MapData readMapData(
1162+
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
1163+
final int size = ((int) offsetAndSize);
1164+
int offset = (int) (offsetAndSize >> 32);
1165+
BinaryMapData map = new BinaryMapData();
1166+
map.pointTo(segments, offset + baseOffset, size);
1167+
return map;
1168+
}
1169+
1170+
/** Gets an instance of {@link ArrayData} from underlying {@link MemorySegment}. */
1171+
public static ArrayData readArrayData(
1172+
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
1173+
final int size = ((int) offsetAndSize);
1174+
int offset = (int) (offsetAndSize >> 32);
1175+
BinaryArrayData array = new BinaryArrayData();
1176+
array.pointTo(segments, offset + baseOffset, size);
1177+
return array;
1178+
}
11571179
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,31 @@
1717

1818
package org.apache.flink.cdc.connectors.paimon.sink.v2;
1919

20+
import org.apache.flink.cdc.common.data.ArrayData;
2021
import org.apache.flink.cdc.common.data.DecimalData;
22+
import org.apache.flink.cdc.common.data.MapData;
2123
import org.apache.flink.cdc.common.data.RecordData;
24+
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
25+
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
26+
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
2227
import org.apache.flink.cdc.common.event.DataChangeEvent;
2328
import org.apache.flink.cdc.common.schema.Column;
2429
import org.apache.flink.cdc.common.schema.Schema;
2530
import org.apache.flink.cdc.common.types.DataType;
2631
import org.apache.flink.cdc.common.types.DataTypeChecks;
32+
import org.apache.flink.cdc.common.types.DataTypeRoot;
33+
import org.apache.flink.core.memory.MemorySegment;
2734

35+
import org.apache.paimon.data.BinaryRow;
2836
import org.apache.paimon.data.BinaryString;
2937
import org.apache.paimon.data.Decimal;
3038
import org.apache.paimon.data.GenericRow;
39+
import org.apache.paimon.data.InternalRow;
3140
import org.apache.paimon.data.Timestamp;
41+
import org.apache.paimon.memory.MemorySegmentUtils;
3242
import org.apache.paimon.types.RowKind;
3343

44+
import java.nio.ByteBuffer;
3445
import java.time.ZoneId;
3546
import java.util.ArrayList;
3647
import java.util.List;
@@ -118,7 +129,11 @@ private static RecordData.FieldGetter createFieldGetter(
118129
break;
119130
case ROW:
120131
final int rowFieldCount = getFieldCount(fieldType);
121-
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
132+
fieldGetter = new BinaryFieldDataGetter(fieldPos, DataTypeRoot.ROW, rowFieldCount);
133+
break;
134+
case ARRAY:
135+
case MAP:
136+
fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot());
122137
break;
123138
default:
124139
throw new IllegalArgumentException(
@@ -163,4 +178,121 @@ public static GenericRow convertEventToGenericRow(
163178
}
164179
return genericRow;
165180
}
181+
182+
/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
183+
public static class BinaryFieldDataGetter implements RecordData.FieldGetter {
184+
private final int fieldPos;
185+
private final DataTypeRoot dataTypeRoot;
186+
private final int rowFieldCount;
187+
188+
BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot) {
189+
this(fieldPos, dataTypeRoot, -1);
190+
}
191+
192+
BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot, int rowFieldCount) {
193+
this.fieldPos = fieldPos;
194+
this.dataTypeRoot = dataTypeRoot;
195+
this.rowFieldCount = rowFieldCount;
196+
}
197+
198+
@Override
199+
public Object getFieldOrNull(RecordData row) {
200+
switch (dataTypeRoot) {
201+
case ARRAY:
202+
return getArrayField(row);
203+
case MAP:
204+
return getMapField(row);
205+
case ROW:
206+
return getRecordField(row);
207+
default:
208+
throw new IllegalArgumentException("Unsupported field type: " + dataTypeRoot);
209+
}
210+
}
211+
212+
private Object getArrayField(RecordData row) {
213+
ArrayData arrayData = row.getArray(fieldPos);
214+
if (!(arrayData instanceof BinaryArrayData)) {
215+
throw new IllegalArgumentException(
216+
"Expected BinaryArrayData but was " + arrayData.getClass().getSimpleName());
217+
}
218+
BinaryArrayData binaryArrayData = (BinaryArrayData) arrayData;
219+
return convertSegments(
220+
binaryArrayData.getSegments(),
221+
binaryArrayData.getOffset(),
222+
binaryArrayData.getSizeInBytes(),
223+
MemorySegmentUtils::readArrayData);
224+
}
225+
226+
private Object getMapField(RecordData row) {
227+
MapData mapData = row.getMap(fieldPos);
228+
if (!(mapData instanceof BinaryMapData)) {
229+
throw new IllegalArgumentException(
230+
"Expected BinaryMapData but was " + mapData.getClass().getSimpleName());
231+
}
232+
BinaryMapData binaryMapData = (BinaryMapData) mapData;
233+
return convertSegments(
234+
binaryMapData.getSegments(),
235+
binaryMapData.getOffset(),
236+
binaryMapData.getSizeInBytes(),
237+
MemorySegmentUtils::readMapData);
238+
}
239+
240+
private Object getRecordField(RecordData row) {
241+
RecordData recordData = row.getRow(fieldPos, rowFieldCount);
242+
if (!(recordData instanceof BinaryRecordData)) {
243+
throw new IllegalArgumentException(
244+
"Expected BinaryRecordData but was "
245+
+ recordData.getClass().getSimpleName());
246+
}
247+
BinaryRecordData binaryRecordData = (BinaryRecordData) recordData;
248+
return convertSegments(
249+
binaryRecordData.getSegments(),
250+
binaryRecordData.getOffset(),
251+
binaryRecordData.getSizeInBytes(),
252+
(segments, offset, sizeInBytes) ->
253+
MemorySegmentUtils.readRowData(
254+
segments, rowFieldCount, offset, sizeInBytes));
255+
}
256+
257+
private <T> T convertSegments(
258+
MemorySegment[] segments,
259+
int offset,
260+
int sizeInBytes,
261+
SegmentConverter<T> converter) {
262+
org.apache.paimon.memory.MemorySegment[] paimonMemorySegments =
263+
new org.apache.paimon.memory.MemorySegment[segments.length];
264+
for (int i = 0; i < segments.length; i++) {
265+
MemorySegment currMemorySegment = segments[i];
266+
ByteBuffer byteBuffer = currMemorySegment.wrap(0, currMemorySegment.size());
267+
268+
// Allocate a new byte array and copy the data from the ByteBuffer
269+
byte[] bytes = new byte[currMemorySegment.size()];
270+
byteBuffer.get(bytes);
271+
272+
paimonMemorySegments[i] = org.apache.paimon.memory.MemorySegment.wrap(bytes);
273+
}
274+
return converter.convert(paimonMemorySegments, offset, sizeInBytes);
275+
}
276+
277+
private interface SegmentConverter<T> {
278+
T convert(
279+
org.apache.paimon.memory.MemorySegment[] segments, int offset, int sizeInBytes);
280+
}
281+
282+
/**
283+
* Gets an instance of {@link InternalRow} from underlying {@link
284+
* org.apache.paimon.memory.MemorySegment}.
285+
*/
286+
public InternalRow readRowData(
287+
org.apache.paimon.memory.MemorySegment[] segments,
288+
int numFields,
289+
int baseOffset,
290+
long offsetAndSize) {
291+
final int size = ((int) offsetAndSize);
292+
int offset = (int) (offsetAndSize >> 32);
293+
BinaryRow row = new BinaryRow(numFields);
294+
row.pointTo(segments, offset + baseOffset, size);
295+
return row;
296+
}
297+
}
166298
}

0 commit comments

Comments
 (0)