🕥 5 min.
Introduction
This is the third article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.
What is Beam Combine?
Combine is the Beam word for the reduce transformation. This transformation combines data spread across workers to make a computation. The computation can be whatever you like. In the Beam SDK, Combine is used under the wood in various built-in transforms: Count, Sum, Mean ... but you can also create your own Combine implementations.
The overview architecture of the Combine is like this:
The PCollection is split across the cluster (consider here that we have a cluster of 3 nodes). On each of the nodes, Beam creates an Accumulator. Each input value is added to the local accumulator. Then the accumulators are merged into a single combined output accumulator from which the result is extracted. At each of these 3 steps, the user can add whatever computation code is necessary for his use case.
How to create your own combiner?
To create your own combiner you need to extend CombineFn<InputT,AccumT,OutputT>. You'll have to override some methods:
- createAccumulator() operation is invoked to create a fresh mutable accumulator value of type AccumT, initialized to represent the combination of zero values.
- addInput(AccumT, InputT) operation is invoked on each input value to add it to the local accumulator AccumT value.
- mergeAccumulators(java.lang.Iterable<AccumT>) operation is invoked to combine a collection of accumulators AccumT values into a single combined output accumulator AccumT value, once the merging accumulators have had all the input values in their partition added to them. This operation is invoked repeatedly, until there is only one accumulator value left.
- extractOutput(AccumT) operation is invoked on the final accumulator AccumT value to get the output OutputT value.
Your combiner can be called in the pipeline with:
inputPCollection.apply(Combine.globally(new CustomCombineFn()));
or for keyed input PCollections:
inputPCollection.apply(Combine.perKey(new CustomCombineFn()));
Please note that combining functions should be associative and commutative. Associativity is required because input values are first broken up into partitions before being combined, and their intermediate results further combined, in an arbitrary tree structure. Commutativity is required because any order of the input values is ignored when breaking up input values into partitions.
Also, please note about coders: some form of data encoding is required when using custom types in a CombineFn which do not have well-known coders. If so, usually your custom type will just implement Serializable and Beam will provide a coder automatically. Beam relies on the generic CoderProvider, which is able to provide a coder for any Serializable if applicable. In cases where Serializable is not efficient, or inapplicable, there are two alternatives for encoding:
- Default CoderRegistry: for example, implement a coder class explicitly and use the @DefaultCoder tag. See the CoderRegistry for the numerous ways in which to bind a type to a coder.
- CombineFn specific way: while extending CombineFn, overwrite both getAccumulatorCoder(CoderRegistry, Coder<InputT>) and getDefaultOutputCoder(CoderRegistry, Coder<InputT>).
An example
The following example is extracted from Nexmark auction system. Here, the use case is to calculate the average price for the last 3 closed auctions:
Here, the input PCollection is the collection of the auction-winning bids with their associated price and timestamp. Each bid contains the final price of a given auction.
The types used in the custom Combine are: InputT=Bid which implements Serializable. The accumulator class (AccumT) is a list of sorted bids so AccumT=List and the output is a mean so OutputT=Double. Beam SDK provides coders for all these types automatically through the mechanism mentioned above.
As you can see, we added some computation code (sort bids by timestamp then price and keep last 3) during both the add step (addInput() method implementation) and the merge step (mergeAccumulators() method implementation). During the add step we could have simply added each input to the accumulator without keeping the last 3. but this would have lead to some load in the next merging step. It is more efficient to keep only 3 as early as the adding step, this way, the merging step will have fewer values to combine.
The average price is only calculated at the end when the output value is extracted from the final combined accumulator (extractOutput() method implementation).