Robert Metzger recently shared a video presentation on the Top 5 Mistakes When Deploying Apache Flink. This article summarizes Robert’s video and explores the top 5 most common mistakes and areas of confusion that we see over and over again when people are getting started with Apache Flink. This often happens when they have figured out a use case, implemented it, and then actually started getting into the weeds of deploying it. We’ll discuss some of these common mistakes and explain the related underlying principles of Flink to help you understand these issues and how to overcome them.
It’s important to understand that data serialization is expensive, and some data structures are larger and more inefficient than others. When choosing data types in your Flink implementation, we often see people using things like Java HashMaps, Sets, or other collection data types. When serialized, these types can be very large when considering the amount of actual data they contain. This is important because they are stored in Flink’s state, so usually in RocksDB or on the heap, and are also used for network transfers.
There are multiple places where serialization can occur:
- in the data exchanges between TaskManagers as well as sources and sinks;
- when accessing state in RocksDB;
- and when sending data between non-chained tasks locally.
In addition to the data involved, it's also CPU intensive to do serialization. Let's take a look at an example that is particularly bad, but also shows how easy it can happen.
In this example, each event contains latitude and longitude coordinates for a pair of start and end locations. The data structure used for the locations is a Java HashMap, as shown above. The challenge is that Flink does not know how to efficiently handle HashMap. It knows how to handle strings, tuples, SQL types, or Avro types extremely efficiently; but for standard Java collection types, Flink falls back to the Kryo serialization framework.
For our example, Kryo serializes a HashMap of two Location objects into a series of approximately 120 bytes, which includes a lot of unwanted metadata. All of those bytes are then transferred over the wire or written to disk for every single event. This is quite inefficient, especially if you consider that a plain old Java object (POJO) could be used instead. A simple object with four integer fields is just 16 bytes, which is a 7.5 times reduction in size.
If you are interested in a deeper dive on this topic, there is a very extensive article on the Flink blog about tuning the Flink serialization framework. It's a fairly advanced implementation that allows for the use of many different serializers, such as Avro.
Flink doesn't always need to be fully distributed. Or put another way, not every use case requires multiple machines running in a distributed fashion. Flink’s MiniCluster allows you to create a full-fledged local environment in a single JVM. It provides everything you would normally find in distributed clusters, including RocksDB, support for checkpointing, the web UI, SQL, all the connector support, etc. It's the same components, but instead of running them across multiple machines, they just start in the same JVM.
Setting up a MiniCluster is fairly simple using the code below. As the cluster.start() method executes, the JVM will display information about all of the components being started. Example use cases for the MiniCluster include local debugging, performance profiling, testing, and processing small data streams efficiently.
If you’re running into an issue in production and aren’t able to isolate the problem, then assuming you’re able to access your data locally, it would be possible to very elegantly step through the Flink code, the framework code, and your own code to understand why something is not working the way you expect.
The same goes for profiling. For example, if you're unsure whether your serialization stack is causing some performance issues, you can use the MiniCluster to run Flink locally, attach a profiler, do some sampling over the most common common code path, and then potentially isolate the issue.
Another very important aspect of well written software is testing, and with the MiniCluster you can very easily bring up Flink in your integration or end-to-end tests. You can even bring up Kafka, for example, or MinIO as an S3 replacement if you want to test with checkpointing or under failure scenarios. We can also recommend the test container project, which allows you to start auxiliary components in the same JVM.
A third use case is for processing very small streams efficiently, where the MiniCluster can be a viable option even in production scenarios. This is because it's exactly the same components that you would use on a fully distributed cluster. In situations where it's just a few hundred messages per second, you don't necessarily need a full cluster. There are times when it's fine to just use a single JVM.
Flink is often perceived as difficult to use, but (speaking unabashedly as the Flink PMC Chair) it’s actually quite beautiful once you understand how it works. The sense of difficulty stems in large part because there are so many options and so much flexibility in how you can deploy it. Let's look at a framework for thinking about the deployment options that are available.
To begin, there are different execution modes that determine the way Flink jobs are executed: application, session, and per job. In almost all cases, the recommendation is to use application mode. In this mode, there is only one job per job manager instance. In contrast, when using session mode, multiple jobs share a job manager. This is only recommended in specific scenarios, for example when running many short-running batch jobs or for very small streaming jobs where the resource costs of running a dedicated job manager for each one is too high. Per-job mode was deprecated in Flink 1.15 and was only used in the YARN context.
There are two types of deployment options: passive deployment, where the Flink resources are managed externally—also called Standalone mode, and active deployment, where Flink actively manages the resources.
With passive deployment, or standalone mode, you have a bunch of JVMs that are deployed wherever you want. You can use the scripts included with Flink which will launch Job Manager and Task Manager JVMs, they will connect to each other and start processing. You can do the same on Docker, so you can bring up a bunch of docker containers and they will find each other and do the processing. Or you use what almost everybody is using these days, and that is Kubernetes. You define two deployments, one for the Job Manager and one for the Task Manager, and they will do the processing.
The benefit of passive deployment is that it supports Flink’s reactive mode, which is basically auto-scaling. It also supports many DIY scenarios, so you could use your own homegrown deployment system. And it also allows for fairly fast deployments, because all components are starting up together at the same time. However, you have to take care of some infrastructure things yourself, for example restarting failed JVMs. So if you're deploying on bare metal, you have to make sure the processes are restarted in case of a failure. Also, considerations such as lock file management, etc.
With active deployment, Flink actively manages the resources it needs. For example, when Flink is started on Kubernetes or YARN, it knows how to interact with those environments to acquire the resources it needs. It also knows how to restart resources when they fail. The benefit of this approach is that if, for example, you have batch jobs which have resource requirements that change over time, Flink will be able to dynamically allocate and deallocate resources as needed. However, the downside of this approach is that, particularly when using Kubernetes, it requires a lot of permissions to accomplish tasks, such as creating pods. This is a fairly uncommon pattern on Kubernetes, which is why active deployment is less common for Flink.
Properly sizing your cluster is very important, both in terms of under-sizing as well as over-sizing. Some people new to Flink can become disappointed in the performance they experience or with their ratio of resource investment to results. At the same time, others have amazing hardware and almost nothing to process, so their abundance of resources are not being used effectively.
To mitigate these problems, it’s important to have a good understanding of the intended workloads and the environment in which it will run. It’s possible to do some basic math to determine realistic resource requirements and set expectations. Let’s go through the process to perform a “back of the napkin” calculation to understand the baseline performance of an example Flink deployment. And I'm explicitly saying baseline because you may need to add 20-30 percent to handle demand spikes, such as those caused by failure recovery, a dramatic change in your user behavior, or upstream failures like your Kafka platform going down. These can cause situations during which you need to reprocess much more data for a short period of time.
Let's dive into an example that has a relatively simple topology with a Kafka source, a window operator that collects data for five minutes with an offset of one minute (so you get results written once a minute), and the results are written to a Kafka sink. We'll also use RocksDB as the state backend for keeping the sliding window data.
As a side note, it’s a common misconception that RocksDB is a database that you need to install somewhere and that needs to be maintained, connected to, etc. In reality, it’s just a library inside the Flink JVMs that writes to local disks—so you really don't directly interact with RocksDB as a user of Flink.
In our example, we have a Flink job where our sample data has a message size, or event size, of 2 KB (which is admittedly quite large), a throughput of one million incoming messages per second, 500 million unique keys (which is important for the window operator to determine the state size), and a requirement to create a checkpoint (a full copy of the state of the data in the sliding window) once a minute. We also have five machines, each running a TaskManager.
To perform this calculation, we can look at this Flink cluster from a single task manager's perspective, and then we know that everything we determine is happening five times. Since each message is two kilobytes and there are one million messages per second, that's a two gigabyte per second stream with each machine receiving 400 megabytes of data from Kafka. Each task manager is processing one parallel slice of the data stream, with its own Kafka connectors and sliding window operator.
The incoming data stream is partitioned in real-time based on the primary key so that the same keys are consistently sent to the correct window operator. This results in the 400 MB/s stream being split into five partitions, with 80 MB/s being sent to the window operator for processing locally by this task manager, and the remaining 320 MB/s being sent over the network to the other four task managers. And of course, since the other four task managers are doing the same thing, this task manager is receiving 320 MB/s in four 80 MB/s streams. The purpose of this shuffling is that all messages with the same primary key will be processed by the same window operator, even though some of those messages will have been initially received by the wrong task manager. Finally, the output stream being sent to the Kafka sink connection by each task manager has an average throughput of 67 MB/s because the window operator is emitting results once a minute.
We also need to consider the amount of data being produced by the checkpointing. In this example, each primary key is 40 bytes and we have five windows (the window operator has a 5 minute size with a 1 minute slide) with each partition responsible for 100 million keys. That results in roughly 20 gigabytes of state that will be kept in RocksDB—which is also important information for sizing your local storage. Since we checkpoint this 20 gigabytes of data every minute, we will have on average 333 megabytes of data that we're sending over the network.
As a side note, if we use incremental checkpointing, then we won't back up the entire 20 gigabyte every minute. It will only be roughly the amount of keys that we have touched within the last minute. So if we have only touched 20 percent of the key space, then only roughly 20 percent of the state needs to be backed up.
Another interesting side question that we can quickly ask is, how much disk access do we actually produce? We receive 200,000 messages per second on each machine, with 40 bytes across five windows. That results in roughly 40 MB/s of sustained disk access being produced. So when you're sizing your storage, make sure that your disk is able to sustain the appropriate level of data transfer.
So if we add checkpointing to our overview of the traffic, then we see that in total we have 720 MB/s of incoming data and 720 MB/s of outgoing data on each task manager. Readily available cloud-based VMs with 16 CPU cores and guaranteed 10 gigabit Ethernet connections (which is 1,250 MB/s) are able to sustain this load. These are obviously quite big machines, they have plenty of main memory and so on, so this could work.
In a similar way, a Kafka cluster that is able to sustain such high loads must also be pretty beefy, because the rate is determined by the disk speed, so each broker would need 400 MB/s of disk throughput—or if less, then it would require more brokers.
For the purposes of cluster sizing, what matters here is that with this example we have some numbers, and based on these we can discuss what machine types to choose. And perhaps five task managers is actually not the best solution, maybe 10 smaller task managers would be a better approach. This will depend on specific use cases and what resources are available.
I also want to mention a few things that we have not looked at in this back-of-the-napkin approximation. Things like protocol overhead from the network stack, internal RPC calls, the networking bursts caused by checkpointing and window operator emissions that happen once per minute, and other systems that may be using the network. In addition, the topics of CPU, memory, and disk access speed have not been fully considered. In general, for these big use cases it is strongly advised to do some benchmarking to make sure that everything is well calibrated.
Finally we come to the final mistake that we see being made, and that is not asking for help. Flink has been around for many, many years and a significant portion of problems new users experience have already been solved. There are many different channels that you can use for getting help:
- firstname.lastname@example.org mailing list
- https://lists.apache.org searchable archive
- the apache-flink tag has over 6,300 questions on Stack Overflow
- the Apache Flink community on Slack
- meet-up communities around the world
- conferences such as Flink Forward (with training sessions)
- Have a question for Robert? Connect on Twitter or LinkedIn
- Ready to connect to a data stream and create a pipeline? Start free
- Take a guided tour with our Quickstart Guide
- Join our Slack community