Monday, April 3, 2023

Flink: Howto test a batch source with the new Source framework

🕥 6 min.


The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article. Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.

The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.

Unit testing the source

Testing the serializers

In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.

Other unit tests

Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend. 

Integration testing the source

For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:

For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled:
  • testSourceMetrics
  • testSavepoint
  • testScaleUp
  • testScaleDown
  • testTaskManagerFailure 

Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:

We add this annotated field to our ITCase and we're done
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

Backend environment

To test the connector we need a backend to run the connector against. This TestEnvironment 
provides everything related to the backend: the container, its configuration, the session to connect to it, 
and all the elements bound to the whole test case (table space, initialization requests ...)  

We add this annotated field to our ITCase
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:
  • Redirect container output (error and standard output) to Flink logs
  • Set the different timeouts to cope with CI server load
  • Set retrial mechanisms for connection, initialization requests etc... for the same reason

Checkpointing semantics

In big data execution engines, there are 2 levels of guarantee regarding source and sinks: 
  • At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
  • Exactly once: upon failure and recovery, every record will be reflected exactly once

By the following code we verify that the source supports exactly once semantics:
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};

That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override SourceTestSuiteBase#checkResultWithSemantic in out ITCase: 

protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
CheckpointingMode semantic,
Integer limit) {
if (limit != null) {
Runnable runnable =
() ->
.matchesRecordsFromSource(testData, semantic);

} else {
.matchesRecordsFromSource(testData, semantic);

This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered()

Test context

The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite). It is linked to the ITCase through a factory of TestContext as shown below. 

TestContextFactory contextFactory = new TestContextFactory(testEnvironment);

TestContext implements DataStreamSourceExternalContext:
  • We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
  • close() : drop the created test case resources
  • getProducedType(): specify the test output type of the source such as a test Pojo for example
  • getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
  • createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
  • createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
  • generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.

Contributing the source to Flink

Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow this official wiki page.


This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :) 

Thursday, March 30, 2023

Flink: Howto create a batch source with the new Source framework

🕥 10 min.


The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!

Implementing the source components

The source architecture is depicted in the diagrams below:


The source interface only does the "glue" between all the other components. Its role is to instantiate all of them and to define the source Boundedness. We also do the source configuration here along with user configuration validation.


As shown in the graphic above, the instances of the SourceReader (which we will call simply readers in the continuation of this article) run in parallel in task managers to read the actual data which is divided into Splits. Readers request splits from the SplitEnumerator and the resulting splits are assigned to them in return.

Flink provides the SourceReaderBase implementation that takes care of all the threading. Flink also provides a useful extension to this class for most cases: SingleThreadMultiplexSourceReaderBase. This class has the threading model already configured: each SplitReader instance reads splits using one thread (but there are several SplitReader instances that live among task managers).
What we have left to do in the SourceReader class is:
  • Provide a SplitReader supplier
  • Create a RecordEmitter
  • Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier is created in the SourceReader constructor in a super() call, using a SourceReader factory to create the shared resources and pass them to the supplier is a good idea.
  • Implement start(): here we should ask the enumerator for our first split
  • Override close() in SourceReaderBase parent class to free up any created resources (the shared resources for example)
  • Implement initializedState() to create a mutable SplitState from a Split
  • Implement toSplitType() to create a Split from the mutable SplitState
  • Implement onSplitFinished(): here, as it is a batch source (finite data), we should ask the Enumerator for next split

Split and SplitState

The SourceSplit represents a partition of the source data. What defines a split depends on the backend we are reading from. It could be a (partition start, partition end) tuple or an (offset, split size) tuple for example. 

In any case, the Split object should be seen as an immutable object: any update to it should be done on the associated SplitState. The split state is the one that will be stored inside the Flink checkpoints. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we must store in the split state the current state of the reading process. This current state needs to be something serializable (because it will be part of a checkpoint) and something that the backend source can resume from. That way, in case of failover, the reading could be resumed from where it was left off. Thus we ensure there will be no duplicates or lost data.  For example, if the records reading order is deterministic in the backend, then the split state can store the number n of already read records to restart at n+1 after failover

