Showing posts with label spark. Show all posts
Showing posts with label spark. Show all posts

Saturday, October 31, 2020

Watermark architecture proposal for Spark Structured Streaming framework

🕥 7 min.

The multiple aggregations problem with Spark Structured Streaming framework


Developing the translation layer (called runner) from Apache Beam to Apache Spark we faced an issue with the Spark Structured Streaming framework: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is an open ticket in the Spark project, an ongoing design and an ongoing PR, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project.


The underlying problem: the global watermark in the Spark Structured Streaming framework


Before explaining this problem, let's explain what a watermark is

What is a watermark?


In streaming systems, there is always lags between the time at which a given element was produced (the event timestamp) and the time at which it was received by the system (the processing timestamp) because there can be network outages or slow downs etc... There also can be out of order data. 

To deal with these two facts, streaming systems define the notion of watermark. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data.

If we take an example of a very simple static naïve watermark defined as an offset of 10 min

    at 2:10 the system considers that no element with a timestamp older than 2:00 can arrive

A corollary notion is late data. It is data that arrives behind the watermark. For example, with the previous watermark example, let's say that an element with timestamp of 1:59 arrives at 2:11. At 2:11 the watermark tells the system that no element with timestamp older that 2:01 (offset of 10 min) can arrive, so this element is behind the watermark, so it will be considered late by the system. As the consequence, the system should drop it. Still, we can define a tolerance that we call allowed lateness that could permit to process this late element. For this element not to be dropped, we would need to set an allowed lateness value of at least 2 min (the element arrives 12 min after it was produced - watermark of 10 min offset).

The problem with the global watermark in an example


Back to the problem: Spark Structured Streaming framework does not support more than one aggregation in a streaming pipeline because the watermark scope in Spark Structured Streaming framework is global for the whole pipeline, there is no watermark per operation. At the end of each microbatch the global watermark is updated with the maximum timestamp received. How is it a problem? Let's answer in an example:

Consider we have a streaming pipeline with 2 aggregations (Op1 and Op2 in blue in the diagram below). The first aggregation outputs the highest valued element out of a window of 3 seconds.


Now let's say that the source sends 3 elements with value 6, 4 and 5 and timestamps 1, 2 and 3. These elements get buffered inside Op1 and Op1 updates the global watermark to value 3 (the maximum timestamp seen). As the watermark reached the end of Op1's window (window of 3 seconds), then it is time for Op1 to output its data. The highest valued element is value 6 but timestamp 1. As a consequence this element will be considered by Op2 as late because Op2 relies on the global watermark which value is 3, so the element's timestamp of 1 is behind the global watermark. As a consequence, this element will be dropped by Op2 leading to incorrect results.

So, to avoid that incorrect results could be produced by downstream operations, Spark Structured Streaming framework deactivates the support for more than one aggregation in streaming pipelines.

A possible solution to this problem


A watermark per operation


A possible solution to this problem is to replace the global watermark by a watermark per transform.
And the watermarks values will be propagated from the source through the different operations of the pipeline. In fact, in most streaming systems, the watermark is a special type of element that flows with the data downstream the pipeline. The local watermark values will be updated as new watermarks and data arrive. For each operation we need to define:
  • an InputWatermark = Min {outputWatermark(previous operations)} *
  • an OutputWatermark = Min {inputWatermark, oldest(processed element)}

