🕥 6 min.
Introducion
The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article. Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.
The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.
Unit testing the source
Testing the serializers
In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.
In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.
Other unit tests
Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend.
Integration testing the source
For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:
For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled:- testSourceMetrics
- testSavepoint
- testScaleUp
- testScaleDown
- testTaskManagerFailure
Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:
- testSourceMetrics
- testSavepoint
- testScaleUp
- testScaleDown
- testTaskManagerFailure
Flink environment
We add this annotated field to our ITCase and we're done@TestEnv
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();
@TestEnv
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();
Backend environment
To test the connector we need a backend to run the connector against. This TestEnvironment
provides everything related to the backend: the container, its configuration, the session to connect to it,
and all the elements bound to the whole test case (table space, initialization requests ...)
We add this annotated field to our ITCase@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:- Redirect container output (error and standard output) to Flink logs
- Set the different timeouts to cope with CI server load
- Set retrial mechanisms for connection, initialization requests etc... for the same reason
@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
- Redirect container output (error and standard output) to Flink logs
- Set the different timeouts to cope with CI server load
- Set retrial mechanisms for connection, initialization requests etc... for the same reason
Checkpointing semantics
In big data execution engines, there are 2 levels of guarantee regarding source and sinks: - At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
- Exactly once: upon failure and recovery, every record will be reflected exactly once
By the following code we verify that the source supports exactly once semantics:@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
- At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
- Exactly once: upon failure and recovery, every record will be reflected exactly once
By the following code we verify that the source supports exactly once semantics:
@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
CheckpointingMode semantic,
Integer limit) {
if (limit != null) {
Runnable runnable =
() ->
CollectIteratorAssertions.assertUnordered(resultIterator)
.withNumRecordsLimit(limit)
.matchesRecordsFromSource(testData, semantic);
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
} else {
CollectIteratorAssertions.assertUnordered(resultIterator)
.matchesRecordsFromSource(testData, semantic);
}
}
This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered()
Test context
The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite). It is linked to the ITCase through a factory of TestContext as shown below.
@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);
TestContext implements DataStreamSourceExternalContext:- We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
- close() : drop the created test case resources
- getProducedType(): specify the test output type of the source such as a test Pojo for example
- getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
- createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
- createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
- generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.
@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);
- We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
- close() : drop the created test case resources
- getProducedType(): specify the test output type of the source such as a test Pojo for example
- getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
- createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
- createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
- generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.