How to prevent OOMs while streaming data to GCS via Apache Beam/Dataflow?

Yunus Durmuş
Google Cloud - Community
5 min readNov 16, 2022

--

Let’s admit, streaming is a challenging task when you deal with large amounts of data. A common problem I see in streaming data to GCS is out of memory (OOM) errors. Don’t panic, you are not alone, it happens to everyone.

Don’t panic, you are not alone

Let’s see what is happening in the backend

Assume that we have a pipeline with the below Java code. We write Parquet files to GCS with dynamic destinations and a fix number of shards.

.apply( "WriteParquetFile(s)",
FileIO.<Destination,GenericRecord>writeDynamic()
.by(new DestinationBuilder())
.via(ParquetIO.sink(schema))
.to(options.getOutputBucket())
.withCompression(Compression.SNAPPY)
.withNumShards(options.getNumShards()) .withDestinationCoder(AvroCoder.of(Destination.class))
.withNaming(key-> FileIO.Write.defaultNaming(key.getPath(),".parquet") )
);

The magic happens in WriteFiles.java Beam library, and in our case in WriteShardedBundlesToTempFiles Ptransform with the below code:

Data is divided into shards and then written to GCS

So what’s happening actually? As you see below, for each window and its pane(s) (multiple, in case you have early/late firings), we apply GroupByKey. The key is a combination of destinations and shards. They are hashed but since the hash has quite large space, you may assume that the number of groups = # keys x # shards. After grouping, Beam uses GCS connector for write. By default GCS connector creates a buffer of 1MB for each of these groups and performs resumable uploads over GCS API.

FileIO with dynamic destinations and shards

Let’s have a few example to see the memory impact.

  1. Windows without early firings. Assume that each window has on average 10 dynamic destinations and we have 5 shards. Without early firings, window triggers will be fires sequentially. Parallelisation will be limited. Then, only the GCS buffers contribute: 1 window x 10 destinations x 5 shards x 1MB buffer = 50MB. On top of buffers we have the actual data as well.
  2. Windows with early firings. Since we have early firings, multiple window triggers start to fire in parallel. Assume that we have currently 10 windows open as we have unordered late data. Destinations and shards are the same as above. Then our GCS buffers consume at least 10 windows x 10 destinations x 5 shards x 1MB = 500 MB.

As you see, things can go really bad suddenly when you have many write operations in parallel. The good thing is that each of these writes are handled by a single thread and they are distributed among the VMs by Dataflow engine.

What are our options to prevent OOMs and also increase throughput?

  1. VM size defines the shared memory for all the threads on a machine. You should provide larger machines (e.g., highmem ones) or use Dataflow Vertical auto-scaling to avoid OOMs.
  2. numberOfWorkerHarnessThreads parameter defines the number of threads per VM. By default it is 300 for streaming pipelines. By decreasing this limit we decrease the parallelism of the pipeline but we also put a limit on memory consumption. There is a always tradeoff. Note that Dataflow has workload rebalancing so it moves workloads between the machines to avoid overcrowding in one machine.
  3. numShards defines the number of shards that we create per key and window. Each window/pane of a key is divided into shards to further reduce the size of messages per thread as well as parallelise the writes to GCS. Shards help you to adjust the size of files created in GCS and also parallelise the writes to GCS. If you observe that GCS throughput is low, increasing the number of shards may help.
  4. gcsUploadBufferSizeBytes defines the buffer size for the GCS resumable upload operation. Every GCS writer thread allocates this buffer at the beginning no matter the size of data. The default in Beam is set as 1MB while the original GCS connector uses 8MB. Beam overwrites this field to avoid OOMs incase too many writers are open.
  5. Pipeline branching can be used to set parameters per high-volume topics. Some topics and partitions have a much higher number of files generated. For those it is best to provide more shards to divide data. We recommend branching PCollections that are high in volume and set bigger numShards for them.

How to monitor?

Dataflow Job Metrics page allows you to see the memory consumption and many other metrics. If you enable dataflow profiling options, then you even see the heap space usage.

An overlooked but important metric is the Parallelism. It helps us see the number of keys per window at the write stage. First, figure out the correct fused stage by going to execution details tab and trying to find the PTransform of WriteShardsIntoTempFiles. As you see below, it is in stage F33.

Dataflow fuses the sequential PTransforms together into one stage. You may find out the stage of the write transform in Execution Details tab.

After discovering the stage, F33 in my case, go to Job Metrics tab and select parallelism. There you will see the number of keys. Mine is 100 (5 destinations and 20 shards). This says that on average for each window, data is divided into 100 groups.

Parallelism metric is available only for the streaming engine enabled pipelines. It shows the number of keys and hence the parallelism in every stage.

There maybe more groups if there are more windows/panes are fired at the same time. Parallelism defines the estimated keys, not the actual fired groupings. We can discover that too! Under the timers metric, we see all the waiting as well as processed panes/windows. In my case, I have normally around 200 panes fired at the same time with a spike to 2.5K at the end.

Timers show the amount of firings. Since I have early firings in my pipeline, I see more firings than the parallelism.

Even with 2.5K spike (1MB buffer per pane x 2.5K = 2.5 MB), I should not have had any memory outage. What is happening?

So I checked my output files and saw that each parquet file is a few hundred bytes. Every message has its own file!

The below code is from FileIO - WriteShardsIntoTempFilesFn. For every input message, a destination writer is selected. Destination writer is a hash map. Guess what? I have not implemented the hashing function of my Java POJO for destination class! That’s why for every single machine, a new writer is created due to unique java hash per-object. Each writer has its own 1MB buffer by default although I just have a message of size a few hundreds of bytes. The result is millions of messages, and GBs of memory usage. By adding a simple hash function to my POJO, the problem has been solved. Now, I have files of size a few KBs.

FileIO — WriteShardsIntoTempFilesFn. Every message is sent to a writer where the writers are selected based on the hash of the message.

--

--