Skip to content

Commit 7928270

Browse files
feat(Dataflow): Update Kafka snippet to use Managed I/O (#13304)
* Update Kafka snippet to use Managed I/O * Remove Python 3.8 from tests (not supported)
1 parent 134cb47 commit 7928270

File tree

4 files changed

+34
-25
lines changed

4 files changed

+34
-25
lines changed

dataflow/snippets/Dockerfile

+20-12
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,32 @@
1818
# on the host machine. This Dockerfile is derived from the
1919
# dataflow/custom-containers/ubuntu sample.
2020

21-
FROM ubuntu:focal
21+
FROM python:3.12-slim
22+
23+
# Install JRE
24+
COPY --from=openjdk:8-jre-slim /usr/local/openjdk-8 /usr/local/openjdk-8
25+
ENV JAVA_HOME /usr/local/openjdk-8
26+
RUN update-alternatives --install /usr/bin/java java /usr/local/openjdk-8/bin/java 10
2227

2328
WORKDIR /pipeline
2429

25-
COPY --from=apache/beam_python3.11_sdk:2.62.0 /opt/apache/beam /opt/apache/beam
30+
# Copy files from official SDK image.
31+
COPY --from=apache/beam_python3.11_sdk:2.63.0 /opt/apache/beam /opt/apache/beam
32+
# Set the entrypoint to Apache Beam SDK launcher.
2633
ENTRYPOINT [ "/opt/apache/beam/boot" ]
2734

28-
COPY requirements.txt .
29-
RUN apt-get update \
30-
&& apt-get install -y --no-install-recommends \
31-
curl python3-distutils default-jre docker.io \
32-
&& rm -rf /var/lib/apt/lists/* \
33-
&& update-alternatives --install /usr/bin/python python /usr/bin/python3 10 \
34-
&& curl https://bootstrap.pypa.io/get-pip.py | python \
35-
# Install the requirements.
36-
&& pip install --no-cache-dir -r requirements.txt \
37-
&& pip check
35+
# Install Docker.
36+
RUN apt-get update
37+
RUN apt-get install -y --no-install-recommends docker.io
38+
39+
# Install dependencies.
40+
RUN pip3 install --no-cache-dir apache-beam[gcp]==2.63.0
41+
RUN pip install --no-cache-dir kafka-python==2.0.6
3842

43+
# Verify that the image does not have conflicting dependencies.
44+
RUN pip check
3945

46+
# Copy the snippets to test.
4047
COPY read_kafka.py ./
4148
COPY read_kafka_multi_topic.py ./
49+

dataflow/snippets/noxfile_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
TEST_CONFIG_OVERRIDE = {
2424
# You can opt out from the test for specific Python versions.
25-
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12", "3.13"],
25+
"ignored_versions": ["2.7", "3.7", "3.8", "3.9", "3.10", "3.13"],
2626
# Old samples are opted out of enforcing Python type hints
2727
# All new samples should feature them
2828
"enforce_type_hints": True,

dataflow/snippets/read_kafka.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import apache_beam as beam
2020

2121
from apache_beam import window
22-
from apache_beam.io.kafka import ReadFromKafka
2322
from apache_beam.io.textio import WriteToText
2423
from apache_beam.options.pipeline_options import PipelineOptions
2524

@@ -42,16 +41,18 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
4241
(
4342
pipeline
4443
# Read messages from an Apache Kafka topic.
45-
| ReadFromKafka(
46-
consumer_config={"bootstrap.servers": options.bootstrap_server},
47-
topics=[options.topic],
48-
with_metadata=False,
49-
max_num_records=5,
50-
start_read_time=0,
44+
| beam.managed.Read(
45+
beam.managed.KAFKA,
46+
config={
47+
"bootstrap_servers": options.bootstrap_server,
48+
"topic": options.topic,
49+
"data_format": "RAW",
50+
"auto_offset_reset_config": "earliest",
51+
# The max_read_time_seconds parameter is intended for testing.
52+
# Avoid using this parameter in production.
53+
"max_read_time_seconds": 5
54+
}
5155
)
52-
# The previous step creates a key-value collection, keyed by message ID.
53-
# The values are the message payloads.
54-
| beam.Values()
5556
# Subdivide the output into fixed 5-second windows.
5657
| beam.WindowInto(window.FixedWindows(5))
5758
| WriteToText(

dataflow/snippets/requirements.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
apache-beam[gcp]==2.58.0
2-
kafka-python==2.0.2
1+
apache-beam[gcp]==2.63.0
2+
kafka-python==2.0.6

0 commit comments

Comments
 (0)