* In a straight pipeline, the InputWatermark is simply the OutputWatermark of the previous operation


    Updated example


    Let's update the previous example of the streaming pipeline with Op1 and Op2. Only now, there is no more global watermark, we define a watermark per operation in green in the diagram below:


    As said above, the watermark is set by the source. Let's say that the source tells the system that no element older than timestamp 1 can arrive (source watermark = 1). This value is propagated to the input watermark and to the ouput watermark of Op1 according to the rules defined in the previous paragraph. The watermarks of Op2 are updated as well according to the same rules. We end up having the green values in the above diagram.

    Now let's say that we receive the very same 3 elements as before with values 6, 4 and 5 and timestamps 1, 2 and 3. 


    They get buffered in Op1. Op1 updates its oldest processed counter (in red) to the minimum timestamp seen. And, as the output watermark is the minimum between this counter and the input watermark, it remains set to 1. And, as the ouput watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    Let's say now that the source updates its watermark to 3 (in red in the diagram below), saying that it should not receive older elements than timestamp 3.


    So the input watermark of Op1 is updated to the output watermark of the previous step (the source) but the output watermark of Op1 is not updated as it is the minimum between the oldest processed counter (which is 1) and the input watermark. As the output watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    But something happens now in Op1: its input watermark is now value 3, so it has passed the end of the window. Therefore it is time for Op1 to output its data and clear its buffers. So it outputs the highest element of value 6 and timestamp 1 as shown below.




    But now, this element is no more dropped by Op2 because the input watermark of Op2 is still 1, Op2 no more considers the element late. It just processes it, buffers it and updates its oldest processed counter.

    Now, one final thing you may wonder, is when and how does Op2 updates its watermarks?

    Well, after outputting its data,  Op1 updates its output watermark to 3 and re-initializes its oldest processed counter. After that it sends its output watermark downstream. So Op2 receives the output watermark of Op1 after the element and, as a consequence, updates its input watermark as shown below. This allows to update the watermarks on a timely manner and not wait for the arrival of new data in the pipeline to update watermarks.




    There is something left to be discussed: the late case that we will see in the next paragraph.

    What happens for late data in this architecture?


    A given element is late for a given operation when its timestamp is inferior to the InputWatermark of the operation. As the OutputWatermark of this operation is defined as Min {inputWatermark, oldest(processed element)}, then the OutputWatermark of the operation is set to the timestamp of the element. So, for the next operation, the InputWatermark will be set to the OuputWatermark of the previous operation, and thus will be set to the timestamp of the element. So we can see that the watermarks of all downstream operations will be set to the value of this element's timestamp. So, to sum it up: a late element will delay downstream operations.

    Conclusion


    This watermark architecture was proposed to the Apache Spark project.


    Tuesday, March 17, 2020

    How to create a custom Spark Encoder in ... java

    🕥 13 min

    What is a Spark Encoder ?


    An Encoder is a wrapper class that specifies how to serialize and deserialize data with the Spark Structured Streaming framework.

    In this framework, datasets are typed and they are schema aware so every Spark transformation called on a dataset needs to provide an Encoder for its output type so that the framework can know how to serialize and deserialize its content. For example, the signature of the map transform is as this:


    Why this blog post ?


    Because Spark Structured Streaming Encoders are complex to write, have no java API and they are not very documented so I thought that what I learnt writing them could be useful to the community.

    Catalyst


    To understand Encoders we need to focus a bit on Catalyst. Spark Structured Streaming uses Catalyst optimizer to optimize execution plans. And Encoders are part of the Catalyst plan. Catalyst sees a pipeline as a tree made of TreeNodes. Here is the Catalyst workflow:



    The entry point of Catalyst is either a SQL abstract syntax tree (AST) returned by the SQL parser if the user has used direct Spark SQL or a Dataframe object if the user has used the Dataframe/Dataset API. The result is an unresolved logical plan with unbound attributes and data types. During the analysis phase, Catalyst does catalog lookups and attributes mapping to fill these placeholders and ends up with a logical plan. This plan is then optimized by applying standard rule-based optimizations to end up with the optimized logical plan.  Then Catalyst uses rules to turn the optimized logical plan into one or more physical plans with physical operators (for example dataset.map) that match the Spark execution engine. It then selects a plan using a cost model that consists of applying cost evaluation rules to the physical plans.

    When we come back to the subject is with the last phase: the code generation. Among the Treenodes types, there are Catalyst Expressions and custom Encoders are Catalayst Expressions called ExpressionEncoders. Catalyst Expressions contain java code strings that get composed to form the code AST of the pipeline which is then compiled to java bytecode using Janino compiler. This bytecode is the one that is executed when the pipeline runs. Let's see how to write ExpressionEncoders:

    Custom Encoder (ExpressionEncoder)



    To manage serialization in Spark Structured Streaming framework, you could use the Encoders available in the Encoders utility class. They can manage serialization of primitive types or beans with either java or kryo serialization. But let's say you want to write a custom Encoder because, for example, you develop a framework based on Spark and you want to allow your users to provide serialization code. As an example, let's take Apache Beam which lets the user define his serialization code in the Coder class.


    Spark Encoders have no java api, so if you want to code in java because your code base is in java, you'll need to use some bindings. Maybe there are better ways, but as I'm not a scala developer, I could be unaware of them :)


    As said, a custom Encoder is an ExpressionEncoder

    ExpressionEncoder groups serializer and deserializer. You need to specify your serializer and deserializer as Expression instances. Let's see the serializer part:

    Serializer


    Things to point out in that code:
    • It is an Encoder so it has no SQL representation (remember, Catalyst Expressions are broader than Encoders) so we  implement NonSQLExpression and it has one input and one output so we extend UnaryExpression.
    • There are several methods to override:
      • child(): the Encoder is part of Catalyst tree (see above) so we need to keep track of its child (the input of the UnaryExpression)
      • doGenCode(): this method is responsible for producing the java code strings that will be compiled by Janino.
      • datatype(): that is the type of the result of the evaluation of the Expression. In our case binary (because we are serializing an object to binary).
      • the other overrides productElement(), productArity(), canEqual() and consequently equals() and hashcode() are due to the fact that there is no Java API of ExpressionEncoder so we need to implement Scala product specifics.


    Code generation


    Let's focus on the interesting method doGenCode():

    This method generates the code in the comment line 32 in the form of java strings.

    It returns an ExprCode, see line 52. This Block is constructed through string interpolation. This Block creation is managed line 49.

    Now that the global architecture of this method is clearer, there is some pieces that could look weird:
    • To access an object that is not a local variable part of the generated code block, we need to add a Catalyst reference to it, see line 24. In our case we reference the coder which contains the user provided serialization code.
    • As said above the serializer is a UnaryExpression. This Expression has only one input Expression which is its child (see lines 25 and 32). We need to concatenate the child code and the actual serialization code so that everything can be compiled by Janino line 52.

    Instantiate the serializer



    Here is the code to create the EncodeUsingBeamCoder object (the serializer part of our ExpressionEncoder). To instantiate this class we need to pass it a reference to its child in the Catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the Catalyst input Expression, we do like this: BoundReference(0, new ObjectType(clazz), true). There is only one input (because EncodeUsingBeamCoder is a UnaryExpression) so we get it at index 0 and we indicate the Datatype of the input and its nullability.

    Spark physical plan



    The serialization part of the physical Catalyst plan comes like this

    SerializeFromObject [encodeusingbeamcoder(input[0, org.apache.beam.sdk.util.WindowedValue, true], WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)) AS binaryStructField#11]

    WindowedValue is the type to serialize from  (the clazz)  and WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder) is the Beam Coder provided in the pipeline.

    Deserializer


    The deserializer class DecodeUsingBeamCoder that you can see in the full code link below is completely symetric to the serializer class EncodeUsingBeamCoder. The only thing worth mentioning is its instanciation:

    Instantiate the deserializer


    Here is the code to create the DecodeUsingBeamCoder object (the deserializer part of our ExpressionEncoder). Here also, to instantiate this class, we need to pass it a reference to its child in the catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the catalyst node, we do like this: new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType).

    Here again we get the first input Expression at index 0 (cf UnaryExpression), it is of type BinaryType because we are deserializing bytes. And the Cast allows Spark to treat the Expression more efficiently as it allows Catalyst to treat it as binary.

    Spark physical plan


    The deserialization part of the physical Catalyst plan comes like this
    DeserializeToObject decodeusingbeamcoder(cast(binaryStructField#4 as binary), org.apache.beam.sdk.util.WindowedValue, WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)), obj#6: org.apache.beam.sdk.util.WindowedValue
    WindowedValue is the type to deserialize to (the clazz) and WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder) is the Beam Coder provided in the pipeline


    Performances


    Even though Janino compiler is fast, compiling java strings to bytecode takes time. I measured a big performance gain when I reduced the size of the generated code and replaced it by as much compiled code as possible. This is why in full code link, encode() method in the serializer and decode() method in the deserializer are compiled code and not string code inside doGenCode() method. Another gain to this approach is to enable debugging of this compiled part of the code.

    Full code link


    EncoderHelpers class in the Apache Beam project




    Friday, February 7, 2020

    Understand Apache Beam runners: focus on the Spark runner

    🕥 5 min.

    Previously on Apache Beam runners 😀


    In the previous article, we had a brief overview of what an Apache Beam runner is. This article will dig into more details.

    The previous article introduced this very simple pipeline:

    We saw that the Beam SDK translates this pipeline into a DAG representing the pipeline in the form of Beam transform nodes. Now let's see how a Beam runner translates this DAG. Let's say that the user choses Apache Spark as the target Big Data platform when he launches his pipeline.

    The runner (at last)


    The job of the runner is to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this native code that will be executed by a Big Data cluster. If Spark is chosen, then the runner translates this DAG below into the Spark native pipeline code below. For the sake of simplicity the Spark pipeline is pseudo-code.


    Composite and primitive transforms: the level of translation


    In the previous article we talked about Beam transforms (primitive transforms and composite transforms). In the continuation of the blog, we will refer to "composite transform" as just "composite" and "primitive transform" as just "primitive".

    The DAG above is the expanded DAG: on the left hand-side are the Beam transforms of the user pipeline. But among these transforms only Read is a primitive. The others are implemented by Beam SDK as composites of other primitives. Composites can also be made of composites themselves (like Count transform is made of Combine transform) but in the end they are always made of primitives (Pardo or GroupByKey). And these primitives are what the runner translates.

    But in some cases, the runner can chose to translate at a composite level of the graph, not at a primitive level depending on the target Big Data technology capabilities. Indeed, if there is a direct correspondance of the Beam composite in the target API, the runner does not decompose the composite into its primitives and translates directly the composite. The green boxes in the DAG represent the level of translation. In our example, there is a direct equivalent of Beam Combine composite to a Spark Aggregator (agg in the Spark pipeline).

    The translation itself


    The translation occurs when the pipeline is run (pipeline.run() is executed). To translate the DAG, the runner visits the graph. All Beam runners work the same, only the target API changes with the chosen runner.

    The first step is to detect the translation mode (batch or streaming) by searching for Beam BoundedSource (like Elasticsearch for example) or UnBoundedSource (like Kafka for example). Knowing the translation mode, the runner can chose
    • the proper Spark DataSourceV2 to instantiate either implementing ReadSupport (batch) or MicroBatchReadSupport (streaming)
    • the proper Spark action to execute on the output dataset either foreach (batch) or writeStream (streaming)

    Then the DAG visit continues and each node is translated to the target native API : Read gets translated to a Spark DataSourceV2 that creates the input dataset, Pardo gets translated to a Spark flatmap that is applied to the input dataset and so on until the output dataset.

    Pipeline run


    When the visit of the DAG is done, the runner applies the action chosen above to the output dataset to run the pipeline. At this point the spark framework executes the resulting native Spark pipeline.