Showing posts with label flink. Show all posts
Showing posts with label flink. Show all posts

Monday, April 3, 2023

Flink: Howto test a batch source with the new Source framework

🕥 6 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article. Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.

The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.

Unit testing the source


Testing the serializers


In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.

Other unit tests


Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend. 

Integration testing the source


For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:


For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled:
  • testSourceMetrics
  • testSavepoint
  • testScaleUp
  • testScaleDown
  • testTaskManagerFailure 

Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:


We add this annotated field to our ITCase and we're done
@TestEnv 
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

Backend environment

To test the connector we need a backend to run the connector against. This TestEnvironment 
provides everything related to the backend: the container, its configuration, the session to connect to it, 
and all the elements bound to the whole test case (table space, initialization requests ...)  

We add this annotated field to our ITCase
@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:
  • Redirect container output (error and standard output) to Flink logs
  • Set the different timeouts to cope with CI server load
  • Set retrial mechanisms for connection, initialization requests etc... for the same reason

Checkpointing semantics


In big data execution engines, there are 2 levels of guarantee regarding source and sinks: 
  • At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost
  • Exactly once: upon failure and recovery, every record will be reflected exactly once

By the following code we verify that the source supports exactly once semantics:
@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};


That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override SourceTestSuiteBase#checkResultWithSemantic in out ITCase: 

@Override
protected void checkResultWithSemantic(
CloseableIterator<Pojo> resultIterator,
List<List<Pojo>> testData,
CheckpointingMode semantic,
Integer limit) {
if (limit != null) {
Runnable runnable =
() ->
CollectIteratorAssertions.assertUnordered(resultIterator)
.withNumRecordsLimit(limit)
.matchesRecordsFromSource(testData, semantic);

assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
} else {
CollectIteratorAssertions.assertUnordered(resultIterator)
.matchesRecordsFromSource(testData, semantic);
}
}


This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered()


Test context


The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite). It is linked to the ITCase through a factory of TestContext as shown below. 


@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);

TestContext implements DataStreamSourceExternalContext:
  • We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.
  • close() : drop the created test case resources
  • getProducedType(): specify the test output type of the source such as a test Pojo for example
  • getConnectorJarPaths(): provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath
  • createSource(): here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework
  • createSourceSplitDataWriter(): here we create an ExternalSystemSplitDataWriter responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()
  • generateTestData(): produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.

Contributing the source to Flink


Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow this official wiki page.

Conclusion 


This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :) 

Thursday, March 30, 2023

Flink: Howto create a batch source with the new Source framework

🕥 10 min.

Introducion


The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!

Implementing the source components


The source architecture is depicted in the diagrams below:





Source

The source interface only does the "glue" between all the other components. Its role is to instantiate all of them and to define the source Boundedness. We also do the source configuration here along with user configuration validation.

SourceReader

As shown in the graphic above, the instances of the SourceReader (which we will call simply readers in the continuation of this article) run in parallel in task managers to read the actual data which is divided into Splits. Readers request splits from the SplitEnumerator and the resulting splits are assigned to them in return.

Flink provides the SourceReaderBase implementation that takes care of all the threading. Flink also provides a useful extension to this class for most cases: SingleThreadMultiplexSourceReaderBase. This class has the threading model already configured: each SplitReader instance reads splits using one thread (but there are several SplitReader instances that live among task managers).
What we have left to do in the SourceReader class is:
  • Provide a SplitReader supplier
  • Create a RecordEmitter
  • Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier is created in the SourceReader constructor in a super() call, using a SourceReader factory to create the shared resources and pass them to the supplier is a good idea.
  • Implement start(): here we should ask the enumerator for our first split
  • Override close() in SourceReaderBase parent class to free up any created resources (the shared resources for example)
  • Implement initializedState() to create a mutable SplitState from a Split
  • Implement toSplitType() to create a Split from the mutable SplitState
  • Implement onSplitFinished(): here, as it is a batch source (finite data), we should ask the Enumerator for next split


Split and SplitState

The SourceSplit represents a partition of the source data. What defines a split depends on the backend we are reading from. It could be a (partition start, partition end) tuple or an (offset, split size) tuple for example. 

In any case, the Split object should be seen as an immutable object: any update to it should be done on the associated SplitState. The split state is the one that will be stored inside the Flink checkpoints. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we must store in the split state the current state of the reading process. This current state needs to be something serializable (because it will be part of a checkpoint) and something that the backend source can resume from. That way, in case of failover, the reading could be resumed from where it was left off. Thus we ensure there will be no duplicates or lost data.  For example, if the records reading order is deterministic in the backend, then the split state can store the number n of already read records to restart at n+1 after failover

SplitEnumerator and SplitEnumeratorState

The SplitEnumerator is responsible for creating the splits and serving them to the readers. Whenever possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For that we implement SplitEnumerator#handleSplitRequest(). Lazy splits generation  is preferable to splits discovery, in which we pre-generate all the splits and store them waiting to assign them to the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a memory which could be problematic in case of straggling readers. The framework offers the ability to act upon reader registration by implementing addReader() but, as we do lazy splits generation, we have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken into account to avoid consuming too much memory. 

Long story short,  the tricky part of the source implementation is splitting the source data. The good equilibrium to find is not to have to many splits (which could lead to too much memory consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this equilibrium is to evaluate the size of the source data upfront and allow the user to specify the maximum memory a split will take. That way they can configure this parameter accordingly to the memory available on the task managers. This parameter is optional so the source needs to provide a default value. Also, the source needs to control that the user provided max-split-size is not too little which would lead to too many splits. The general rule of thumb is to let the user some freedom but protect him from unwanted behavior. For these safety measures, rigid thresholds don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if the job is regularly run on a growing table, at some point there will be more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of the source data needs to be evaluated without reading the actual data.  For the Cassandra connector I did like this

Another important topic is state. If the job manager fails, the split enumerator needs to recover. For that, as for the split, we need to provide a state for the enumerator that will be part of a checkpoint. Upon recovery, the enumerator is reconstructed and receives an enumerator state for recovering its previous state. Upon checkpointing, the enumerator returns its state when SplitEnumerator#snapshotState()
is called. The state must contain everything needed to resume where the enumerator was left off after failover. In lazy split generation scenario, the state will contain everything needed to generate the next split whenever asked to. It can be for example the start offset of next split, split size, number of splits still to generate etc... But the SplitEnumeratorState must also contain a list of splits, not the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if it was assigned splits after last checkpoint, then the checkpoint will not contain those splits. Consequently, upon restoration, the reader won't have the splits assigned anymore. There is a callback to deal with that case: addSplitsBack(). There, the splits that were assigned to the failing reader, can be put back into the enumerator state for later re-assignment to readers. There is no memory size risk here as the number of splits to reassign is pretty low.

The above topics are the more important regarding splitting. There are 2 methods left to implement: the usual start()/close() methods for resources creation/disposal. Regarding implementing start(), the Flink connector framework provides enumeratorContext#callAsync() utility to run long processing asynchronously such as splits preparation or splits discovery (if lazy splits generation is impossible). Indeed, the start() method runs in the source coordinator thread, we don't want to block it for a long time.

SplitReader

This class is responsible for reading the actual splits that it receives when the framework calls handleSplitsChanges(). The main part of the split reader is the fetch() implementation where we read all the splits received and return the read records as a RecordsBySplits object. This object contains a map of the split ids to the belonging records and also the ids of the finished splits. Important points need to be considered:

  • The fetch call must be non-blocking. If any call in its code is synchronous and potentially long, an escape from the fetch() must be provided. When the framework calls wakeUp() we should interrupt the fetch for example by setting an AtomicBoolean.
  • Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it from the list of splits to read and add its id to the finished splits (along with empty splits) in the RecordsBySplits that we return.

It is totally fine for the implementer to exit the fetch() method early. Also a failure could interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the fetch method must resume the reading from where it was left off using the split state already discussed. If resuming the read of a split is impossible because of backend constraints, then the only solution is to read splits atomically (either not read the split at all, or read it entirely). That way, in case of interrupted fetch, nothing will be output and the split could be read again from the beginning at next fetch call leading to no duplicates. But if the split is read entirely, there are points to consider:
  • We should ensure that the total split content (records from the source) fits in memory for example by specifying a max split size in bytes (see SplitEnumarator)
  • The split state becomes useless, only a Split class is needed

RecordEmitter

The SplitReader reads records in the form of an intermediary record format that the implementer provides for each record. It can be the raw format returned by the backend or any format allowing to extract the actual record afterwards. This format is not the final output format expected by the source. It contains anything needed to do the conversion to the record output format. We need to implement RecordEmitter#emitRecord() to do this conversion. A good pattern here is to initialize the RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method maybe interrupted in the middle. In that case, the same set of records will be passed to the record emitter again later.

Serializers

We need to provide singleton serializers for:
  • Split: splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state
  • SplitEnumeratorState:  the serializer is used for the result of the SplitEnumerator#snapshotState() 

For both, we need to implement SimpleVersionedSerializer. Care needs to be taken at some important points:

  • Using Java serialization is forbidden in Flink mainly for migration concerns. We should rather manually write the fields of the objects using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String, Integer, Long...), we should write the size of the object in bytes as an Integer and then write the object converted to byte[]. Similar method is used to serialize collections. First write the number of elements of the collection, then serialize all the contained objects. Of course, for deserialization we do the exact same reading with the same order. 
  • There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can do so by using.
ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
The initial stream size depends on the size of a split. 

Testing the source


For the sake of concision of this article, testing the source will be the object of the next article. Stay tuned !
    

Conclusion


This article gathering the implementation field feedback was needed as the javadocs cannot cover all the implementation details for high-performance and maintainable sources. I hope you enjoyed reading and that it gave you the desire to contribute a new connector to the Flink project !

Monday, November 7, 2022

Flink: Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

🕥 5 min.

Introducion


Flink has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API). This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. 
All the code presented in this article is available in the tpcds-benchmark-flink repo
The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs. 

What is TPCDS?


TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective performance data of Big Data engines to industry users.

Chosen TPCDS query


The chosen query for this article is Query3  because it contains all the more common analytics operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on store sales. Its SQL code is presented here:
SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 128
AND dt.d_moy=11
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, sum_agg desc, brand_id
LIMIT 100

The initial DataSet pipeline


The pipeline we are migrating is this batch pipeline that implements the above query using the DataSet API and Row as dataset element type.

Migrating the DataSet pipeline to a DataStream pipeline


Instead of going through all of the code which is available here we will rather focus on some key areas of the migration. The code is based on the latest release of Flink at the time this article was written: version 1.16.0. 

DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink execution environment, it is also needed to migrate some operations. Indeed, the DataStream API semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, compared to the DataSet API which operates on finite data, there are adaptations to be made on some operations.

Setting the execution environment


We start by moving from ExecutionEnvironment to StreamExecutionEnvironment. Then, as the source in this pipeline is bounded, we can use either the default streaming execution mode or the batch mode. In batch mode the tasks of the job can be separated into stages that can be executed one after another. In streaming mode all tasks need to be running all the time and records are sent to downstream tasks as soon as they are available. 

Here we keep the default streaming mode that gives good performance on this pipeline and that would allow to run the same pipeline with no change on an unbounded source.

Using the streaming sources and datasets


SourcesDataSource<T> becomes DataStreamSource<T> after the call to env.createInput().

DatasetsDataSet<T> are now DataStream<T> and subclasses.

Migrating the join operation


The DataStream join operator does not yet support aggregations in batch mode (see FLINK-22587 for details). Basically, the problem is with the trigger of the default GlobalWindow which never fires so the records are never output. We will workaround this problem by applying a custom EndOfStream window. It is a window assigner that assigns all the records to a single TimeWindow. So, like for the GlobalWindow, all the records are assigned to the same window except that this window's trigger is based on the end of the window (which is set to Long.MAX_VALUE). As we are on a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will thus cross the end of the time window and consequently fire the trigger and output the records.

Now that we have a working triggering, we need to call a standard join with the  Row::join function.

Migrating the group by and reduce (sum) operations


DataStream API has no more groupBy() method, we now use the keyBy() method. An aggregation downstream will be applied on elements with the same key exactly as a GroupReduceFunction would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following operator is a reducer: the summing operation downstream is still done through a ReduceFunction but this time the operator reduces the elements incrementally instead of receiving the rows as a Collection. To make the sum we store the reduced row the partially aggregated sum. Due to incremental reduce, we also need to distinguish if we received an already reduced row (in that case, we read the partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). 

Migrating the order by operation


The sort of the datastream is done by applying a KeyedProcessFunction.

But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to "divide" the data to have output times. For that we need to set a timer to output the resulting data. We set a timer to fire at the end of the EndOfStream window meaning that the timer will fire at the end of the batch.

To sort the data, we store the incoming rows inside a ListState and sort them at output time, when the timer fires in the onTimer() callback. 

Another thing: to be able to use Flink state, we need to key the datastream beforehand even if there is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so that there is a single state.

Migrating the limit operation


As all the elements of the DataStream were keyed by the same "0" key, they are kept in the same "
group". So we can implement the SQL LIMIT with
a ProcessFunction with a counter that will output only the first 100 elements.

Migrating the sink operation


As with sources, there were big changes in sinks with recent versions of Flink. We now use the Sink interface that requires an Encoder. But the resulting code is very similar to the one using the DataSet API. It's only that Encoder#encode() method writes bytes when TextOutputFormat.TextFormatter#format() wrote Strings.

Conclusion


As you saw for the migration of the join operation, the new unified DataStream API has some limitations left in batch mode. In addition, the order by and limit resulting code is quite manual and requires the help of the Flink state API for the migration. For all these reasons, the Flink community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code that uses the Flink SQL/Table API in the Query3ViaFlinkSQLCSV class.