Friday, February 7, 2020

Understand Apache Beam runners: focus on the Spark runner

🕥 5 min.

Previously on Apache Beam runners 😀


In the previous article, we had a brief overview of what an Apache Beam runner is. This article will dig into more details.

The previous article introduced this very simple pipeline:

We saw that the Beam SDK translates this pipeline into a DAG representing the pipeline in the form of Beam transform nodes. Now let's see how a Beam runner translates this DAG. Let's say that the user choses Apache Spark as the target Big Data platform when he launches his pipeline.

The runner (at last)


The job of the runner is to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this native code that will be executed by a Big Data cluster. If Spark is chosen, then the runner translates this DAG below into the Spark native pipeline code below. For the sake of simplicity the Spark pipeline is pseudo-code.


Composite and primitive transforms: the level of translation


In the previous article we talked about Beam transforms (primitive transforms and composite transforms). In the continuation of the blog, we will refer to "composite transform" as just "composite" and "primitive transform" as just "primitive".

The DAG above is the expanded DAG: on the left hand-side are the Beam transforms of the user pipeline. But among these transforms only Read is a primitive. The others are implemented by Beam SDK as composites of other primitives. Composites can also be made of composites themselves (like Count transform is made of Combine transform) but in the end they are always made of primitives (Pardo or GroupByKey). And these primitives are what the runner translates.

But in some cases, the runner can chose to translate at a composite level of the graph, not at a primitive level depending on the target Big Data technology capabilities. Indeed, if there is a direct correspondance of the Beam composite in the target API, the runner does not decompose the composite into its primitives and translates directly the composite. The green boxes in the DAG represent the level of translation. In our example, there is a direct equivalent of Beam Combine composite to a Spark Aggregator (agg in the Spark pipeline).

The translation itself


The translation occurs when the pipeline is run (pipeline.run() is executed). To translate the DAG, the runner visits the graph. All Beam runners work the same, only the target API changes with the chosen runner.

The first step is to detect the translation mode (batch or streaming) by searching for Beam BoundedSource (like Elasticsearch for example) or UnBoundedSource (like Kafka for example). Knowing the translation mode, the runner can chose
  • the proper Spark DataSourceV2 to instantiate either implementing ReadSupport (batch) or MicroBatchReadSupport (streaming)
  • the proper Spark action to execute on the output dataset either foreach (batch) or writeStream (streaming)

Then the DAG visit continues and each node is translated to the target native API : Read gets translated to a Spark DataSourceV2 that creates the input dataset, Pardo gets translated to a Spark flatmap that is applied to the input dataset and so on until the output dataset.

Pipeline run


When the visit of the DAG is done, the runner applies the action chosen above to the output dataset to run the pipeline. At this point the spark framework executes the resulting native Spark pipeline.