Showing posts with label beam. Show all posts
Showing posts with label beam. Show all posts

Tuesday, January 5, 2021

Tricky use cases of Apache Beam 3/3: Custom Combine

🕥 5 min.

Introduction


This is the third article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam Combine?


Combine is the Beam word for the reduce transformation. This transformation combines data spread across workers to make a computation. The computation can be whatever you like. In the Beam SDK, Combine is used under the wood in various built-in transforms: Count, Sum, Mean ... but you can also create your own Combine implementations.

The overview architecture of the Combine is like this:


The PCollection is split across the cluster (consider here that we have a cluster of 3 nodes). On each of the nodes, Beam creates an Accumulator.  Each input value is added to the local accumulator. Then the accumulators are merged into a single combined output accumulator from which the result is extracted. At each of these 3 steps, the user can add whatever computation code is necessary for his use case.

How to create your own combiner?


To create your own combiner you need to extend CombineFn<InputT,AccumT,OutputT>. You'll have to override some methods:

  • createAccumulator() operation is invoked to create a fresh mutable accumulator value of type AccumT, initialized to represent the combination of zero values.
  • addInput(AccumT, InputT) operation is invoked on each input value to add it to the local accumulator AccumT value. 
  • mergeAccumulators(java.lang.Iterable<AccumT>) operation is invoked to combine a collection of accumulators AccumT values into a single combined output accumulator AccumT value, once the merging accumulators have had all the input values in their partition added to them. This operation is invoked repeatedly, until there is only one accumulator value left.
  • extractOutput(AccumT) operation is invoked on the final accumulator AccumT value to get the output OutputT value.
Your combiner can be called in the pipeline with:

inputPCollection.apply(Combine.globally(new CustomCombineFn()));

or for keyed input PCollections:

inputPCollection.apply(Combine.perKey(new CustomCombineFn()));

Please note that combining functions should be associative and commutative. Associativity is required because input values are first broken up into partitions before being combined, and their intermediate results further combined, in an arbitrary tree structure. Commutativity is required because any order of the input values is ignored when breaking up input values into partitions.

Also, please note about coders: some form of data encoding is required when using custom types in a CombineFn which do not have well-known coders. If so, usually your custom type will just implement Serializable and Beam will provide a coder automatically. Beam relies on the generic CoderProvider, which is able to provide a coder for any Serializable if applicable. In cases where Serializable is not efficient, or inapplicable, there are two alternatives for encoding:

  • Default CoderRegistry: for example, implement a coder class explicitly and use the @DefaultCoder tag. See the CoderRegistry for the numerous ways in which to bind a type to a coder.
  • CombineFn specific way: while extending CombineFn, overwrite both getAccumulatorCoder(CoderRegistry, Coder<InputT>) and getDefaultOutputCoder(CoderRegistry, Coder<InputT>).

An example


The following example is extracted from Nexmark auction system. Here, the use case is to calculate the average price for the last 3 closed auctions:


Here, the input PCollection is the collection of the auction-winning bids with their associated price and timestamp. Each bid contains the final price of a given auction. 

The types used in the custom Combine are: InputT=Bid which implements Serializable. The accumulator class (AccumT) is a list of sorted bids so AccumT=List and the output is a mean so OutputT=Double. Beam SDK provides coders for all these types automatically through the mechanism mentioned above.

As you can see, we added some computation code (sort bids by timestamp then price and keep last 3) during both the add step (addInput() method implementation) and the merge step (mergeAccumulators() method implementation). During the add step we could have simply added each input to the accumulator without keeping the last 3. but this would have lead to some load in the next merging step. It is more efficient to keep only 3 as early as the adding step, this way, the merging step will have fewer values to combine.

The average price is only calculated at the end when the output value is extracted from the final combined accumulator (extractOutput() method implementation).


Now you know how to create a custom Beam combiner to adapt to whatever use case you might have !

Wednesday, December 2, 2020

Tricky use cases of Apache Beam 2/3: Custom windows

🕥 9 min.

Introduction


This is the second article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam windowing?


