Monday, November 7, 2022

Flink: Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

πŸ•₯ 5 min.


Flink has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API). This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. 
All the code presented in this article is available in the tpcds-benchmark-flink repo
The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs. 

What is TPCDS?

TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective performance data of Big Data engines to industry users.

Chosen TPCDS query

The chosen query for this article is Query3  because it contains all the more common analytics operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on store sales. Its SQL code is presented here:
SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 128
AND dt.d_moy=11
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, sum_agg desc, brand_id

The initial DataSet pipeline

The pipeline we are migrating is this one, it is a batch pipeline that implements the above query using the DataSet API and Row as dataset element type.

Migrating the DataSet pipeline to a DataStream pipeline

There is no point in going through all of the code which is available here. We will rather focus on some key areas of the migration. The code is based on the latest release of Flink at the time this article was written: version 1.16.0. 

DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink execution environment, it is also needed to migrate some operations. Indeed, the DataStream API semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, compared to the DataSet API which operates on finite data, there are adaptations to be made on some operations.

Setting the execution environment

We start by moving from ExecutionEnvironment to StreamExecutionEnvironment. Then, as the source in this pipeline is bounded, we can use either the default streaming execution mode or the batch mode. In batch mode the tasks of the job can be separated into stages that can be executed one after another. In streaming mode all tasks need to be running all the time and records are sent to downstream tasks as soon as they are available. 

Here we keep the default streaming mode that gives good performance on this pipeline and that would allow to run the same pipeline with no change on an unbounded source.

Using the streaming sources and datasets

SourcesDataSource<T> becomes DataStreamSource<T> after the call to env.createInput().

DatasetsDataSet<T> are now DataStream<T> and subclasses.

Migrating the join operation

The DataStream join operator does not yet support aggregations in batch mode (see FLINK-22587 for details). Basically, the problem is with the trigger of the default GlobalWindow which never fires so the records are never output. We will workaround this problem by applying a custom EndOfStream window. It is a window assigner that assigns all the records to a single TimeWindow. So, like for the GlobalWindow, all the records are assigned to the same window except that this window's trigger is based on the end of the window (which is set to Long.MAX_VALUE). As we are on a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will thus cross the end of the time window and consequently fire the trigger and output the records.

Now that we have a working triggering, we just need to call a standard join with the  Row::join function.

Migrating the group by and reduce (sum) operations

DataStream API has no more groupBy() method, we now use the keyBy() method. An aggregation downstream will be applied on elements with the same key exactly as a GroupReduceFunction would have done on a DataSet unless it will not materialize the collection of data. The summing operation downstream is still done through a ReduceFunction but this time the operator reduces the elements incrementally instead of receiving the rows as a Collection. So we store in the reduced row the partially aggregated sum. Due to incremental reduce, we also need to distinguish if we received an already reduced row (in that case, we read the partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). 

Migrating the order by operation

The sort of the datastream is done by applying a KeyedProcessFunction.

But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to "divide" the data to have output times. For that we need to set a timer to output the resulting data. As we are in batch mode we work in processing time (and not in event time) and set a timer to fire at Long.MAX_VALUE timestamp meaning that the timer will fire at the end of the batch.

To sort the data, we store the incoming rows inside a ListState and sort them at output time, when the timer fires in the onTimer() callback. 

Another thing: to be able to use Flink state, we need to key the datastream beforeward even if there is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so that there is a single state.

Migrating the limit operation

Like for the order by migration, here also we need to adapt to the streaming semantics in which we don't have the whole data at hand. Once again, we use the state API. We apply a mapper that stores the incoming rows into a ValueState to count them and stop at the set limit. Here also we need to key by a static key to be able to use the state API.
The code resides in the LimitMapper class. 

Migrating the sink operation

As with sources, there were big changes in sinks with recent versions of Flink. We now use the Sink interface that requires an Encoder. But the resulting code is very similar to the one using the DataSet API. It's only that Encoder#encode() method writes bytes when TextOutputFormat.TextFormatter#format() wrote Strings.


