Thursday, March 30, 2023

Flink: Howto create a batch source with the new Source framework

🕥 10 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!

Implementing the source components


The source architecture is depicted in the diagrams below:





Source

The source interface only does the "glue" between all the other components. Its role is to instantiate all of them and to define the source Boundedness. We also do the source configuration here along with user configuration validation.

SourceReader

As shown in the graphic above, the instances of the SourceReader (which we will call simply readers in the continuation of this article) run in parallel in task managers to read the actual data which is divided into Splits. Readers request splits from the SplitEnumerator and the resulting splits are assigned to them in return.

Flink provides the SourceReaderBase implementation that takes care of all the threading. Flink also provides a useful extension to this class for most cases: SingleThreadMultiplexSourceReaderBase. This class has the threading model already configured: each SplitReader instance reads splits using one thread (but there are several SplitReader instances that live among task managers).
What we have left to do in the SourceReader class is:
  • Provide a SplitReader supplier
  • Create a RecordEmitter
  • Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier is created in the SourceReader constructor in a super() call, using a SourceReader factory to create the shared resources and pass them to the supplier is a good idea.
  • Implement start(): here we should ask the enumerator for our first split
  • Override close() in SourceReaderBase parent class to free up any created resources (the shared resources for example)
  • Implement initializedState() to create a mutable SplitState from a Split
  • Implement toSplitType() to create a Split from the mutable SplitState
  • Implement onSplitFinished(): here, as it is a batch source (finite data), we should ask the Enumerator for next split


Split and SplitState

The SourceSplit represents a partition of the source data. What defines a split depends on the backend we are reading from. It could be a (partition start, partition end) tuple or an (offset, split size) tuple for example. 

In any case, the Split object should be seen as an immutable object: any update to it should be done on the associated SplitState. The split state is the one that will be stored inside the Flink checkpoints. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we must store in the split state the current state of the reading process. This current state needs to be something serializable (because it will be part of a checkpoint) and something that the backend source can resume from. That way, in case of failover, the reading could be resumed from where it was left off. Thus we ensure there will be no duplicates or lost data.  For example, if the records reading order is deterministic in the backend, then the split state can store the number n of already read records to restart at n+1 after failover

SplitEnumerator and SplitEnumeratorState

The SplitEnumerator is responsible for creating the splits and serving them to the readers. Whenever possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For that we implement SplitEnumerator#handleSplitRequest(). Lazy splits generation  is preferable to splits discovery, in which we pre-generate all the splits and store them waiting to assign them to the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a memory which could be problematic in case of straggling readers. The framework offers the ability to act upon reader registration by implementing addReader() but, as we do lazy splits generation, we have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken into account to avoid consuming too much memory. 

Long story short,  the tricky part of the source implementation is splitting the source data. The good equilibrium to find is not to have to many splits (which could lead to too much memory consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this equilibrium is to evaluate the size of the source data upfront and allow the user to specify the maximum memory a split will take. That way they can configure this parameter accordingly to the memory available on the task managers. This parameter is optional so the source needs to provide a default value. Also, the source needs to control that the user provided max-split-size is not too little which would lead to too many splits. The general rule of thumb is to let the user some freedom but protect him from unwanted behavior. For these safety measures, rigid thresholds don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if the job is regularly run on a growing table, at some point there will be more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of the source data needs to be evaluated without reading the actual data.  For the Cassandra connector I did like this

Another important topic is state. If the job manager fails, the split enumerator needs to recover. For that, as for the split, we need to provide a state for the enumerator that will be part of a checkpoint. Upon recovery, the enumerator is reconstructed and receives an enumerator state for recovering its previous state. Upon checkpointing, the enumerator returns its state when SplitEnumerator#snapshotState()
is called. The state must contain everything needed to resume where the enumerator was left off after failover. In lazy split generation scenario, the state will contain everything needed to generate the next split whenever asked to. It can be for example the start offset of next split, split size, number of splits still to generate etc... But the SplitEnumeratorState must also contain a list of splits, not the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if it was assigned splits after last checkpoint, then the checkpoint will not contain those splits. Consequently, upon restoration, the reader won't have the splits assigned anymore. There is a callback to deal with that case: addSplitsBack(). There, the splits that were assigned to the failing reader, can be put back into the enumerator state for later re-assignment to readers. There is no memory size risk here as the number of splits to reassign is pretty low.

The above topics are the more important regarding splitting. There are 2 methods left to implement: the usual start()/close() methods for resources creation/disposal. Regarding implementing start(), the Flink connector framework provides enumeratorContext#callAsync() utility to run long processing asynchronously such as splits preparation or splits discovery (if lazy splits generation is impossible). Indeed, the start() method runs in the source coordinator thread, we don't want to block it for a long time.

SplitReader

This class is responsible for reading the actual splits that it receives when the framework calls handleSplitsChanges(). The main part of the split reader is the fetch() implementation where we read all the splits received and return the read records as a RecordsBySplits object. This object contains a map of the split ids to the belonging records and also the ids of the finished splits. Important points need to be considered:

  • The fetch call must be non-blocking. If any call in its code is synchronous and potentially long, an escape from the fetch() must be provided. When the framework calls wakeUp() we should interrupt the fetch for example by setting an AtomicBoolean.
  • Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it from the list of splits to read and add its id to the finished splits (along with empty splits) in the RecordsBySplits that we return.

It is totally fine for the implementer to exit the fetch() method early. Also a failure could interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the fetch method must resume the reading from where it was left off using the split state already discussed. If resuming the read of a split is impossible because of backend constraints, then the only solution is to read splits atomically (either not read the split at all, or read it entirely). That way, in case of interrupted fetch, nothing will be output and the split could be read again from the beginning at next fetch call leading to no duplicates. But if the split is read entirely, there are points to consider:
  • We should ensure that the total split content (records from the source) fits in memory for example by specifying a max split size in bytes (see SplitEnumarator)
  • The split state becomes useless, only a Split class is needed

RecordEmitter

The SplitReader reads records in the form of an intermediary record format that the implementer provides for each record. It can be the raw format returned by the backend or any format allowing to extract the actual record afterwards. This format is not the final output format expected by the source. It contains anything needed to do the conversion to the record output format. We need to implement RecordEmitter#emitRecord() to do this conversion. A good pattern here is to initialize the RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method maybe interrupted in the middle. In that case, the same set of records will be passed to the record emitter again later.

Serializers

We need to provide singleton serializers for:
  • Split: splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state
  • SplitEnumeratorState:  the serializer is used for the result of the SplitEnumerator#snapshotState() 

For both, we need to implement SimpleVersionedSerializer. Care needs to be taken at some important points:

  • Using Java serialization is forbidden in Flink mainly for migration concerns. We should rather manually write the fields of the objects using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String, Integer, Long...), we should write the size of the object in bytes as an Integer and then write the object converted to byte[]. Similar method is used to serialize collections. First write the number of elements of the collection, then serialize all the contained objects. Of course, for deserialization we do the exact same reading with the same order. 
  • There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can do so by using.
ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
The initial stream size depends on the size of a split. 

