Wednesday, December 2, 2020

Tricky use cases of Apache Beam 2/3: Custom windows

🕥 9 min.

Introduction


This is the second article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam windowing?


In streaming systems, the flow of data is continuous. To be able to process it, we need to divide this flow of data into finite chunks. These finite chunks are windows. They are a way to temporally group the elements based on their event timestamp (the time at which the element was produced).

As Beam provides a unified API for batch mode and streaming mode, all the elements of Beam pipelines (even in batch mode) are stored in windows.  In batch mode, by default, there is a single window called the Global Window that stores all the elements of the pipeline. It is the default window that you get behind the scenes when you don't specify a window in your pipeline. But, even in batch mode, you could specify another windowing such as the ones below based on the timestamp of the elements. 

There are several types of windows provided by the Beam SDK:


Fixed windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Fixed windows are defined by the duration of the window. In the example above they allow to express the use case: "I want to have the last 30 seconds of data". 

Sliding windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Sliding windows are similar to fixed windows except they overlap. They are defined by both a duration and a sliding period. In the example above they allow to express the use case: "I want to have the last 60 seconds of data every 30 seconds". 

Sessions windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Sessions windows are very similar to web sessions. They are defined by a gap duration: if no data comes for more than the gap duration, then the next element to come is considered belonging to another session. Of course, the gap duration is measured in event time, in other words: if the next element's timestamp is more than gap duration after the previous element, then it will be put into another session window.

Custom windows


So far, the window types that Beam provides are quite similar to what other Big Data streaming systems provide except maybe the sessions windows that not many systems provide. But, what is really different with Beam is that it provides a way for the user to define his own windowing by extending the WindowFn function class or one of its sub-classes.

Health monitoring system example


To illustrate Beam custom windows, we will consider this use case: let's say we have a monitoring system that reports events about the health of a large and complex e-commerce platform. 

For the sake of simplicity, the event will be a simple class with just an enum status:
 
A big amount of events are produced, so the monitoring is made through a Big Bata pipeline. And this pipeline uses Beam. It is a streaming pipeline as the events are continuously arriving. We want fine oversight on what's going on so we set the granularity of the monitoring to 10 minutes. Thus, in the pipeline, we divide the collections of events into fixed windows of 10 minutes. But there is something to consider about the health of the e-commerce platform: when a failure occurs, it has consequences on the health of the overall platform for about 30 min (time for the clusters to failover, load to decrease etc...), the system is thus in recovered state. As a consequence, in the monitoring system, we want a failure event to be reported for 30 min after its occurrence. 

Implementation


To implement the above desired behavior, we need to assign events to windows depending on the type of these events. This is a perfect use case for Beam custom windows. We will define a MonitoringFixedWindows custom window that assigns HEALTHY events to the current fixed window and FAILURE events to the current and the next 3 fixed windows:


The custom window is similar to FixedWindows, but we do not extend the FixedWindows class because FixedWindows assigns an element to only a single window. In our case, in the case of a FAILURE event, we want it to be assigned to 4 windows (the current and the next 3 ones).

The lowest function class in WindowFn hierarchy that we can extend is NonMergingWindowFn. It is simply windows that do not merge between each other (see the code).

The interesting part of this code is the override of the assignWindows method. It is there that resides the whole essence of custom windows. It allows for a given timestamp (of an element) to give the list of windows it should be assigned to. In our case we return a list of IntervalWindows (simply windows with boundaries) because our windowing function class deals with IntervalWindows.

The other overrides are quite straightforward:
  • windowCoder : as said above, all the elements in Beam are in windows. And they will be serialized at some point, so the window information needs to be serialized as well. WindowCoder is simply a Coder for the custom window. The custom window deals with IntervalWindow so we just return this coder which is already provided by the SDK.
  • getDefaultWindowMappingFn: this method is for side inputs (data of another PCollection accessible though a view in the Beam transforms). Here, this custom window does not support side inputs so we just throw an exception
  • isCompatible: 2 custom windowing functions are compatible if they are equal (same type and same boundaries)
  • equals and hashcode: as there will be windows comparisons issued by Beam, we need to override equals and hashcode with the usual code

Now, if we apply this windowing function to an input streaming PCollection. 

inputPCollection.apply(MonitoringFixedWindows.of(Duration.standardMinutes(10)));

The output will be like this: 


Consider that we receive a HEALTHY event with timestamp 5, it will be assigned to the window starting at timestamp 0 and ending à timestamp 10. If we then receive a FAILURE event with timestamp 12, it will be assigned to the window it belongs (starting at timestamp 10 and ending at timestamp 20) and also to the next 3 10-minutes windows.

Now you know how Beam windowing can adapt to whatever use case you might have !