The other day, I wanted to get my feet wet with PyFlink. While there is a fair amount of related information out there, I couldn’t find really up-to-date documentation on using current versions of PyFlink with Flink on Kubernetes.
Kubernetes is the common go-to platform for running Flink at scale in production these days, allowing you to deploy and operate Flink jobs efficiently and securely, providing high availability, observability, features such as auto-scaling, and much more. All of which are good reasons for running PyFlink jobs on Kubernetes, too, and so I thought I’d provide a quick run-through of the required steps for getting started with PyFlink on Kubernetes as of Apache Flink 1.18.
In the remainder of this blog post, I’m going to explore how to
- install Flink and its Kubernetes operator on a local Kubernetes cluster,
- install Kafka on the same cluster, using the Strimzi operator,
- create a PyFlink job which creates some random data using Flink’s DataGen connector and writes that data to a Kafka topic using Flink SQL, and
- deploy and run that job to Kubernetes.
The overall solution is going to look like this:
What Is PyFlink and Why Should You Care?
“build scalable batch and streaming workloads, such as real-time data processing pipelines, large-scale exploratory data analysis, Machine Learning (ML) pipelines and ETL processes”.
It is particularly useful for development teams who are more familiar with Python than with Java and who still would like to benefit from Flink’s powerful stream processing capabilities. Another factor is Python’s rich 3rd party ecosystem: not only does it provide libraries for data engineering (e.g. Pandas) and scientific computing (e.g. NumPy), but also for ML and AI (e.g. PyTorch and TensorFlow), which makes PyFlink the ideal link between these fields and real-time stream processing. So if for instance you wanted to train your ML models on real time event data sourced from your production RDBMS, doing some data cleaning, filtering, and aggregation along the way, PyFlink would be a great option.
Similar to Flink’s Java APIs, PyFlink comes with a DataStream API and a Table API. The former lets you implement operations such as filtering and mapping, joining, grouping and aggregation, and much more on data streams, providing you with a large degree of freedom and control over aspects such as state management, late event handling, or output control. In contrast, the Table API offers a more rigid but also easier-to-use and less verbose relational programming interface, including support for defining stream processing pipelines using SQL, benefitting from automatic optimizations by Flink’s query planner.
Implementation-wise, PyFlink acts as a wrapper around Flink’s Java APIs. Upon start-up, the job graph is retrieved from the Python job using Py4J, a bridge between the Python VM and the JVM. It then gets transformed into an equivalent job running on the Flink cluster. At runtime, Flink will call back to the Python VM for executing any Python-based user-defined functions (UDFs) with the help of the Apache Beam portability framework. As of Flink 1.15, there’s also support for a new execution mode called “thread mode”, where Python code is executed on the JVM itself (via JNI), avoiding the overhead of cross-process communication.
So let’s set up a Kubernetes cluster and install the aforementioned operators onto it. To follow along, make sure to have the following things installed on your machine:
- Docker, for creating and running containers
- kind, for setting up a local Kubernetes cluster (of course you could also use alternatives such as MiniKube or a managed cluster on EKS or GKE, etc.)
- helm, for installing software into the cluster
- kubectl, for interacting with Kubernetes
Begin by creating a new cluster with one control plane and one worker node via kind:
After a minute or so, the Kubernetes cluster should be ready, and you can take a look at its nodes with kubectl:
Installing the Flink Kubernetes Operator
Next, install the Flink Kubernetes Operator. Official part of the Flink project, its task will be to deploy and run Flink jobs on Kubernetes, based on custom resource definitions. It is installed using a Helm chart, using the following steps (refer to the upstream documentation for more details):
- Deploy the certificate manager (required later on when the operator’s webhook is invoked during the creation of custom resources): <span class="inline-code">kubectl create -f https://github.com/cert-manager/cert-manager/releases/download/v1.8.2/cert-manager.yaml</span>
- Add the Helm repository for the operator: <span class="inline-code">helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/</span>
- Install the operator using the provided Helm chart: <span class="inline-code">helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator</span>
After a short time, a Kubernetes pod with the operator should be running on the cluster:
Installing Strimzi and Apache Kafka
With the Flink Kubernetes Operator up and running, it’s time to install Strimzi. It is another Kubernetes Operator, in this case in charge of deploying and running Kafka clusters. Strimzi is a very powerful tool, supporting all kinds of Kafka deployments, Kafka Connect, MirrorMaker2, and much more. We are going to use it for installing a simple one node Kafka cluster, following the steps from Strimzi’s quickstart guide:
- Create a Kubernetes namespace for Strimzi and Kafka:
<span class="inline-code">kubectl create namespace kafka</span>
- Install Strimzi into that namespace:
<span class="inline-code">kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka</span>
- Create a Kafka cluster:
<span class="inline-code">kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka</span>
Again this will take some time to complete. You can use the <span class="inline-code">wait</span> command to await the Kafka cluster materialization:
Once the Kafka broker is up, the command will return and you’re ready to go.
A Simple PyFlink Job
With the operators for Flink and Kafka as well as a single-node Kafka cluster in place, let’s create a simple stream processing job using PyFlink. Inspired by the Python example job coming with the Flink Kubernetes operator, it uses the Flink DataGen SQL connector for creating random purchase orders. But instead of just printing them to sysout, we’re going to emit the stream of orders to a Kafka topic, using Flink's Apache Kafka SQL Connector.
<div class="side-note">In order to get started with setting up a PyFlink development environment on your local machine, check out this post by Robin Moffatt. In particular, make sure to use no Python version newer than 3.10 with PyFlink 1.18. At the time of writing, Python 3.11 is not supported yet, it is on the roadmap for PyFlink 1.19 (FLINK-33030).</div>
Here is the complete job:
<div style="text-align: center">Listing 1: pyflink_hello_world.py (source)</div>
The logic is fairly straightforward: A <span class="inline-code">StreamTableEnvironment</span> is created and the Flink SQL Kafka connector JAR is registered with it. Then two tables are created: a source table <span class="inline-code">orders</span>, based on the datagen connector, and a sink table <span class="inline-code">orders_sink</span>, based on the Kafka sink connector. Finally, an <span class="inline-code">INSERT</span> query is issued, which propagates any changed rows from the source to the sink table.
Building a Container Image With Your PyFlink Job
In order to deploy our PyFlink job onto Kubernetes, you’ll need to create a container image, containing the Python job itself and all its dependencies: PyFlink, any required Python packages, as well as any Flink connectors used by the job.
Unfortunately, the Dockerfile of the aforementioned Python example coming with the Flink Kubernetes operator didn’t quite work for me. When trying to build an image from it, I’d get the following error:
The problem is that the upstream Flink container image which is used as a base image here, itself is derived from a JRE image for Java, i.e. it contains only a subset of all the modules provided by the Java platform. PemJa, one of PyFlink’s dependencies, requires some header files which only are provided by a complete JDK, though.
I have therefore created a new base image for PyFlink jobs, which removes the JRE and adds back the complete JDK, inspired by the equivalent step in the Dockerfile for creating the JDK 11 image provided by the Eclipse Temurin project. This is not quite ideal in terms of overall image size, and the cleaner approach would be to create a new image derived from the JDK one, but it does the trick for now to get going:
<div style="text-align: center">Listing 2: Dockerfile.pyflink-base (source)</div>
Create an image from that Dockerfile and store it in your local container image registry:
As for the actual image with our job, all that remains needed to be done is to extend that base image and add the Python job with all its dependencies (in this case, just the Kafka connector):
<div style="text-align: center">Listing 3: Dockerfile (source)</div>
Let’s also build an image for that:
In order to use that image from within Kubernetes, you’ll finally need to load it into the cluster, which can be done with <span class="inline-code">kind</span> like this:
Deploying a PyFlink Job On Kubernetes
At this point, you have everything in place for deploying your PyFlink job to Kubernetes. The operator project comes with an example resource of type <span class="inline-code">FlinkDeployment</span>, which works pretty much as-is for our purposes. Only the image name and the Flink version need to be changed to the image you’ve just created:
<div style="text-align: center">Listing 4: pyflink-hello-world.yaml (source)</div>
Note how the <span class="inline-code">PythonDriver</span> class is used as the entry point for running a PyFlink job and the job to run is passed in via the <span class="inline-code">-py</span> argument. Just as any other Kubernetes resource, this Flink job can be deployed using <span class="inline-code">kubectl</span>:
The operator will pick up that resource definition and spin up the corresponding pods with the Flink job and task manager for running this job. As before, you can await the creation of the job:
As the last step, you can check out the Kafka topic, confirming that the job propagates all the orders from the datagen source to the Kafka sink as expected:
You also can take a look at the deployed job in the Flink web UI by forwarding the REST port from the job manager:
This makes the Flink Dashboard accessible at http://localhost:8081, allowing you to take a look at the job’s health status, metrics, etc.:
Finally, to stop your job, simply delete its resource definition:
And there you have it—Your first PyFlink job running on Kubernetes 🎉. To try everything out yourself, you can find the complete source code in our examples repository on GitHub. Happy PyFlink-ing!
Many thanks to Robert Metzger and Robin Moffatt for their feedback while writing this post.