Skip to content

Commit 9102f25

Browse files
committed
Add documentation on how to execute PyFlink jobs on kubenetes
1 parent 1cbb0eb commit 9102f25

File tree

4 files changed

+259
-0
lines changed

4 files changed

+259
-0
lines changed

k8s/README.md

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Runs PyFlink jobs on Kubernetes
2+
3+
In this example, we'd like to give a simple example to show how to run PyFlink jobs on Kubernetes in application mode.
4+
It has been documented clearly in Flink's [official documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/native_kubernetes/) about how to work with Kubernetes.
5+
All the documentation there also applies for PyFlink jobs. It's strongly advised to read that documentation carefully before going through the following example.
6+
7+
## Preparation
8+
9+
### Setup Kubernetes cluster
10+
11+
If there is no kubernetes cluster available for use, you need firstly set up it. You can take a look at [how to set up a Kubernetes cluster](https://kubernetes.io/docs/setup/) for more details.
12+
13+
You can verify the permissions by running `kubectl auth can-i <list|create|edit|delete> pods`, e.g.
14+
```shell
15+
kubectl auth can-i create pods
16+
```
17+
18+
Then, you could run the following command:
19+
```shell
20+
kubectl get pods -A
21+
```
22+
If the outputs are something like the following, it means that the Kubernetes cluster is running, and the kubectl is configured correctly,
23+
you could proceed to the next section:
24+
```shell
25+
kube-system coredns-f9fd979d6-96xql 1/1 Running 0 7m41s
26+
kube-system coredns-f9fd979d6-h9q5v 1/1 Running 0 7m41s
27+
kube-system etcd-docker-desktop 1/1 Running 0 6m44s
28+
kube-system kube-apiserver-docker-desktop 1/1 Running 0 6m47s
29+
kube-system kube-controller-manager-docker-desktop 1/1 Running 0 6m42s
30+
kube-system kube-proxy-94f22 1/1 Running 0 7m41s
31+
kube-system kube-scheduler-docker-desktop 1/1 Running 0 6m39s
32+
kube-system storage-provisioner 1/1 Running 0 7m6s
33+
kube-system vpnkit-controller 1/1 Running 0 7m5s
34+
```
35+
36+
### Build docker image with PyFlink installed
37+
38+
It requires PyFlink installed on all the cluster nodes. Currently, it has still not provided official Flink docker images with PyFlink installed.
39+
You need to build it yourself as following.
40+
41+
```shell
42+
docker build -t pyflink:1.14.4 -f docker/Dockerfile .
43+
```
44+
45+
## Execute PyFlink jobs
46+
47+
### Creating a custom image containing the PyFlink job you want to execute and also the dependencies if needed
48+
49+
In application mode, it requires that the user code is bundled together with the Flink image. See [Application Mode](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/native_kubernetes/#application-mode) for more details.
50+
So you need to build a custom image with PyFlink job code bundled in the image.
51+
52+
```shell
53+
docker build -t pyflink_wc -f docker/Dockerfile.job .
54+
```
55+
56+
Note: Make sure to publish the Docker image to a repository which is accessible for the Kubernetes cluster if the Kubernetes cluster is not a local test cluster.
57+
58+
### Submit PyFlink jobs
59+
60+
#### Submit PyFlink on host machine
61+
62+
1) Download Flink distribution, e.g. for Flink 1.14.4, it's available in https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
63+
64+
2) Extract it
65+
```shell
66+
tar zxvf flink-1.14.4-bin-scala_2.11.tgz
67+
```
68+
69+
3) Submit PyFlink jobs:
70+
```shell
71+
cd flink-1.14.4
72+
./bin/flink run-application \
73+
--target kubernetes-application \
74+
--parallelism 8 \
75+
-Dkubernetes.cluster-id=word-count \
76+
-Dtaskmanager.memory.process.size=4096m \
77+
-Dkubernetes.taskmanager.cpu=2 \
78+
-Dtaskmanager.numberOfTaskSlots=4 \
79+
-Dkubernetes.container.image=pyflink_wc:latest \
80+
-Dkubernetes.rest-service.exposed.type=ClusterIP \
81+
-py /opt/flink/usrlib/word_count.py
82+
```
83+
84+
Note:
85+
- More Kubernetes specific configurations could be found [here](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#kubernetes)
86+
- You could override configurations set in `conf/flink-conf.yaml` via `-Dkey=value`
87+
88+
If you see outputs as following, the job should have been submitted successfully:
89+
```shell
90+
2022-04-24 17:08:32,603 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
91+
2022-04-24 17:08:32,603 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
92+
2022-04-24 17:08:33,289 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
93+
2022-04-24 17:08:33,302 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink application cluster word-count successfully, JobManager Web Interface: http://word-count-rest.default:8081
94+
```
95+
96+
You could verify the pod status as following:
97+
```shell
98+
kubectl get pods -A | grep word-count
99+
```
100+
101+
If everything runs normally, you should see outputs like the following:
102+
```shell
103+
NAMESPACE NAME READY STATUS RESTARTS AGE
104+
default word-count-5f5d44b598-zg5z8 1/1 Running 0 90s
105+
default word-count-taskmanager-1-1 0/1 Pending 0 59s
106+
default word-count-taskmanager-1-2 0/1 Pending 0 59s
107+
```
108+
Among them, the JobManager runs in the pod `word-count-5f5d44b598-zg5z8 ` and the TaskManager runs in the pods `word-count-taskmanager-1-1` and `word-count-taskmanager-1-2`.
109+
110+
If the pods are not running normally, you could check the logs of the pods, e.g. checking the log of the JM as following:
111+
```shell
112+
kubectl logs word-count-5f5d44b598-zg5z8
113+
```
114+
115+
See [Flink documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/cli/#submitting-pyflink-jobs) for more details on how to submit PyFlink jobs.
116+
117+
### Accessing Flink’s Web UI
118+
119+
Flink’s Web UI and REST endpoint can be exposed in several ways via the `kubernetes.rest-service.exposed.type` configuration option.
120+
Since it's set to `ClusterIP` in this example, the Flink’s Web UI could be accessed in the following way:
121+
```shell
122+
kubectl port-forward service/word-count-rest 8081
123+
```
124+
Then you could access Flink's Web UI of the job via `http://127.0.0.1:8081`.
125+
126+
You could refer to Flink's [official documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) on more details.
127+
128+
### Cancel the jobs
129+
130+
You could either cancel the job through Flink's Web UI or via CLI commands as following:
131+
132+
```shell
133+
# list jobs:
134+
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=word-count
135+
136+
# cancel jobs:
137+
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=word-count
138+
```

k8s/docker/Dockerfile

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
###############################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
###############################################################################
18+
19+
FROM flink:1.14.4
20+
21+
# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source
22+
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
23+
RUN apt-get update -y && \
24+
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
25+
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
26+
tar -xvf Python-3.7.9.tgz && \
27+
cd Python-3.7.9 && \
28+
./configure --without-tests --enable-shared && \
29+
make -j6 && \
30+
make install && \
31+
ldconfig /usr/local/lib && \
32+
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
33+
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
34+
apt-get clean && \
35+
rm -rf /var/lib/apt/lists/*
36+
37+
# install PyFlink 1.14.4
38+
RUN pip3 install apache-flink==1.14.4

k8s/docker/Dockerfile.job

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
###############################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
###############################################################################
18+
19+
FROM pyflink:1.14.4
20+
21+
RUN mkdir -p $FLINK_HOME/usrlib
22+
COPY docker/word_count.py $FLINK_HOME/usrlib/word_count.py

k8s/docker/word_count.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
import logging
19+
import sys
20+
21+
from pyflink.table import EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes
22+
from pyflink.table.expressions import lit, col
23+
from pyflink.table.udf import udf
24+
25+
if __name__ == '__main__':
26+
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
27+
28+
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
29+
30+
# define the source
31+
t_env.create_temporary_table(
32+
'source',
33+
TableDescriptor.for_connector('datagen')
34+
.schema(Schema.new_builder()
35+
.column('word', DataTypes.STRING())
36+
.build())
37+
.option('number-of-rows', '1000000')
38+
.build())
39+
tab = t_env.from_path('source')
40+
41+
# define the sink
42+
t_env.create_temporary_table(
43+
'sink',
44+
TableDescriptor.for_connector('print')
45+
.schema(Schema.new_builder()
46+
.column('word', DataTypes.STRING())
47+
.column('count', DataTypes.BIGINT())
48+
.build())
49+
.build())
50+
51+
52+
@udf(result_type=DataTypes.STRING())
53+
def normalize(word, start, end):
54+
return word[start:end]
55+
56+
57+
# compute word count
58+
tab.select(normalize(col('word'), 0, 3)).alias("word") \
59+
.group_by(col('word')) \
60+
.select(col('word'), lit(1).count) \
61+
.execute_insert('sink')

0 commit comments

Comments
 (0)