Tuesday, March 17, 2020

How to create a custom Spark Encoder in ... java

🕥 13 min

What is a Spark Encoder ?


An Encoder is a wrapper class that specifies how to serialize and deserialize data with the Spark Structured Streaming framework.

In this framework, datasets are typed and they are schema aware so every Spark transformation called on a dataset needs to provide an Encoder for its output type so that the framework can know how to serialize and deserialize its content. For example, the signature of the map transform is as this:


Why this blog post ?


Because Spark Structured Streaming Encoders are complex to write, have no java API and they are not very documented so I thought that what I learnt writing them could be useful to the community.

Catalyst


To understand Encoders we need to focus a bit on Catalyst. Spark Structured Streaming uses Catalyst optimizer to optimize execution plans. And Encoders are part of the Catalyst plan. Catalyst sees a pipeline as a tree made of TreeNodes. Here is the Catalyst workflow:



The entry point of Catalyst is either a SQL abstract syntax tree (AST) returned by the SQL parser if the user has used direct Spark SQL or a Dataframe object if the user has used the Dataframe/Dataset API. The result is an unresolved logical plan with unbound attributes and data types. During the analysis phase, Catalyst does catalog lookups and attributes mapping to fill these placeholders and ends up with a logical plan. This plan is then optimized by applying standard rule-based optimizations to end up with the optimized logical plan.  Then Catalyst uses rules to turn the optimized logical plan into one or more physical plans with physical operators (for example dataset.map) that match the Spark execution engine. It then selects a plan using a cost model that consists of applying cost evaluation rules to the physical plans.

When we come back to the subject is with the last phase: the code generation. Among the Treenodes types, there are Catalyst Expressions and custom Encoders are Catalayst Expressions called ExpressionEncoders. Catalyst Expressions contain java code strings that get composed to form the code AST of the pipeline which is then compiled to java bytecode using Janino compiler. This bytecode is the one that is executed when the pipeline runs. Let's see how to write ExpressionEncoders:

Custom Encoder (ExpressionEncoder)



To manage serialization in Spark Structured Streaming framework, you could use the Encoders available in the Encoders utility class. They can manage serialization of primitive types or beans with either java or kryo serialization. But let's say you want to write a custom Encoder because, for example, you develop a framework based on Spark and you want to allow your users to provide serialization code. As an example, let's take Apache Beam which lets the user define his serialization code in the Coder class.


Spark Encoders have no java api, so if you want to code in java because your code base is in java, you'll need to use some bindings. Maybe there are better ways, but as I'm not a scala developer, I could be unaware of them :)


As said, a custom Encoder is an ExpressionEncoder

ExpressionEncoder groups serializer and deserializer. You need to specify your serializer and deserializer as Expression instances. Let's see the serializer part:

Serializer


Things to point out in that code:
  • It is an Encoder so it has no SQL representation (remember, Catalyst Expressions are broader than Encoders) so we  implement NonSQLExpression and it has one input and one output so we extend UnaryExpression.
  • There are several methods to override:
    • child(): the Encoder is part of Catalyst tree (see above) so we need to keep track of its child (the input of the UnaryExpression)
    • doGenCode(): this method is responsible for producing the java code strings that will be compiled by Janino.
    • datatype(): that is the type of the result of the evaluation of the Expression. In our case binary (because we are serializing an object to binary).
    • the other overrides productElement(), productArity(), canEqual() and consequently equals() and hashcode() are due to the fact that there is no Java API of ExpressionEncoder so we need to implement Scala product specifics.


Code generation


Let's focus on the interesting method doGenCode():

This method generates the code in the comment line 32 in the form of java strings.

It returns an ExprCode, see line 52. This Block is constructed through string interpolation. This Block creation is managed line 49.

Now that the global architecture of this method is clearer, there is some pieces that could look weird:
  • To access an object that is not a local variable part of the generated code block, we need to add a Catalyst reference to it, see line 24. In our case we reference the coder which contains the user provided serialization code.
  • As said above the serializer is a UnaryExpression. This Expression has only one input Expression which is its child (see lines 25 and 32). We need to concatenate the child code and the actual serialization code so that everything can be compiled by Janino line 52.

Instantiate the serializer



Here is the code to create the EncodeUsingBeamCoder object (the serializer part of our ExpressionEncoder). To instantiate this class we need to pass it a reference to its child in the Catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the Catalyst input Expression, we do like this: BoundReference(0, new ObjectType(clazz), true). There is only one input (because EncodeUsingBeamCoder is a UnaryExpression) so we get it at index 0 and we indicate the Datatype of the input and its nullability.

Spark physical plan



The serialization part of the physical Catalyst plan comes like this

SerializeFromObject [encodeusingbeamcoder(input[0, org.apache.beam.sdk.util.WindowedValue, true], WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)) AS binaryStructField#11]

WindowedValue is the type to serialize from  (the clazz)  and WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder) is the Beam Coder provided in the pipeline.

Deserializer


The deserializer class DecodeUsingBeamCoder that you can see in the full code link below is completely symetric to the serializer class EncodeUsingBeamCoder. The only thing worth mentioning is its instanciation:

Instantiate the deserializer


Here is the code to create the DecodeUsingBeamCoder object (the deserializer part of our ExpressionEncoder). Here also, to instantiate this class, we need to pass it a reference to its child in the catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the catalyst node, we do like this: new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType).

Here again we get the first input Expression at index 0 (cf UnaryExpression), it is of type BinaryType because we are deserializing bytes. And the Cast allows Spark to treat the Expression more efficiently as it allows Catalyst to treat it as binary.

Spark physical plan


The deserialization part of the physical Catalyst plan comes like this
DeserializeToObject decodeusingbeamcoder(cast(binaryStructField#4 as binary), org.apache.beam.sdk.util.WindowedValue, WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)), obj#6: org.apache.beam.sdk.util.WindowedValue
WindowedValue is the type to deserialize to (the clazz) and WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder) is the Beam Coder provided in the pipeline


Performances


Even though Janino compiler is fast, compiling java strings to bytecode takes time. I measured a big performance gain when I reduced the size of the generated code and replaced it by as much compiled code as possible. This is why in full code link, encode() method in the serializer and decode() method in the deserializer are compiled code and not string code inside doGenCode() method. Another gain to this approach is to enable debugging of this compiled part of the code.

Full code link


EncoderHelpers class in the Apache Beam project