Monday, April 3, 2023

Flink: Howto test a batch source with the new Source framework

🕥 6 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article. Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.

The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.

Unit testing the source


Testing the serializers


In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.

Other unit tests


Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend. 

Integration testing the source


For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:


For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled:
  • testSourceMetrics
  • testSavepoint
  • testScaleUp
  • testScaleDown
  • testTaskManagerFailure 

Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:


We add this annotated field to our ITCase and we're done
@TestEnv 
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

Backend environment

To test the connector we need a backend to run the connector against. This TestEnvironment 
provides everything related to the backend: the container, its configuration, the session to connect to it, 
and all the elements bound to the whole test case (table space, initialization requests ...)  

We add this annotated field to our ITCase
@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:
  • Redirect container output (error and standard output) to Flink logs
  • Set the different timeouts to cope with CI server load
  • Set retrial mechanisms for connection, initialization requests etc... for the same reason

Checkpointing semantics


In big data execution engines, there are 2 levels of guarantee regarding source and sinks: 
  • At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
  • Exactly once: upon failure and recovery, every record will be reflected exactly once

By the following code we verify that the source supports exactly once semantics:
@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};


That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override SourceTestSuiteBase#checkResultWithSemantic in out ITCase: 

@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
CheckpointingMode semantic,
Integer limit) {
if (limit != null) {
Runnable runnable =
() ->
CollectIteratorAssertions.assertUnordered(resultIterator)
.withNumRecordsLimit(limit)
.matchesRecordsFromSource(testData, semantic);

assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
} else {
CollectIteratorAssertions.assertUnordered(resultIterator)
.matchesRecordsFromSource(testData, semantic);
}
}


This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered()


Test context


The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite). It is linked to the ITCase through a factory of TestContext as shown below. 


@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);

TestContext implements DataStreamSourceExternalContext:
  • We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
  • close() : drop the created test case resources
  • getProducedType(): specify the test output type of the source such as a test Pojo for example
  • getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
  • createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
  • createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
  • generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.

Contributing the source to Flink


Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow this official wiki page.

Conclusion 


This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :) 

Thursday, March 30, 2023

Flink: Howto create a batch source with the new Source framework

🕥 10 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!

Implementing the source components


The source architecture is depicted in the diagrams below:





Source

The source interface only does the "glue" between all the other components. Its role is to instantiate all of them and to define the source Boundedness. We also do the source configuration here along with user configuration validation.

SourceReader

As shown in the graphic above, the instances of the SourceReader (which we will call simply readers in the continuation of this article) run in parallel in task managers to read the actual data which is divided into Splits. Readers request splits from the SplitEnumerator and the resulting splits are assigned to them in return.

Flink provides the SourceReaderBase implementation that takes care of all the threading. Flink also provides a useful extension to this class for most cases: SingleThreadMultiplexSourceReaderBase. This class has the threading model already configured: each SplitReader instance reads splits using one thread (but there are several SplitReader instances that live among task managers).
What we have left to do in the SourceReader class is:
  • Provide a SplitReader supplier
  • Create a RecordEmitter
  • Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier is created in the SourceReader constructor in a super() call, using a SourceReader factory to create the shared resources and pass them to the supplier is a good idea.
  • Implement start(): here we should ask the enumerator for our first split
  • Override close() in SourceReaderBase parent class to free up any created resources (the shared resources for example)
  • Implement initializedState() to create a mutable SplitState from a Split
  • Implement toSplitType() to create a Split from the mutable SplitState
  • Implement onSplitFinished(): here, as it is a batch source (finite data), we should ask the Enumerator for next split


Split and SplitState

The SourceSplit represents a partition of the source data. What defines a split depends on the backend we are reading from. It could be a (partition start, partition end) tuple or an (offset, split size) tuple for example. 

In any case, the Split object should be seen as an immutable object: any update to it should be done on the associated SplitState. The split state is the one that will be stored inside the Flink checkpoints. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we must store in the split state the current state of the reading process. This current state needs to be something serializable (because it will be part of a checkpoint) and something that the backend source can resume from. That way, in case of failover, the reading could be resumed from where it was left off. Thus we ensure there will be no duplicates or lost data.  For example, if the records reading order is deterministic in the backend, then the split state can store the number n of already read records to restart at n+1 after failover

