Skip to content

Commit aef8c86

Browse files
mina-ashamdianfu
authored andcommitted
[FLINK-37192][python] Replace deprecated avro-python3 with avro
- avro-python3 was deprecated and replaced by avro, the first hasn't had a release since March 17, 2021 while the second has had multiple fixes and updated, latest is August 5, 2024 - Both libraries are the exact same (i.e. `avro-python3` was just renamed to `avro` and had multiple updates since), but the problem is that they overlap in package name, so using PyFlink with any updated library that relies on `avro` fails starting the pipeline even if the pipeline doesn't actually do any avro encoding/decoding - This updates the library to the latest one, and fixes a few imports, other than that the library's functionality is exactly the same
1 parent c3c7f0c commit aef8c86

File tree

3 files changed

+6
-7
lines changed

3 files changed

+6
-7
lines changed

flink-python/dev/dev-requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ cython>=0.29.24
2020
py4j==0.10.9.7
2121
python-dateutil>=2.8.0,<3
2222
cloudpickle~=2.2.0
23-
avro-python3>=1.8.1,!=1.9.2
23+
avro>=1.12.0
2424
pandas>=1.3.0
2525
pyarrow>=5.0.0
2626
pytz>=2018.3

flink-python/pyflink/fn_execution/formats/avro.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
################################################################################
1818
import struct
1919

20+
from avro.errors import AvroTypeException, SchemaResolutionException
2021
from avro.io import (
21-
AvroTypeException,
2222
BinaryDecoder,
2323
BinaryEncoder,
2424
DatumReader,
2525
DatumWriter,
26-
SchemaResolutionException,
27-
Validate,
26+
validate,
2827
)
2928

3029
STRUCT_FLOAT = struct.Struct('>f') # big-endian float
@@ -203,7 +202,7 @@ def write_bytes(self, datum):
203202
class FlinkAvroDatumWriter(DatumWriter):
204203

205204
def __init__(self, writer_schema=None):
206-
super().__init__(writer_schema=writer_schema)
205+
super().__init__(writers_schema=writer_schema)
207206

208207
def write_array(self, writer_schema, datum, encoder):
209208
if len(datum) > 0:
@@ -224,7 +223,7 @@ def write_union(self, writer_schema, datum, encoder):
224223
# resolve union
225224
index_of_schema = -1
226225
for i, candidate_schema in enumerate(writer_schema.schemas):
227-
if Validate(candidate_schema, datum):
226+
if validate(candidate_schema, datum):
228227
index_of_schema = i
229228
if index_of_schema < 0:
230229
raise AvroTypeException(writer_schema, datum)

flink-python/setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def extracted_output_files(base_dir, file_path, output_directory):
318318

319319
install_requires = ['py4j==0.10.9.7', 'python-dateutil>=2.8.0,<3',
320320
'apache-beam>=2.54.0,<=2.61.0',
321-
'cloudpickle>=2.2.0', 'avro-python3>=1.8.1,!=1.9.2',
321+
'cloudpickle>=2.2.0', 'avro>=1.12.0',
322322
'pytz>=2018.3', 'fastavro>=1.1.0,!=1.8.0', 'requests>=2.26.0',
323323
'protobuf>=3.19.0',
324324
'numpy>=1.22.4',

0 commit comments

Comments
 (0)