Skip to content

Commit 483a94a

Browse files
committed
[FLINK-36311][format/csv] Remove deprecated APIs in Flink Csv format
1 parent 98a0a56 commit 483a94a

File tree

6 files changed

+191
-423
lines changed

6 files changed

+191
-423
lines changed

flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java

Lines changed: 185 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
package org.apache.flink.formats.csv;
2020

21+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
22+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
2123
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.common.typeinfo.Types;
25+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2226
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2327
import org.apache.flink.core.fs.FileInputSplit;
2428
import org.apache.flink.core.fs.Path;
25-
import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter;
2629
import org.apache.flink.types.Row;
2730
import org.apache.flink.util.jackson.JacksonMapperFactory;
2831

@@ -32,11 +35,19 @@
3235

3336
import java.io.IOException;
3437
import java.io.Serializable;
38+
import java.lang.reflect.Array;
39+
import java.math.BigDecimal;
40+
import java.math.BigInteger;
41+
import java.sql.Date;
42+
import java.sql.Time;
43+
import java.sql.Timestamp;
44+
import java.time.LocalDateTime;
45+
import java.time.ZoneOffset;
3546
import java.util.Arrays;
3647
import java.util.NoSuchElementException;
3748

38-
import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.createFieldRuntimeConverters;
39-
import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.validateArity;
49+
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
50+
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
4051
import static org.apache.flink.util.Preconditions.checkArgument;
4152
import static org.apache.flink.util.Preconditions.checkNotNull;
4253

@@ -140,6 +151,177 @@ public Row nextRecord(Row record) throws IOException {
140151
return returnRecord;
141152
}
142153

154+
// --------------------------------------------------------------------------------------------
155+
156+
interface RuntimeConverter extends Serializable {
157+
Object convert(JsonNode node);
158+
}
159+
160+
private static RuntimeConverter createRowRuntimeConverter(
161+
RowTypeInfo rowTypeInfo, boolean ignoreParseErrors, boolean isTopLevel) {
162+
final TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
163+
final String[] fieldNames = rowTypeInfo.getFieldNames();
164+
165+
final RuntimeConverter[] fieldConverters =
166+
createFieldRuntimeConverters(ignoreParseErrors, fieldTypes);
167+
168+
return assembleRowRuntimeConverter(
169+
ignoreParseErrors, isTopLevel, fieldNames, fieldConverters);
170+
}
171+
172+
static RuntimeConverter[] createFieldRuntimeConverters(
173+
boolean ignoreParseErrors, TypeInformation<?>[] fieldTypes) {
174+
final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length];
175+
for (int i = 0; i < fieldTypes.length; i++) {
176+
fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors);
177+
}
178+
return fieldConverters;
179+
}
180+
181+
private static RuntimeConverter assembleRowRuntimeConverter(
182+
boolean ignoreParseErrors,
183+
boolean isTopLevel,
184+
String[] fieldNames,
185+
RuntimeConverter[] fieldConverters) {
186+
final int rowArity = fieldNames.length;
187+
188+
return (node) -> {
189+
final int nodeSize = node.size();
190+
191+
if (nodeSize != 0) {
192+
validateArity(rowArity, nodeSize, ignoreParseErrors);
193+
} else {
194+
return null;
195+
}
196+
197+
final Row row = new Row(rowArity);
198+
for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
199+
// Jackson only supports mapping by name in the first level
200+
if (isTopLevel) {
201+
row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i])));
202+
} else {
203+
row.setField(i, fieldConverters[i].convert(node.get(i)));
204+
}
205+
}
206+
return row;
207+
};
208+
}
209+
210+
private static RuntimeConverter createNullableRuntimeConverter(
211+
TypeInformation<?> info, boolean ignoreParseErrors) {
212+
final RuntimeConverter valueConverter = createRuntimeConverter(info, ignoreParseErrors);
213+
return (node) -> {
214+
if (node.isNull()) {
215+
return null;
216+
}
217+
try {
218+
return valueConverter.convert(node);
219+
} catch (Throwable t) {
220+
if (!ignoreParseErrors) {
221+
throw t;
222+
}
223+
return null;
224+
}
225+
};
226+
}
227+
228+
private static RuntimeConverter createRuntimeConverter(
229+
TypeInformation<?> info, boolean ignoreParseErrors) {
230+
if (info.equals(Types.VOID)) {
231+
return (node) -> null;
232+
} else if (info.equals(Types.STRING)) {
233+
return JsonNode::asText;
234+
} else if (info.equals(Types.BOOLEAN)) {
235+
return (node) -> Boolean.valueOf(node.asText().trim());
236+
} else if (info.equals(Types.BYTE)) {
237+
return (node) -> Byte.valueOf(node.asText().trim());
238+
} else if (info.equals(Types.SHORT)) {
239+
return (node) -> Short.valueOf(node.asText().trim());
240+
} else if (info.equals(Types.INT)) {
241+
return (node) -> Integer.valueOf(node.asText().trim());
242+
} else if (info.equals(Types.LONG)) {
243+
return (node) -> Long.valueOf(node.asText().trim());
244+
} else if (info.equals(Types.FLOAT)) {
245+
return (node) -> Float.valueOf(node.asText().trim());
246+
} else if (info.equals(Types.DOUBLE)) {
247+
return (node) -> Double.valueOf(node.asText().trim());
248+
} else if (info.equals(Types.BIG_DEC)) {
249+
return (node) -> new BigDecimal(node.asText().trim());
250+
} else if (info.equals(Types.BIG_INT)) {
251+
return (node) -> new BigInteger(node.asText().trim());
252+
} else if (info.equals(Types.SQL_DATE)) {
253+
return (node) -> Date.valueOf(node.asText());
254+
} else if (info.equals(Types.SQL_TIME)) {
255+
return (node) -> Time.valueOf(node.asText());
256+
} else if (info.equals(Types.SQL_TIMESTAMP)) {
257+
return (node) -> Timestamp.valueOf(node.asText());
258+
} else if (info.equals(Types.LOCAL_DATE)) {
259+
return (node) -> Date.valueOf(node.asText()).toLocalDate();
260+
} else if (info.equals(Types.LOCAL_TIME)) {
261+
return (node) -> Time.valueOf(node.asText()).toLocalTime();
262+
} else if (info.equals(Types.LOCAL_DATE_TIME)) {
263+
return (node) -> LocalDateTime.parse(node.asText().trim(), SQL_TIMESTAMP_FORMAT);
264+
} else if (info.equals(Types.INSTANT)) {
265+
return (node) ->
266+
LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)
267+
.toInstant(ZoneOffset.UTC);
268+
} else if (info instanceof RowTypeInfo) {
269+
final RowTypeInfo rowTypeInfo = (RowTypeInfo) info;
270+
return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false);
271+
} else if (info instanceof BasicArrayTypeInfo) {
272+
return createObjectArrayRuntimeConverter(
273+
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo(), ignoreParseErrors);
274+
} else if (info instanceof ObjectArrayTypeInfo) {
275+
return createObjectArrayRuntimeConverter(
276+
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo(), ignoreParseErrors);
277+
} else if (info instanceof PrimitiveArrayTypeInfo
278+
&& ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
279+
return createByteArrayRuntimeConverter(ignoreParseErrors);
280+
} else {
281+
throw new RuntimeException("Unsupported type information '" + info + "'.");
282+
}
283+
}
284+
285+
private static RuntimeConverter createObjectArrayRuntimeConverter(
286+
TypeInformation<?> elementType, boolean ignoreParseErrors) {
287+
final Class<?> elementClass = elementType.getTypeClass();
288+
final RuntimeConverter elementConverter =
289+
createNullableRuntimeConverter(elementType, ignoreParseErrors);
290+
291+
return (node) -> {
292+
final int nodeSize = node.size();
293+
final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize);
294+
for (int i = 0; i < nodeSize; i++) {
295+
array[i] = elementConverter.convert(node.get(i));
296+
}
297+
return array;
298+
};
299+
}
300+
301+
private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignoreParseErrors) {
302+
return (node) -> {
303+
try {
304+
return node.binaryValue();
305+
} catch (IOException e) {
306+
if (!ignoreParseErrors) {
307+
throw new RuntimeException("Unable to deserialize byte array.", e);
308+
}
309+
return null;
310+
}
311+
};
312+
}
313+
314+
static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
315+
if (expected != actual && !ignoreParseErrors) {
316+
throw new RuntimeException(
317+
"Row length mismatch. "
318+
+ expected
319+
+ " fields expected but was "
320+
+ actual
321+
+ ".");
322+
}
323+
}
324+
143325
/** Create a builder. */
144326
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths) {
145327
return new Builder(typeInfo, filePaths);

flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ private static byte[] serialize(
474474
CsvRowDataSerializationSchema schema =
475475
InstantiationUtil.deserializeObject(
476476
InstantiationUtil.serializeObject(serSchemaBuilder.build()),
477-
CsvRowDeSerializationSchemaTest.class.getClassLoader());
477+
CsvRowDataSerDeSchemaTest.class.getClassLoader());
478478
open(schema);
479479
return schema.serialize(row);
480480
}
@@ -487,7 +487,7 @@ private static RowData deserialize(
487487
CsvRowDataDeserializationSchema schema =
488488
InstantiationUtil.deserializeObject(
489489
InstantiationUtil.serializeObject(deserSchemaBuilder.build()),
490-
CsvRowDeSerializationSchemaTest.class.getClassLoader());
490+
CsvRowDataSerDeSchemaTest.class.getClassLoader());
491491
open(schema);
492492
return schema.deserialize(csv != null ? csv.getBytes() : null);
493493
}

0 commit comments

Comments
 (0)