SplitEnumerator and SplitEnumeratorState

The SplitEnumerator is responsible for creating the splits and serving them to the readers. Whenever possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For that we implement SplitEnumerator#handleSplitRequest(). Lazy splits generation  is preferable to splits discovery, in which we pre-generate all the splits and store them waiting to assign them to the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a memory which could be problematic in case of straggling readers. The framework offers the ability to act upon reader registration by implementing addReader() but, as we do lazy splits generation, we have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken into account to avoid consuming too much memory. 

Long story short,  the tricky part of the source implementation is splitting the source data. The good equilibrium to find is not to have to many splits (which could lead to too much memory consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this equilibrium is to evaluate the size of the source data upfront and allow the user to specify the maximum memory a split will take. That way they can configure this parameter accordingly to the memory available on the task managers. This parameter is optional so the source needs to provide a default value. Also, the source needs to control that the user provided max-split-size is not too little which would lead to too many splits. The general rule of thumb is to let the user some freedom but protect him from unwanted behavior. For these safety measures, rigid thresholds don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if the job is regularly run on a growing table, at some point there will be more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of the source data needs to be evaluated without reading the actual data.  For the Cassandra connector I did like this

Another important topic is state. If the job manager fails, the split enumerator needs to recover. For that, as for the split, we need to provide a state for the enumerator that will be part of a checkpoint. Upon recovery, the enumerator is reconstructed and receives an enumerator state for recovering its previous state. Upon checkpointing, the enumerator returns its state when SplitEnumerator#snapshotState()
is called. The state must contain everything needed to resume where the enumerator was left off after failover. In lazy split generation scenario, the state will contain everything needed to generate the next split whenever asked to. It can be for example the start offset of next split, split size, number of splits still to generate etc... But the SplitEnumeratorState must also contain a list of splits, not the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if it was assigned splits after last checkpoint, then the checkpoint will not contain those splits. Consequently, upon restoration, the reader won't have the splits assigned anymore. There is a callback to deal with that case: addSplitsBack(). There, the splits that were assigned to the failing reader, can be put back into the enumerator state for later re-assignment to readers. There is no memory size risk here as the number of splits to reassign is pretty low.

The above topics are the more important regarding splitting. There are 2 methods left to implement: the usual start()/close() methods for resources creation/disposal. Regarding implementing start(), the Flink connector framework provides enumeratorContext#callAsync() utility to run long processing asynchronously such as splits preparation or splits discovery (if lazy splits generation is impossible). Indeed, the start() method runs in the source coordinator thread, we don't want to block it for a long time.

SplitReader

This class is responsible for reading the actual splits that it receives when the framework calls handleSplitsChanges(). The main part of the split reader is the fetch() implementation where we read all the splits received and return the read records as a RecordsBySplits object. This object contains a map of the split ids to the belonging records and also the ids of the finished splits. Important points need to be considered:

  • The fetch call must be non-blocking. If any call in its code is synchronous and potentially long, an escape from the fetch() must be provided. When the framework calls wakeUp() we should interrupt the fetch for example by setting an AtomicBoolean.
  • Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it from the list of splits to read and add its id to the finished splits (along with empty splits) in the RecordsBySplits that we return.

It is totally fine for the implementer to exit the fetch() method early. Also a failure could interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the fetch method must resume the reading from where it was left off using the split state already discussed. If resuming the read of a split is impossible because of backend constraints, then the only solution is to read splits atomically (either not read the split at all, or read it entirely). That way, in case of interrupted fetch, nothing will be output and the split could be read again from the beginning at next fetch call leading to no duplicates. But if the split is read entirely, there are points to consider:
  • We should ensure that the total split content (records from the source) fits in memory for example by specifying a max split size in bytes (see SplitEnumarator)
  • The split state becomes useless, only a Split class is needed

RecordEmitter

The SplitReader reads records in the form of an intermediary record format that the implementer provides for each record. It can be the raw format returned by the backend or any format allowing to extract the actual record afterwards. This format is not the final output format expected by the source. It contains anything needed to do the conversion to the record output format. We need to implement RecordEmitter#emitRecord() to do this conversion. A good pattern here is to initialize the RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method maybe interrupted in the middle. In that case, the same set of records will be passed to the record emitter again later.

Serializers

We need to provide singleton serializers for:
  • Split: splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state
  • SplitEnumeratorState:  the serializer is used for the result of the SplitEnumerator#snapshotState() 

For both, we need to implement SimpleVersionedSerializer. Care needs to be taken at some important points:

  • Using Java serialization is forbidden in Flink mainly for migration concerns. We should rather manually write the fields of the objects using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String, Integer, Long...), we should write the size of the object in bytes as an Integer and then write the object converted to byte[]. Similar method is used to serialize collections. First write the number of elements of the collection, then serialize all the contained objects. Of course, for deserialization we do the exact same reading with the same order. 
  • There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can do so by using.
ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
The initial stream size depends on the size of a split. 