Testing the source


For the sake of concision of this article, testing the source will be the object of the next article. Stay tuned !
    

Conclusion


This article gathering the implementation field feedback was needed as the javadocs cannot cover all the implementation details for high-performance and maintainable sources. I hope you enjoyed reading and that it gave you the desire to contribute a new connector to the Flink project !

Tuesday, March 21, 2023

Cassandra: evaluate table size without reading the data

🕥 3 min.

Introduction


While developing the Cassandra source connector for Flink I needed a way to ensure that the data I was reading fitted in memory. For that I had to evaluate how big the source table was in order to know how to divide it. But, of course, it had to be done without reading the data itself. Here is how it was achieved.

Cassandra size estimates statistics


Cassandra partitioning is based on tokens arranged into a ring. Cassandra cluster provides statistical information about tables sizes in a system table called system.size_estimates. It provides per-table information on what we will call in this article token ranges: number of partitions taken by the table, mean partition size, start and end tokens. These elements can be used to get a rough estimation of the table size.

To get these information, we need to issue this request: 

SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?

We will receive the token ranges that the table occupies. To get the size of the table, we need to sum them that way:

table_size_on_this_node = sum (mean_partition_size * partition_count) 

You see in the formula above that the calculated size is only for one Cassandra node as the system table is the one of the node. We need to extrapolate to the whole cluster to avoid requesting all the nodes of the cluster. For that we will calculate the ring  fraction this node represents in the cluster. The ring fraction is a percentage obtained like this: 

ring_fraction = sum (token_ranges_size) / ring_size

ring_size is a constant depending on the configured Cassandra cluster partitioner.

token_ranges_size = sum(distance(range_start, range_end))

There can be overlap between tokens so the distance method needs to be a little more complex:

private BigInteger distance(BigInteger token1, BigInteger token2) {
// token2 > token1
if (token2.compareTo(token1) > 0) {
return token2.subtract(token1);
} else {
return token2.subtract(token1).add(partitioner.ringSize);
}
}
So, now that we have our ring fraction, we can extrapolate the node table size to get the total table size: 

table_size = table_size_on_this_node / ring_fraction

And here we are !

Updating the statistics


These size estimates statistics are updated when the Cassandra cluster flushes its Memtables (tables in memory) into SSTabes (tables on disc). Especially, it updates the partitions information. The flush of the table can be done through the nodetool command : 

nodetool flush keyspace table

    In integration tests, we often want to read the test data just after writing it. In that case, the cluster has not done the flush yet so the size estimates are not updated. It is worth mentioning that the official Cassandra docker image contains nodetool binary and that the flush can be done from within the container using testContainers with the code below:

cassandraContainer.execInContainer("nodetool", "flush", keyspace, table);
In that case, a local JMX call is issued and local JMX is enabled by default on the Cassandra cluster.

Conclusion


I guess this article is mostly useful for Cassandra connector authors or DevOps people. I hope you enjoyed reading.