This blog post is about part of this talk that I gave at the ApacheCon 2018 about universal metrics in
Apache Beam. More
precisely, it deals with metrics export: how Beam metrics are exported to the
outside world in a running Beam pipeline.
What are Beam metrics?
The content below is also available in the Beam website but is recalled here for the completeness of the article
In the Beam model, metrics provide some insight into the current state of a
user pipeline, potentially while the pipeline is running. For example, this
allows the user to:
- Check the number of errors encountered while running a specific step in the pipeline
- Monitor the number of calls made to an external service
- Count the number of elements that have been processed
- …and so on.
Metrics are:
- Named: Each metric has a name which consists of a namespace and an actual name. The namespace can be used to differentiate between multiple metrics with the same name and also allows querying for all metrics within a specific namespace.
- Scoped: Each metric is reported against a specific step in the pipeline (i.e. a specific transform in the pipeline), indicating what code was running when the metric was declared. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.
- Dynamically Created: Metrics may be created during runtime without pre-declaring them, in much the same way a logger could be created. This makes it easier to produce metrics in utility code and have them usefully reported.
- Degrade Gracefully: If a runner doesn’t support some part of reporting metrics, the fallback behavior is to drop the metric updates rather than failing the pipeline. If a runner doesn’t support some part of querying metrics, the runner will not return the associated data.
- Attempted/committed metrics: attempted metrics include retrials whereas committed metrics do not: if a bundle (part of Beam PCollection) is retried a counter committed value will not be incremented. If you want some details on retrial and metrics please take a look at slide 18/19 of the related talk.
Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.
Types of metrics
There are three types of metrics that are supported for the moment:
Counter, Distribution and Gauge.
Counter
Counter is a metric that reports a single long value and can be incremented or
decremented.
Distribution
Distribution is a metric that reports information about the distribution of
reported values (min value, max value, mean value, sum of values and count of
values).
Gauge
Gauge is a metric that reports the latest value out of reported values.
Since metrics are collected from many workers the value may not be the
absolute last, but one of the latest values.
How does a pipeline author use them in Java?
How to query a metric?
Users query metrics using PipelineResult which is Beam object to
interact with a pipeline. PipelineResult has a method metrics() which
returns a MetricResults object that allows accessing metrics. The
main method available in MetricResults allows querying for all metrics
matching a given filter.
Complete example
Below, there is a simple example of how to use a Counter metric in a
user pipeline.
Metrics export: MetricsPusher
Here comes the core of the subject: the MetricsPusher feature in Beam !
Motivation
Metrics Pusher came to life in the Beam project after these observations:
- Not all execution engines (Spark, Flink etc...) ship a way to push the metrics to external sinks, they usually rely on their own monitoring UI. Even though the execution engine UI is updated with the Beam metrics, there are some use cases that require to export the metrics values: What if you need to have the metrics in your own application UI ? Or if you need to have the metrics in a metrics backend for reporting ?
- Need for consistency:
- There is no common set of monitoring backend support among execution engines
- There is a difference of availability moment when the pipeline runs: some runners make the metrics available through PipelineResult during the pipeline execution and others only at the end of the pipeline execution
- Beam needs to have a common metrics flow no matter the runners for pipelines to be portable
Architecture
Design principles
Metrics pusher was designed based on these principles:
- No client polling (e.g. JMX) because:
- infrastructure changes (cluster managers, ...) and must not be known of the users
- such pulled metrics would be non-aggregated metrics, that users would need to aggregate
- such pull mechanism raises timing issues: for example if a small batch finishes before the JMX client has had time to pull the metrics, there will be no metric to show.
- Metrics are pushed from the runner and not pulled from the sdk because:
- runners needs to decide when to push in particular because to support Beam committed metrics it needs to push only when a Beam bundle is committed.
- runner system metrics are also defined in Beam (but not yet supported in MetricsPusher) and there again, only the runner knows its internal system.
- Push aggregated (across parallel workers) metrics periodically rather than push event based metrics because:
- aggregated metrics avoid the need for metrics backend to merge metrics.
- runners know how to merge metrics across their workers.
- We chose that Beam manages the metrics sinks IOs mainly for coherence because, as said above, execution engines have different support of the metrics backend.
MetricsPusher architecture
The MetricsPusher service lives as a thread inside the runner that regularly
requests for aggregated metrics and then pushes them through a MetricsSink IO
to a metrics backend.
The architecture diagram above focuses on metrics export, if you want more
details on the internal runner metric system part,
please take a look at part 4 of the related talk; for the purpose of this article let's just say that it is provided as part
of the runner-core Beam library so that different runners
(Spark, Flink, Dataflow, ...) share the same core metrics architecture.
How to use Metrics pusher ?
There is nothing to code in the user pipeline to use the MetricsPusher
feature. The only thing to do is to configure a MetricsSink. If a
metrics sink is set up in the configuration, the runner will push metrics to
it at a default 5s period.
The configuration is held in the
MetricsOptions
class. It contains push period configuration and also sink specific options
such as type and URL.
-
MetricsHttpSink: a REST client that sends json serialized
metrics to a REST endpoint using an HTTP POST request. Below is an example
of the body of the request sending attempted metrics values of a pipeline
that defines a metric of each type:
-
MetricsGraphiteSink: a sink that pushes metrics to Graphite as
Graphite messages. Each metric value is an entry with a timestamp in a
Graphite message. Below is an example of the graphite message payload
sending attempted metrics values of a pipeline that defines a
counter metric and a
distribution
metric.