Saturday, October 31, 2020

Watermark architecture proposal for Spark Structured Streaming framework

🕥 7 min.

The multiple aggregations problem with Spark Structured Streaming framework


Developing the translation layer (called runner) from Apache Beam to Apache Spark we faced an issue with the Spark Structured Streaming framework: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is an open ticket in the Spark project, an ongoing design and an ongoing PR, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project.


The underlying problem: the global watermark in the Spark Structured Streaming framework


Before explaining this problem, let's explain what a watermark is

What is a watermark?


In streaming systems, there is always lags between the time at which a given element was produced (the event timestamp) and the time at which it was received by the system (the processing timestamp) because there can be network outages or slow downs etc... There also can be out of order data. 

To deal with these two facts, streaming systems define the notion of watermark. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data.

If we take an example of a very simple static naïve watermark defined as an offset of 10 min

    at 2:10 the system considers that no element with a timestamp older than 2:00 can arrive

A corollary notion is late data. It is data that arrives behind the watermark. For example, with the previous watermark example, let's say that an element with timestamp of 1:59 arrives at 2:11. At 2:11 the watermark tells the system that no element with timestamp older that 2:01 (offset of 10 min) can arrive, so this element is behind the watermark, so it will be considered late by the system. As the consequence, the system should drop it. Still, we can define a tolerance that we call allowed lateness that could permit to process this late element. For this element not to be dropped, we would need to set an allowed lateness value of at least 2 min (the element arrives 12 min after it was produced - watermark of 10 min offset).

The problem with the global watermark in an example


Back to the problem: Spark Structured Streaming framework does not support more than one aggregation in a streaming pipeline because the watermark scope in Spark Structured Streaming framework is global for the whole pipeline, there is no watermark per operation. At the end of each microbatch the global watermark is updated with the newest timestamp received. How is it a problem? Let's answer in an example:

Consider we have a streaming pipeline with 2 aggregations (Op1 and Op2 in blue in the diagram below). The first aggregation outputs the highest valued element out of a window of 3 seconds.


Now let's say that the source sends 3 elements with value 6, 4 and 5 and timestamps 1, 2 and 3. These elements get buffered inside Op1 and Op1 updates the global watermark to value 3 (the newest timestamp seen). As the watermark reached the end of Op1's window (window of 3 seconds), then it is time for Op1 to output its data. The highest valued element is value 6 but timestamp 1. As a consequence this element will be considered by Op2 as late because Op2 relies on the global watermark which value is 3, so the element's timestamp of 1 is behind the global watermark. As a consequence, this element will be dropped by Op2 leading to incorrect results.

So, to avoid that incorrect results could be produced by downstream operations, Spark Structured Streaming framework deactivates the support for more than one aggregation in streaming pipelines.

A possible solution to this problem


A watermark per operation


A possible solution to this problem is to replace the global watermark by a watermark per transform.
And the watermarks values will be propagated from the source through the different operations of the pipeline. The local watermark values will be updated as new data arrives. For each operation we need to define:
  • an InputWatermark = Min {outputWatermark(previous operations)} *
  • an OutputWatermark = Min {inputWatermark, oldest(processed element)}

* In a straight pipeline, the InputWatermark is simply the OutputWatermark of the previous operation


    Updated example


    Let's update the previous example of the streaming pipeline with Op1 and Op2. Only now, there is no more global watermark, we define a watermark per operation in green in the diagram below:


    As said above, the watermark is set by the source. Let's say that the source tells the system that no element older than timestamp 1 can arrive (source watermark = 1). This value is propagated to the input watermark and to the ouput watermark of Op1 according to the rules defined in the previous paragraph. The watermarks of Op2 are updated as well according to the same rules. We end up having the green values in the above diagram.

    Now let's say that we receive the very same 3 elements as before with values 6, 4 and 5 and timestamps 1, 2 and 3. 


    They get buffered in Op1. Op1 updates its oldest processed counter (in red) to the minimum timestamp seen. And, as the output watermark is the minimum between this counter and the input watermark, it remains set to 1. And, as the ouput watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    Let's say now that the source updates its watermark to 3 (in red in the diagram below), saying that it should not receive older elements than timestamp 3.


    So the input watermark of Op1 is updated to the output watermark of the previous step (the source) but the output watermark of Op1 is not updated as it is the minimum between the oldest processed counter (which is 1) and the input watermark. As the output watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    But something happens now in Op1: its input watermark is value 3, it has passed the end of the window, so it is time for Op1 to output its data and clear its buffers. So it outputs the highest element of value 6 and timestamp 1 as shown below.


    But now, this element is no more dropped by Op2 because the input watermark of Op2 is still 1, Op2 no more considers the element late. It just processes it, buffers it and updates its oldest processed counter.

    Now, one final thing you may wonder, is when and how does Op2 updates its watermarks?

    Well, the answer is with the arrival of new data: let's say, we receive a new element from the source with value 7 and timestamp 4 as show below:


    The processing of this element by Op1 will update the oldest processed counter to the timestamp of the element (4) and the output watermark of Op1 will be updated as the minimum between the input watermak and the oldest processed counter is now 3. As the consequence of the update of the output watermark of Op1, the watermarks of downstream Op2 will be updated as shown above. This is how watermarks values are updated with the arrival of elements.

    There is something left to be discussed: the late case that we will see in the next paragraph.

    What happens for late data in this architecture?


    A given element is late for a given operation when its timestamp is inferior to the InputWatermark of the operation. As the OutputWatermark of this operation is defined as Min {inputWatermark, oldest(processed element)}, then the OutputWatermark of the operation is set the timestamp of the element. So, for the next operation, the InputWatermark will be set to the OuputWatermark of the previous operation, and thus will be set to the timestamp of the element. So we can see that the watermarks of all downstream operations will be set to the value of this element's timestamp. So, to sum it up: a late element will delay downstream operations.

    Conclusion


    This watermark architecture was proposed to the Apache Spark project.