Photo by Włodzimierz Jaworski on Unsplash

7 things to watch out for after productionalizing Flink jobs

Getting job to production is just the first step

Adrian Bednarz
8 min readSep 23, 2022

--

Despite many companies struggling to enable their real-time data, more and more are actually running production, streaming workloads. As I was working with clients on running their workloads, I realized that certain problems could be avoided with the right preparation.

I advocate for streaming whenever I can. It gets easier and easier to get started with streaming on Flink — you can either use FlinkSQL or a SaaS platform like Ververica. Even though it is straightforward to get started, things get complicted down the road.

When event time processing confronts reality

Event time processing is a fascinating concept — embedding time within the event enables new ways of looking at the data. It doesn’t matter when event is loaded to the system — you can always associated it with the right time window. A machine goes down for a hour? No problem! Just reboot it and push the events — results will eventually be correct.

Ok, that’s the theory. If you look at it from technical perspective — this ain’t that simple. You should be well aware of watermarks and their impact on the job. So on the other hand, you can’t really wait forever. Well, in theory you still resort to late data handling. This is not as easy as it sounds though.

For instance, the default time window trigger will fire the window for each late event. Just imagine having a pipeline that processes millions of events per second and one of the producers were stuck for an hour. Your system will be flooded with late data that is usually pretty expensive to compute.

You can deal with this problem e.g. by implementing a custom window trigger — this requires writing some custom code though. So you no longer are writing SQL-only jobs.

Another common late data problem that can be missed is the lack of TTLs on window state. If you allow lateness of 3 days, state for all time windows created within that period of time will be kept around. It is so because the default strategy for handling late data is to merge the windows (and for this to work state is needed).

When working with time, you should also be aware of idle partitions, consequences of out-of-orderness intervals — things we usually take for granted. Idle partitions are actually closely related to late data handling. There is just one sensible strategy in this case — to bump the watermark. This again might lead to late data in case the events should be there but they simply got stuck somewhere on upstream producer side.

Serialization

With any distributed framework you deal with serialization. Subtasks have to exchange the data, process it, and eventually push it to downstream systems.

Firstly, the default Flink’s fallback when POJO / Avro serialization fails is to use Kryo. You should be aware though that this can be an order of magnitude less efficient. Your safest bet is to disable pipeline.auto-type-registration. Flink will complain at runtime if it falls back to Kryo. This is not ideal but in many cases more preferable than silently experiencing 10x worse performance.

Secondly, due to serialization overhead you may experience performance overhead when doing a change that should actually boost it. Consider reshuffling your data right before a sink for reducing number of connections to external system. As a concrete example, if you are using bucketing for data dumps to external file system, each StreamingFileSink subtask maintains a list of buckets along with a file they write events to. With randomly shuffled data this can lead to situations where every subtask writes to every partition. To avoid this, you commonly reshuffle data on Flink side.

Now imagine that the data was already shuffled. Introducing a keyBy shouldn’t change much right? The answer is no, because Flink will serialize and deserialize each event, even if the exchange will happen within the same JVM!

Get your JVM profiling tools ready

Debugging jobs under time pressure is exhausting. To increase your chances for success prepare yourself — get async-profiler, VisualVM, jemalloc, jeprof and any other of your favourite tools. Make sure you have run books in place to be ready to connect to problematic task manager and invoke relevant thread / heap dumps.

Flink comes with a great UI and metrics, still you should leverage your JVM experience!

  • get a heap dump and analyze it with MAT to spot memory leaks quickly,
  • go ahead and use async-profiler to plot flame graphs to analyze thread dumps faster (Flink ships its own flame graphs but they require special configuration flags and it is more difficult to share them once JM is down).

On a similar note, you might want to set up a robust logging infrastructure to collect the logs (like EFK or Loki / Promtail / Grafana). Adding robust logging is a science when it comes to streaming jobs, they usually don’t get restarted that often and they can’t really produce much logs due to the data volumes going through the systems.

Be mindful of state

If you are not careful, state can grow unwieldy. Remember, that by default Flink keeps the state indefinitely. Although FlinkSQL hides a lot of stateful complexity from you, these jobs are just as susceptible to state problems as the regular streaming jobs. I already mentioned problems that may arise with late data and state. Still, any key associated with e.g. a session will be kept forever if idle state retention time is not set.