SplitEnumerator and SplitEnumeratorState

The SplitEnumerator is responsible for creating the splits and serving them to the readers. Whenever possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For that we implement SplitEnumerator#handleSplitRequest(). Lazy splits generation  is preferable to splits discovery, in which we pre-generate all the splits and store them waiting to assign them to the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a memory which could be problematic in case of straggling readers. The framework offers the ability to act upon reader registration by implementing addReader() but, as we do lazy splits generation, we have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken into account to avoid consuming too much memory. 

Long story short,  the tricky part of the source implementation is splitting the source data. The good equilibrium to find is not to have to many splits (which could lead to too much memory consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this equilibrium is to evaluate the size of the source data upfront and allow the user to specify the maximum memory a split will take. That way they can configure this parameter accordingly to the memory available on the task managers. This parameter is optional so the source needs to provide a default value. Also, the source needs to control that the user provided max-split-size is not too little which would lead to too many splits. The general rule of thumb is to let the user some freedom but protect him from unwanted behavior. For these safety measures, rigid thresholds don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if the job is regularly run on a growing table, at some point there will be more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of the source data needs to be evaluated without reading the actual data.  For the Cassandra connector I did like this

Another important topic is state. If the job manager fails, the split enumerator needs to recover. For that, as for the split, we need to provide a state for the enumerator that will be part of a checkpoint. Upon recovery, the enumerator is reconstructed and receives an enumerator state for recovering its previous state. Upon checkpointing, the enumerator returns its state when SplitEnumerator#snapshotState()
is called. The state must contain everything needed to resume where the enumerator was left off after failover. In lazy split generation scenario, the state will contain everything needed to generate the next split whenever asked to. It can be for example the start offset of next split, split size, number of splits still to generate etc... But the SplitEnumeratorState must also contain a list of splits, not the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if it was assigned splits after last checkpoint, then the checkpoint will not contain those splits. Consequently, upon restoration, the reader won't have the splits assigned anymore. There is a callback to deal with that case: addSplitsBack(). There, the splits that were assigned to the failing reader, can be put back into the enumerator state for later re-assignment to readers. There is no memory size risk here as the number of splits to reassign is pretty low.

The above topics are the more important regarding splitting. There are 2 methods left to implement: the usual start()/close() methods for resources creation/disposal. Regarding implementing start(), the Flink connector framework provides enumeratorContext#callAsync() utility to run long processing asynchronously such as splits preparation or splits discovery (if lazy splits generation is impossible). Indeed, the start() method runs in the source coordinator thread, we don't want to block it for a long time.


This class is responsible for reading the actual splits that it receives when the framework calls handleSplitsChanges(). The main part of the split reader is the fetch() implementation where we read all the splits received and return the read records as a RecordsBySplits object. This object contains a map of the split ids to the belonging records and also the ids of the finished splits. Important points need to be considered:

  • The fetch call must be non-blocking. If any call in its code is synchronous and potentially long, an escape from the fetch() must be provided. When the framework calls wakeUp() we should interrupt the fetch for example by setting an AtomicBoolean.
  • Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it from the list of splits to read and add its id to the finished splits (along with empty splits) in the RecordsBySplits that we return.

It is totally fine for the implementer to exit the fetch() method early. Also a failure could interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the fetch method must resume the reading from where it was left off using the split state already discussed. If resuming the read of a split is impossible because of backend constraints, then the only solution is to read splits atomically (either not read the split at all, or read it entirely). That way, in case of interrupted fetch, nothing will be output and the split could be read again from the beginning at next fetch call leading to no duplicates. But if the split is read entirely, there are points to consider:
  • We should ensure that the total split content (records from the source) fits in memory for example by specifying a max split size in bytes (see SplitEnumarator)
  • The split state becomes useless, only a Split class is needed


The SplitReader reads records in the form of an intermediary record format that the implementer provides for each record. It can be the raw format returned by the backend or any format allowing to extract the actual record afterwards. This format is not the final output format expected by the source. It contains anything needed to do the conversion to the record output format. We need to implement RecordEmitter#emitRecord() to do this conversion. A good pattern here is to initialize the RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method maybe interrupted in the middle. In that case, the same set of records will be passed to the record emitter again later.


We need to provide singleton serializers for:
  • Split: splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state
  • SplitEnumeratorState:  the serializer is used for the result of the SplitEnumerator#snapshotState() 

For both, we need to implement SimpleVersionedSerializer. Care needs to be taken at some important points:

  • Using Java serialization is forbidden in Flink mainly for migration concerns. We should rather manually write the fields of the objects using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String, Integer, Long...), we should write the size of the object in bytes as an Integer and then write the object converted to byte[]. Similar method is used to serialize collections. First write the number of elements of the collection, then serialize all the contained objects. Of course, for deserialization we do the exact same reading with the same order. 
  • There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can do so by using.
ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
The initial stream size depends on the size of a split. 

