Wednesday, July 8, 2020

Export metrics from Apache Beam pipelines

🕥 12 min. 

 This blog post is about part of this talk that I gave at the ApacheCon 2018 about universal metrics in Apache Beam. More precisely, it deals with metrics export: how Beam metrics are exported to the outside world in a running Beam pipeline.

What are Beam metrics?


The content below is also available in the Beam website but is recalled here for the completeness of the article

In the Beam model, metrics provide some insight into the current state of a user pipeline, potentially while the pipeline is running. For example, this allows the user to:
  • Check the number of errors encountered while running a specific step in the pipeline
  • Monitor the number of calls made to an external service
  • Count the number of elements that have been processed
  • …and so on.
Metrics are: 
  • Named: Each metric has a name which consists of a namespace and an actual name. The namespace can be used to differentiate between multiple metrics with the same name and also allows querying for all metrics within a specific namespace.
  • Scoped: Each metric is reported against a specific step in the pipeline (i.e. a specific transform in the pipeline), indicating what code was running when the metric was declared. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.
  • Dynamically Created: Metrics may be created during runtime without pre-declaring them, in much the same way a logger could be created. This makes it easier to produce metrics in utility code and have them usefully reported.
  • Degrade Gracefully: If a runner doesn’t support some part of reporting metrics, the fallback behavior is to drop the metric updates rather than failing the pipeline. If a runner doesn’t support some part of querying metrics, the runner will not return the associated data.
  • Attempted/committed metrics: attempted metrics include retrials whereas committed metrics do not: if a bundle (part of Beam PCollection) is retried a counter committed value will not be incremented. If you want some details on retrial and metrics please take a look at slide 18/19 of the related talk
Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.
 

Types of metrics


There are three types of metrics that are supported for the moment: Counter, Distribution and Gauge.

Counter


Counter is a metric that reports a single long value and can be incremented or decremented.


Distribution


Distribution is a metric that reports information about the distribution of reported values (min value, max value, mean value, sum of values and count of values).


Gauge


Gauge is a metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values.


How does a pipeline author use them in Java?


How to query a metric?


Users query metrics using PipelineResult which is Beam object to interact with a pipeline. PipelineResult has a method metrics() which returns a MetricResults object that allows accessing metrics. The main method available in MetricResults allows querying for all metrics matching a given filter.


Complete example


Below, there is a simple example of how to use a Counter metric in a user pipeline.


Metrics export: MetricsPusher


Here comes the core of the subject: the MetricsPusher feature in Beam !


Motivation


Metrics Pusher came to life in the Beam project after these observations:
  • Not all execution engines (Spark, Flink etc...)  ship a way to push the metrics to external sinks, they usually rely on their own monitoring UI. Even though the execution engine UI is updated with the Beam metrics, there are some use cases that require to export the metrics values: What if you need to have the metrics in your own application UI ? Or if you need to have the metrics in a metrics backend for reporting ?
  • Need for consistency:
    • There is no common set of monitoring backend support among execution engines
    • There is a difference of availability moment when the pipeline runs: some runners make the metrics available through PipelineResult during the pipeline execution and others only at the end of the pipeline execution
    • Beam needs to have a common metrics flow no matter the runners for pipelines to be portable


Architecture


Design principles


Metrics pusher was designed based on these principles:
  • No client polling (e.g. JMX) because:
    • infrastructure changes (cluster managers, ...) and must not be known of the users
    • such pulled metrics would be non-aggregated metrics, that users would need to aggregate
    • such pull mechanism raises timing issues: for example if a small batch finishes before the JMX client has had time to pull the metrics, there will be no metric to show.
  • Metrics are pushed from the runner and not pulled from the sdk because:
    • runners needs to decide when to push in particular because to support Beam committed metrics it needs to push only when a Beam bundle is committed.
    • runner system metrics are also defined in Beam (but not yet supported in MetricsPusher) and there again, only the runner knows its internal system.
  • Push aggregated (across parallel workers) metrics periodically rather than push event based metrics because:
    • aggregated metrics avoid the need for metrics backend to merge metrics.
    • runners know how to merge metrics across their workers.
  • We chose that Beam manages the metrics sinks IOs mainly for coherence because, as said above, execution engines have different support of the metrics backend.


MetricsPusher architecture


The MetricsPusher service lives as a thread inside the runner that regularly requests for aggregated metrics and then pushes them through a MetricsSink IO to a metrics backend.  


The architecture diagram above focuses on metrics export, if you want more details on the internal runner metric system part, please take a look at part 4 of the related talk; for the purpose of this article let's just say that it is provided as part of the runner-core Beam library so that different runners (Spark, Flink, Dataflow, ...) share the same core metrics architecture.


How to use Metrics pusher ?


There is nothing to code in the user pipeline to use the MetricsPusher feature. The only thing to do is to configure a MetricsSink. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
The configuration is held in the MetricsOptions class. It contains push period configuration and also sink specific options such as type and URL.

As for now, there is only Flink and Spark runners that are supported and the metrics sinks available are:
  • MetricsHttpSink: a REST client that sends json serialized metrics to a REST endpoint using an HTTP POST request. Below is an example of the body of the request sending attempted metrics values of a pipeline that defines a metric of each type: 

  • MetricsGraphiteSink: a sink that pushes metrics to Graphite as Graphite messages. Each metric value is an entry with a timestamp in a Graphite message. Below is an example of the graphite message payload sending attempted metrics values of a pipeline that defines a counter metric and a distribution metric.

Now you know about Beam metrics and their export !