In streaming systems, the flow of data is continuous. To be able to process it, we need to divide this flow of data into finite chunks. These finite chunks are windows. They are a way to temporally group the elements based on their event timestamp (the time at which the element was produced).

As Beam provides a unified API for batch mode and streaming mode, all the elements of Beam pipelines (even in batch mode) are stored in windows.  In batch mode, by default, there is a single window called the Global Window that stores all the elements of the pipeline. It is the default window that you get behind the scenes when you don't specify a window in your pipeline. But, even in batch mode, you could specify another windowing such as the ones below based on the timestamp of the elements. 

There are several types of windows provided by the Beam SDK:


Fixed windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Fixed windows are defined by the duration of the window. In the example above they allow to express the use case: "I want to have the last 30 seconds of data". 

Sliding windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Sliding windows are similar to fixed windows except they overlap. They are defined by both a duration and a sliding period. In the example above they allow to express the use case: "I want to have the last 60 seconds of data every 30 seconds". 

Sessions windows


* The diagram show keys but, of course, these windows work for non keyed PCollections

Sessions windows are very similar to web sessions. They are defined by a gap duration: if no data comes for more than the gap duration, then the next element to come is considered belonging to another session. Of course, the gap duration is measured in event time, in other words: if the next element's timestamp is more than gap duration after the previous element, then it will be put into another session window.

Custom windows


So far, the window types that Beam provides are quite similar to what other Big Data streaming systems provide except maybe the sessions windows that not many systems provide. But, what is really different with Beam is that it provides a way for the user to define his own windowing by extending the WindowFn function class or one of its sub-classes.

Health monitoring system example


To illustrate Beam custom windows, we will consider this use case: let's say we have a monitoring system that reports events about the health of a large and complex e-commerce platform. 

For the sake of simplicity, the event will be a simple class with just an enum status:
 
A big amount of events are produced, so the monitoring is made through a Big Bata pipeline. And this pipeline uses Beam. It is a streaming pipeline as the events are continuously arriving. We want fine oversight on what's going on so we set the granularity of the monitoring to 10 minutes. Thus, in the pipeline, we divide the collections of events into fixed windows of 10 minutes. But there is something to consider about the health of the e-commerce platform: when a failure occurs, it has consequences on the health of the overall platform for about 30 min (time for the clusters to failover, load to decrease etc...), the system is thus in recovered state. As a consequence, in the monitoring system, we want a failure event to be reported for 30 min after its occurrence. 

Implementation


To implement the above desired behavior, we need to assign events to windows depending on the type of these events. This is a perfect use case for Beam custom windows. We will define a MonitoringFixedWindows custom window that assigns HEALTHY events to the current fixed window and FAILURE events to the current and the next 3 fixed windows:


The custom window is similar to FixedWindows, but we do not extend the FixedWindows class because FixedWindows assigns an element to only a single window. In our case, in the case of a FAILURE event, we want it to be assigned to 4 windows (the current and the next 3 ones).

The lowest function class in WindowFn hierarchy that we can extend is NonMergingWindowFn. It is simply windows that do not merge between each other (see the code).

The interesting part of this code is the override of the assignWindows method. It is there that resides the whole essence of custom windows. It allows for a given timestamp (of an element) to give the list of windows it should be assigned to. In our case we return a list of IntervalWindows (simply windows with boundaries) because our windowing function class deals with IntervalWindows.

The other overrides are quite straightforward:
  • windowCoder : as said above, all the elements in Beam are in windows. And they will be serialized at some point, so the window information needs to be serialized as well. WindowCoder is simply a Coder for the custom window. The custom window deals with IntervalWindow so we just return this coder which is already provided by the SDK.
  • getDefaultWindowMappingFn: this method is for side inputs (data of another PCollection accessible though a view in the Beam transforms). Here, this custom window does not support side inputs so we just throw an exception
  • isCompatible: 2 custom windowing functions are compatible if they are equal (same type and same boundaries)
  • equals and hashcode: as there will be windows comparisons issued by Beam, we need to override equals and hashcode with the usual code

Now, if we apply this windowing function to an input streaming PCollection. 

inputPCollection.apply(MonitoringFixedWindows.of(Duration.standardMinutes(10)));