Testing the source

For the sake of concision of this article, testing the source will be the object of the next article. Stay tuned !


This article gathering the implementation field feedback was needed as the javadocs cannot cover all the implementation details for high-performance and maintainable sources. I hope you enjoyed reading and that it gave you the desire to contribute a new connector to the Flink project !

Tuesday, March 21, 2023

Cassandra: evaluate table size without reading the data

🕥 3 min.


While developing the Cassandra source connector for Flink I needed a way to ensure that the data I was reading fitted in memory. For that I had to evaluate how big the source table was in order to know how to divide it. But, of course, it had to be done without reading the data itself. Here is how it was achieved.

Cassandra size estimates statistics

Cassandra partitioning is based on tokens arranged into a ring. Cassandra cluster provides statistical information about tables sizes in a system table called system.size_estimates. It provides per-table information on what we will call in this article token ranges: number of partitions taken by the table, mean partition size, start and end tokens. These elements can be used to get a rough estimation of the table size.

To get these information, we need to issue this request: 

SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?

We will receive the token ranges that the table occupies. To get the size of the table, we need to sum them that way:

table_size_on_this_node = sum (mean_partition_size * partition_count) 

You see in the formula above that the calculated size is only for one Cassandra node as the system table is the one of the node. We need to extrapolate to the whole cluster to avoid requesting all the nodes of the cluster. For that we will calculate the ring  fraction this node represents in the cluster. The ring fraction is a percentage obtained like this: 

ring_fraction = sum (token_ranges_size) / ring_size

ring_size is a constant depending on the configured Cassandra cluster partitioner.

token_ranges_size = sum(distance(range_start, range_end))

There can be overlap between tokens so the distance method needs to be a little more complex:

private BigInteger distance(BigInteger token1, BigInteger token2) {
// token2 > token1
if (token2.compareTo(token1) > 0) {
return token2.subtract(token1);
} else {
return token2.subtract(token1).add(partitioner.ringSize);
So, now that we have our ring fraction, we can extrapolate the node table size to get the total table size: 

table_size = table_size_on_this_node / ring_fraction

And here we are !

Updating the statistics

These size estimates statistics are updated when the Cassandra cluster flushes its Memtables (tables in memory) into SSTabes (tables on disc). Especially, it updates the partitions information. The flush of the table can be done through the nodetool command : 

nodetool flush keyspace table

    In integration tests, we often want to read the test data just after writing it. In that case, the cluster has not done the flush yet so the size estimates are not updated. It is worth mentioning that the official Cassandra docker image contains nodetool binary and that the flush can be done from within the container using testContainers with the code below:

cassandraContainer.execInContainer("nodetool", "flush", keyspace, table);
In that case, a local JMX call is issued and local JMX is enabled by default on the Cassandra cluster.


I guess this article is mostly useful for Cassandra connector authors or DevOps people. I hope you enjoyed reading.

Monday, November 7, 2022

Flink: Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

🕥 5 min.


Flink has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API). This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. 
All the code presented in this article is available in the tpcds-benchmark-flink repo
The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs. 

What is TPCDS?

TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective performance data of Big Data engines to industry users.

Chosen TPCDS query

The chosen query for this article is Query3  because it contains all the more common analytics operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on store sales. Its SQL code is presented here:
SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 128
AND dt.d_moy=11
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, sum_agg desc, brand_id

The initial DataSet pipeline

The pipeline we are migrating is this batch pipeline that implements the above query using the DataSet API and Row as dataset element type.

Migrating the DataSet pipeline to a DataStream pipeline

Instead of going through all of the code which is available here we will rather focus on some key areas of the migration. The code is based on the latest release of Flink at the time this article was written: version 1.16.0. 

DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink execution environment, it is also needed to migrate some operations. Indeed, the DataStream API semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, compared to the DataSet API which operates on finite data, there are adaptations to be made on some operations.

Setting the execution environment

We start by moving from ExecutionEnvironment to StreamExecutionEnvironment. Then, as the source in this pipeline is bounded, we can use either the default streaming execution mode or the batch mode. In batch mode the tasks of the job can be separated into stages that can be executed one after another. In streaming mode all tasks need to be running all the time and records are sent to downstream tasks as soon as they are available. 

Here we keep the default streaming mode that gives good performance on this pipeline and that would allow to run the same pipeline with no change on an unbounded source.

Using the streaming sources and datasets

SourcesDataSource<T> becomes DataStreamSource<T> after the call to env.createInput().

DatasetsDataSet<T> are now DataStream<T> and subclasses.

Migrating the join operation

The DataStream join operator does not yet support aggregations in batch mode (see FLINK-22587 for details). Basically, the problem is with the trigger of the default GlobalWindow which never fires so the records are never output. We will workaround this problem by applying a custom EndOfStream window. It is a window assigner that assigns all the records to a single TimeWindow. So, like for the GlobalWindow, all the records are assigned to the same window except that this window's trigger is based on the end of the window (which is set to Long.MAX_VALUE). As we are on a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will thus cross the end of the time window and consequently fire the trigger and output the records.

Now that we have a working triggering, we need to call a standard join with the  Row::join function.

Migrating the group by and reduce (sum) operations

DataStream API has no more groupBy() method, we now use the keyBy() method. An aggregation downstream will be applied on elements with the same key exactly as a GroupReduceFunction would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following operator is a reducer: the summing operation downstream is still done through a ReduceFunction but this time the operator reduces the elements incrementally instead of receiving the rows as a Collection. To make the sum we store the reduced row the partially aggregated sum. Due to incremental reduce, we also need to distinguish if we received an already reduced row (in that case, we read the partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). 

Migrating the order by operation

The sort of the datastream is done by applying a KeyedProcessFunction.

But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to "divide" the data to have output times. For that we need to set a timer to output the resulting data. We set a timer to fire at the end of the EndOfStream window meaning that the timer will fire at the end of the batch.

To sort the data, we store the incoming rows inside a ListState and sort them at output time, when the timer fires in the onTimer() callback. 

Another thing: to be able to use Flink state, we need to key the datastream beforehand even if there is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so that there is a single state.

Migrating the limit operation

As all the elements of the DataStream were keyed by the same "0" key, they are kept in the same "
group". So we can implement the SQL LIMIT with
a ProcessFunction with a counter that will output only the first 100 elements.

Migrating the sink operation

As with sources, there were big changes in sinks with recent versions of Flink. We now use the Sink interface that requires an Encoder. But the resulting code is very similar to the one using the DataSet API. It's only that Encoder#encode() method writes bytes when TextOutputFormat.TextFormatter#format() wrote Strings.


As you saw for the migration of the join operation, the new unified DataStream API has some limitations left in batch mode. In addition, the order by and limit resulting code is quite manual and requires the help of the Flink state API for the migration. For all these reasons, the Flink community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code that uses the Flink SQL/Table API in the Query3ViaFlinkSQLCSV class.

Tuesday, January 5, 2021

Tricky use cases of Apache Beam 3/3: Custom Combine

🕥 5 min.


This is the third article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam Combine?

Combine is the Beam word for the reduce transformation. This transformation combines data spread across workers to make a computation. The computation can be whatever you like. In the Beam SDK, Combine is used under the wood in various built-in transforms: Count, Sum, Mean ... but you can also create your own Combine implementations.

The overview architecture of the Combine is like this:

The PCollection is split across the cluster (consider here that we have a cluster of 3 nodes). On each of the nodes, Beam creates an Accumulator.  Each input value is added to the local accumulator. Then the accumulators are merged into a single combined output accumulator from which the result is extracted. At each of these 3 steps, the user can add whatever computation code is necessary for his use case.

How to create your own combiner?

To create your own combiner you need to extend CombineFn<InputT,AccumT,OutputT>. You'll have to override some methods:

  • createAccumulator() operation is invoked to create a fresh mutable accumulator value of type AccumT, initialized to represent the combination of zero values.
  • addInput(AccumT, InputT) operation is invoked on each input value to add it to the local accumulator AccumT value. 
  • mergeAccumulators(java.lang.Iterable<AccumT>) operation is invoked to combine a collection of accumulators AccumT values into a single combined output accumulator AccumT value, once the merging accumulators have had all the input values in their partition added to them. This operation is invoked repeatedly, until there is only one accumulator value left.
  • extractOutput(AccumT) operation is invoked on the final accumulator AccumT value to get the output OutputT value.
Your combiner can be called in the pipeline with:

inputPCollection.apply(Combine.globally(new CustomCombineFn()));

or for keyed input PCollections:

inputPCollection.apply(Combine.perKey(new CustomCombineFn()));

Please note that combining functions should be associative and commutative. Associativity is required because input values are first broken up into partitions before being combined, and their intermediate results further combined, in an arbitrary tree structure. Commutativity is required because any order of the input values is ignored when breaking up input values into partitions.

Also, please note about coders: some form of data encoding is required when using custom types in a CombineFn which do not have well-known coders. If so, usually your custom type will just implement Serializable and Beam will provide a coder automatically. Beam relies on the generic CoderProvider, which is able to provide a coder for any Serializable if applicable. In cases where Serializable is not efficient, or inapplicable, there are two alternatives for encoding:

  • Default CoderRegistry: for example, implement a coder class explicitly and use the @DefaultCoder tag. See the CoderRegistry for the numerous ways in which to bind a type to a coder.
  • CombineFn specific way: while extending CombineFn, overwrite both getAccumulatorCoder(CoderRegistry, Coder<InputT>) and getDefaultOutputCoder(CoderRegistry, Coder<InputT>).

An example

The following example is extracted from Nexmark auction system. Here, the use case is to calculate the average price for the last 3 closed auctions:

Here, the input PCollection is the collection of the auction-winning bids with their associated price and timestamp. Each bid contains the final price of a given auction. 

The types used in the custom Combine are: InputT=Bid which implements Serializable. The accumulator class (AccumT) is a list of sorted bids so AccumT=List and the output is a mean so OutputT=Double. Beam SDK provides coders for all these types automatically through the mechanism mentioned above.

As you can see, we added some computation code (sort bids by timestamp then price and keep last 3) during both the add step (addInput() method implementation) and the merge step (mergeAccumulators() method implementation). During the add step we could have simply added each input to the accumulator without keeping the last 3. but this would have lead to some load in the next merging step. It is more efficient to keep only 3 as early as the adding step, this way, the merging step will have fewer values to combine.

The average price is only calculated at the end when the output value is extracted from the final combined accumulator (extractOutput() method implementation).

Now you know how to create a custom Beam combiner to adapt to whatever use case you might have !

Wednesday, December 2, 2020

Tricky use cases of Apache Beam 2/3: Custom windows

🕥 9 min.


This is the second article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

What is Beam windowing?

In streaming systems, the flow of data is continuous. To be able to process it, we need to divide this flow of data into finite chunks. These finite chunks are windows. They are a way to temporally group the elements based on their event timestamp (the time at which the element was produced).

As Beam provides a unified API for batch mode and streaming mode, all the elements of Beam pipelines (even in batch mode) are stored in windows.  In batch mode, by default, there is a single window called the Global Window that stores all the elements of the pipeline. It is the default window that you get behind the scenes when you don't specify a window in your pipeline. But, even in batch mode, you could specify another windowing such as the ones below based on the timestamp of the elements. 

There are several types of windows provided by the Beam SDK:

Fixed windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Fixed windows are defined by the duration of the window. In the example above they allow to express the use case: "I want to have the last 30 seconds of data". 

Sliding windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Sliding windows are similar to fixed windows except they overlap. They are defined by both a duration and a sliding period. In the example above they allow to express the use case: "I want to have the last 60 seconds of data every 30 seconds". 

Sessions windows

* The diagram show keys but, of course, these windows work for non keyed PCollections

Sessions windows are very similar to web sessions. They are defined by a gap duration: if no data comes for more than the gap duration, then the next element to come is considered belonging to another session. Of course, the gap duration is measured in event time, in other words: if the next element's timestamp is more than gap duration after the previous element, then it will be put into another session window.

Custom windows

So far, the window types that Beam provides are quite similar to what other Big Data streaming systems provide except maybe the sessions windows that not many systems provide. But, what is really different with Beam is that it provides a way for the user to define his own windowing by extending the WindowFn function class or one of its sub-classes.

Health monitoring system example

To illustrate Beam custom windows, we will consider this use case: let's say we have a monitoring system that reports events about the health of a large and complex e-commerce platform. 

For the sake of simplicity, the event will be a simple class with just an enum status:
A big amount of events are produced, so the monitoring is made through a Big Bata pipeline. And this pipeline uses Beam. It is a streaming pipeline as the events are continuously arriving. We want fine oversight on what's going on so we set the granularity of the monitoring to 10 minutes. Thus, in the pipeline, we divide the collections of events into fixed windows of 10 minutes. But there is something to consider about the health of the e-commerce platform: when a failure occurs, it has consequences on the health of the overall platform for about 30 min (time for the clusters to failover, load to decrease etc...), the system is thus in recovered state. As a consequence, in the monitoring system, we want a failure event to be reported for 30 min after its occurrence. 


To implement the above desired behavior, we need to assign events to windows depending on the type of these events. This is a perfect use case for Beam custom windows. We will define a MonitoringFixedWindows custom window that assigns HEALTHY events to the current fixed window and FAILURE events to the current and the next 3 fixed windows:

The custom window is similar to FixedWindows, but we do not extend the FixedWindows class because FixedWindows assigns an element to only a single window. In our case, in the case of a FAILURE event, we want it to be assigned to 4 windows (the current and the next 3 ones).

The lowest function class in WindowFn hierarchy that we can extend is NonMergingWindowFn. It is simply windows that do not merge between each other (see the code).

The interesting part of this code is the override of the assignWindows method. It is there that resides the whole essence of custom windows. It allows for a given timestamp (of an element) to give the list of windows it should be assigned to. In our case we return a list of IntervalWindows (simply windows with boundaries) because our windowing function class deals with IntervalWindows.

The other overrides are quite straightforward:
  • windowCoder : as said above, all the elements in Beam are in windows. And they will be serialized at some point, so the window information needs to be serialized as well. WindowCoder is simply a Coder for the custom window. The custom window deals with IntervalWindow so we just return this coder which is already provided by the SDK.
  • getDefaultWindowMappingFn: this method is for side inputs (data of another PCollection accessible though a view in the Beam transforms). Here, this custom window does not support side inputs so we just throw an exception
  • isCompatible: 2 custom windowing functions are compatible if they are equal (same type and same boundaries)
  • equals and hashcode: as there will be windows comparisons issued by Beam, we need to override equals and hashcode with the usual code

Now, if we apply this windowing function to an input streaming PCollection. 


The output will be like this: 

Consider that we receive a HEALTHY event with timestamp 5, it will be assigned to the window starting at timestamp 0 and ending à timestamp 10. If we then receive a FAILURE event with timestamp 12, it will be assigned to the window it belongs (starting at timestamp 10 and ending at timestamp 20) and also to the next 3 10-minutes windows.

Now you know how Beam windowing can adapt to whatever use case you might have !

Tuesday, November 10, 2020

Tricky use cases of Apache Beam 1/3: incremental join

🕥 7 min.


This is the first article of a serie of blog posts about tricky use cases of Apache Beam that enlight some of the advanced possibilities of the Beam SDK.

Incremental join example

Let's say we have a streaming system that records events of an auction platform. Such events can be Persons creating Auctions and placing bids on auctions.

Consider, you have 2 PCollections as input:
  • One of the Persons details that are collected at their connection into the auction system
  • One of the Auctions details that are are collected at the auction creation
Now, you want regular updates on who is selling in particular US states.
So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the CoGroupByKey transform of the SDK. This transform works as illustrated below:

This is similar to a SQL Join. The 2 PCollections are keyed by the same key: personId is the unique key of the Person object and sellerId is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.

First step is to group the 2 PCollections in a KeyedPCollectionTuple before joining. For that we need to tag the elements of the PCollections with a TupleTag (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.
Then the actual CoGroupByKey transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.

Stateful processing

Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to store the elements in a persistent state waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the ParDo transform but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to the official Beam stateful processing documentation.

So the architecture of the stateful join in our example will be:
  • The Person element will be stored in a persistent state in order to match future auctions by that person. We also set a timer to clear the person state after a TTL (see Timer API in the SDK)
  • The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared 

The code of the incremental join

I think it deserves some explanations: 

We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows. We rather apply the Global window to the elements and set a trigger to output the results each time n elements were processed.

We then extract the auctions out of the stream of events and key the auctions by there sellerId . We then do the same for the Persons: extract them out of the events and key them by their personId (which is the same as the sellerId remember).

Now comes the actual join: we first group the 2 PCollections in a KeyedPCollectionTuple as said above. We then apply the actual CoGroupByKey transform to the KeyedPCollectionTuple.

Now comes the stateful processing: as explained above, we need to store elements in a persistent state to deal with out of order data. This is done by applying a Pardo to the output joint PCollection. The topic of this article is not stateful processing so, if you want some details, the complete code of JoinDoFn is on Beam github. I will just say here that JoinDoFn does the following:

  • Defines 2 ValueState to store the Persons and Auctions
  • Defines 1 Timer that fires after a configured TTL to clear the Person ValueState
  • For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates. 

The final step is to export the result by applying a ParDo that creates the output POJOs that gather persons information and auctions information.


I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:
  • The extraction of auctions out of events
  • The extraction of persons out of events
  • The keying of the auctions PCollection
  • The keying of the persons PCollection
  • The apply of the stateful JoinDoFn
  • The export to the POJO output 
The actual join resides only in the creation of the KeyedPCollectionTuple and the apply of the CoGroupByKey. The incremental part resides in the apply of windowing: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying FixedWindows to the input PCollection.

That's it, this is how you can create an incremental join pipeline with Apache Beam