Monday, April 3, 2023

Flink: Howto test a batch source with the new Source framework

🕥 6 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article. Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.

The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.

Unit testing the source


Testing the serializers


In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.

Other unit tests


Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend. 

Integration testing the source


For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:


For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled:
  • testSourceMetrics
  • testSavepoint
  • testScaleUp
  • testScaleDown
  • testTaskManagerFailure 

Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:


We add this annotated field to our ITCase and we're done
@TestEnv 
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

Backend environment

To test the connector we need a backend to run the connector against. This TestEnvironment 
provides everything related to the backend: the container, its configuration, the session to connect to it, 
and all the elements bound to the whole test case (table space, initialization requests ...)  

We add this annotated field to our ITCase
@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:
  • Redirect container output (error and standard output) to Flink logs
  • Set the different timeouts to cope with CI server load
  • Set retrial mechanisms for connection, initialization requests etc... for the same reason

Checkpointing semantics


In big data execution engines, there are 2 levels of guarantee regarding source and sinks: 
  • At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
  • Exactly once: upon failure and recovery, every record will be reflected exactly once

By the following code we verify that the source supports exactly once semantics:
@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};


That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override SourceTestSuiteBase#checkResultWithSemantic in out ITCase: 

@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
CheckpointingMode semantic,
Integer limit) {
if (limit != null) {
Runnable runnable =
() ->
CollectIteratorAssertions.assertUnordered(resultIterator)
.withNumRecordsLimit(limit)
.matchesRecordsFromSource(testData, semantic);

assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
} else {
CollectIteratorAssertions.assertUnordered(resultIterator)
.matchesRecordsFromSource(testData, semantic);
}
}


This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered()


Test context


The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite). It is linked to the ITCase through a factory of TestContext as shown below. 


@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);

TestContext implements DataStreamSourceExternalContext:
  • We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
  • close() : drop the created test case resources
  • getProducedType(): specify the test output type of the source such as a test Pojo for example
  • getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
  • createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
  • createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
  • generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.

Contributing the source to Flink


Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow this official wiki page.

Conclusion 


This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :)