Tuesday, November 10, 2020

Tricky use cases of Apache Beam 1/3: incremental join

🕥 7 min.

Introduction


This is the first article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

Incremental join example


Let's say we have a streaming system that records events of an auction platform. Such events can be Persons creating Auctions and placing bids on auctions.

Consider, you have 2 PCollections as input:
  • One of the Persons details that are collected at their connection into the auction system
  • One of the Auctions details that are are collected at the auction creation
Now, you want regular updates on who is selling in particular US states.
So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the CoGroupByKey transform of the SDK. This transform works as illustrated below:




This is similar to a SQL Join. The 2 PCollections are keyed by the same key: personId is the unique key of the Person object and sellerId is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.

First step is to group the 2 PCollections in a KeyedPCollectionTuple before joining. For that we need to tag the elements of the PCollections with a TupleTag (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.
Then the actual CoGroupByKey transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.


Stateful processing


Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to store the elements in a persistent state waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the ParDo transform but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to the official Beam stateful processing documentation.

So the architecture of the stateful join in our example will be:
  • The Person element will be stored in a persistent state in order to match future auctions by that person. We also set a timer to clear the person state after a TTL (see Timer API in the SDK)
  • The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared 


The code of the incremental join


int numEventsInPane = 30;
PCollection<Event> eventsWindowed =
events.apply(
Window.<Event>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(numEventsInPane)))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO));
PCollection<KV<Long, Auction>> auctionsBySellerId =
eventsWindowed
// Only want the auction events.
.apply(NexmarkQueryUtil.JUST_AUCTIONS)
// Key auctions by their seller id.
.apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER);
PCollection<KV<Long, Person>> personsById =
eventsWindowed
// Only want the people events.
.apply(NexmarkQueryUtil.JUST_PERSONS)
// Key people by their id.
.apply("PersonById", NexmarkQueryUtil.PERSON_BY_ID);
// Join auctions and people.
// create the tuple of PCollections
KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, auctionsBySellerId)
.and(NexmarkQueryUtil.PERSON_TAG, personsById)
// group auctions and persons by personId
.apply(CoGroupByKey.create())
.apply(name + ".Join", ParDo.of(joinDoFn))
// Export what we want.
.apply(
name + ".Export",
ParDo.of(
new DoFn<KV<Auction, Person>, NameCityStateId>() {
@ProcessElement
public void processElement(ProcessContext c) {
Auction auction = c.element().getKey();
Person person = c.element().getValue();
c.output(
new NameCityStateId(person.name, person.city, person.state, auction.id));
}
}));
I think it deserves some explanations: 

We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows. We rather apply the Global window to the elements and set a trigger to output the results each time n elements were processed.

We then extract the auctions out of the stream of events and key the auctions by there sellerId . We then do the same for the Persons: extract them out of the events and key them by their personId (which is the same as the sellerId remember).

Now comes the actual join: we first group the 2 PCollections in a KeyedPCollectionTuple as said above. We then apply the actual CoGroupByKey transform to the KeyedPCollectionTuple.

Now comes the stateful processing: as explained above, we need to store elements in a persistent state to deal with out of order data. This is done by applying a Pardo to the output joint PCollection. The topic of this article is not stateful processing so, if you want some details, the complete code of JoinDoFn is on Beam github. I will just say here that JoinDoFn does the following:

  • Defines 2 ValueState to store the Persons and Auctions
  • Defines 1 Timer that fires after a configured TTL to clear the Person ValueState
  • For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates. 

The final step is to export the result by applying a ParDo that creates the output POJOs that gather persons information and auctions information.

Conclusion


I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:
  • The extraction of auctions out of events
  • The extraction of persons out of events
  • The keying of the auctions PCollection
  • The keying of the persons PCollection
  • The apply of the stateful JoinDoFn
  • The export to the POJO output 
The actual join resides only in the creation of the KeyedPCollectionTuple and the apply of the CoGroupByKey. The incremental part resides in the apply of windowing: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying FixedWindows to the input PCollection.

That's it, this is how you can create an incremental join pipeline with Apache Beam