The output will be like this: 


Consider that we receive a HEALTHY event with timestamp 5, it will be assigned to the window starting at timestamp 0 and ending à timestamp 10. If we then receive a FAILURE event with timestamp 12, it will be assigned to the window it belongs (starting at timestamp 10 and ending at timestamp 20) and also to the next 3 10-minutes windows.

Now you know how Beam windowing can adapt to whatever use case you might have !




Tuesday, November 10, 2020

Tricky use cases of Apache Beam 1/3: incremental join

🕥 7 min.

Introduction


This is the first article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

Incremental join example


Let's say we have a streaming system that records events of an auction platform. Such events can be Persons creating Auctions and placing bids on auctions.

Consider, you have 2 PCollections as input:
  • One of the Persons details that are collected at their connection into the auction system
  • One of the Auctions details that are are collected at the auction creation
Now, you want regular updates on who is selling in particular US states.
So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the CoGroupByKey transform of the SDK. This transform works as illustrated below:




This is similar to a SQL Join. The 2 PCollections are keyed by the same key: personId is the unique key of the Person object and sellerId is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.

First step is to group the 2 PCollections in a KeyedPCollectionTuple before joining. For that we need to tag the elements of the PCollections with a TupleTag (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.
Then the actual CoGroupByKey transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.


Stateful processing


Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to store the elements in a persistent state waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the ParDo transform but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to the official Beam stateful processing documentation.

So the architecture of the stateful join in our example will be:
  • The Person element will be stored in a persistent state in order to match future auctions by that person. We also set a timer to clear the person state after a TTL (see Timer API in the SDK)
  • The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared 


The code of the incremental join


I think it deserves some explanations: 

We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows. We rather apply the Global window to the elements and set a trigger to output the results each time n elements were processed.

We then extract the auctions out of the stream of events and key the auctions by there sellerId . We then do the same for the Persons: extract them out of the events and key them by their personId (which is the same as the sellerId remember).

Now comes the actual join: we first group the 2 PCollections in a KeyedPCollectionTuple as said above. We then apply the actual CoGroupByKey transform to the KeyedPCollectionTuple.

Now comes the stateful processing: as explained above, we need to store elements in a persistent state to deal with out of order data. This is done by applying a Pardo to the output joint PCollection. The topic of this article is not stateful processing so, if you want some details, the complete code of JoinDoFn is on Beam github. I will just say here that JoinDoFn does the following:

  • Defines 2 ValueState to store the Persons and Auctions
  • Defines 1 Timer that fires after a configured TTL to clear the Person ValueState
  • For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates. 

The final step is to export the result by applying a ParDo that creates the output POJOs that gather persons information and auctions information.

Conclusion


I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:
  • The extraction of auctions out of events
  • The extraction of persons out of events
  • The keying of the auctions PCollection
  • The keying of the persons PCollection
  • The apply of the stateful JoinDoFn
  • The export to the POJO output 
The actual join resides only in the creation of the KeyedPCollectionTuple and the apply of the CoGroupByKey. The incremental part resides in the apply of windowing: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying FixedWindows to the input PCollection.

That's it, this is how you can create an incremental join pipeline with Apache Beam

Wednesday, July 8, 2020

Export metrics from Apache Beam pipelines

🕥 12 min. 

 This blog post is about part of this talk that I gave at the ApacheCon 2018 about universal metrics in Apache Beam. More precisely, it deals with metrics export: how Beam metrics are exported to the outside world in a running Beam pipeline.

What are Beam metrics?


The content below is also available in the Beam website but is recalled here for the completeness of the article

In the Beam model, metrics provide some insight into the current state of a user pipeline, potentially while the pipeline is running. For example, this allows the user to:
  • Check the number of errors encountered while running a specific step in the pipeline
  • Monitor the number of calls made to an external service
  • Count the number of elements that have been processed
  • …and so on.
Metrics are: 
  • Named: Each metric has a name which consists of a namespace and an actual name. The namespace can be used to differentiate between multiple metrics with the same name and also allows querying for all metrics within a specific namespace.
  • Scoped: Each metric is reported against a specific step in the pipeline (i.e. a specific transform in the pipeline), indicating what code was running when the metric was declared. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.
  • Dynamically Created: Metrics may be created during runtime without pre-declaring them, in much the same way a logger could be created. This makes it easier to produce metrics in utility code and have them usefully reported.
  • Degrade Gracefully: If a runner doesn’t support some part of reporting metrics, the fallback behavior is to drop the metric updates rather than failing the pipeline. If a runner doesn’t support some part of querying metrics, the runner will not return the associated data.
  • Attempted/committed metrics: attempted metrics include retrials whereas committed metrics do not: if a bundle (part of Beam PCollection) is retried a counter committed value will not be incremented. If you want some details on retrial and metrics please take a look at slide 18/19 of the related talk
Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.
 

Types of metrics


There are three types of metrics that are supported for the moment: Counter, Distribution and Gauge.

Counter


Counter is a metric that reports a single long value and can be incremented or decremented.


Distribution


Distribution is a metric that reports information about the distribution of reported values (min value, max value, mean value, sum of values and count of values).


Gauge


Gauge is a metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values.


How does a pipeline author use them in Java?


How to query a metric?


Users query metrics using PipelineResult which is Beam object to interact with a pipeline. PipelineResult has a method metrics() which returns a MetricResults object that allows accessing metrics. The main method available in MetricResults allows querying for all metrics matching a given filter.


Complete example


Below, there is a simple example of how to use a Counter metric in a user pipeline.


Metrics export: MetricsPusher


Here comes the core of the subject: the MetricsPusher feature in Beam !


Motivation


Metrics Pusher came to life in the Beam project after these observations:
  • Not all execution engines (Spark, Flink etc...)  ship a way to push the metrics to external sinks, they usually rely on their own monitoring UI. Even though the execution engine UI is updated with the Beam metrics, there are some use cases that require to export the metrics values: What if you need to have the metrics in your own application UI ? Or if you need to have the metrics in a metrics backend for reporting ?
  • Need for consistency:
    • There is no common set of monitoring backend support among execution engines
    • There is a difference of availability moment when the pipeline runs: some runners make the metrics available through PipelineResult during the pipeline execution and others only at the end of the pipeline execution
    • Beam needs to have a common metrics flow no matter the runners for pipelines to be portable


Architecture


Design principles


Metrics pusher was designed based on these principles:
  • No client polling (e.g. JMX) because:
    • infrastructure changes (cluster managers, ...) and must not be known of the users
    • such pulled metrics would be non-aggregated metrics, that users would need to aggregate
    • such pull mechanism raises timing issues: for example if a small batch finishes before the JMX client has had time to pull the metrics, there will be no metric to show.
  • Metrics are pushed from the runner and not pulled from the sdk because:
    • runners needs to decide when to push in particular because to support Beam committed metrics it needs to push only when a Beam bundle is committed.
    • runner system metrics are also defined in Beam (but not yet supported in MetricsPusher) and there again, only the runner knows its internal system.
  • Push aggregated (across parallel workers) metrics periodically rather than push event based metrics because:
    • aggregated metrics avoid the need for metrics backend to merge metrics.
    • runners know how to merge metrics across their workers.
  • We chose that Beam manages the metrics sinks IOs mainly for coherence because, as said above, execution engines have different support of the metrics backend.


MetricsPusher architecture


The MetricsPusher service lives as a thread inside the runner that regularly requests for aggregated metrics and then pushes them through a MetricsSink IO to a metrics backend.  


The architecture diagram above focuses on metrics export, if you want more details on the internal runner metric system part, please take a look at part 4 of the related talk; for the purpose of this article let's just say that it is provided as part of the runner-core Beam library so that different runners (Spark, Flink, Dataflow, ...) share the same core metrics architecture.


How to use Metrics pusher ?


There is nothing to code in the user pipeline to use the MetricsPusher feature. The only thing to do is to configure a MetricsSink. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
The configuration is held in the MetricsOptions class. It contains push period configuration and also sink specific options such as type and URL.

As for now, there is only Flink and Spark runners that are supported and the metrics sinks available are:
  • MetricsHttpSink: a REST client that sends json serialized metrics to a REST endpoint using an HTTP POST request. Below is an example of the body of the request sending attempted metrics values of a pipeline that defines a metric of each type: 

  • MetricsGraphiteSink: a sink that pushes metrics to Graphite as Graphite messages. Each metric value is an entry with a timestamp in a Graphite message. Below is an example of the graphite message payload sending attempted metrics values of a pipeline that defines a counter metric and a distribution metric.

Now you know about Beam metrics and their export !

Friday, June 12, 2020

Nexmark: benchmark and CI tool for Apache Beam

🕥 10 min. 

This blog post is about the subject of this talk I gave at the ApacheCon 2017. While the talk focuses on building Nexmark for Apache Beam, this article will focus on the use the Beam project has done of Nexmark since then. If you're interested in details about Nexmark, please refer to the talk.

What is Nexmark ?


Nexmark is originally a research paper of 2004 about benchmarking queries on a streaming system. In the context of Beam, what we call Nexmark is its adaptation in the form of Beam pipelines that run in both batch and in streaming modes. 

Nexmark is a simulation of an auction system. The synthetic data injected in the system is about Persons placing Auctions on Items and Persons placing Bids on these Auctions

Nexmark then interrogates the auction system for statistical data by running queries such as below.

The queries are written as Beam pipelines and these pipelines are meant to cover 100% of the Beam public API (including Beam SQL) to benchmark and detect regressions on all of Beam.

Query

Description

Use of Beam API

3

Who is selling in particular US states?

Join, State, Timer

5

Which auctions have seen the most bids in the last period?

Sliding Window, Combiners

6

What is the average selling price per seller for their last 10 closed auctions?

Global Window, Custom Combiner

7

What are the highest bids per period? 

Fixed Windows, Side Input

9 *

Winning bids

Custom Window

11 *

How many bids did a user make in each session he was active?

Session Window, Triggering

12 *

How many bids does a user make within a fixed processing time limit?

Global Window, working in Processing Time


* Not in the original Nexmark research paper.

These queries are particularly suited for benchmark because they are representative of what a real user could request on a real auction system. They are also quite complex and leverage all the capabilities of Beam runners.

Throughout the execution of each pipeline, Nexmark gathers metrics (using Beam metrics system) such as the number of output results, event rate (input element generation rate) and obviously the query execution time.


What is the architecture of Nexmark?




Nexmark is a standalone executable which 
  • creates a source that generates events (such as a Person creating an Auction or placing a Bid on an ongoing Auction)
  • runs the queries depending on the workload selected by the user (ingestion rate, number of events generated, selected queries, etc...)
  • monitors queries execution and collects their execution metrics.
Nexmark can run on all the supported Beam runners.

The user can chose to benchmark in batch mode or in streaming mode: Nexmark creates either a Beam BoundedSource to trigger batch mode in the runner or an UnboundedSource to trigger streaming mode in the runner.

Each query pipeline has this form:

1.Get PCollection<Event> as input
2.Apply ParDo + Filter to extract the object of interest: Bids, Auctions, Person
3.Apply Beam transforms that implement the query business logic: Filter, Count, GroupByKey, Window, etc...
4.Apply ParDo to output the final PCollection: collection of AuctionPrice, AuctionCount ...


How is Nexmark useful for Beam ?


    CI setup


With each commit on master, a jenkins script runs in a local in-JVM runner a Nexmark suite of queries with 100 000 generated events:
  • on all the queries including Beam SQL versions
  • on all the supported runners
  • in both batch and streaming modes
The output of this benchmark is stored in Google BigQuery tables per query x runner x mode.

Each record of the table contains:
  • the size of the output PCollection
  • the execution time of the query
The results are then printed into graphs allowing to build performance dashboards and output size dashboards per runner as shown in the images below showing part of the flink dashboards:




    Performance assessment


Such dashboards allow to keep track of the performances of the different Beam runners:

        Detect performance regressions


The performance graphs allow to detect performance regressions. As query exercise a particular part of Beam API, it allows to narrow the field of search. Here query 6 uses Combiners so the regression showed below observed on 2018/10/05 on query 6 on the spark runner allowed to point to Combine translation in the spark runner and then fix it.



        Measure the impact of performance changes


When performance improvement tasks are run on a particular runner, we can check the impact on the graphs. Here there was a performance improvement on the spark runner on RDD caching that decreased the execution time of query 2 by more than 2. This change was committed just after 2018/11/21:



        Compare performances of the runners


Each query exercise a different part of the Beam API, so for each one we can compare the performances of the runner translations of that particular part of the API. The graph below, for example, compares the performances of the new Spark runner based on the Structured Streaming framework and the current Spark runner based on RDD/Dstream. The comparison is on query 8 which is a join (CoGroupByKey Beam transform).




    Functional assessment


There are also graphs that print the size of the output PCollection for each query, such graphs allow to check the functional behavior of the runners.

        Detect functional regressions


The output size graphs are useful to detect functional regressions: indeed, such a graph should be a straight line as, whatever the performance, a given query produces always the same results unless there is a bug: if the pipeline produces wrong data, then the size of the output PCollection will vary as in the example below. In this example, the flink runner produced wrong data for some time when executing query 12 in streaming mode. 



If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.

        Verify compatibility


Beam provides the Capability matrix that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an UnsupportedException. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.



    Release check


All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope. 

Friday, February 7, 2020

Understand Apache Beam runners: focus on the Spark runner

🕥 5 min.

Previously on Apache Beam runners 😀


In the previous article, we had a brief overview of what an Apache Beam runner is. This article will dig into more details.

The previous article introduced this very simple pipeline:

We saw that the Beam SDK translates this pipeline into a DAG representing the pipeline in the form of Beam transform nodes. Now let's see how a Beam runner translates this DAG. Let's say that the user choses Apache Spark as the target Big Data platform when he launches his pipeline.

The runner (at last)


The job of the runner is to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this native code that will be executed by a Big Data cluster. If Spark is chosen, then the runner translates this DAG below into the Spark native pipeline code below. For the sake of simplicity the Spark pipeline is pseudo-code.


Composite and primitive transforms: the level of translation


In the previous article we talked about Beam transforms (primitive transforms and composite transforms). In the continuation of the blog, we will refer to "composite transform" as just "composite" and "primitive transform" as just "primitive".

The DAG above is the expanded DAG: on the left hand-side are the Beam transforms of the user pipeline. But among these transforms only Read is a primitive. The others are implemented by Beam SDK as composites of other primitives. Composites can also be made of composites themselves (like Count transform is made of Combine transform) but in the end they are always made of primitives (Pardo or GroupByKey). And these primitives are what the runner translates.

But in some cases, the runner can chose to translate at a composite level of the graph, not at a primitive level depending on the target Big Data technology capabilities. Indeed, if there is a direct correspondance of the Beam composite in the target API, the runner does not decompose the composite into its primitives and translates directly the composite. The green boxes in the DAG represent the level of translation. In our example, there is a direct equivalent of Beam Combine composite to a Spark Aggregator (agg in the Spark pipeline).

The translation itself


The translation occurs when the pipeline is run (pipeline.run() is executed). To translate the DAG, the runner visits the graph. All Beam runners work the same, only the target API changes with the chosen runner.

The first step is to detect the translation mode (batch or streaming) by searching for Beam BoundedSource (like Elasticsearch for example) or UnBoundedSource (like Kafka for example). Knowing the translation mode, the runner can chose
  • the proper Spark DataSourceV2 to instantiate either implementing ReadSupport (batch) or MicroBatchReadSupport (streaming)
  • the proper Spark action to execute on the output dataset either foreach (batch) or writeStream (streaming)

Then the DAG visit continues and each node is translated to the target native API : Read gets translated to a Spark DataSourceV2 that creates the input dataset, Pardo gets translated to a Spark flatmap that is applied to the input dataset and so on until the output dataset.

Pipeline run


When the visit of the DAG is done, the runner applies the action chosen above to the output dataset to run the pipeline. At this point the spark framework executes the resulting native Spark pipeline.