Monday, November 7, 2022

Flink: Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

🕥 5 min.

Introducion


Flink has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API). This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. 
All the code presented in this article is available in the tpcds-benchmark-flink repo
The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs. 

What is TPCDS?


TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective performance data of Big Data engines to industry users.

Chosen TPCDS query


The chosen query for this article is Query3  because it contains all the more common analytics operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on store sales. Its SQL code is presented here:
SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 128
AND dt.d_moy=11
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, sum_agg desc, brand_id
LIMIT 100

The initial DataSet pipeline


The pipeline we are migrating is this batch pipeline that implements the above query using the DataSet API and Row as dataset element type.

Migrating the DataSet pipeline to a DataStream pipeline


Instead of going through all of the code which is available here we will rather focus on some key areas of the migration. The code is based on the latest release of Flink at the time this article was written: version 1.16.0. 

DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink execution environment, it is also needed to migrate some operations. Indeed, the DataStream API semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, compared to the DataSet API which operates on finite data, there are adaptations to be made on some operations.

Setting the execution environment


We start by moving from ExecutionEnvironment to StreamExecutionEnvironment. Then, as the source in this pipeline is bounded, we can use either the default streaming execution mode or the batch mode. In batch mode the tasks of the job can be separated into stages that can be executed one after another. In streaming mode all tasks need to be running all the time and records are sent to downstream tasks as soon as they are available. 

Here we keep the default streaming mode that gives good performance on this pipeline and that would allow to run the same pipeline with no change on an unbounded source.

Using the streaming sources and datasets


SourcesDataSource<T> becomes DataStreamSource<T> after the call to env.createInput().

DatasetsDataSet<T> are now DataStream<T> and subclasses.

Migrating the join operation


The DataStream join operator does not yet support aggregations in batch mode (see FLINK-22587 for details). Basically, the problem is with the trigger of the default GlobalWindow which never fires so the records are never output. We will workaround this problem by applying a custom EndOfStream window. It is a window assigner that assigns all the records to a single TimeWindow. So, like for the GlobalWindow, all the records are assigned to the same window except that this window's trigger is based on the end of the window (which is set to Long.MAX_VALUE). As we are on a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will thus cross the end of the time window and consequently fire the trigger and output the records.

Now that we have a working triggering, we need to call a standard join with the  Row::join function.

Migrating the group by and reduce (sum) operations


DataStream API has no more groupBy() method, we now use the keyBy() method. An aggregation downstream will be applied on elements with the same key exactly as a GroupReduceFunction would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following operator is a reducer: the summing operation downstream is still done through a ReduceFunction but this time the operator reduces the elements incrementally instead of receiving the rows as a Collection. To make the sum we store the reduced row the partially aggregated sum. Due to incremental reduce, we also need to distinguish if we received an already reduced row (in that case, we read the partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). 

Migrating the order by operation


The sort of the datastream is done by applying a KeyedProcessFunction.

But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to "divide" the data to have output times. For that we need to set a timer to output the resulting data. We set a timer to fire at the end of the EndOfStream window meaning that the timer will fire at the end of the batch.

To sort the data, we store the incoming rows inside a ListState and sort them at output time, when the timer fires in the onTimer() callback. 

Another thing: to be able to use Flink state, we need to key the datastream beforehand even if there is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so that there is a single state.

Migrating the limit operation


As all the elements of the DataStream were keyed by the same "0" key, they are kept in the same "
group". So we can implement the SQL LIMIT with
a ProcessFunction with a counter that will output only the first 100 elements.

Migrating the sink operation


As with sources, there were big changes in sinks with recent versions of Flink. We now use the Sink interface that requires an Encoder. But the resulting code is very similar to the one using the DataSet API. It's only that Encoder#encode() method writes bytes when TextOutputFormat.TextFormatter#format() wrote Strings.

Conclusion


As you saw for the migration of the join operation, the new unified DataStream API has some limitations left in batch mode. In addition, the order by and limit resulting code is quite manual and requires the help of the Flink state API for the migration. For all these reasons, the Flink community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code that uses the Flink SQL/Table API in the Query3ViaFlinkSQLCSV class.