As you saw for the migration of the join operation, the new unified DataStream API has some limitations left in batch mode. In addition, the order by and limit resulting code is quite manual and requires the help of the Flink state API for the migration. For all these reasons, the Flink community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code that uses the Flink SQL/Table API in the Query3ViaFlinkSQLCSV class.

Tuesday, January 5, 2021

Tricky use cases of Apache Beam 3/3: Custom Combine

πŸ•₯ 5 min.


This is the third article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam Combine?

Combine is the Beam word for the reduce transformation. This transformation combines data spread across workers to make a computation. The computation can be whatever you like. In the Beam SDK, Combine is used under the wood in various built-in transforms: Count, Sum, Mean ... but you can also create your own Combine implementations.

The overview architecture of the Combine is like this:

The PCollection is split across the cluster (consider here that we have a cluster of 3 nodes). On each of the nodes, Beam creates an Accumulator.  Each input value is added to the local accumulator. Then the accumulators are merged into a single combined output accumulator from which the result is extracted. At each of these 3 steps, the user can add whatever computation code is necessary for his use case.

How to create your own combiner?

To create your own combiner you need to extend CombineFn<InputT,AccumT,OutputT>. You'll have to override some methods:

  • createAccumulator() operation is invoked to create a fresh mutable accumulator value of type AccumT, initialized to represent the combination of zero values.
  • addInput(AccumT, InputT) operation is invoked on each input value to add it to the local accumulator AccumT value. 
  • mergeAccumulators(java.lang.Iterable<AccumT>) operation is invoked to combine a collection of accumulators AccumT values into a single combined output accumulator AccumT value, once the merging accumulators have had all the input values in their partition added to them. This operation is invoked repeatedly, until there is only one accumulator value left.
  • extractOutput(AccumT) operation is invoked on the final accumulator AccumT value to get the output OutputT value.
Your combiner can be called in the pipeline with:

inputPCollection.apply(Combine.globally(new CustomCombineFn()));

or for keyed input PCollections:

inputPCollection.apply(Combine.perKey(new CustomCombineFn()));

Please note that combining functions should be associative and commutative. Associativity is required because input values are first broken up into partitions before being combined, and their intermediate results further combined, in an arbitrary tree structure. Commutativity is required because any order of the input values is ignored when breaking up input values into partitions.

Also, please note about coders: some form of data encoding is required when using custom types in a CombineFn which do not have well-known coders. If so, usually your custom type will just implement Serializable and Beam will provide a coder automatically. Beam relies on the generic CoderProvider, which is able to provide a coder for any Serializable if applicable. In cases where Serializable is not efficient, or inapplicable, there are two alternatives for encoding:

  • Default CoderRegistry: for example, implement a coder class explicitly and use the @DefaultCoder tag. See the CoderRegistry for the numerous ways in which to bind a type to a coder.
  • CombineFn specific way: while extending CombineFn, overwrite both getAccumulatorCoder(CoderRegistry, Coder<InputT>) and getDefaultOutputCoder(CoderRegistry, Coder<InputT>).

An example

The following example is extracted from Nexmark auction system. Here, the use case is to calculate the average price for the last 3 closed auctions:

Here, the input PCollection is the collection of the auction-winning bids with their associated price and timestamp. Each bid contains the final price of a given auction. 

The types used in the custom Combine are: InputT=Bid which implements Serializable. The accumulator class (AccumT) is a list of sorted bids so AccumT=List and the output is a mean so OutputT=Double. Beam SDK provides coders for all these types automatically through the mechanism mentioned above.

As you can see, we added some computation code (sort bids by timestamp then price and keep last 3) during both the add step (addInput() method implementation) and the merge step (mergeAccumulators() method implementation). During the add step we could have simply added each input to the accumulator without keeping the last 3. but this would have lead to some load in the next merging step. It is more efficient to keep only 3 as early as the adding step, this way, the merging step will have fewer values to combine.

The average price is only calculated at the end when the output value is extracted from the final combined accumulator (extractOutput() method implementation).

Now you know how to create a custom Beam combiner to adapt to whatever use case you might have !

Wednesday, December 2, 2020

Tricky use cases of Apache Beam 2/3: Custom windows

πŸ•₯ 9 min.


This is the second article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam windowing?

In streaming systems, the flow of data is continuous. To be able to process it, we need to divide this flow of data into finite chunks. These finite chunks are windows. They are a way to temporally group the elements based on their event timestamp (the time at which the element was produced).

As Beam provides a unified API for batch mode and streaming mode, all the elements of Beam pipelines (even in batch mode) are stored in windows.  In batch mode, by default, there is a single window called the Global Window that stores all the elements of the pipeline. It is the default window that you get behind the scenes when you don't specify a window in your pipeline. But, even in batch mode, you could specify another windowing such as the ones below based on the timestamp of the elements. 

There are several types of windows provided by the Beam SDK:

Fixed windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Fixed windows are defined by the duration of the window. In the example above they allow to express the use case: "I want to have the last 30 seconds of data". 

Sliding windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Sliding windows are similar to fixed windows except they overlap. They are defined by both a duration and a sliding period. In the example above they allow to express the use case: "I want to have the last 60 seconds of data every 30 seconds". 

Sessions windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Sessions windows are very similar to web sessions. They are defined by a gap duration: if no data comes for more than the gap duration, then the next element to come is considered belonging to another session. Of course, the gap duration is measured in event time, in other words: if the next element's timestamp is more than gap duration after the previous element, then it will be put into another session window.

Custom windows

So far, the window types that Beam provides are quite similar to what other Big Data streaming systems provide except maybe the sessions windows that not many systems provide. But, what is really different with Beam is that it provides a way for the user to define his own windowing by extending the WindowFn function class or one of its sub-classes.

Health monitoring system example

To illustrate Beam custom windows, we will consider this use case: let's say we have a monitoring system that reports events about the health of a large and complex e-commerce platform. 

For the sake of simplicity, the event will be a simple class with just an enum status:
A big amount of events are produced, so the monitoring is made through a Big Bata pipeline. And this pipeline uses Beam. It is a streaming pipeline as the events are continuously arriving. We want fine oversight on what's going on so we set the granularity of the monitoring to 10 minutes. Thus, in the pipeline, we divide the collections of events into fixed windows of 10 minutes. But there is something to consider about the health of the e-commerce platform: when a failure occurs, it has consequences on the health of the overall platform for about 30 min (time for the clusters to failover, load to decrease etc...), the system is thus in recovered state. As a consequence, in the monitoring system, we want a failure event to be reported for 30 min after its occurrence. 


To implement the above desired behavior, we need to assign events to windows depending on the type of these events. This is a perfect use case for Beam custom windows. We will define a MonitoringFixedWindows custom window that assigns HEALTHY events to the current fixed window and FAILURE events to the current and the next 3 fixed windows:

The custom window is similar to FixedWindows, but we do not extend the FixedWindows class because FixedWindows assigns an element to only a single window. In our case, in the case of a FAILURE event, we want it to be assigned to 4 windows (the current and the next 3 ones).

The lowest function class in WindowFn hierarchy that we can extend is NonMergingWindowFn. It is simply windows that do not merge between each other (see the code).

The interesting part of this code is the override of the assignWindows method. It is there that resides the whole essence of custom windows. It allows for a given timestamp (of an element) to give the list of windows it should be assigned to. In our case we return a list of IntervalWindows (simply windows with boundaries) because our windowing function class deals with IntervalWindows.

The other overrides are quite straightforward:
  • windowCoder : as said above, all the elements in Beam are in windows. And they will be serialized at some point, so the window information needs to be serialized as well. WindowCoder is simply a Coder for the custom window. The custom window deals with IntervalWindow so we just return this coder which is already provided by the SDK.
  • getDefaultWindowMappingFn: this method is for side inputs (data of another PCollection accessible though a view in the Beam transforms). Here, this custom window does not support side inputs so we just throw an exception
  • isCompatible: 2 custom windowing functions are compatible if they are equal (same type and same boundaries)
  • equals and hashcode: as there will be windows comparisons issued by Beam, we need to override equals and hashcode with the usual code

