This blog post is about the subject of this talk I gave at the ApacheCon 2017. While the talk focuses on building Nexmark for Apache Beam, this article will focus on the use the Beam project has done of Nexmark since then. If you're interested in details about Nexmark, please refer to the talk.
What is Nexmark ?
Nexmark is originally a research paper of 2004 about benchmarking queries on a streaming system. In the context of Beam, what we call Nexmark is its adaptation in the form of Beam pipelines that run in both batch and in streaming modes.
Nexmark is a simulation of an auction system. The synthetic data injected in the system is about Persons placing Auctions on Items and Persons placing Bids on these Auctions.
Nexmark then interrogates the auction system for statistical data by running queries such as below.
The queries are written as Beam pipelines and these pipelines are meant to cover 100% of the Beam public API (including Beam SQL) to benchmark and detect regressions on all of Beam.
Query | Description | Use of Beam API |
3 | Who is selling in particular US states? | Join, State, Timer |
5 | Which auctions have seen the most bids in the last period? | Sliding Window, Combiners |
6 | What is the average selling price per seller for their last 10 closed auctions? | Global Window, Custom Combiner |
7 | What are the highest bids per period? | Fixed Windows, Side Input |
9 * | Winning bids | Custom Window |
11 * | How many bids did a user make in each session he was active? | Session Window, Triggering |
12 * | How many bids does a user make within a fixed processing time limit? | Global Window, working in Processing Time |
* Not in the original Nexmark research paper.
These queries are particularly suited for benchmark because they are representative of what a real user could request on a real auction system. They are also quite complex and leverage all the capabilities of Beam runners.
Throughout the execution of each pipeline, Nexmark gathers metrics (using Beam metrics system) such as the number of output results, event rate (input element generation rate) and obviously the query execution time.
- creates a source that generates events (such as a Person creating an Auction or placing a Bid on an ongoing Auction)
- runs the queries depending on the workload selected by the user (ingestion rate, number of events generated, selected queries, etc...)
- monitors queries execution and collects their execution metrics.
Nexmark can run on all the supported Beam runners.
The user can chose to benchmark in batch mode or in streaming mode: Nexmark creates either a Beam BoundedSource to trigger batch mode in the runner or an UnboundedSource to trigger streaming mode in the runner.
Each query pipeline has this form:
1.Get PCollection<Event> as input
2.Apply ParDo + Filter to extract the object of interest: Bids, Auctions, Person
3.Apply Beam transforms that implement the query business logic: Filter, Count, GroupByKey, Window, etc...
4.Apply ParDo to output the final PCollection: collection of AuctionPrice, AuctionCount ...
How is Nexmark useful for Beam ?
CI setup
With each commit on master, a jenkins script runs in a local in-JVM runner a Nexmark suite of queries with 100 000 generated events:
- on all the queries including Beam SQL versions
- on all the supported runners
- in both batch and streaming modes
The output of this benchmark is stored in Google BigQuery tables per query x runner x mode.
Each record of the table contains:
- the size of the output PCollection
- the execution time of the query
The results are then printed into graphs allowing to build performance dashboards and output size dashboards per runner as shown in the images below showing part of the flink dashboards:
Performance assessment
Such dashboards allow to keep track of the performances of the different Beam runners:
Detect performance regressions
The performance graphs allow to detect performance regressions. As query exercise a particular part of Beam API, it allows to narrow the field of search. Here query 6 uses Combiners so the regression showed below observed on 2018/10/05 on query 6 on the spark runner allowed to point to Combine translation in the spark runner and then fix it.
Measure the impact of performance changes
When performance improvement tasks are run on a particular runner, we can check the impact on the graphs. Here there was a performance improvement on the spark runner on RDD caching that decreased the execution time of query 2 by more than 2. This change was committed just after 2018/11/21:
Compare performances of the runners
Each query exercise a different part of the Beam API, so for each one we can compare the performances of the runner translations of that particular part of the API. The graph below, for example, compares the performances of the new Spark runner based on the Structured Streaming framework and the current Spark runner based on RDD/Dstream. The comparison is on query 8 which is a join (CoGroupByKey Beam transform).
Functional assessment
There are also graphs that print the size of the output PCollection for each query, such graphs allow to check the functional behavior of the runners.
Detect functional regressions
The output size graphs are useful to detect functional regressions: indeed, such a graph should be a straight line as, whatever the performance, a given query produces always the same results unless there is a bug: if the pipeline produces wrong data, then the size of the output PCollection will vary as in the example below. In this example, the flink runner produced wrong data for some time when executing query 12 in streaming mode.
If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.
If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.
Verify compatibility
Beam provides the Capability matrix that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an UnsupportedException. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.
Beam provides the Capability matrix that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an UnsupportedException. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.
Release check
All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope.
All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope.