🕥 7 min.
Introduction
This is the first article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.
Incremental join example
Let's say we have a streaming system that records events of an auction platform. Such events can be Persons creating Auctions and placing bids on auctions.
Consider, you have 2 PCollections as input:
- One of the Persons details that are collected at their connection into the auction system
- One of the Auctions details that are are collected at the auction creation
Now, you want regular updates on who is selling in particular US states.
So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the CoGroupByKey transform of the SDK. This transform works as illustrated below:
This is similar to a SQL Join. The 2 PCollections are keyed by the same key: personId is the unique key of the Person object and sellerId is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.
First step is to group the 2 PCollections in a KeyedPCollectionTuple before joining. For that we need to tag the elements of the PCollections with a TupleTag (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.
Then the actual CoGroupByKey transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.
Stateful processing
Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to store the elements in a persistent state waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the ParDo transform but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to the official Beam stateful processing documentation.
So the architecture of the stateful join in our example will be:
- The Person element will be stored in a persistent state in order to match future auctions by that person. We also set a timer to clear the person state after a TTL (see Timer API in the SDK)
- The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared
The code of the incremental join
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
int numEventsInPane = 30; | |
PCollection<Event> eventsWindowed = | |
events.apply( | |
Window.<Event>into(new GlobalWindows()) | |
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(numEventsInPane))) | |
.discardingFiredPanes() | |
.withAllowedLateness(Duration.ZERO)); | |
PCollection<KV<Long, Auction>> auctionsBySellerId = | |
eventsWindowed | |
// Only want the auction events. | |
.apply(NexmarkQueryUtil.JUST_AUCTIONS) | |
// Key auctions by their seller id. | |
.apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER); | |
PCollection<KV<Long, Person>> personsById = | |
eventsWindowed | |
// Only want the people events. | |
.apply(NexmarkQueryUtil.JUST_PERSONS) | |
// Key people by their id. | |
.apply("PersonById", NexmarkQueryUtil.PERSON_BY_ID); | |
// Join auctions and people. | |
// create the tuple of PCollections | |
KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, auctionsBySellerId) | |
.and(NexmarkQueryUtil.PERSON_TAG, personsById) | |
// group auctions and persons by personId | |
.apply(CoGroupByKey.create()) | |
.apply(name + ".Join", ParDo.of(joinDoFn)) | |
// Export what we want. | |
.apply( | |
name + ".Export", | |
ParDo.of( | |
new DoFn<KV<Auction, Person>, NameCityStateId>() { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
Auction auction = c.element().getKey(); | |
Person person = c.element().getValue(); | |
c.output( | |
new NameCityStateId(person.name, person.city, person.state, auction.id)); | |
} | |
})); |
We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows. We rather apply the Global window to the elements and set a trigger to output the results each time n elements were processed.
- Defines 2 ValueState to store the Persons and Auctions
- Defines 1 Timer that fires after a configured TTL to clear the Person ValueState
- For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates.
Conclusion
I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:
- The extraction of auctions out of events
- The extraction of persons out of events
- The keying of the auctions PCollection
- The keying of the persons PCollection
- The apply of the stateful JoinDoFn
- The export to the POJO output
The actual join resides only in the creation of the KeyedPCollectionTuple and the apply of the CoGroupByKey. The incremental part resides in the apply of windowing: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying FixedWindows to the input PCollection.