Skip to content

Commit 5a1ef84

Browse files
authored
[FLINK-37154] Introduce ByteArraySchema (#26080)
1 parent b62c086 commit 5a1ef84

File tree

6 files changed

+127
-8
lines changed

6 files changed

+127
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.common.serialization;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
23+
import java.io.IOException;
24+
25+
/** Simple serialization schema for bytes. */
26+
@PublicEvolving
27+
public class ByteArraySchema implements DeserializationSchema<byte[]>, SerializationSchema<byte[]> {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
// ------------------------------------------------------------------------
32+
// Kafka Serialization
33+
// ------------------------------------------------------------------------
34+
35+
@Override
36+
public byte[] deserialize(byte[] message) throws IOException {
37+
return message;
38+
}
39+
40+
@Override
41+
public boolean isEndOfStream(byte[] nextElement) {
42+
return false;
43+
}
44+
45+
@Override
46+
public byte[] serialize(byte[] element) {
47+
return element;
48+
}
49+
50+
@Override
51+
public TypeInformation<byte[]> getProducedType() {
52+
return TypeInformation.of(byte[].class);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.common.serialization;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.io.IOException;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
public class ByteArraySchemaTest {
27+
@Test
28+
void testSimpleSerialisation() throws IOException {
29+
final byte[] testBytes = "hello world".getBytes();
30+
assertThat(new ByteArraySchema().serialize(testBytes)).isEqualTo(testBytes);
31+
assertThat(new ByteArraySchema().deserialize(testBytes)).isEqualTo(testBytes);
32+
}
33+
}

Diff for: flink-python/pyflink/common/__init__.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@
4545
:class:`~pyflink.datastream.formats.csv.CsvRowSerializationSchema`,
4646
:class:`~pyflink.datastream.formats.csv.CsvRowDeserializationSchema`,
4747
:class:`~pyflink.datastream.formats.avro.AvroRowSerializationSchema`,
48-
:class:`~pyflink.datastream.formats.avro.AvroRowDeserializationSchema` and
49-
:class:`~SimpleStringSchema` for more details.
48+
:class:`~pyflink.datastream.formats.avro.AvroRowDeserializationSchema`,
49+
:class:`~SimpleStringSchema` and
50+
:class:`~SimpleByteSchema` for more details.
5051
"""
5152
from pyflink.common.completable_future import CompletableFuture
5253
from pyflink.common.config_options import ConfigOption, ConfigOptions
@@ -59,11 +60,11 @@
5960
from pyflink.common.job_status import JobStatus
6061
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
6162
from pyflink.common.serialization import SerializationSchema, DeserializationSchema, \
62-
SimpleStringSchema, Encoder
63+
SimpleStringSchema, ByteArraySchema, Encoder
6364
from pyflink.common.serializer import TypeSerializer
65+
from pyflink.common.time import Duration, Instant, Time
6466
from pyflink.common.typeinfo import Types, TypeInformation
6567
from pyflink.common.types import Row, RowKind
66-
from pyflink.common.time import Duration, Instant, Time
6768
from pyflink.common.watermark_strategy import WatermarkStrategy, \
6869
AssignerWithPeriodicWatermarksWrapper
6970

@@ -78,6 +79,7 @@
7879
'SerializationSchema',
7980
'DeserializationSchema',
8081
'SimpleStringSchema',
82+
'ByteArraySchema',
8183
'Encoder',
8284
'CompletableFuture',
8385
'InputDependencyConstraint',

Diff for: flink-python/pyflink/common/serialization.py

+16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
'SerializationSchema',
2424
'DeserializationSchema',
2525
'SimpleStringSchema',
26+
'ByteArraySchema',
2627
'Encoder',
2728
'BulkWriterFactory'
2829
]
@@ -68,6 +69,21 @@ def __init__(self, charset: str = 'UTF-8'):
6869
self, j_deserialization_schema=j_simple_string_serialization_schema)
6970

7071

72+
class ByteArraySchema(SerializationSchema, DeserializationSchema):
73+
"""
74+
Simple serialization/deserialization schema for bytes.
75+
"""
76+
77+
def __init__(self):
78+
gate_way = get_gateway()
79+
j_simple_byte_serialization_schema = gate_way \
80+
.jvm.org.apache.flink.api.common.serialization.ByteArraySchema()
81+
SerializationSchema.__init__(self,
82+
j_serialization_schema=j_simple_byte_serialization_schema)
83+
DeserializationSchema.__init__(
84+
self, j_deserialization_schema=j_simple_byte_serialization_schema)
85+
86+
7187
class Encoder(object):
7288
"""
7389
Encoder is used by the file sink to perform the actual writing of the

Diff for: flink-python/pyflink/common/tests/test_serialization_schemas.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18-
from pyflink.common.serialization import SimpleStringSchema
18+
from pyflink.common.serialization import SimpleStringSchema, ByteArraySchema
1919
from pyflink.testing.test_case_utils import PyFlinkTestCase
2020

2121

@@ -29,3 +29,13 @@ def test_simple_string_schema(self):
2929

3030
self.assertEqual(expected_string, simple_string_schema._j_deserialization_schema
3131
.deserialize(expected_string.encode(encoding='utf-8')))
32+
33+
34+
class SimpleByteSchemaTests(PyFlinkTestCase):
35+
def test_simple_byte_schema(self):
36+
expected_bytes = "test bytes".encode(encoding='utf-8')
37+
simple_byte_schema = ByteArraySchema()
38+
self.assertEqual(expected_bytes,
39+
simple_byte_schema._j_serialization_schema.serialize(expected_bytes))
40+
self.assertEqual(expected_bytes, simple_byte_schema._j_deserialization_schema
41+
.deserialize(expected_bytes))

Diff for: flink-python/pyflink/datastream/connectors/tests/test_kafka.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
# limitations under the License.
1717
################################################################################
1818
import json
19-
from typing import Dict
2019
import unittest
20+
from typing import Dict
2121

2222
import pyflink.datastream.data_stream as data_stream
2323
from pyflink.common import typeinfo
24-
2524
from pyflink.common.configuration import Configuration
26-
from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema
25+
from pyflink.common.serialization import SimpleStringSchema, ByteArraySchema, DeserializationSchema
2726
from pyflink.common.typeinfo import Types
2827
from pyflink.common.types import Row
2928
from pyflink.common.watermark_strategy import WatermarkStrategy
@@ -318,6 +317,7 @@ def _check(schema: DeserializationSchema, class_name: str):
318317
class_name)
319318

320319
_check(SimpleStringSchema(), 'org.apache.flink.api.common.serialization.SimpleStringSchema')
320+
_check(ByteArraySchema(), 'org.apache.flink.api.common.serialization.ByteArraySchema')
321321
_check(
322322
JsonRowDeserializationSchema.builder().type_info(Types.ROW([Types.STRING()])).build(),
323323
'org.apache.flink.formats.json.JsonRowDeserializationSchema'
@@ -635,6 +635,10 @@ def _check_serialization_schema_implementations(check_function):
635635
SimpleStringSchema(),
636636
'org.apache.flink.api.common.serialization.SimpleStringSchema'
637637
)
638+
check_function(
639+
ByteArraySchema(),
640+
'org.apache.flink.api.common.serialization.ByteArraySchema'
641+
)
638642

639643

640644
class MockDataStream(data_stream.DataStream):

0 commit comments

Comments
 (0)