Now, if we apply this windowing function to an input streaming PCollection. 


The output will be like this: 

Consider that we receive a HEALTHY event with timestamp 5, it will be assigned to the window starting at timestamp 0 and ending Γ  timestamp 10. If we then receive a FAILURE event with timestamp 12, it will be assigned to the window it belongs (starting at timestamp 10 and ending at timestamp 20) and also to the next 3 10-minutes windows.

Now you know how Beam windowing can adapt to whatever use case you might have !

Tuesday, November 10, 2020

Tricky use cases of Apache Beam 1/3: incremental join

πŸ•₯ 7 min.


This is the first article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

Incremental join example

Let's say we have a streaming system that records events of an auction platform. Such events can be Persons creating Auctions and placing bids on auctions.

Consider, you have 2 PCollections as input:
  • One of the Persons details that are collected at their connection into the auction system
  • One of the Auctions details that are are collected at the auction creation
Now, you want regular updates on who is selling in particular US states.
So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the CoGroupByKey transform of the SDK. This transform works as illustrated below:

This is similar to a SQL Join. The 2 PCollections are keyed by the same key: personId is the unique key of the Person object and sellerId is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.

First step is to group the 2 PCollections in a KeyedPCollectionTuple before joining. For that we need to tag the elements of the PCollections with a TupleTag (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.
Then the actual CoGroupByKey transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.

Stateful processing

Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to store the elements in a persistent state waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the ParDo transform but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to the official Beam stateful processing documentation.

So the architecture of the stateful join in our example will be:
  • The Person element will be stored in a persistent state in order to match future auctions by that person. We also set a timer to clear the person state after a TTL (see Timer API in the SDK)
  • The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared 

The code of the incremental join

I think it deserves some explanations: 

We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows. We rather apply the Global window to the elements and set a trigger to output the results each time n elements were processed.

We then extract the auctions out of the stream of events and key the auctions by there sellerId . We then do the same for the Persons: extract them out of the events and key them by their personId (which is the same as the sellerId remember).

Now comes the actual join: we first group the 2 PCollections in a KeyedPCollectionTuple as said above. We then apply the actual CoGroupByKey transform to the KeyedPCollectionTuple.

Now comes the stateful processing: as explained above, we need to store elements in a persistent state to deal with out of order data. This is done by applying a Pardo to the output joint PCollection. The topic of this article is not stateful processing so, if you want some details, the complete code of JoinDoFn is on Beam github. I will just say here that JoinDoFn does the following:

  • Defines 2 ValueState to store the Persons and Auctions
  • Defines 1 Timer that fires after a configured TTL to clear the Person ValueState
  • For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates. 

The final step is to export the result by applying a ParDo that creates the output POJOs that gather persons information and auctions information.


I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:
  • The extraction of auctions out of events
  • The extraction of persons out of events
  • The keying of the auctions PCollection
  • The keying of the persons PCollection
  • The apply of the stateful JoinDoFn
  • The export to the POJO output 
The actual join resides only in the creation of the KeyedPCollectionTuple and the apply of the CoGroupByKey. The incremental part resides in the apply of windowing: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying FixedWindows to the input PCollection.

That's it, this is how you can create an incremental join pipeline with Apache Beam

Saturday, October 31, 2020

Watermark architecture proposal for Spark Structured Streaming framework

πŸ•₯ 7 min.

The multiple aggregations problem with Spark Structured Streaming framework

Developing the translation layer (called runner) from Apache Beam to Apache Spark we faced an issue with the Spark Structured Streaming framework: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is an open ticket in the Spark project, an ongoing design and an ongoing PR, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project.

The underlying problem: the global watermark in the Spark Structured Streaming framework

Before explaining this problem, let's explain what a watermark is

What is a watermark?

In streaming systems, there is always lags between the time at which a given element was produced (the event timestamp) and the time at which it was received by the system (the processing timestamp) because there can be network outages or slow downs etc... There also can be out of order data. 

To deal with these two facts, streaming systems define the notion of watermark. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data.

If we take an example of a very simple static naΓ―ve watermark defined as an offset of 10 min

    at 2:10 the system considers that no element with a timestamp older than 2:00 can arrive

A corollary notion is late data. It is data that arrives behind the watermark. For example, with the previous watermark example, let's say that an element with timestamp of 1:59 arrives at 2:11. At 2:11 the watermark tells the system that no element with timestamp older that 2:01 (offset of 10 min) can arrive, so this element is behind the watermark, so it will be considered late by the system. As the consequence, the system should drop it. Still, we can define a tolerance that we call allowed lateness that could permit to process this late element. For this element not to be dropped, we would need to set an allowed lateness value of at least 2 min (the element arrives 12 min after it was produced - watermark of 10 min offset).

The problem with the global watermark in an example

Back to the problem: Spark Structured Streaming framework does not support more than one aggregation in a streaming pipeline because the watermark scope in Spark Structured Streaming framework is global for the whole pipeline, there is no watermark per operation. At the end of each microbatch the global watermark is updated with the newest timestamp received. How is it a problem? Let's answer in an example:

Consider we have a streaming pipeline with 2 aggregations (Op1 and Op2 in blue in the diagram below). The first aggregation outputs the highest valued element out of a window of 3 seconds.

Now let's say that the source sends 3 elements with value 6, 4 and 5 and timestamps 1, 2 and 3. These elements get buffered inside Op1 and Op1 updates the global watermark to value 3 (the newest timestamp seen). As the watermark reached the end of Op1's window (window of 3 seconds), then it is time for Op1 to output its data. The highest valued element is value 6 but timestamp 1. As a consequence this element will be considered by Op2 as late because Op2 relies on the global watermark which value is 3, so the element's timestamp of 1 is behind the global watermark. As a consequence, this element will be dropped by Op2 leading to incorrect results.

So, to avoid that incorrect results could be produced by downstream operations, Spark Structured Streaming framework deactivates the support for more than one aggregation in streaming pipelines.

A possible solution to this problem

A watermark per operation

A possible solution to this problem is to replace the global watermark by a watermark per transform.
And the watermarks values will be propagated from the source through the different operations of the pipeline. The local watermark values will be updated as new data arrives. For each operation we need to define:
  • an InputWatermark = Min {outputWatermark(previous operations)} *
  • an OutputWatermark = Min {inputWatermark, oldest(processed element)}

* In a straight pipeline, the InputWatermark is simply the OutputWatermark of the previous operation

    Updated example

    Let's update the previous example of the streaming pipeline with Op1 and Op2. Only now, there is no more global watermark, we define a watermark per operation in green in the diagram below:

    As said above, the watermark is set by the source. Let's say that the source tells the system that no element older than timestamp 1 can arrive (source watermark = 1). This value is propagated to the input watermark and to the ouput watermark of Op1 according to the rules defined in the previous paragraph. The watermarks of Op2 are updated as well according to the same rules. We end up having the green values in the above diagram.

    Now let's say that we receive the very same 3 elements as before with values 6, 4 and 5 and timestamps 1, 2 and 3. 

    They get buffered in Op1. Op1 updates its oldest processed counter (in red) to the minimum timestamp seen. And, as the output watermark is the minimum between this counter and the input watermark, it remains set to 1. And, as the ouput watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    Let's say now that the source updates its watermark to 3 (in red in the diagram below), saying that it should not receive older elements than timestamp 3.

    So the input watermark of Op1 is updated to the output watermark of the previous step (the source) but the output watermark of Op1 is not updated as it is the minimum between the oldest processed counter (which is 1) and the input watermark. As the output watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.

    But something happens now in Op1: its input watermark is value 3, it has passed the end of the window, so it is time for Op1 to output its data and clear its buffers. So it outputs the highest element of value 6 and timestamp 1 as shown below.

    But now, this element is no more dropped by Op2 because the input watermark of Op2 is still 1, Op2 no more considers the element late. It just processes it, buffers it and updates its oldest processed counter.

    Now, one final thing you may wonder, is when and how does Op2 updates its watermarks?

    Well, the answer is with the arrival of new data: let's say, we receive a new element from the source with value 7 and timestamp 4 as show below:

    The processing of this element by Op1 will update the oldest processed counter to the timestamp of the element (4) and the output watermark of Op1 will be updated as the minimum between the input watermak and the oldest processed counter is now 3. As the consequence of the update of the output watermark of Op1, the watermarks of downstream Op2 will be updated as shown above. This is how watermarks values are updated with the arrival of elements.

    There is something left to be discussed: the late case that we will see in the next paragraph.

    What happens for late data in this architecture?

    A given element is late for a given operation when its timestamp is inferior to the InputWatermark of the operation. As the OutputWatermark of this operation is defined as Min {inputWatermark, oldest(processed element)}, then the OutputWatermark of the operation is set the timestamp of the element. So, for the next operation, the InputWatermark will be set to the OuputWatermark of the previous operation, and thus will be set to the timestamp of the element. So we can see that the watermarks of all downstream operations will be set to the value of this element's timestamp. So, to sum it up: a late element will delay downstream operations.


    This watermark architecture was proposed to the Apache Spark project.

    Wednesday, July 8, 2020

    Export metrics from Apache Beam pipelines

    πŸ•₯ 12 min. 

     This blog post is about part of this talk that I gave at the ApacheCon 2018 about universal metrics in Apache Beam. More precisely, it deals with metrics export: how Beam metrics are exported to the outside world in a running Beam pipeline.

    What are Beam metrics?

    The content below is also available in the Beam website but is recalled here for the completeness of the article

    In the Beam model, metrics provide some insight into the current state of a user pipeline, potentially while the pipeline is running. For example, this allows the user to:
    • Check the number of errors encountered while running a specific step in the pipeline
    • Monitor the number of calls made to an external service
    • Count the number of elements that have been processed
    • …and so on.
    Metrics are: 
    • Named: Each metric has a name which consists of a namespace and an actual name. The namespace can be used to differentiate between multiple metrics with the same name and also allows querying for all metrics within a specific namespace.
    • Scoped: Each metric is reported against a specific step in the pipeline (i.e. a specific transform in the pipeline), indicating what code was running when the metric was declared. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.
    • Dynamically Created: Metrics may be created during runtime without pre-declaring them, in much the same way a logger could be created. This makes it easier to produce metrics in utility code and have them usefully reported.
    • Degrade Gracefully: If a runner doesn’t support some part of reporting metrics, the fallback behavior is to drop the metric updates rather than failing the pipeline. If a runner doesn’t support some part of querying metrics, the runner will not return the associated data.
    • Attempted/committed metrics: attempted metrics include retrials whereas committed metrics do not: if a bundle (part of Beam PCollection) is retried a counter committed value will not be incremented. If you want some details on retrial and metrics please take a look at slide 18/19 of the related talk
    Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.

    Types of metrics

    There are three types of metrics that are supported for the moment: Counter, Distribution and Gauge.


    Counter is a metric that reports a single long value and can be incremented or decremented.


    Distribution is a metric that reports information about the distribution of reported values (min value, max value, mean value, sum of values and count of values).


    Gauge is a metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values.

    How does a pipeline author use them in Java?

    How to query a metric?

    Users query metrics using PipelineResult which is Beam object to interact with a pipeline. PipelineResult has a method metrics() which returns a MetricResults object that allows accessing metrics. The main method available in MetricResults allows querying for all metrics matching a given filter.

    Complete example

    Below, there is a simple example of how to use a Counter metric in a user pipeline.

    Metrics export: MetricsPusher

    Here comes the core of the subject: the MetricsPusher feature in Beam !


    Metrics Pusher came to life in the Beam project after these observations:
    • Not all execution engines (Spark, Flink etc...)  ship a way to push the metrics to external sinks, they usually rely on their own monitoring UI. Even though the execution engine UI is updated with the Beam metrics, there are some use cases that require to export the metrics values: What if you need to have the metrics in your own application UI ? Or if you need to have the metrics in a metrics backend for reporting ?
    • Need for consistency:
      • There is no common set of monitoring backend support among execution engines
      • There is a difference of availability moment when the pipeline runs: some runners make the metrics available through PipelineResult during the pipeline execution and others only at the end of the pipeline execution
      • Beam needs to have a common metrics flow no matter the runners for pipelines to be portable


    Design principles

    Metrics pusher was designed based on these principles:
    • No client polling (e.g. JMX) because:
      • infrastructure changes (cluster managers, ...) and must not be known of the users
      • such pulled metrics would be non-aggregated metrics, that users would need to aggregate
      • such pull mechanism raises timing issues: for example if a small batch finishes before the JMX client has had time to pull the metrics, there will be no metric to show.
    • Metrics are pushed from the runner and not pulled from the sdk because:
      • runners needs to decide when to push in particular because to support Beam committed metrics it needs to push only when a Beam bundle is committed.
      • runner system metrics are also defined in Beam (but not yet supported in MetricsPusher) and there again, only the runner knows its internal system.
    • Push aggregated (across parallel workers) metrics periodically rather than push event based metrics because:
      • aggregated metrics avoid the need for metrics backend to merge metrics.
      • runners know how to merge metrics across their workers.
    • We chose that Beam manages the metrics sinks IOs mainly for coherence because, as said above, execution engines have different support of the metrics backend.

    MetricsPusher architecture

    The MetricsPusher service lives as a thread inside the runner that regularly requests for aggregated metrics and then pushes them through a MetricsSink IO to a metrics backend.  

    The architecture diagram above focuses on metrics export, if you want more details on the internal runner metric system part, please take a look at part 4 of the related talk; for the purpose of this article let's just say that it is provided as part of the runner-core Beam library so that different runners (Spark, Flink, Dataflow, ...) share the same core metrics architecture.

    How to use Metrics pusher ?

    There is nothing to code in the user pipeline to use the MetricsPusher feature. The only thing to do is to configure a MetricsSink. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
    The configuration is held in the MetricsOptions class. It contains push period configuration and also sink specific options such as type and URL.

    As for now, there is only Flink and Spark runners that are supported and the metrics sinks available are:
    • MetricsHttpSink: a REST client that sends json serialized metrics to a REST endpoint using an HTTP POST request. Below is an example of the body of the request sending attempted metrics values of a pipeline that defines a metric of each type: 

    • MetricsGraphiteSink: a sink that pushes metrics to Graphite as Graphite messages. Each metric value is an entry with a timestamp in a Graphite message. Below is an example of the graphite message payload sending attempted metrics values of a pipeline that defines a counter metric and a distribution metric.

    Now you know about Beam metrics and their export !

    Friday, June 12, 2020

    Nexmark: benchmark and CI tool for Apache Beam

    πŸ•₯ 10 min. 

    This blog post is about the subject of this talk I gave at the ApacheCon 2017. While the talk focuses on building Nexmark for Apache Beam, this article will focus on the use the Beam project has done of Nexmark since then. If you're interested in details about Nexmark, please refer to the talk.

    What is Nexmark ?

    Nexmark is originally a research paper of 2004 about benchmarking queries on a streaming system. In the context of Beam, what we call Nexmark is its adaptation in the form of Beam pipelines that run in both batch and in streaming modes. 

    Nexmark is a simulation of an auction system. The synthetic data injected in the system is about Persons placing Auctions on Items and Persons placing Bids on these Auctions

    Nexmark then interrogates the auction system for statistical data by running queries such as below.

    The queries are written as Beam pipelines and these pipelines are meant to cover 100% of the Beam public API (including Beam SQL) to benchmark and detect regressions on all of Beam.



    Use of Beam API


    Who is selling in particular US states?

    Join, State, Timer


    Which auctions have seen the most bids in the last period?

    Sliding Window, Combiners


    What is the average selling price per seller for their last 10 closed auctions?

    Global Window, Custom Combiner


    What are the highest bids per period? 

    Fixed Windows, Side Input

    9 *

    Winning bids

    Custom Window

    11 *

    How many bids did a user make in each session he was active?

    Session Window, Triggering

    12 *

    How many bids does a user make within a fixed processing time limit?

    Global Window, working in Processing Time

    * Not in the original Nexmark research paper.

    These queries are particularly suited for benchmark because they are representative of what a real user could request on a real auction system. They are also quite complex and leverage all the capabilities of Beam runners.

    Throughout the execution of each pipeline, Nexmark gathers metrics (using Beam metrics system) such as the number of output results, event rate (input element generation rate) and obviously the query execution time.

    What is the architecture of Nexmark?

    Nexmark is a standalone executable which 
    • creates a source that generates events (such as a Person creating an Auction or placing a Bid on an ongoing Auction)
    • runs the queries depending on the workload selected by the user (ingestion rate, number of events generated, selected queries, etc...)
    • monitors queries execution and collects their execution metrics.
    Nexmark can run on all the supported Beam runners.

    The user can chose to benchmark in batch mode or in streaming mode: Nexmark creates either a Beam BoundedSource to trigger batch mode in the runner or an UnboundedSource to trigger streaming mode in the runner.

    Each query pipeline has this form:

    1.Get PCollection<Event> as input
    2.Apply ParDo + Filter to extract the object of interest: Bids, Auctions, Person
    3.Apply Beam transforms that implement the query business logic: Filter, Count, GroupByKey, Window, etc...
    4.Apply ParDo to output the final PCollection: collection of AuctionPrice, AuctionCount ...

    How is Nexmark useful for Beam ?

        CI setup

    With each commit on master, a jenkins script runs in a local in-JVM runner a Nexmark suite of queries with 100 000 generated events:
    • on all the queries including Beam SQL versions
    • on all the supported runners
    • in both batch and streaming modes
    The output of this benchmark is stored in Google BigQuery tables per query x runner x mode.

    Each record of the table contains:
    • the size of the output PCollection
    • the execution time of the query
    The results are then printed into graphs allowing to build performance dashboards and output size dashboards per runner as shown in the images below showing part of the flink dashboards:

        Performance assessment

    Such dashboards allow to keep track of the performances of the different Beam runners:

            Detect performance regressions

    The performance graphs allow to detect performance regressions. As query exercise a particular part of Beam API, it allows to narrow the field of search. Here query 6 uses Combiners so the regression showed below observed on 2018/10/05 on query 6 on the spark runner allowed to point to Combine translation in the spark runner and then fix it.

            Measure the impact of performance changes

    When performance improvement tasks are run on a particular runner, we can check the impact on the graphs. Here there was a performance improvement on the spark runner on RDD caching that decreased the execution time of query 2 by more than 2. This change was committed just after 2018/11/21:

            Compare performances of the runners

    Each query exercise a different part of the Beam API, so for each one we can compare the performances of the runner translations of that particular part of the API. The graph below, for example, compares the performances of the new Spark runner based on the Structured Streaming framework and the current Spark runner based on RDD/Dstream. The comparison is on query 8 which is a join (CoGroupByKey Beam transform).

        Functional assessment

    There are also graphs that print the size of the output PCollection for each query, such graphs allow to check the functional behavior of the runners.

            Detect functional regressions

    The output size graphs are useful to detect functional regressions: indeed, such a graph should be a straight line as, whatever the performance, a given query produces always the same results unless there is a bug: if the pipeline produces wrong data, then the size of the output PCollection will vary as in the example below. In this example, the flink runner produced wrong data for some time when executing query 12 in streaming mode. 

    If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.

            Verify compatibility

    Beam provides the Capability matrix that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an UnsupportedException. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.

        Release check

    All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope.