Showing posts with label runner. Show all posts
Showing posts with label runner. Show all posts

Wednesday, July 8, 2020

Export metrics from Apache Beam pipelines

🕥 12 min. 

 This blog post is about part of this talk that I gave at the ApacheCon 2018 about universal metrics in Apache Beam. More precisely, it deals with metrics export: how Beam metrics are exported to the outside world in a running Beam pipeline.

What are Beam metrics?


The content below is also available in the Beam website but is recalled here for the completeness of the article

In the Beam model, metrics provide some insight into the current state of a user pipeline, potentially while the pipeline is running. For example, this allows the user to:
  • Check the number of errors encountered while running a specific step in the pipeline
  • Monitor the number of calls made to an external service
  • Count the number of elements that have been processed
  • …and so on.
Metrics are: 
  • Named: Each metric has a name which consists of a namespace and an actual name. The namespace can be used to differentiate between multiple metrics with the same name and also allows querying for all metrics within a specific namespace.
  • Scoped: Each metric is reported against a specific step in the pipeline (i.e. a specific transform in the pipeline), indicating what code was running when the metric was declared. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.
  • Dynamically Created: Metrics may be created during runtime without pre-declaring them, in much the same way a logger could be created. This makes it easier to produce metrics in utility code and have them usefully reported.
  • Degrade Gracefully: If a runner doesn’t support some part of reporting metrics, the fallback behavior is to drop the metric updates rather than failing the pipeline. If a runner doesn’t support some part of querying metrics, the runner will not return the associated data.
  • Attempted/committed metrics: attempted metrics include retrials whereas committed metrics do not: if a bundle (part of Beam PCollection) is retried a counter committed value will not be incremented. If you want some details on retrial and metrics please take a look at slide 18/19 of the related talk
Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.
 

Types of metrics


There are three types of metrics that are supported for the moment: Counter, Distribution and Gauge.

Counter


Counter is a metric that reports a single long value and can be incremented or decremented.


Distribution


Distribution is a metric that reports information about the distribution of reported values (min value, max value, mean value, sum of values and count of values).


Gauge


Gauge is a metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values.


How does a pipeline author use them in Java?


How to query a metric?


Users query metrics using PipelineResult which is Beam object to interact with a pipeline. PipelineResult has a method metrics() which returns a MetricResults object that allows accessing metrics. The main method available in MetricResults allows querying for all metrics matching a given filter.


Complete example


Below, there is a simple example of how to use a Counter metric in a user pipeline.


Metrics export: MetricsPusher


Here comes the core of the subject: the MetricsPusher feature in Beam !


Motivation


Metrics Pusher came to life in the Beam project after these observations:
  • Not all execution engines (Spark, Flink etc...)  ship a way to push the metrics to external sinks, they usually rely on their own monitoring UI. Even though the execution engine UI is updated with the Beam metrics, there are some use cases that require to export the metrics values: What if you need to have the metrics in your own application UI ? Or if you need to have the metrics in a metrics backend for reporting ?
  • Need for consistency:
    • There is no common set of monitoring backend support among execution engines
    • There is a difference of availability moment when the pipeline runs: some runners make the metrics available through PipelineResult during the pipeline execution and others only at the end of the pipeline execution
    • Beam needs to have a common metrics flow no matter the runners for pipelines to be portable


Architecture


Design principles


Metrics pusher was designed based on these principles:
  • No client polling (e.g. JMX) because:
    • infrastructure changes (cluster managers, ...) and must not be known of the users
    • such pulled metrics would be non-aggregated metrics, that users would need to aggregate
    • such pull mechanism raises timing issues: for example if a small batch finishes before the JMX client has had time to pull the metrics, there will be no metric to show.
  • Metrics are pushed from the runner and not pulled from the sdk because:
    • runners needs to decide when to push in particular because to support Beam committed metrics it needs to push only when a Beam bundle is committed.
    • runner system metrics are also defined in Beam (but not yet supported in MetricsPusher) and there again, only the runner knows its internal system.
  • Push aggregated (across parallel workers) metrics periodically rather than push event based metrics because:
    • aggregated metrics avoid the need for metrics backend to merge metrics.
    • runners know how to merge metrics across their workers.
  • We chose that Beam manages the metrics sinks IOs mainly for coherence because, as said above, execution engines have different support of the metrics backend.


MetricsPusher architecture


The MetricsPusher service lives as a thread inside the runner that regularly requests for aggregated metrics and then pushes them through a MetricsSink IO to a metrics backend.  


The architecture diagram above focuses on metrics export, if you want more details on the internal runner metric system part, please take a look at part 4 of the related talk; for the purpose of this article let's just say that it is provided as part of the runner-core Beam library so that different runners (Spark, Flink, Dataflow, ...) share the same core metrics architecture.


How to use Metrics pusher ?


There is nothing to code in the user pipeline to use the MetricsPusher feature. The only thing to do is to configure a MetricsSink. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
The configuration is held in the MetricsOptions class. It contains push period configuration and also sink specific options such as type and URL.

As for now, there is only Flink and Spark runners that are supported and the metrics sinks available are:
  • MetricsHttpSink: a REST client that sends json serialized metrics to a REST endpoint using an HTTP POST request. Below is an example of the body of the request sending attempted metrics values of a pipeline that defines a metric of each type: 

  • MetricsGraphiteSink: a sink that pushes metrics to Graphite as Graphite messages. Each metric value is an entry with a timestamp in a Graphite message. Below is an example of the graphite message payload sending attempted metrics values of a pipeline that defines a counter metric and a distribution metric.

Now you know about Beam metrics and their export !

Friday, June 12, 2020

Nexmark: benchmark and CI tool for Apache Beam

🕥 10 min. 

This blog post is about the subject of this talk I gave at the ApacheCon 2017. While the talk focuses on building Nexmark for Apache Beam, this article will focus on the use the Beam project has done of Nexmark since then. If you're interested in details about Nexmark, please refer to the talk.

What is Nexmark ?


Nexmark is originally a research paper of 2004 about benchmarking queries on a streaming system. In the context of Beam, what we call Nexmark is its adaptation in the form of Beam pipelines that run in both batch and in streaming modes. 

Nexmark is a simulation of an auction system. The synthetic data injected in the system is about Persons placing Auctions on Items and Persons placing Bids on these Auctions

Nexmark then interrogates the auction system for statistical data by running queries such as below.

The queries are written as Beam pipelines and these pipelines are meant to cover 100% of the Beam public API (including Beam SQL) to benchmark and detect regressions on all of Beam.

Query

Description

Use of Beam API

3

Who is selling in particular US states?

Join, State, Timer

5

Which auctions have seen the most bids in the last period?

Sliding Window, Combiners

6

What is the average selling price per seller for their last 10 closed auctions?

Global Window, Custom Combiner

7

What are the highest bids per period? 

Fixed Windows, Side Input

9 *

Winning bids

Custom Window

11 *

How many bids did a user make in each session he was active?

Session Window, Triggering

12 *

How many bids does a user make within a fixed processing time limit?

Global Window, working in Processing Time


* Not in the original Nexmark research paper.

These queries are particularly suited for benchmark because they are representative of what a real user could request on a real auction system. They are also quite complex and leverage all the capabilities of Beam runners.

Throughout the execution of each pipeline, Nexmark gathers metrics (using Beam metrics system) such as the number of output results, event rate (input element generation rate) and obviously the query execution time.


What is the architecture of Nexmark?




Nexmark is a standalone executable which 
  • creates a source that generates events (such as a Person creating an Auction or placing a Bid on an ongoing Auction)
  • runs the queries depending on the workload selected by the user (ingestion rate, number of events generated, selected queries, etc...)
  • monitors queries execution and collects their execution metrics.
Nexmark can run on all the supported Beam runners.

The user can chose to benchmark in batch mode or in streaming mode: Nexmark creates either a Beam BoundedSource to trigger batch mode in the runner or an UnboundedSource to trigger streaming mode in the runner.

Each query pipeline has this form:

1.Get PCollection<Event> as input
2.Apply ParDo + Filter to extract the object of interest: Bids, Auctions, Person
3.Apply Beam transforms that implement the query business logic: Filter, Count, GroupByKey, Window, etc...
4.Apply ParDo to output the final PCollection: collection of AuctionPrice, AuctionCount ...


How is Nexmark useful for Beam ?


    CI setup


With each commit on master, a jenkins script runs in a local in-JVM runner a Nexmark suite of queries with 100 000 generated events:
  • on all the queries including Beam SQL versions
  • on all the supported runners
  • in both batch and streaming modes
The output of this benchmark is stored in Google BigQuery tables per query x runner x mode.

Each record of the table contains:
  • the size of the output PCollection
  • the execution time of the query
The results are then printed into graphs allowing to build performance dashboards and output size dashboards per runner as shown in the images below showing part of the flink dashboards:




    Performance assessment


Such dashboards allow to keep track of the performances of the different Beam runners:

        Detect performance regressions


The performance graphs allow to detect performance regressions. As query exercise a particular part of Beam API, it allows to narrow the field of search. Here query 6 uses Combiners so the regression showed below observed on 2018/10/05 on query 6 on the spark runner allowed to point to Combine translation in the spark runner and then fix it.



        Measure the impact of performance changes


When performance improvement tasks are run on a particular runner, we can check the impact on the graphs. Here there was a performance improvement on the spark runner on RDD caching that decreased the execution time of query 2 by more than 2. This change was committed just after 2018/11/21:



        Compare performances of the runners


Each query exercise a different part of the Beam API, so for each one we can compare the performances of the runner translations of that particular part of the API. The graph below, for example, compares the performances of the new Spark runner based on the Structured Streaming framework and the current Spark runner based on RDD/Dstream. The comparison is on query 8 which is a join (CoGroupByKey Beam transform).




    Functional assessment


There are also graphs that print the size of the output PCollection for each query, such graphs allow to check the functional behavior of the runners.

        Detect functional regressions


The output size graphs are useful to detect functional regressions: indeed, such a graph should be a straight line as, whatever the performance, a given query produces always the same results unless there is a bug: if the pipeline produces wrong data, then the size of the output PCollection will vary as in the example below. In this example, the flink runner produced wrong data for some time when executing query 12 in streaming mode. 



If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.

        Verify compatibility


Beam provides the Capability matrix that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an UnsupportedException. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.



    Release check


All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope. 

Friday, February 7, 2020

Understand Apache Beam runners: focus on the Spark runner

🕥 5 min.

Previously on Apache Beam runners 😀


In the previous article, we had a brief overview of what an Apache Beam runner is. This article will dig into more details.

The previous article introduced this very simple pipeline:

We saw that the Beam SDK translates this pipeline into a DAG representing the pipeline in the form of Beam transform nodes. Now let's see how a Beam runner translates this DAG. Let's say that the user choses Apache Spark as the target Big Data platform when he launches his pipeline.

The runner (at last)


The job of the runner is to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this native code that will be executed by a Big Data cluster. If Spark is chosen, then the runner translates this DAG below into the Spark native pipeline code below. For the sake of simplicity the Spark pipeline is pseudo-code.


Composite and primitive transforms: the level of translation


In the previous article we talked about Beam transforms (primitive transforms and composite transforms). In the continuation of the blog, we will refer to "composite transform" as just "composite" and "primitive transform" as just "primitive".

The DAG above is the expanded DAG: on the left hand-side are the Beam transforms of the user pipeline. But among these transforms only Read is a primitive. The others are implemented by Beam SDK as composites of other primitives. Composites can also be made of composites themselves (like Count transform is made of Combine transform) but in the end they are always made of primitives (Pardo or GroupByKey). And these primitives are what the runner translates.

But in some cases, the runner can chose to translate at a composite level of the graph, not at a primitive level depending on the target Big Data technology capabilities. Indeed, if there is a direct correspondance of the Beam composite in the target API, the runner does not decompose the composite into its primitives and translates directly the composite. The green boxes in the DAG represent the level of translation. In our example, there is a direct equivalent of Beam Combine composite to a Spark Aggregator (agg in the Spark pipeline).

The translation itself


The translation occurs when the pipeline is run (pipeline.run() is executed). To translate the DAG, the runner visits the graph. All Beam runners work the same, only the target API changes with the chosen runner.

The first step is to detect the translation mode (batch or streaming) by searching for Beam BoundedSource (like Elasticsearch for example) or UnBoundedSource (like Kafka for example). Knowing the translation mode, the runner can chose
  • the proper Spark DataSourceV2 to instantiate either implementing ReadSupport (batch) or MicroBatchReadSupport (streaming)
  • the proper Spark action to execute on the output dataset either foreach (batch) or writeStream (streaming)

Then the DAG visit continues and each node is translated to the target native API : Read gets translated to a Spark DataSourceV2 that creates the input dataset, Pardo gets translated to a Spark flatmap that is applied to the input dataset and so on until the output dataset.

Pipeline run


When the visit of the DAG is done, the runner applies the action chosen above to the output dataset to run the pipeline. At this point the spark framework executes the resulting native Spark pipeline.

Wednesday, January 29, 2020

Introduction to Apache Beam and the runners

🕥 7 min.

This is a first blog to introduce Apache Beam before going into more details about Beam runners in the next blogs.


What is Apache Beam ?


To define Apache Beam let's start with a quote
«Batch / streaming? Never heard of either.»
(Batch is nearly always part of higher-level streaming) 

Beam is a programming model that allows to create big data pipelines. Its particularity is that its API is unified between batch and streaming because the only difference between a batch pipeline and a streaming pipeline is that batch pipelines deal with finite data whereas streaming pipelines deal with infinite data. So, batch can be seen as a sub-part of the streaming problem. And Beam provides windowing features that divide infinite data into finite chunks.

Let's take a look at the Beam stack:

The pipeline user code is written using one of the several language SDKs (java, python or go) that Beam provides to the user. The SDK is a set of libraries of transforms and IOs that allow him to input data into his pipeline, do some computation on this data and then output to a given destination. Once the pipeline is written, the user choses a Big Data execution engine such as Apache FlinkApache Spark or others to run his pipeline. When the pipeline is run, the runner first translates it to native code depending on the chosen Big Data engine. It is this native code that is executed by the Big Data cluster.

Primitive transforms


The Beam SDK contains a set of transforms that are the building blocks of the pipelines the user writes. There are only 3 primitives :


Pardo: It is the good old flatmap that allows to process a collection element per element in parallel and apply them a function called DoFn


GroupByKey: This one groups the elements that have a common key. The groups can then be processed in parallel by next transforms downstream in the pipeline.


Read: The read transform is the way to input data into the pipeline by reading an external source which can be a batch source (like a database) or a streaming (continuously growing) source (such as a kafka topic).

Composite transforms



Composite transforms: All the other transforms available in the SDK are actually implemented as composites of the 3 previous ones.

As an example, the Reduce of Beam which is called Combine and which allows to do a computation on data spread across the cluster is implemented like this:


Other examples of composite transforms provided by the SDK are FlatMapElements, MapElements or Count that we will see in next chapter. But the user can also create his own composites and composites of composites.

A simple Beam pipeline


Let's take a look at a first simple Beam pipeline:


This is the usual Hello World type big data pipeline that counts the occurrences of the different words in a text file. It reads a text file from google storage and the result of the count is output to google storage. This is a very simple straight pipeline. Not all the pipelines have to be straight that way, it is there to serve as a baseline example for the continuation of the blog and to illustrate some key concepts of Beam :
  • Pipeline: the user interaction object.
  • PCollection: Beam abstraction of the collection of elements spread across a cluster.
  • TextIO: this IO is used to read and write from/to the text file. Reading part of the IO is in reality a Read transform and writing part of the IO is in reality a Write transform
  • Count: Combine transform that counts occurrences
  • FlatMapElements and MapElements: They are composite transforms of ParDo that are there for convenience.

The resulting DAG


When Beam executes the above pipeline code, the SDK first creates the adjacent graph to represent the pipeline. It is known as the DAG (Direct Acyclic Graph). For each transform of the pipeline code, a node of the DAG is created.  
  •  Read node corresponds to the Read transform of TextIO (the input of the piepeline)
  • Write node corresponds to the Write transform of TextIO (the output of the pipeline)
  • All the other nodes are a direct representation of the transforms of the pipeline.
Please note that only the Read transform is a primitive transform as described in the above paragraph, all the others are composite transforms.

But what about the runner ?


This is when the runner enters the scene. The job of the runner is simply to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this code that will be executed by a Big Data cluster such as Apache Spark or Apache Flink.

You want to know more about the runner ? The next article describes what the runner is and uses the Spark runner as an example