DataStream API offers more flexibility but also more traps when it comes to state. You have to actually control it by yourself and nothing restrains you from using these abstractions however you like. In the past I used to write a lot of code based on timers that would clear the state for me in such jobs. If you are not aware, Flink supports state TTL. I found this feature very useful and it cleans up the code immensely.

One of such freedoms is to choose between ListState and ReducingState. Unexperienced Flink practicioners often overuse ListState — for instance, the simplest mental model to understand time windows is to think of them as abstractions to gather all events to a list than can later be used to derive a final result. ReducingState — that maintains just one record per time window , an accumulated result — seems like a clever choice. And it usually is. Unless you are using RocksDB and deal with small lists. Adding elements to RocksDB ListState is really cheap — it doesn’t have to deserialize existing records in state to do so after all. With ReducingState situation is different — the value is kept on heap serialized and whenever you want to update it, a deserialization, computation and serialization must happen. This can be a signifiant performance bottleneck.

Working with large state

Large state can also become a problem, it increases the checkpoint creation overhead. Also, state in Flink is periodically persisted thanks to checkpoints but it is not replicated on TMs. Whenever a TM crashes, you need to recover state from external storage. And with terabytes of state, this can take significant amount of time.

Sometimes a better option is to offload whatever you can to external databases. Aerospike, Redis and alike usually worked well in the projects I worked with. Of course, if latency is critical you are forced to put as much as you can into the TM.

A useful tip for large state with RocksDB is to use local SSDs. They are orders of magnitude faster than the default network HDD drives mounted by default by cloud providers. Although RocksDB keeps some data in memory, if you profile the job closely you’ll see how important disk access is.

Decouple warehouse from streaming jobs

It might sound tempting to push data from streaming job directly to warehouse but in reality I found it more robust to use intermediate storage such as S3 to decouple both systems. If there are any problems with your warehouse (say Snowflake), the streaming job will inevitably fail. Setting up Snowpipe from S3 to Snowflake via SQS notifications is simple enough.

The same rule of the thumb can be applied to any intergration with external system. If you see that something can fail, may not be available for prolonged amounts of time, just try to find a way of decoupling. My viewpoint here might be a bit skewed due to the fact that I worked with large scale, state heavy streaming jobs that were just painful to restart.

Watch out for data skews!

Flink provides a clear visualization of the number of events handled by each subtask via its UI. You might often realize that some subtasks are more overburdened than the others. There are at least two reasons to this:

  • Flink is optimized for large range of keys that are going to be assigned to small number of workers (controlled by task parallelism),
  • The default subtask assignment algorithm has uneven distribution.

I will not get into too much details but you can follow the code by yourself. First of all, Flink uses key’s hash code and Murmur hash function to encode the keys. It also leverages values of both parallelism and maxParallelism (that defaults to 1.5 * parallelism) to compute the operator index. I think the only relevant thing in reality is that you should specify both of these values explicitly and make sure that maxParallelism is a multiple of parallelism to get the even distribution.

Secondly, with small number of keys (especially if they are consecutive integers) the distribution of Murmur hash function modulo parallelism will be skewed too. I usually hit this use case when I want to repartition data so that each sink subtask corresponds to a single partition in downstream system (be it a Kafka topic or some distributed database). In such cases, it is convenient to repartition data by partition index, with hopes that the load will be evenly distributed in Flink. This is not the case and the only workaround I know is to be aware of Flink hashing scheme and apply a small bit of reverse engineering — instead of passing an expected subtask index as a key (like 0, 1, 2, 3... ) I remap each subtask index ito an integer idx so that murmur(idx) % number of subtasks == i . If I provide Flink with such keys, I am sure that each event will end up on the right subtask (use at your own risk 😃).

Conclusion

Deploying job is one thing, making sure it scales well, is correct and is robust to failures is yet another story. Don’t get me wrong, Flink is a great product that I really enjoy working with. It taught me about a lot about JVM world and made me a better engineer overall. It is also my framework of choice for whatever greenfield real-time project I do.

I hope that you found any of these tips helpful and that this would eventually make the system you work with just a tiny bit better.

--

--