Testing the source


For the sake of concision of this article, testing the source will be the object of the next article. Stay tuned !
    

Conclusion


This article gathering the implementation field feedback was needed as the javadocs cannot cover all the implementation details for high-performance and maintainable sources. I hope you enjoyed reading and that it gave you the desire to contribute a new connector to the Flink project !

Tuesday, March 21, 2023

Cassandra: evaluate table size without reading the data

🕥 3 min.

Introduction


While developing the Cassandra source connector for Flink I needed a way to ensure that the data I was reading fitted in memory. For that I had to evaluate how big the source table was in order to know how to divide it. But, of course, it had to be done without reading the data itself. Here is how it was achieved.

Cassandra size estimates statistics


Cassandra partitioning is based on tokens arranged into a ring. Cassandra cluster provides statistical information about tables sizes in a system table called system.size_estimates. It provides per-table information on what we will call in this article token ranges: number of partitions taken by the table, mean partition size, start and end tokens. These elements can be used to get a rough estimation of the table size.

To get these information, we need to issue this request: 

SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?

We will receive the token ranges that the table occupies. To get the size of the table, we need to sum them that way:

table_size_on_this_node = sum (mean_partition_size * partition_count) 

You see in the formula above that the calculated size is only for one Cassandra node as the system table is the one of the node. We need to extrapolate to the whole cluster to avoid requesting all the nodes of the cluster. For that we will calculate the ring  fraction this node represents in the cluster. The ring fraction is a percentage obtained like this: 

ring_fraction = sum (token_ranges_size) / ring_size

ring_size is a constant depending on the configured Cassandra cluster partitioner.

token_ranges_size = sum(distance(range_start, range_end))

There can be overlap between tokens so the distance method needs to be a little more complex:

private BigInteger distance(BigInteger token1, BigInteger token2) {
// token2 > token1
if (token2.compareTo(token1) > 0) {
return token2.subtract(token1);
} else {
return token2.subtract(token1).add(partitioner.ringSize);
}
}
So, now that we have our ring fraction, we can extrapolate the node table size to get the total table size: 

table_size = table_size_on_this_node / ring_fraction

And here we are !

Updating the statistics


These size estimates statistics are updated when the Cassandra cluster flushes its Memtables (tables in memory) into SSTabes (tables on disc). Especially, it updates the partitions information. The flush of the table can be done through the nodetool command : 

nodetool flush keyspace table

    In integration tests, we often want to read the test data just after writing it. In that case, the cluster has not done the flush yet so the size estimates are not updated. It is worth mentioning that the official Cassandra docker image contains nodetool binary and that the flush can be done from within the container using testContainers with the code below:

cassandraContainer.execInContainer("nodetool", "flush", keyspace, table);
In that case, a local JMX call is issued and local JMX is enabled by default on the Cassandra cluster.

Conclusion


I guess this article is mostly useful for Cassandra connector authors or DevOps people. I hope you enjoyed reading.