tag:blogger.com,1999:blog-14381246558195237922024-03-13T02:33:19.142+01:00Big data chroniclesTechnical blog on big data technologiesEtienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comBlogger14125tag:blogger.com,1999:blog-1438124655819523792.post-52895628063599382212023-04-03T10:31:00.019+02:002023-05-22T13:01:31.754+02:00Flink: Howto test a batch source with the new Source framework<div><div><span style="font-size: xx-small;">π₯ 6 min.</span><br /><br /></div><div><h2>Introducion</h2><div><br /></div><div>The <a href="https://flink.apache.org" target="_blank">Flink</a> community has designed <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/" target="_blank">a new Source framework</a> based on <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface" target="_blank">FLIP-27</a> lately. This article is the continuation of the <a href="https://echauchot.blogspot.com/2023/03/flink-howto-create-batch-source-with.html" target="_blank">howto create a batch source with the new Source framework</a> article. Now it is time to test the created source ! As the previous article, this one was built while implementing the <a href="https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca" target="_blank">Flink batch source</a> for <a href="https://cassandra.apache.org/_/index.html" target="_blank">Cassandra</a>.</div><div><br /></div><div>The goal here is to give field feedback on how to implement tests of a batch source. For details you should read the documentation, the javadocs or the Cassandra connector code. The links are above.</div><div><br /></div><h2 style="text-align: left;">Unit testing the source</h2><h4 style="text-align: left;"><br /></h4><h3 style="text-align: left;">Testing the serializers</h3><h2><div style="font-weight: 400;"><span style="font-size: medium;"><div style="font-size: medium;"><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java" target="_blank">Example Cassandra SplitSerializer</a> and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java" target="_blank">SplitEnumeratorStateSerializer</a></span></div><div style="font-size: medium;"><br /></div><div><span style="font-size: medium;">In the previous article, we created </span><a href="https://echauchot.blogspot.com/2023/03/flink-howto-create-batch-source-with.html#Serializers" style="font-size: medium;" target="_blank">serializers</a><span style="font-size: medium;"> for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.</span></div></span></div><div style="font-size: medium; font-weight: 400;"><br /></div></h2><h2 style="font-size: medium; text-align: left;">Other unit tests</h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend. </div><div style="font-size: medium; font-weight: 400;"><br /></div><h2 style="text-align: left;">Integration testing the source</h2><div><div><br /></div><div>For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:</div><div><ul style="text-align: left;"><li><a href="#flink-environment">The Flink environment</a></li><li><a href="#backend-environment">The backend environment</a></li><li><a href="#checkpointing-semantics">The checkpointing semantics</a></li><li><a href="#test-context">The test context</a></li></ul></div></div><h2><div style="font-size: medium; font-weight: 400;"><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java" target="_blank">Example Cassandra SourceITCase</a></span></div><div style="font-size: medium; font-weight: 400;"><br /></div><div><span style="font-size: small;"><span style="font-weight: 400;">For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called </span></span><span style="font-size: medium; font-weight: 400;">differently if the test belongs to somewhere else. The class extends </span><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html" style="font-size: medium; font-weight: 400;" target="_blank">SourceTestSuiteBase</a><span style="font-size: medium; font-weight: 400;">. This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with </span><i style="font-size: medium; font-weight: 400;">@Disabled</i><span style="font-size: medium; font-weight: 400;">:</span></div><div style="font-size: medium; font-weight: 400;"><ul><li>testSourceMetrics</li><li>testSavepoint</li><li>testScaleUp</li><li>testScaleDown</li><li>testTaskManagerFailure </li></ul></div><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:</div><div style="font-size: medium; font-weight: 400;"><br /></div></h2><h3 id="flink-environment" style="text-align: left;"><b>Flink environment</b></h3><h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">We add this annotated field to our ITCase and we're done</div><div style="font-weight: 400;"><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace; font-size: 11,3pt;"><span style="font-size: small;"><span style="color: #bbb529;">@TestEnv <br /></span><span style="font-weight: bold;">MiniClusterTestEnvironment </span><span style="color: #9876aa;">flinkTestEnvironment </span>= <span style="color: #cc7832;">new </span>MiniClusterTestEnvironment()<span style="color: #cc7832;">;</span></span></pre></div><div style="font-size: medium; font-weight: 400;"><b><br /></b></div></h2><h3 id="backend-environment" style="text-align: left;">Backend environment</h3><div><div><h2><div style="font-weight: 400;"><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java" target="_blank">Example Cassandra TestEnvironment</a></span></div><div><br /></div></div></h2></div><div>To test the connector we need a backend to run the connector against. This TestEnvironment </div><div>provides everything related to the backend: the container, its configuration, the session to connect to it, </div><div>and all the elements bound to the whole test case (table space, initialization requests ...) </div></div><h2><div style="font-size: medium; font-weight: 400;">We add this annotated field to our ITCase</div><div style="font-weight: 400;"><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace; font-size: 11,3pt;"><span style="font-size: small;"><span style="color: #bbb529;">@TestExternalSystem<br /></span><span style="font-weight: bold;">MyBackendTestEnvironment </span><span style="color: #9876aa;">backendTestEnvironment </span>= <span style="color: #cc7832;">new </span>MyBackendTestEnvironment()<span style="color: #cc7832;">;</span></span></pre></div><div style="font-size: medium; font-weight: 400;">To integrate with JUnit5 BackendTestEnvironment implements <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html" target="_blank">TestResource</a>. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--" target="_blank">startup()</a> and <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--" target="_blank">tearDown()</a> methods. For that we advise the use of <a href="https://www.testcontainers.org/" target="_blank">testContainers</a> that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:</div><div style="font-size: medium; font-weight: 400;"><ul style="text-align: left;"><li>Redirect container output (error and standard output) to Flink logs</li><li>Set the different timeouts to cope with CI server load</li><li>Set retrial mechanisms for connection, initialization requests etc... for the same reason</li></ul></div><div style="font-size: medium; font-weight: 400;"><br /></div></h2><h3 id="checkpointing-semantics" style="text-align: left;">Checkpointing semantics</h3><h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">In big data execution engines, there are 2 levels of guarantee regarding source and sinks: </div><div style="font-size: medium; font-weight: 400;"><ul><li>At least once: upon failure and recovery, some records may be reflected multiple times but none will be lost</li><li>Exactly once: upon failure and recovery, every record will be reflected exactly once</li></ul><div><br /></div><div>By the following code we verify that the source supports exactly once semantics:</div></div><div style="font-weight: 400;"><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace;"><span style="font-size: small;"><span style="color: #bbb529;">@TestSemantics<br /></span><span style="font-weight: bold;">CheckpointingMode</span>[] <span style="color: #9876aa;">semantics </span>= <span style="color: #cc7832;">new </span>CheckpointingMode[] {<span style="font-weight: bold;">CheckpointingMode</span>.<span style="color: #9876aa; font-style: italic;">EXACTLY_ONCE</span>}<span style="color: #cc7832;">;</span></span></pre></div></h2><h4><span style="font-weight: 400;"><br /></span></h4><span style="font-weight: 400;">That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override </span><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html#checkResultWithSemantic-org.apache.flink.util.CloseableIterator-java.util.List-org.apache.flink.streaming.api.CheckpointingMode-java.lang.Integer-" style="font-weight: 400;" target="_blank">SourceTestSuiteBase#checkResultWithSemantic</a> <span style="font-weight: 400;">in out ITCase: </span><div><span style="font-weight: 400;"><br /></span></div><div style="font-weight: 400;"><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace;"><span style="font-size: small;"><span style="color: #bbb529;">@Override<br /></span><span style="color: #cc7832;">protected void </span><span style="color: #ffc66d; font-weight: bold;">checkResultWithSemantic</span>(<br /> CloseableIterator<<span style="font-weight: bold;">Pojo</span>> resultIterator<span style="color: #cc7832;">,<br /></span><span style="color: #cc7832;"> </span>List<List<<span style="font-weight: bold;">Pojo</span>>> testData<span style="color: #cc7832;">,<br /></span><span style="color: #cc7832;"> </span><span style="font-weight: bold;">CheckpointingMode </span>semantic<span style="color: #cc7832;">,<br /></span><span style="color: #cc7832;"> </span><span style="font-weight: bold;">Integer </span>limit) {<br /> <span style="color: #cc7832;">if </span>(limit != <span style="color: #cc7832;">null</span>) {<br /> Runnable runnable =<br /> () -><br /> <span style="font-weight: bold;">CollectIteratorAssertions</span>.<span style="font-style: italic;">assertUnordered</span>(<span style="color: #b389c5;">resultIterator</span>)<br /> .withNumRecordsLimit(<span style="color: #b389c5;">limit</span>)<br /> .matchesRecordsFromSource(<span style="color: #b389c5;">testData</span><span style="color: #cc7832;">, </span><span style="color: #b389c5;">semantic</span>)<span style="color: #cc7832;">;<br /></span><span style="color: #cc7832;"><br /></span><span style="color: #cc7832;"> </span><span style="font-style: italic;">assertThat</span>(<span style="font-style: italic;">runAsync</span>(runnable)).succeedsWithin(<span style="color: #9876aa; font-style: italic;">DEFAULT_COLLECT_DATA_TIMEOUT</span>)<span style="color: #cc7832;">;<br /></span><span style="color: #cc7832;"> </span>} <span style="color: #cc7832;">else </span>{<br /> <span style="font-weight: bold;">CollectIteratorAssertions</span>.<span style="font-style: italic;">assertUnordered</span>(resultIterator)<br /> .matchesRecordsFromSource(testData<span style="color: #cc7832;">, </span>semantic)<span style="color: #cc7832;">;<br /></span><span style="color: #cc7832;"> </span>}<br />}</span></pre></div></div></div><h4><span style="font-weight: 400;"><br /></span></h4><h4><span style="font-weight: 400;">This is a copy-paste of the parent method where </span><i style="font-weight: 400;">CollectIteratorAssertions.assertOrdered()</i><span style="font-weight: 400;"> is replaced by </span><i style="font-weight: 400;">CollectIteratorAssertions.assertUnordered()</i></h4><div><br /></div><h3 id="test-context" style="text-align: left;">Test context</h3><h2><div style="font-weight: 400;"><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java" target="_blank">Example Cassandra TestContext</a></span></div><div style="font-weight: 400;"><br /></div><div style="font-weight: 400;"><span style="font-size: medium;">The test context provides Flink with means to interact with the backend, like inserting test </span><span style="font-size: medium;">data, creating tables or constructing the source. It is scoped to the test case (and not to the test </span><span style="font-size: medium;">suite). </span><span style="font-size: medium;">It is linked to the ITCase through a factory of TestContext as shown below. </span></div></h2><h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-weight: 400;"><div><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace;"><span style="font-size: small;"><span style="color: #bbb529;">@TestContext<br /></span><span style="font-weight: bold;">TestContextFactory </span><span style="color: #9876aa;">contextFactory </span>= <span style="color: #cc7832;">new </span>TestContextFactory(<span style="color: #9876aa;">testEnvironment</span>)<span style="color: #cc7832;">;</span></span></pre></div></div><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">TestContext implements <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html" target="_blank">DataStreamSourceExternalContext</a>:</div><div style="font-size: medium; font-weight: 400;"><ul><li>We don't connect to the backend at each test case, so the shared resources such as session are created by the backend test environment (test suite scoped). They are then passed to the test context by constructor. It is also in the constructor that we initialize test case backend resources such as test case table.</li><li>close() : drop the created test case resources</li><li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.html#getProducedType--" target="_blank">getProducedType()</a>: specify the test output type of the source such as a test Pojo for example</li><li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalContext.html#getConnectorJarPaths--" target="_blank">getConnectorJarPaths()</a>: provide a list of attached jars. Most of the time, we can return an empty list as maven already adds the jars to the test classpath</li><li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSource-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-" target="_blank">createSource()</a>: here we create the source as a user would have done. It will be provided to the test cases by the Flink test framework</li><li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSourceSplitDataWriter-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-" target="_blank">createSourceSplitDataWriter()</a>: here we create an <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalSystemSplitDataWriter.html" target="_blank">ExternalSystemSplitDataWriter</a> responsible for writing test data which comes as a list of produced type objects such as defined in getProducedType()</li><li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#generateTestData-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-int-long-" target="_blank">generateTestData()</a>: produce the list of test data that will be given to the ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of this list belong to different splits. The easier for that is to include the split id into the produced type and make sure equals() and hashcode() are properly implemented to include this field.</li></ul><div><br /></div></div></h2><h2>Contributing the source to Flink</h2><h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow <a href="https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development" target="_blank">this official wiki page</a>.</div><div style="font-size: medium; font-weight: 400;"><br /></div></h2><h2>Conclusion </h2><h2 style="font-size: medium; font-weight: 400; text-align: left;"></h2><h2><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :) </div></h2>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-13268030899337062142023-03-30T16:24:00.016+02:002023-05-10T12:47:03.645+02:00Flink: Howto create a batch source with the new Source framework<div><div><span style="font-size: xx-small;">π₯ 10 min.</span><br /><br /></div><div><h2>Introducion</h2><div><br /></div><div>The <a href="https://flink.apache.org" target="_blank">Flink</a> community has designed <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/" target="_blank">a new Source framework</a> based on <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface" target="_blank">FLIP-27</a> lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the <a href="https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca" target="_blank">Flink batch source</a> for <a href="https://cassandra.apache.org/_/index.html" target="_blank">Cassandra</a>. If you are interested in contributing or migrating connectors, this blog post is for you!</div><div><br /></div><h2>Implementing the source components</h2><div><br /></div><div>The source architecture is depicted in the diagrams below:</div><div><br /></div><div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEju9wZwipLs2X1S835liFcbJBpnwtrcdlHvxdiBuj3mZVY0Kzzw0eWdTiBfPO0C-zeCc4XTZAuUyBYXHHqo0AejbhKunLnmk4k3HKMU4VdTOHeLl2sEOZJEbRMuhnktacqBKLQuqn60EUH_HqKsgR_mt6c1NNUlFymWHuyqk887ZJRzSV69oqfYcWwa" style="margin-left: 1em; margin-right: 1em;"><img alt="" data-original-height="487" data-original-width="855" height="365" src="https://blogger.googleusercontent.com/img/a/AVvXsEju9wZwipLs2X1S835liFcbJBpnwtrcdlHvxdiBuj3mZVY0Kzzw0eWdTiBfPO0C-zeCc4XTZAuUyBYXHHqo0AejbhKunLnmk4k3HKMU4VdTOHeLl2sEOZJEbRMuhnktacqBKLQuqn60EUH_HqKsgR_mt6c1NNUlFymWHuyqk887ZJRzSV69oqfYcWwa=w640-h365" width="640" /></a></div><div class="separator" style="clear: both; text-align: center;"><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEhYIuFwqEq1u_pMFoQuOlplg4Bqme94LVZf0MQIfeFoV8yZCUFXzvELNkh0XslCGlPjjktvR9gqp8Y6wAwMVjt0oyND7QYygU20ksox7j2H3cFzoikMM6y_CAXPozg-ganFq-FxdBv1taUwodN5K655pS5GsJGuW5lzUA4AxjBssgPre6xDQhglADIx" style="margin-left: 1em; margin-right: 1em;"><img alt="" data-original-height="653" data-original-width="1257" height="332" src="https://blogger.googleusercontent.com/img/a/AVvXsEhYIuFwqEq1u_pMFoQuOlplg4Bqme94LVZf0MQIfeFoV8yZCUFXzvELNkh0XslCGlPjjktvR9gqp8Y6wAwMVjt0oyND7QYygU20ksox7j2H3cFzoikMM6y_CAXPozg-ganFq-FxdBv1taUwodN5K655pS5GsJGuW5lzUA4AxjBssgPre6xDQhglADIx=w640-h332" width="640" /></a></div><br /><br /><br /></div><h3>Source</h3></div></div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java" target="_blank">Example Cassandra Source</a></span></div><div>The source interface only does the "glue" between all the other components. Its role is to instantiate all of them and to define the source <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html" target="_blank">Boundedness</a>. We also do the source configuration here along with user configuration validation.</div><div><br /></div><h3 style="text-align: left;">SourceReader</h3><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java" target="_blank">Example Cassandra SourceReader</a></span></div></div><div>As shown in the graphic above, the instances of the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html" target="_blank">SourceReader</a> (which we will call simply readers in the continuation of this article) run in parallel in task managers to read the actual data which is divided into <a href="#split-and-splitState" target="">Splits</a>. Readers request splits from the <a href="#splitEnumerator-and-splitEnumeratorState" target="">SplitEnumerator</a> and the resulting splits are assigned to them in return.</div><div><br /></div><div>Flink provides the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html" target="_blank">SourceReaderBase</a> implementation that takes care of all the threading. Flink also provides a useful extension to this class for most cases: <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html" target="_blank">SingleThreadMultiplexSourceReaderBase</a>. This class has the threading model already configured: each <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html" target="_blank">SplitReader</a> instance reads splits using one thread (but there are several SplitReader instances that live among task managers).</div><div>What we have left to do in the SourceReader class is:</div><div><ul style="text-align: left;"><li>Provide a <a href="#splitReader" target="">SplitReader</a> supplier</li><li>Create a <a href="#recordEmitter" target="">RecordEmitter</a></li><li>Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier is created in the SourceReader constructor in a super() call, using a SourceReader factory to create the shared resources and pass them to the supplier is a good idea.</li><li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--" target="_blank">start()</a>: here we should ask the enumerator for our first split</li><li>Override <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--" target="_blank">close()</a> in SourceReaderBase parent class to free up any created resources (the shared resources for example)</li><li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-" target="_blank">initializedState()</a> to create a mutable <a href="#split-and-splitState" target="">SplitState</a> from a Split</li><li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-" target="_blank">toSplitType()</a> to create a Split from the mutable SplitState</li><li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-" target="_blank">onSplitFinished()</a>: here, as it is a batch source (finite data), we should ask the Enumerator for next split</li></ul><h3 style="text-align: left;"></h3><br /><h3 id="split-and-splitState" style="text-align: left;">Split and SplitState</h3></div><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java" target="_blank">Example Cassandra Split</a></span></div></div><div>The <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html" target="_blank">SourceSplit</a> represents a partition of the source data. What defines a split depends on the backend we are reading from. It could be a <i>(partition start, partition end)</i> tuple or an <i>(offset, split size) </i>tuple for example. </div><div><br /></div><div>In any case, the Split object should be seen as an immutable object: any update to it should be done on the associated <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html" target="_blank">SplitState</a>. The split state is the one that will be stored inside the Flink <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing" target="_blank">checkpoints</a>. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we must store in the split state the current state of the reading process. This current state needs to be something serializable (because it will be part of a checkpoint) and something that the backend source can resume from. That way, in case of failover, the reading could be resumed from where it was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records reading order is deterministic in the backend, then the split state can store the number <i>n</i> of already read records to restart at <i>n+1</i> after failover<i>. </i></div><div><br /></div><h3 id="splitEnumerator-and-splitEnumeratorState" style="text-align: left;">SplitEnumerator and SplitEnumeratorState</h3><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java" target="_blank">Example Cassandra SplitEnumerator</a> and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java" target="_blank">SplitEnumeratorState</a></span></div></div><div>The <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html" target="_blank">SplitEnumerator</a> is responsible for creating the splits and serving them to the readers. Whenever possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For that we implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-" target="_blank">SplitEnumerator#handleSplitRequest()</a>. Lazy splits generation is preferable to splits discovery, in which we pre-generate all the splits and store them waiting to assign them to the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a memory which could be problematic in case of straggling readers. The framework offers the ability to act upon reader registration by implementing <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addReader-int-" target="_blank">addReader()</a> but, as we do lazy splits generation, we have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken into account to avoid consuming too much memory. </div><div><br /></div><div>Long story short, the tricky part of the source implementation is splitting the source data. The good equilibrium to find is not to have to many splits (which could lead to too much memory consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this equilibrium is to evaluate the size of the source data upfront and allow the user to specify the maximum memory a split will take. That way they can configure this parameter accordingly to the memory available on the task managers. This parameter is optional so the source needs to provide a default value. Also, the source needs to control that the user provided max-split-size is not too little which would lead to too many splits. The general rule of thumb is to let the user some freedom but protect him from unwanted behavior. For these safety measures, rigid thresholds don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if the job is regularly run on a growing table, at some point there will be more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of the source data needs to be evaluated without reading the actual data. For the Cassandra connector I did <a href="https://echauchot.blogspot.com/2023/03/cassandra-evaluate-table-size-without.html" target="_blank">like this</a>. </div><div><br /></div><div>Another important topic is state. If the job manager fails, the split enumerator needs to recover. For that, as for the split, we need to provide a state for the enumerator that will be part of a checkpoint. Upon recovery, the enumerator is reconstructed and receives <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html" target="_blank">an enumerator state</a> for recovering its previous state. Upon checkpointing, the enumerator returns its state when <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#snapshotState-long-" target="_blank">SplitEnumerator#snapshotState()</a><br />is called. The state must contain everything needed to resume where the enumerator was left off after failover. In lazy split generation scenario, the state will contain everything needed to generate the next split whenever asked to. It can be for example the start offset of next split, split size, number of splits still to generate etc... But the SplitEnumeratorState must also contain a list of splits, not the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if it was assigned splits after last checkpoint, then the checkpoint will not contain those splits. Consequently, upon restoration, the reader won't have the splits assigned anymore. There is a callback to deal with that case: <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addSplitsBack-java.util.List-int-" target="_blank">addSplitsBack()</a>. There, the splits that were assigned to the failing reader, can be put back into the enumerator state for later re-assignment to readers. There is no memory size risk here as the number of splits to reassign is pretty low.</div><div><br /></div><div>The above topics are the more important regarding splitting. There are 2 methods left to implement: the usual <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#start--" target="_blank">start()</a>/<a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#close--" target="_blank">close()</a> methods for resources creation/disposal. Regarding implementing start(), the Flink connector framework provides <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#callAsync-java.util.concurrent.Callable-java.util.function.BiConsumer-long-long-" target="_blank">enumeratorContext#callAsync()</a> utility to run long processing asynchronously such as splits preparation or splits discovery (if lazy splits generation is impossible). Indeed, the start() method runs in the source coordinator thread, we don't want to block it for a long time.</div><div><br /></div><h3 id="splitReader" style="text-align: left;">SplitReader</h3><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java" target="_blank">Example Cassandra SplitReader</a></span></div></div><div>This class is responsible for reading the actual splits that it receives when the framework calls <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#handleSplitsChanges-org.apache.flink.connector.base.source.reader.splitreader.SplitsChange-" target="_blank">handleSplitsChanges()</a>. The main part of the split reader is the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#fetch--" target="_blank">fetch()</a> implementation where we read all the splits received and return the read records as a <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.html" target="_blank">RecordsBySplits</a> object. This object contains a map of the split ids to the belonging records and also the ids of the finished splits. Important points need to be considered:</div><div><div><br /></div></div><div><ul style="text-align: left;"><li>The fetch call must be non-blocking. If any call in its code is synchronous and potentially long, an escape from the fetch() must be provided. When the framework calls <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#wakeUp--" target="_blank">wakeUp()</a> we should interrupt the fetch for example by setting an AtomicBoolean.</li><li>Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it from the list of splits to read and add its id to the finished splits (along with empty splits) in the RecordsBySplits that we return.</li></ul><div><br /></div>It is totally fine for the implementer to exit the fetch() method early. Also a failure could interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the fetch method must resume the reading from where it was left off using the split state already discussed. If resuming the read of a split is impossible because of backend constraints, then the only solution is to read splits atomically (either not read the split at all, or read it entirely). That way, in case of interrupted fetch, nothing will be output and the split could be read again from the beginning at next fetch call leading to no duplicates. But if the split is read entirely, there are points to consider:<div><div><ul><li>We should ensure that the total split content (records from the source) fits in memory for example by specifying a max split size in bytes (see <a href="#splitEnumerator-and-splitEnumeratorState" target="">SplitEnumarator</a>)</li><li>The split state becomes useless, only a Split class is needed</li></ul><div><br /></div></div></div></div><h3 id="recordEmitter" style="text-align: left;">RecordEmitter</h3><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java" target="_blank">Example Cassandra RecordEmitter</a></span></div></div><div>The SplitReader reads records in the form of <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html" target="_blank">an intermediary record format</a> that the implementer provides for each record. It can be the raw format returned by the backend or any format allowing to extract the actual record afterwards. This format is not the final output format expected by the source. It contains anything needed to do the conversion to the record output format. We need to implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/RecordEmitter.html#emitRecord-E-org.apache.flink.api.connector.source.SourceOutput-SplitStateT-" target="_blank">RecordEmitter#emitRecord()</a> to do this conversion. A good pattern here is to initialize the RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method maybe interrupted in the middle. In that case, the same set of records will be passed to the record emitter again later.</div><div><br /></div><h3 id="Serializers" style="text-align: left;">Serializers</h3><div><div><span style="font-size: x-small;"><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java" target="_blank">Example Cassandra SplitSerializer</a> and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java" target="_blank">SplitEnumeratorStateSerializer</a></span></div></div><div>We need to provide singleton serializers for:</div><div><ul style="text-align: left;"><li>Split: splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state</li><li>SplitEnumeratorState: the serializer is used for the result of the SplitEnumerator#snapshotState() </li></ul><div><br /></div>For both, we need to implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/io/SimpleVersionedSerializer.html" target="_blank">SimpleVersionedSerializer</a>. Care needs to be taken at some important points:</div><div><br /></div><div><ul style="text-align: left;"><li>Using Java serialization is <a href="https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization" target="_blank">forbidden</a> in Flink mainly for migration concerns. We should rather manually write the fields of the objects using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String, Integer, Long...), we should write the size of the object in bytes as an Integer and then write the object converted to byte[]. Similar method is used to serialize collections. First write the number of elements of the collection, then serialize all the contained objects. Of course, for deserialization we do the exact same reading with the same order. </li><li>There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can do so by using.</li></ul><div><pre style="background-color: #2b2b2b; color: #a9b7c6; font-family: "Source Code Pro", monospace; font-size: 11,3pt;"><span style="font-size: x-small;"><span style="font-weight: bold;">ThreadLocal</span><<span style="font-weight: bold;">DataOutputSerializer</span>> <span style="color: #9876aa; font-style: italic;">SERIALIZER_CACHE </span>=<br /> <span style="font-weight: bold;">ThreadLocal</span>.<span style="font-style: italic;">withInitial</span>(() -> <span style="color: #cc7832;">new </span>DataOutputSerializer(<span style="color: #6897bb;">64</span>))<span style="color: #cc7832;">;</span></span></pre></div></div><div>The initial stream size depends on the size of a split. </div><div><br /></div><h2 style="text-align: left;">Testing the source</h2><div><br /></div><div>For the sake of concision of this article, testing the source will be the object of <a href="https://echauchot.blogspot.com/2023/04/flink-howto-test-batch-source-with-new.html" target="_blank">the next article</a>. Stay tuned !</div><div> </div><h2 style="text-align: left;">Conclusion</h2><div><br /></div><div>This article gathering the implementation field feedback was needed as the javadocs cannot cover all the implementation details for high-performance and maintainable sources. I hope you enjoyed reading and that it gave you the desire to contribute a new connector to the Flink project !</div><div><br /></div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-60084127262267716122023-03-21T15:25:00.006+01:002023-03-21T16:55:02.926+01:00Cassandra: evaluate table size without reading the data<div>
<span style="font-size: xx-small;">π₯ 3 min.</span><br /></div><div><span style="font-size: xx-small;"><br /></span></div><div><h2 style="text-align: left;">Introduction</h2><div><br /></div><div>While developing the <a href="https://cassandra.apache.org/" target="_blank">Cassandra</a> source connector for <a href="https://flink.apache.org/" target="_blank">Flink</a> I needed a way to ensure that the data I was reading fitted in memory. For that I had to evaluate how big the source table was in order to know how to divide it. But, of course, it had to be done without reading the data itself. Here is how it was achieved.</div><div><br /></div><div><h2 style="text-align: left;">Cassandra size estimates statistics</h2></div><div><br /></div><div>Cassandra partitioning is based on tokens arranged into a ring. Cassandra cluster provides statistical information about tables sizes in a system table called<i> system.size_estimates.</i> It provides per-table information on what we will call in this article <i>token ranges</i>: number of partitions taken by the table, mean partition size, start and end tokens. These elements can be used to get a rough estimation of the table size.</div><div><br /></div><div>To get these information, we need to issue this request: </div><div><br /></div><div><div><i>SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?</i></div></div><div><br /></div><div>We will receive the token ranges that the table occupies. To get the size of the table, we need to sum them that way:</div><div><span style="font-family: inherit;"><br /></span></div><div><span><div><i>table_size_on_this_node = sum (mean_partition_size * partition_count) </i></div><div style="font-family: inherit;"><br /></div></span></div><div><span style="font-family: inherit;">You see in the formula above that the calculated size is only for one Cassandra node as the system table is the one of the node. </span><span>We need to extrapolate to the whole cluster to avoid requesting all the nodes of the cluster. For that we will calculate the <i>ring fraction</i> this node represents in the cluster. The <i>ring fraction</i> is a percentage obtained like this: </span></div><div><span><br /></span></div><div><span><div><i>ring_fraction = sum (token_ranges_size) / ring_size</i></div><div><br /></div></span></div><div><span><i>ring_size</i> is a constant depending on the configured <a href="https://cassandra.apache.org/doc/4.1/cassandra/configuration/cass_yaml_file.html#partitioner" target="_blank">Cassandra cluster partitioner</a>.</span></div><div><br /></div><div><div><i>token_ranges_size = sum(distance(range_start, range_end))</i></div></div><div><br /></div><div>There can be overlap between tokens so the distance method needs to be a little more complex:</div></div><div><span style="font-weight: 400;"><br /></span></div><div><pre style="background-color: #2b2b2b; font-family: "Source Code Pro", monospace; font-size: 11,3pt;"><span style="font-size: x-small;"><span style="color: #cc7832;">private </span><span style="color: #a9b7c6; font-weight: bold;">BigInteger </span><span style="color: #ffc66d; font-weight: bold;">distance</span><span style="color: #a9b7c6;">(</span><span style="color: #a9b7c6; font-weight: bold;">BigInteger </span><span style="color: #a9b7c6;">token1</span><span style="color: #cc7832;">, </span><span style="color: #a9b7c6; font-weight: bold;">BigInteger </span><span style="color: #a9b7c6;">token2) {<br /> </span><span style="color: grey;">// token2 > token1<br /></span><span style="color: grey;"> </span><span style="color: #cc7832;">if </span><span style="color: #a9b7c6;">(token2.compareTo(token1) > </span><span style="color: #6897bb;">0</span><span style="color: #a9b7c6;">) {<br /> </span><span style="color: #cc7832;">return </span><span style="color: #a9b7c6;">token2.subtract(token1)</span><span style="color: #cc7832;">;<br /></span><span style="color: #cc7832;"> </span><span style="color: #a9b7c6;">} </span><span style="color: #cc7832;">else </span><span style="color: #a9b7c6;">{<br /> </span><span style="color: #cc7832;">return </span><span style="color: #a9b7c6;">token2.subtract(token1).add(</span><span style="color: #9876aa;">partitioner</span><span style="color: #a9b7c6;">.</span><span style="color: #9876aa;">ringSize</span><span style="color: #a9b7c6;">)</span><span style="color: #cc7832;">;<br /></span><span style="color: #cc7832;"> </span><span style="color: #a9b7c6;">}<br />}</span></span></pre></div><div><span style="font-weight: 400;">So, now that we have our ring fraction, we can extrapolate the node table size to get the total table </span>size: </div><div><br /></div><div><div><i>table_size = table_size_on_this_node / ring_fraction</i></div></div><div><br /></div><div>And here we are !</div><div><br /></div><div><h2 style="text-align: left;">Updating the statistics</h2></div><div><br /></div><div>These size estimates statistics are updated when the Cassandra cluster flushes its <a href="https://cassandra.apache.org/doc/latest/cassandra/architecture/storage_engine.html#memtables" target="_blank">Memtables</a> (tables in memory) into <a href="https://cassandra.apache.org/doc/latest/cassandra/architecture/storage_engine.html#sstables" target="_blank">SSTabes</a> (tables on disc). Especially, it updates the partitions information. The flush of the table can be done through the n<i>odetool</i> command : </div><div><i><br /></i></div><div><i>nodetool flush keyspace table</i></div><div><br /></div><div><span> </span>In integration tests, we often want to read the test data just after writing it. In that case, the cluster has not done the flush yet so the size estimates are not updated. It is worth mentioning that <a href="https://hub.docker.com/_/cassandra" target="_blank">the official Cassandra docker image</a> contains <i>nodetool</i> binary and that the flush can be done from within the container using <a href="https://www.testcontainers.org/modules/databases/cassandra/" target="_blank">testContainers</a> with the code below:</div><div><br /></div><div><pre style="background-color: #2b2b2b; font-family: "Source Code Pro", monospace;"><pre style="color: #a9b7c6; font-family: "Source Code Pro", monospace;"><span style="font-size: x-small;"><span style="color: #9876aa;">cassandraContainer</span>.execInContainer(<span style="color: #6a8759;">"nodetool"</span><span style="color: #cc7832;">, </span><span style="color: #6a8759;">"flush"</span><span style="color: #cc7832;">, </span>keyspace<span style="color: #cc7832;">, </span>table)<span style="color: #cc7832;">;<br /></span></span></pre></pre></div><div>In that case, a local JMX call is issued and local JMX is enabled by default on the Cassandra cluster.</div><div><br /></div><h2 style="text-align: left;">Conclusion</h2><div><br /></div><div>I guess this article is mostly useful for Cassandra connector authors or DevOps people. I hope you enjoyed reading.</div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-27803578766690904332022-11-07T15:14:00.008+01:002023-05-09T16:25:29.445+02:00Flink: Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API<div><span style="font-size: xx-small;">π₯ 5 min.</span>
</div><div><span style="font-size: xx-small;"><br /></span></div><div><h2>Introducion</h2><div><br /></div><div><a href="https://flink.apache.org/" target="_blank">Flink</a> has been deprecating the DataSet API since version 1.12 as part of the work on <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741" target="_blank">FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)</a>. This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. </div><div>All the code presented in this article is available in the <a href="https://github.com/echauchot/tpcds-benchmark-flink" target="_blank">tpcds-benchmark-flink repo</a>. </div><div>The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing <a href="https://www.tpc.org/tpcds/" target="_blank">TPCDS</a> queries using these APIs. </div></div><div><br /></div><h2 style="text-align: left;">What is TPCDS?</h2><div><br /></div><div>TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective performance data of Big Data engines to industry users.</div><div><br /></div><h2 style="text-align: left;">Chosen TPCDS query</h2><div><br /></div><div>The chosen query for this article is <b>Query3 </b> because it contains all the more common analytics operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on store sales. Its SQL code is presented here:</div><div><pre style="background-color: #2b2b2b; color: #a9b7c6; font-size: 11,3pt;"><span style="font-family: times;"><span style="color: #cc7832;">SELECT </span>dt.d_year, item.i_brand_id brand_id, item.i_brand brand,<span style="color: #cc7832;">SUM</span>(ss_ext_sales_price) sum_agg<br /><span style="color: #cc7832;">FROM </span>date_dim dt, store_sales, item<br /><span style="color: #cc7832;">WHERE </span>dt.d_date_sk = store_sales.ss_sold_date_sk<br /> <span style="color: #cc7832;">AND </span>store_sales.ss_item_sk = item.i_item_sk<br /> <span style="color: #cc7832;">AND </span>item.i_manufact_id = <span style="color: #6897bb;">128<br /></span><span style="color: #6897bb;"> </span><span style="color: #cc7832;">AND </span>dt.d_moy=<span style="color: #6897bb;">11<br /></span><span style="color: #cc7832;">GROUP BY </span>dt.d_year, item.i_brand, item.i_brand_id<br /><span style="color: #cc7832;">ORDER BY </span>dt.d_year, sum_agg <span style="color: #cc7832;">desc</span>, brand_id<br />LIMIT <span style="color: #6897bb;">100</span></span></pre></div><div><br /></div><h2 style="text-align: left;">The initial DataSet pipeline</h2><div><br /></div><div>The pipeline we are migrating is <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDataset.java" target="_blank">this</a> batch pipeline that implements the above query using the DataSet API and <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/types/Row.html" target="_blank">Row</a> as dataset element type.</div><div><br /></div><h2 style="text-align: left;">Migrating the DataSet pipeline to a DataStream pipeline</h2><div><br /></div><div>Instead of going through all of the code which is available <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java" target="_blank">here</a> we will rather focus on some key areas of the migration. The code is based on the latest release of Flink at the time this article was written: version 1.16.0. </div><div><br /></div><div>DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink execution environment, it is also needed to migrate some operations. Indeed, the DataStream API semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, compared to the DataSet API which operates on finite data, there are adaptations to be made on some operations.</div><div><br /></div><h3 style="text-align: left;"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96" target="_blank">Setting the execution environment</a></h3><div><br /></div><div>We start by moving from <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/ExecutionEnvironment.html" target="_blank">ExecutionEnvironment</a> to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html" target="_blank">StreamExecutionEnvironment</a>. Then, as the source in this pipeline is bounded, we can use either the default streaming <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/" target="_blank">execution mode</a> or the batch mode. In batch mode the tasks of the job can be separated into stages that can be executed one after another. In streaming mode all tasks need to be running all the time and records are sent to downstream tasks as soon as they are available. </div><div><br /></div><div>Here we keep the default streaming mode that gives good performance on this pipeline and that would allow to run the same pipeline with no change on an unbounded source.</div><div><br /></div><h3 style="text-align: left;">Using the streaming sources and datasets</h3><div><br /></div><div><b>Sources</b>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/operators/DataSource.html" target="_blank">DataSource<T></a> becomes <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html" target="_blank">DataStreamSource<T></a> after the call to <i>env.createInput()</i>.</div><div><br /></div><div><b>Datasets</b>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html" target="_blank">DataSet<T></a> are now <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html" target="_blank">DataStream<T></a> and subclasses.</div><div><br /></div><h3 style="text-align: left;"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135" target="_blank">Migrating the join operation</a></h3><div><br /></div><div>The DataStream join operator does not yet support aggregations in batch mode (see <a href="https://issues.apache.org/jira/browse/FLINK-22587" target="_blank">FLINK-22587</a> for details). Basically, the problem is with the trigger of the default <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html" target="_blank">GlobalWindow</a> which never fires so the records are never output. We will workaround this problem by applying a custom <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L246-L280" target="_blank">EndOfStream</a> window. It is a window assigner that assigns all the records to a single <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html" target="_blank">TimeWindow</a>. So, like for the GlobalWindow, all the records are assigned to the same window except that this window's trigger is based on the end of the window (which is set to <i>Long.MAX_VALUE)</i>. As we are on a bounded source, at some point the watermark will advance to <i>+INFINITY</i> (Long.MAX_VALUE) and will thus cross the end of the time window and consequently fire the trigger and output the records.</div><div><br /></div><div>Now that we have a working triggering, we need to call a standard join with the <i>Row::join </i>function.</div><div><br /></div><h3 style="text-align: left;"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169" target="_blank">Migrating the group by and reduce (sum) operations</a></h3><div><br /></div><div>DataStream API has no more <i><a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-" target="_blank">groupBy()</a> </i>method, we now use the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-" target="_blank">keyBy()</a> method. An aggregation downstream will be applied on elements with the same key exactly as a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html" target="_blank">GroupReduceFunction</a> would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following operator is a reducer: the summing operation downstream is still done through a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/ReduceFunction.html" target="_blank">ReduceFunction</a> but this time the operator reduces the elements incrementally instead of receiving the rows as a Collection. To make the sum we store the reduced row the partially aggregated sum. Due to incremental reduce, we also need to distinguish if we received an already reduced row (in that case, we read the partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). </div><div><br /></div><h3 style="text-align: left;"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211" target="_blank">Migrating the order by operation</a></h3><div><br /></div><div>The sort of the datastream is done by applying a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html" target="_blank">KeyedProcessFunction</a>.</div><div><br /></div><div>But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to "divide" the data to have output times. For that we need to set a timer to output the resulting data. We <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L188" target="_blank">set a timer to fire at the end of the EndOfStream window</a> meaning that the timer will fire at the end of the batch.</div><div><br /></div><div>To sort the data, we store the incoming rows inside a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/state/ListState.html" target="_blank">ListState</a> and sort them at output time, when the timer fires in the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-" target="_blank">onTimer()</a> callback. </div><div><br /></div><div>Another thing: to be able to use Flink state, we need to key the datastream beforehand even if there is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so that there is a single state.</div><div><br /></div><h3 style="text-align: left;"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223" target="_blank">Migrating the limit operation</a></h3><div><br /></div><div><div style="text-align: left;"><div>As all the elements of the DataStream were keyed by the same "0" key, they are kept in the same "</div><div>group". So we can implement the SQL LIMIT with</div><div>a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/ProcessFunction.html" target="_blank">ProcessFunction</a> with a counter that will output only the first 100 elements.</div></div><div style="text-align: left;"><br /></div><h3><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236" target="_blank">Migrating the sink operation</a></h3></div><div><br /></div><div>As with sources, there were big changes in sinks with recent versions of Flink. We now use the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/connector/sink2/Sink.html" target="_blank">Sink interface</a> that requires an <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html" target="_blank">Encoder</a>. But the resulting code is very similar to the one using the DataSet API. It's only that <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html#encode-IN-java.io.OutputStream-" target="_blank">Encoder#encode()</a><span> method writes bytes<span> when </span><a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-" target="_blank">TextOutputFormat.TextFormatter#format()</a><span> wrote Strings.</span></span></div><div><span><span><br /></span></span></div><div></div><h2 style="text-align: left;">Conclusion</h2><div><br /></div><div>As you saw for the migration of the join operation, the new unified DataStream API has some limitations left in batch mode. In addition, the order by and limit resulting code is quite manual and requires the help of the Flink state API for the migration. For all these reasons, the Flink community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code that uses the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/overview/" target="_blank">Flink SQL/Table API</a> in the <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkSQLCSV.java" target="_blank">Query3ViaFlinkSQLCSV class</a>.</div><div><br /></div><div><br /></div><div><br /></div><div><br /></div><div><br /></div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-54757530573439874822021-01-05T12:00:00.024+01:002021-01-05T16:30:58.116+01:00Tricky use cases of Apache Beam 3/3: Custom Combine<div>
<span style="font-size: xx-small;">π₯ 5 min.</span>
</div><div><br /></div><h2>Introduction</h2><div><br /></div><div>This is the third article of a serie of blog posts about tricky use cases of <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> that enlight some of the advanced possibilities of the Beam SDK.</div><div><br /></div><h2>What is Beam Combine?</h2><div><br /></div><div><b>Combine</b> is the Beam word for the <b>reduce</b> transformation. This transformation combines data spread across workers to make a computation. The computation can be whatever you like. In the Beam SDK, Combine is used under the wood in various built-in transforms: <b>Count</b>, <b>Sum</b>, <b>Mean</b> ... but you can also create your own Combine implementations.</div><div><br /></div><div>The overview architecture of the Combine is like this:</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhfOXJMTFa7wGkstin9dRKXLLljmE3jSaProdUQen5AmmTbWS4I2JohVOIcOvZSuoOiPuE568uG5-73qmS-vWFOwD-51rZX5Hr0SzIzvlmNi7vnT6FeoLmx0eu7krbHNiXky2_CAjctU34/s800/combiner+%25281%2529.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="587" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhfOXJMTFa7wGkstin9dRKXLLljmE3jSaProdUQen5AmmTbWS4I2JohVOIcOvZSuoOiPuE568uG5-73qmS-vWFOwD-51rZX5Hr0SzIzvlmNi7vnT6FeoLmx0eu7krbHNiXky2_CAjctU34/s16000/combiner+%25281%2529.png" /></a></div><br /><div>The PCollection is split across the cluster (consider here that we have a cluster of 3 nodes). On each of the nodes, Beam <b>creates an Accumulator</b>. Each input value is <b>added </b>to the local accumulator. Then the accumulators are <b>merged</b> into a single combined output accumulator from which the result is <b>extracted</b>. At each of these 3 steps, the user can add whatever computation code is necessary for his use case.</div><div><br /></div><h2 style="text-align: left;">How to create your own combiner?</h2><div><br /></div><div>To create your own combiner you need to extend <a href="https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/transforms/Combine.CombineFn.html" target="_blank">CombineFn<InputT,AccumT,OutputT></a>. You'll have to override some methods:</div><div><br /></div><div><ul style="text-align: left;"><li><b>createAccumulator() </b>operation is invoked to create a fresh mutable accumulator value of type AccumT, initialized to represent the combination of zero values.</li><li><b>addInput(AccumT, InputT)</b> operation is invoked on each input value to add it to the local accumulator AccumT value. </li><li><b>mergeAccumulators(java.lang.Iterable<AccumT>) </b>operation is invoked to combine a collection of accumulators AccumT values into a single combined output accumulator AccumT value, once the merging accumulators have had all the input values in their partition added to them. This operation is invoked repeatedly, until there is only one accumulator value left.</li><li><b>extractOutput(AccumT) </b>operation is invoked on the final accumulator AccumT value to get the output OutputT value.</li></ul><div>Your combiner can be called in the pipeline with:</div><div><br /></div><div><div><i>inputPCollection.apply(Combine.globally(new CustomCombineFn()));</i></div><div><br /></div><div>or for keyed input PCollections:</div><div><br /></div><div><i>inputPCollection.apply(Combine.perKey(new CustomCombineFn()));</i></div></div><div><br /></div><div>Please note that combining functions should be <b>associative</b> and <b>commutative</b>. Associativity is required because input values are first broken up into partitions before being combined, and their intermediate results further combined, in an arbitrary tree structure. Commutativity is required because any order of the input values is ignored when breaking up input values into partitions.</div><div><br /></div><div><div>Also, please note about coders: some form of data encoding is required when using custom types in a CombineFn which do not have well-known coders. If so, usually your custom type will just implement <b>Serializable</b> and Beam will provide a coder automatically. Beam relies on the generic <b>CoderProvider</b>, which is able to provide a coder for any Serializable if applicable. In cases where Serializable is not efficient, or inapplicable, there are two alternatives for encoding:</div><div><br /></div><div><ul style="text-align: left;"><li>Default <i>CoderRegistry:</i> for example, implement a coder class explicitly and use the <i>@DefaultCoder</i> tag. See the CoderRegistry for the numerous ways in which to bind a type to a coder.</li><li>CombineFn specific way: while extending CombineFn, overwrite both <i>getAccumulatorCoder(CoderRegistry, Coder<InputT>)</i> and <i>getDefaultOutputCoder(CoderRegistry, Coder<InputT>)</i>.</li></ul></div></div><div><br /></div><h2 style="text-align: left;">An example</h2></div><div><br /></div><div>The following example is extracted from <a href="https://echauchot.blogspot.com/p/talks.html#apachecon2017" target="_blank">Nexmark</a> auction system. Here, the use case is to <b>calculate the average price for the last 3 closed auctions:</b></div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiqIp5aFLS4Uvl8uphobo8OLDQn6GcLDOovJDyahsBlUH4w3RQEAHCMN61wm2T_DVYvlgmMEqtKJHtc7ZHBLcJlv5bDe9XMSVznDv1Yjbs6fAYZN3VcPlUSAM1XLAtz0XBMGQDA8hoJTPs/s800/custom+combiner+%25281%2529.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="600" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiqIp5aFLS4Uvl8uphobo8OLDQn6GcLDOovJDyahsBlUH4w3RQEAHCMN61wm2T_DVYvlgmMEqtKJHtc7ZHBLcJlv5bDe9XMSVznDv1Yjbs6fAYZN3VcPlUSAM1XLAtz0XBMGQDA8hoJTPs/s16000/custom+combiner+%25281%2529.png" /></a></div><div><br /></div><div>Here, the input PCollection is the collection of the auction-winning bids with their associated price and timestamp. Each bid contains the final price of a given auction. </div><div><br /></div><div>The types used in the custom Combine are: <b>InputT=Bid </b>which implements Serializable. The accumulator class (AccumT) is a list of sorted bids so<b> AccumT=List </b>and the output is a mean so <b>OutputT=Double</b>. Beam SDK provides coders for all these types automatically through the mechanism mentioned above.</div><div><br /></div><div>As you can see, we added some computation code (<i>sort bids by timestamp then price and keep last 3</i>) during both the add step (<b>addInput()</b> method implementation) and the merge step (<b>mergeAccumulators()</b> method implementation). During the add step we could have simply added each input to the accumulator without keeping the last 3. but this would have lead to some load in the next merging step. It is more efficient to keep only 3 as early as the adding step, this way, the merging step will have fewer values to combine.</div><div><br /></div><div>The average price is only calculated at the end when the output value is extracted from the final combined accumulator (<b>extractOutput() </b>method implementation).</div><div><br /></div><div><br /></div><div><h3>Now you know how to create a custom Beam combiner to adapt to whatever use case you might have !</h3></div>Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-75202354397988573692020-12-02T10:00:00.007+01:002021-03-18T15:29:05.047+01:00Tricky use cases of Apache Beam 2/3: Custom windows<div>
<span style="font-size: xx-small;">π₯ 9 min.</span>
</div><div><br /></div><h2 style="text-align: left;">Introduction</h2><div><br /></div><div>This is the second article of a serie of blog posts about tricky use cases of <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> that enlight some of the advanced possibilities of the Beam SDK.</div><div><br /></div><h2 style="text-align: left;">What is Beam windowing?</h2><div><br /></div><div>In streaming systems, the flow of data is continuous. To be able to process it, we need to divide this flow of data into finite chunks. These finite chunks are windows. They are a way to temporally group the elements based on their event timestamp (the time at which the element was produced).</div><div><br /></div><div>As Beam provides a unified API for batch mode and streaming mode, all the elements of Beam pipelines (even in batch mode) are stored in windows. In batch mode, by default, there is a single window called the <b>Global Window</b> that stores all the elements of the pipeline. It is the default window that you get behind the scenes when you don't specify a window in your pipeline. But, even in batch mode, you could specify another windowing such as the ones below based on the timestamp of the elements. </div><div><br /></div><div>There are several types of windows provided by the Beam SDK:</div><div><br /></div><div><br /></div><h3 style="text-align: left;">Fixed windows</h3><div><br /></div><div><img src="https://lh5.googleusercontent.com/wMoupLR3q9JLpeou5EPbNDGsOhdUANwBN66_xSXW0Qc7KP-0nLEruGwrI3dz9zYhy3Lb0AOG_ICfJNbvVfhkRDZ5Gu_HXRvHth5jOFf_Ei4idCxI-66hyudPiAdvpnsDsc2GTZfgs4I=s16000" /></div><div><div><span style="font-size: x-small;">* The diagram show keys but, of course, these windows work for non keyed PCollections</span></div><div><br /></div><div><a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/FixedWindows.html" target="_blank">Fixed windows</a> are defined by the <b>duration</b> of the window. In the example above they allow to express the use case: "I want to have the last 30 seconds of data". </div><div><br /></div><h3 style="text-align: left;">Sliding windows</h3><div><br /></div><div><span id="docs-internal-guid-134b3e87-7fff-a5cf-19ce-0e16b6e96e1a"><img src="https://lh4.googleusercontent.com/qsj6UYdcYjBUNfdGXarEeQfSFsn3el9aeqEAO4KTFDOqhKh9wYlnsjac3yVcDKACa9q1onMBr-skstZv-jQ24KG0rbfxOwMXXv3jZP0TVZP8DedJCafEAfQxyoMvDcv9V-Roa0ZFOoc=s16000" /></span></div><div><span style="font-size: x-small;">* The diagram show keys but, of course, these windows work for non keyed PCollections</span></div><div><br /></div><div><a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/SlidingWindows.html" target="_blank">Sliding windows</a> are similar to fixed windows except they overlap. They are defined by both a <b>duration</b> and a <b>sliding period.</b> In the example above they allow to express the use case: "I want to have the last 60 seconds of data every 30 seconds". </div><div><br /></div><h3 style="text-align: left;">Sessions windows</h3><div><br /></div><div><span id="docs-internal-guid-363a17bc-7fff-6ec3-724e-c310771ef5b3"><img src="https://lh5.googleusercontent.com/FnH-wEZOv2q_RpjC3qiEb7MtDjLxluDI0-2gBG-lzPT65Mzt48chxM6NtPK1Mus8qVnEDhMtf3XY1Rn0bUbvWOh1zZlSUZHdIMQFthkhF8GqrjpCvs2lnc_HjvViOYs4ZOujfA4XYkA=s16000" /></span></div><div><span style="font-size: x-small;">* The diagram show keys but, of course, these windows work for non keyed PCollections</span></div><div><span style="font-size: x-small;"><br /></span></div><div><a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/Sessions.html" target="_blank">Sessions windows</a> are very similar to web sessions. They are defined by a <b>gap duration:</b> if no data comes for more than the gap duration, then the next element to come is considered belonging to another session. Of course, the gap duration is measured in <b>event time</b>, in other words:<b> </b>if the next element's timestamp is more than gap duration after the previous element, then it will be put into another session window.</div><div><br /></div><h3 style="text-align: left;">Custom windows</h3><div><br /></div><div>So far, the window types that Beam provides are quite similar to what other Big Data streaming systems provide except maybe the sessions windows that not many systems provide. But, what is really different with Beam is that it provides a way for the user to define his own windowing by extending the <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html" target="_blank">WindowFn</a> function class or one of its sub-classes.</div><div><br /></div><h2 style="text-align: left;">Health monitoring system example</h2><div><br /></div><div>To illustrate Beam custom windows, we will consider this use case: let's say we have a monitoring system that reports events about the health of a large and complex e-commerce platform. </div><div><br /></div><div>For the sake of simplicity, the event will be a simple class with just an enum status:</div><div> </div><script src="https://gist.github.com/echauchot/2ea901448ddffcc97ae4b001677f8093.js"></script><div>A big amount of events are produced, so the monitoring is made through a Big Bata pipeline. And this pipeline uses Beam. It is a streaming pipeline as the events are continuously arriving. We want fine oversight on what's going on so we set the granularity of the monitoring to 10 minutes. Thus, in the pipeline, we divide the collections of events into fixed windows of 10 minutes. But there is something to consider about the health of the e-commerce platform: when a failure occurs, it has consequences on the health of the overall platform for about 30 min (time for the clusters to failover, load to decrease etc...), the system is thus in recovered state. As a consequence, in the monitoring system, we want a failure event to be reported for 30 min after its occurrence. </div><div><br /></div><h3 style="text-align: left;">Implementation</h3><div><br /></div><div>To implement the above desired behavior, we need to assign events to windows depending on the type of these events. This is a perfect use case for Beam custom windows. We will define a <b>MonitoringFixedWindows</b> custom window that <b>assigns HEALTHY events to the current fixed window and FAILURE events to the current and the next 3 fixed windows:</b></div><div><b><br /></b><div id="custom-window"> <script src="https://gist.github.com/echauchot/4ffcfc9a36bb5adff276ce9f5e0a1e44.js"></script></div>
<div><br /></div>
The custom window is similar to FixedWindows, but we do not extend the FixedWindows class because FixedWindows assigns an element to <b>only a single window</b>. In our case, in the case of a FAILURE event, we want it to be assigned to 4 windows (the current and the next 3 ones).<div><br /></div><div>The lowest function class in <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html" target="_blank">WindowFn</a> hierarchy that we can extend is <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.html" target="_blank">NonMergingWindowFn</a>. It is simply windows that do not merge between each other (<a class="callout-trigger" data-lines="1" data-listing="custom-window" data-note="we extend NonMergingWindowFn and our custom windowing function assigns Event elements to IntervalWindows. IntervalWindow is simply a window with boundaries">see the code</a>).</div><div><br /></div><div>The interesting part of this code is the override of the <b>assignWindows</b> method. It is there that resides the whole essence of custom windows. It allows for a given timestamp (of an element) to give the list of windows it should be assigned to. In our case <a class="callout-trigger" data-lines="22-38" data-listing="custom-window" data-note="We first define the start and end timestamps of the first window of size duration. Then, if the event is of type HEALTHY, we just return a window with these boundary timestamps. And if the event is of type FAILURE, we return this same window plus the next 3 ones.">we return a list of IntervalWindows </a>(simply windows with boundaries) because our windowing function class deals with IntervalWindows.</div><div><br /></div><div>The other overrides are quite straightforward:</div><div><ul style="text-align: left;"><li><b>windowCoder</b> : as said above, all the elements in Beam are in windows. And they will be serialized at some point, so the window information needs to be serialized as well. WindowCoder is simply a <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/coders/Coder.html" target="_blank">Coder</a> for the custom window. The custom window deals with IntervalWindow so we just return this coder which is already provided by the SDK.</li><li><b>getDefaultWindowMappingFn</b>: this method is for side inputs (data of another PCollection accessible though a view in the Beam transforms). Here, this custom window does not support side inputs so we just throw an exception</li><li><b>isCompatible</b>: 2 custom windowing functions are compatible if they are equal (same type and same boundaries)</li><li><b>equals</b> and <b>hashcode</b>: as there will be windows comparisons issued by Beam, we need to override <i>equals</i> and <i>hashcode</i> with the usual code</li></ul></div><div><br /></div>
Now, if we apply this windowing function to an input streaming PCollection. <div><br /><div style="text-align: center;"><i>inputPCollection.apply(MonitoringFixedWindows.of(Duration.standardMinutes(10)));</i></div><div style="text-align: center;"><i><br /></i></div><div>The output will be like this: </div><div><br /><div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEi6YDF9edOvW_4r_XL31hmlZPoB6ETuwGAE58Hc1rvBhcLCwXz0nEnvEh9CviviSXEaeLeszNYFLU5-azRsK7fKolL5zZnHaKQ6l2WbeOEopyPyoE5iFQN3uBUJhkcsyCvcSs186zprTyA/s800/csutom+windows+%25281%2529.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="113" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEi6YDF9edOvW_4r_XL31hmlZPoB6ETuwGAE58Hc1rvBhcLCwXz0nEnvEh9CviviSXEaeLeszNYFLU5-azRsK7fKolL5zZnHaKQ6l2WbeOEopyPyoE5iFQN3uBUJhkcsyCvcSs186zprTyA/s16000/csutom+windows+%25281%2529.png" /></a></div><br /><div>Consider that we receive a HEALTHY event with timestamp 5, it will be assigned to the window starting at timestamp 0 and ending Γ timestamp 10. If we then receive a FAILURE event with timestamp 12, it will be assigned to the window it belongs (starting at timestamp 10 and ending at timestamp 20) and also to the next 3 10-minutes windows.</div><div><br /></div><h3 style="text-align: left;">Now you know how Beam windowing can adapt to whatever use case you might have !</h3><div><br /><br /><div><br /></div></div></div></div></div>
<script>
/*
Copyright (c) 2013-2020 Dave Leeds and Contributors
License: https://raw.github.com/djleeds/code-callout/master/LICENSE.md
*/
!function(a,b,c){"use strict";a.fn.codeCallout=function(c){function u(a){var b=new o("code-callout-note","content",a.closeButtonText,"close"),c=new p,d=new q(new k,a.getLineSelector,"code-callout-highlighted"),e=new i(a.scrollTargetSelector,a.animationDurationInMs);return new n(b,c,d,e,a.noteOffsetFromLine)}function v(){var b=0;return a(x.scrollTargetSelector).each(function(c,d){var e=a(d).scrollTop();b=e>b?e:b}),b}var w=new e,x=a.extend(r,c),y=s;if(x.profile in t==!1)throw"Unrecognized profile "+x.profile+" was requested.";var z=t[x.profile];"options"in z&&(x=a.extend(x,z.options)),"styles"in z&&(y=a.extend(y,z.styles));var A=new j(x.styleId,y),B=new l(x.listingIdAttribute,x.lineNumbersAttribute,x.noteContentsAttribute),C=u(x);return A.initialize(),a(this).attr("href","#").on(x.triggerEventName,function(b){var c=B.read(this);if(console.log("wrapping",c.listingId,x),x.shouldWrapLines){var d=new m(a("#"+c.listingId),x.wrapperLineClass,x.wrapperNumberClassPrefix,x.filterLinesToWrap,x.beforeLinesOfCodeAreWrapped);d.wrapListing()}b.preventDefault(),w.deactivate(),w=C.create(c,v()),w.activate()}),x.exposeForTest&&(b.__codeCallout={Callout:d,NullCallout:e,Note:f,Listing:g,LineSet:h,Scroller:i,Style:j,LineNumberParser:k,TriggerReader:l,LineWrapper:m,CalloutFactory:n,NoteFactory:o,ListingFactory:p,LineSetFactory:q,options:x}),this};var d=function(a,b,c,d,e,f){var g=this;a.onClose(function(){g.deactivate()}),this.activate=function(){a.activate(),c.highlight(),d.to(g.position())},this.deactivate=function(){a.deactivate(),c.unhighlight(),d.to(e)},this.center=function(){return(c.top()+a.bottom())/2},this.positionCalculator={windowHeight:f.innerHeight,scrollTop:function(){return this.scrollCenter()-this.windowHeight/2},overhang:function(){return Math.max(a.bottom()-b.bottom(),0)},leash:function(){return Math.max(b.height()-this.windowHeight,0)/2},distance:function(){return g.center()-b.center()},clampedDistance:function(){var a=this.leash();return Math.max(Math.min(this.distance(),a+this.overhang()),-a)},scrollCenter:function(){return b.center()+this.clampedDistance()}},this.position=function(){return this.positionCalculator.scrollTop()}},e=function(){this.deactivate=function(){}},f=function(b,c,d){var e="resize.code-callout-note",f=function(){};this.activate=function(){this.attach(),this.positionAt(c()),this.hook(),this.show()},this.deactivate=function(){this.hide(),this.unhook(),this.detach()},this.attach=function(){b.appendTo(a("body"))},this.detach=function(){b.remove()},this.show=function(){b.show()},this.hide=function(){b.hide()},this.hook=function(){d.on(e,function(a){this.position(c())}),b.find("button.close").on("click",function(){f()})},this.unhook=function(){d.off(e)},this.bottom=function(){return b.offset().top+b.outerHeight()},this.positionAt=function(a){b.css(a)},this.onClose=function(a){f=a}},g=function(a){this.top=function(){return a.offset().top},this.bottom=function(){return this.top()+this.height()},this.height=function(){return a.outerHeight()},this.center=function(){return(this.top()+this.bottom())/2}},h=function(a,b){this.highlight=function(){a.addClass(b)},this.unhighlight=function(){a.removeClass(b)},this.top=function(){return a.first().offset().top},this.bottom=function(){return a.last().offset().top+this.lineHeight()},this.left=function(){return a.last().offset().left},this.lineHeight=function(){return a.last().outerHeight()}},i=function(b,c){this.to=function(a){this.selectElement().stop().animate({scrollTop:a},c)},this.selectElement=function(){return a(b)}},j=function(b,c){this.initialize=function(){this.hasBeenWritten()||this.write()},this.hasBeenWritten=function(){return a("#"+b).length>0},this.write=function(){a("<style />").attr("type","text/css").attr("id",b).text(this.build()).appendTo(a("body"))},this.build=function(){var a="";for(var b in c){a+=b+"{";for(var d in c[b])a+=d+":"+c[b][d]+";";a+="} "}return a}},k=function(){this.parse=function(a){if(!isNaN(a))return[Number(a)];for(var b=[],c=a.split(","),d=0;d<c.length;d++){var e=c[d];this.isLineRange(e)?b=b.concat(this.parseLineRange(e)):b.push(Number(e))}return b},this.isLineRange=function(a){return a.indexOf("-")>0},this.parseLineRange=function(a){for(var b=[],c=a.split("-"),d=c[0],e=c[1],f=d;e>=f;f++)b.push(Number(f));return b}},l=function(b,c,d){this.read=function(e){var f=a(e);return{listingId:f.data(b),lineNumbers:f.data(c),noteContents:f.data(d)}}},m=function(b,c,d,e,f){var g=this,h="\n";this.isAlreadyWrapped=function(){return b.find("."+c).length>0},this.wrapLine=function(b,e){var f=b+1;return a("<div />").addClass(c).addClass(d+f).html(e+"\n")},this.splitIntoLines=function(){return e(b).html().split(h)},this.replaceListing=function(a){b.html(a.children("*"))},this.wrapListing=function(){if(!this.isAlreadyWrapped()){var c=this.splitIntoLines(b),d=a("<div />");a.each(c,function(a,b){d.append(g.wrapLine(a,b))}),f(b,d),this.replaceListing(d)}}},n=function(a,c,e,f,g){this.create=function(g,h){var i=c.create(g.listingId),j=e.create(g.listingId,g.lineNumbers),k=this.createPositionAdvice(j),l=a.create(g.noteContents,k);return new d(l,i,j,f,h,b)},this.createPositionAdvice=function(a){return function(){return{top:a.bottom()+a.lineHeight()*g,left:a.left()}}}},o=function(c,d,e,g){this.renderContent=function(b){return a("<div />").addClass(d).text(b)},this.renderCloseButton=function(){return a("<button />").addClass(g).text(e)},this.render=function(b){var d=this.renderContent(b),f=this.renderCloseButton(e);return a("<div />").attr("id",c).append(d).append(f)},this.create=function(c,d){var g=this.render(c,e);return new f(g,d,a(b))}},p=function(){this.create=function(a){return new g(this.selectElement(a))},this.selectElement=function(b){return a("#"+b)}},q=function(b,c,d){this.create=function(a,c){var e=b.parse(c),f=this.selectLines(a,e);return new h(f,d)},this.selectLines=function(b,d){var e=a();return a.each(d,function(d,f){e=e.add(a(c(b,f)))}),e}},r={profile:"pre",animationDurationInMs:500,closeButtonText:"Continue reading",scrollTargetSelector:"html,body",triggerEventName:"click",listingIdAttribute:"listing",lineNumbersAttribute:"lines",noteContentsAttribute:"note",styleId:"code-callout-styles",noteOffsetFromLine:.25,exposeForTest:!1,shouldWrapLines:!1,wrapperLineClass:"code-callout-line",wrapperNumberClassPrefix:"number-",lineWrapContainerSubselector:"",getLineSelector:function(a,b){throw"Not Implemented"},filterLinesToWrap:function(a){return a},beforeLinesOfCodeAreWrapped:function(a,b){return b}},s={".code-callout-line":{width:"100%"},".code-callout-highlighted":{"background-color":"#FFA !important"},"#code-callout-note":{position:"absolute","background-color":"#EAEAEA",background:"linear-gradient(to bottom, white 0%, #EAEAEA 100%)","border-radius":"1em",padding:"1em 1em 3em 1em",border:"1px solid #CCC","box-shadow":"0.25em 0.25em 0.5em #CCC","min-width":"7em"},"#code-callout-note button.close":{"font-size":"75%",position:"absolute",bottom:"0",right:"0",margin:"1.5em","white-space":"nowrap"}},t={gist:{options:{getLineSelector:function(a,b){return"#"+a+" td[data-line-number='"+b+"'] + td"}}},"jquery-syntax":{options:{getLineSelector:function(a,b){return"#"+a+" .ln"+b}}},pre:{options:{shouldWrapLines:!0,getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b}}},prism:{options:{shouldWrapLines:!0,lineWrapContainerSubselector:"code",getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b},filterLinesToWrap:function(a){return a.not(".line-numbers-rows")},beforeLinesOfCodeAreWrapped:function(a,b){return b.append(a.find(".line-numbers-rows"))}}},syntaxhighlighter:{options:{getLineSelector:function(a,b){return"#"+a+" .line.number"+b}},styles:{"div.syntaxhighlighter div.container div.code-callout-highlighted":{"background-color":s[".code-callout-highlighted"]["background-color"]+" !important"}}}}}(jQuery,window);
</script>
<script>
$(".callout-trigger").codeCallout({ profile: "gist" });
</script></div></div>Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-36270399307833267862020-11-10T12:20:00.015+01:002020-12-02T11:00:40.331+01:00Tricky use cases of Apache Beam 1/3: incremental join<div>
<span style="font-size: xx-small;">π₯ 7 min.</span>
<br />
<span style="font-size: xx-small;"><br /></span><h2 style="text-align: left;">Introduction</h2></div><div><br /></div><div>This is the first article of a serie of blog posts about tricky use cases of <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> that enlight some of the advanced possibilities of the Beam SDK.</div><div><br /></div><h2 style="text-align: left;">Incremental join example</h2><div><br /></div><div>Let's say we have a streaming system that records events of an auction platform. Such events can be <b>Persons</b> creating <b>Auctions</b> and placing bids on auctions.</div><div><br /></div><div>Consider, you have 2 <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/values/PCollection.html" target="_blank">PCollections</a> as input:</div><div><ul style="text-align: left;"><li>One of the Persons details that are collected at their connection into the auction system</li><li>One of the Auctions details that are are collected at the auction creation</li></ul></div><div>Now, you want regular updates on who is selling in particular US states.</div><div>So, this problem is a typical incremental join of these 2 PCollections. Such a join in Beam is done through the <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html" target="_blank">CoGroupByKey</a> transform of the SDK. This transform works as illustrated below:</div><div><br /></div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhl8p-D8fUjharvCMdQnSYv3gkjgnP_wDLI4qQ1TWnBrCqaLXN-jwl1oP8V8YJALljdl-Q8NpztmIq0IeLlvi_KFs0meRWaAD6fU3cmJw3dt6HifVG-Ef75VMr8ZoxjaHnNO0LjYc_-O0A/s800/cogroupByKey+updated+for+blog+%25281%2529.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="527" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhl8p-D8fUjharvCMdQnSYv3gkjgnP_wDLI4qQ1TWnBrCqaLXN-jwl1oP8V8YJALljdl-Q8NpztmIq0IeLlvi_KFs0meRWaAD6fU3cmJw3dt6HifVG-Ef75VMr8ZoxjaHnNO0LjYc_-O0A/s16000/cogroupByKey+updated+for+blog+%25281%2529.png" /></a></div><br /><div class="separator" style="clear: both; text-align: center;"><br /></div><div>This is similar to a SQL Join. The 2 PCollections are keyed by the same key: <b>personId</b> is the unique key of the Person object and <b>sellerId </b>is the unique key of the Seller object. Seller and Person are just two different visions of the same entity depending on either they were created duding a connection event or during an auction creation event. So sellerId and personId are the same, we join by this key.</div><div><br /></div><div>First step is to group the 2 PCollections in a <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.html" target="_blank">KeyedPCollectionTuple</a> before joining. For that we need to <b>tag</b> the elements of the PCollections with a <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/values/TupleTag.html" target="_blank">TupleTag</a> (basically a String id) to differentiate them in the output result. In the diagram above, the Person tag is in green and the Auction tag is in blue.</div><div>Then the actual <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html" target="_blank">CoGroupByKey</a> transform is applied to end up with an output PCollection that contains for each key a list of elements (either Person or Auction) that have this key in common.</div><div><br /></div><div><br /></div><h2 style="text-align: left;">Stateful processing</h2><div><br /></div><div>Hey, wait ! I said that this was a streaming system, that means that there will be out of order data. How to deal with that during the join ? Well, the answer is with stateful processing: the idea here is to <b>store the elements in a persistent state </b>waiting for the corresponding element in the other PCollection. Such a stateful processing API is available in the Beam SDK with the <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/ParDo.html" target="_blank">ParDo transform</a> but it is out of the scope of this article to present it in details. There are good blog articles and talks about that already, so I will just point to <a href="https://beam.apache.org/blog/stateful-processing/" target="_blank">the official Beam stateful processing documentation</a>.</div><div><br /></div><div>So the architecture of the stateful join in our example will be:</div><div><ul style="text-align: left;"><li>The Person element will be stored in a persistent state in order to match future auctions by that person. We also <b>set a timer to clear the person state after a TTL </b>(see <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/state/Timer.html" target="_blank">Timer API</a> in the SDK)</li><li>The Auction elements will be stored in a persistent state until we have seen the corresponding Person element. Then, the stored auctions can be output and the state can be cleared </li></ul><h2 style="text-align: left;"><br /></h2><h2 style="text-align: left;">The code of the incremental join</h2></div><div><br /></div>
<div id="incremental_join"><script src="https://gist.github.com/echauchot/a3a12bb3ec52dff6c8a3953711983199.js"></script> </div>
I think it deserves some explanations: <div><br />We receive as an input the PCollection of events containing all events of the auction system. Beam provides several windowing capabilities but here we chose to work in processing time, we do not window the elements into time windows.
We rather
<a class="callout-trigger" data-lines="4-5" data-listing="incremental_join" data-note="Window the events PCollection by the Global window to have all the elements in a window (there is always a window in Beam for unification of batch and processing APIs)">apply the <b>Global window</b></a>
to the elements and
<a class="callout-trigger" data-lines="1,6-8" data-listing="incremental_join" data-note="set the tigger to output each time 30 elements arrived and set allowed lateness to 0 as we work in processing time">set a trigger</a>
to output the results each time n elements were processed.
<div><br /></div>We then
<a class="callout-trigger" data-lines="12" data-listing="incremental_join" data-note="Extraction is done by applying a simple Filter transform to the PCollection">extract the auctions out of the stream of events</a>
and
<a class="callout-trigger" data-lines="15" data-listing="incremental_join" data-note="key the Auctions by their sellerId. AUCTION_BY_SELLER =
ParDo.of(
new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().seller, c.element()));
}
});">key the auctions by there sellerId</a>
. We then do the same for the Persons: extract them out of the events and key them by their personId (which is the same as the sellerId remember).
<div><br /></div>
Now comes the actual join: we first <a class="callout-trigger" data-lines="27-28" data-listing="incremental_join" data-note="Create the KeyedPCollectionTuple with the tags">group the 2 PCollections</a> in a KeyedPCollectionTuple as said above. We then <a class="callout-trigger" data-lines="30" data-listing="incremental_join" data-note="Apply the CoGroupByKey transform to the KeyedPCollectionTuple">apply the actual CoGroupByKey transform</a> to the KeyedPCollectionTuple.</div><div><div><br /></div>
Now comes the stateful processing: as explained above, we need to store elements in a persistent state to deal with out of order data. This is done by <a class="callout-trigger" data-lines="31" data-listing="incremental_join" data-note="applying a Pardo with a stateful DoFn to deal with out of order data">applying a Pardo</a> to the output joint PCollection. The topic of this article is not stateful processing so, if you want some details, the complete code of JoinDoFn is <a href="https://github.com/apache/beam/blob/0ab52bb2feba6a4bdae4ec77e14a3a213eceeaa5/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java#L158" target="_blank">on Beam github</a>. I will just say here that <b>JoinDoFn</b> does the following:
<div><br /></div><div><ul style="text-align: left;"><li>Defines 2 <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/state/ValueState.html" target="_blank">ValueState</a> to store the Persons and Auctions</li><li>Defines 1 <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/state/Timer.html" target="_blank">Timer</a> that fires after a configured TTL to clear the Person ValueState</li><li>For each key, iterates over the elements in the output of the join and matches persons to auctions by writing and reading from/to the ValueStates. </li></ul></div>
<div><br /></div>
The final step is to export the result by <a class="callout-trigger" data-lines="34-45" data-listing="incremental_join" data-note="Apply a ParDo to create NameCityStateId output objects">applying a ParDo</a> that creates the output POJOs that gather persons information and auctions information.<div><br /></div><h2 style="text-align: left;">Conclusion</h2><div><br /></div><div>I believe that it is interesting to show the join in a complete use case example. Thus, this code has a lot of boilerplate that is not needed for the actual incremental join and that comes from the use case of the auction system itself:</div><div><ul style="text-align: left;"><li>The extraction of auctions out of events</li><li>The extraction of persons out of events</li><li>The keying of the auctions PCollection</li><li>The keying of the persons PCollection</li><li>The apply of the stateful JoinDoFn</li><li>The export to the POJO output </li></ul><div>The actual join resides only in the creation of the <b>KeyedPCollectionTuple</b> and the apply of the <b>CoGroupByKey</b>. The incremental part resides in the apply of <b>windowing</b>: in the use case we chose to trigger the output with each new element but we could have chose a simpler trigger by simply applying <a href="https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/windowing/FixedWindows.html" target="_blank">FixedWindows</a> to the input PCollection.</div></div><div><br /></div><h2 style="text-align: left;"><b><span style="font-size: large;">That's it, this is how you can create an incremental join pipeline with Apache Beam</span></b></h2><div><script>
$(".callout-trigger").codeCallout({ profile: "gist" });
</script>
<script>
/*
Copyright (c) 2013-2020 Dave Leeds and Contributors
License: https://raw.github.com/djleeds/code-callout/master/LICENSE.md
*/
!function(a,b,c){"use strict";a.fn.codeCallout=function(c){function u(a){var b=new o("code-callout-note","content",a.closeButtonText,"close"),c=new p,d=new q(new k,a.getLineSelector,"code-callout-highlighted"),e=new i(a.scrollTargetSelector,a.animationDurationInMs);return new n(b,c,d,e,a.noteOffsetFromLine)}function v(){var b=0;return a(x.scrollTargetSelector).each(function(c,d){var e=a(d).scrollTop();b=e>b?e:b}),b}var w=new e,x=a.extend(r,c),y=s;if(x.profile in t==!1)throw"Unrecognized profile "+x.profile+" was requested.";var z=t[x.profile];"options"in z&&(x=a.extend(x,z.options)),"styles"in z&&(y=a.extend(y,z.styles));var A=new j(x.styleId,y),B=new l(x.listingIdAttribute,x.lineNumbersAttribute,x.noteContentsAttribute),C=u(x);return A.initialize(),a(this).attr("href","#").on(x.triggerEventName,function(b){var c=B.read(this);if(console.log("wrapping",c.listingId,x),x.shouldWrapLines){var d=new m(a("#"+c.listingId),x.wrapperLineClass,x.wrapperNumberClassPrefix,x.filterLinesToWrap,x.beforeLinesOfCodeAreWrapped);d.wrapListing()}b.preventDefault(),w.deactivate(),w=C.create(c,v()),w.activate()}),x.exposeForTest&&(b.__codeCallout={Callout:d,NullCallout:e,Note:f,Listing:g,LineSet:h,Scroller:i,Style:j,LineNumberParser:k,TriggerReader:l,LineWrapper:m,CalloutFactory:n,NoteFactory:o,ListingFactory:p,LineSetFactory:q,options:x}),this};var d=function(a,b,c,d,e,f){var g=this;a.onClose(function(){g.deactivate()}),this.activate=function(){a.activate(),c.highlight(),d.to(g.position())},this.deactivate=function(){a.deactivate(),c.unhighlight(),d.to(e)},this.center=function(){return(c.top()+a.bottom())/2},this.positionCalculator={windowHeight:f.innerHeight,scrollTop:function(){return this.scrollCenter()-this.windowHeight/2},overhang:function(){return Math.max(a.bottom()-b.bottom(),0)},leash:function(){return Math.max(b.height()-this.windowHeight,0)/2},distance:function(){return g.center()-b.center()},clampedDistance:function(){var a=this.leash();return Math.max(Math.min(this.distance(),a+this.overhang()),-a)},scrollCenter:function(){return b.center()+this.clampedDistance()}},this.position=function(){return this.positionCalculator.scrollTop()}},e=function(){this.deactivate=function(){}},f=function(b,c,d){var e="resize.code-callout-note",f=function(){};this.activate=function(){this.attach(),this.positionAt(c()),this.hook(),this.show()},this.deactivate=function(){this.hide(),this.unhook(),this.detach()},this.attach=function(){b.appendTo(a("body"))},this.detach=function(){b.remove()},this.show=function(){b.show()},this.hide=function(){b.hide()},this.hook=function(){d.on(e,function(a){this.position(c())}),b.find("button.close").on("click",function(){f()})},this.unhook=function(){d.off(e)},this.bottom=function(){return b.offset().top+b.outerHeight()},this.positionAt=function(a){b.css(a)},this.onClose=function(a){f=a}},g=function(a){this.top=function(){return a.offset().top},this.bottom=function(){return this.top()+this.height()},this.height=function(){return a.outerHeight()},this.center=function(){return(this.top()+this.bottom())/2}},h=function(a,b){this.highlight=function(){a.addClass(b)},this.unhighlight=function(){a.removeClass(b)},this.top=function(){return a.first().offset().top},this.bottom=function(){return a.last().offset().top+this.lineHeight()},this.left=function(){return a.last().offset().left},this.lineHeight=function(){return a.last().outerHeight()}},i=function(b,c){this.to=function(a){this.selectElement().stop().animate({scrollTop:a},c)},this.selectElement=function(){return a(b)}},j=function(b,c){this.initialize=function(){this.hasBeenWritten()||this.write()},this.hasBeenWritten=function(){return a("#"+b).length>0},this.write=function(){a("<style />").attr("type","text/css").attr("id",b).text(this.build()).appendTo(a("body"))},this.build=function(){var a="";for(var b in c){a+=b+"{";for(var d in c[b])a+=d+":"+c[b][d]+";";a+="} "}return a}},k=function(){this.parse=function(a){if(!isNaN(a))return[Number(a)];for(var b=[],c=a.split(","),d=0;d<c.length;d++){var e=c[d];this.isLineRange(e)?b=b.concat(this.parseLineRange(e)):b.push(Number(e))}return b},this.isLineRange=function(a){return a.indexOf("-")>0},this.parseLineRange=function(a){for(var b=[],c=a.split("-"),d=c[0],e=c[1],f=d;e>=f;f++)b.push(Number(f));return b}},l=function(b,c,d){this.read=function(e){var f=a(e);return{listingId:f.data(b),lineNumbers:f.data(c),noteContents:f.data(d)}}},m=function(b,c,d,e,f){var g=this,h="\n";this.isAlreadyWrapped=function(){return b.find("."+c).length>0},this.wrapLine=function(b,e){var f=b+1;return a("<div />").addClass(c).addClass(d+f).html(e+"\n")},this.splitIntoLines=function(){return e(b).html().split(h)},this.replaceListing=function(a){b.html(a.children("*"))},this.wrapListing=function(){if(!this.isAlreadyWrapped()){var c=this.splitIntoLines(b),d=a("<div />");a.each(c,function(a,b){d.append(g.wrapLine(a,b))}),f(b,d),this.replaceListing(d)}}},n=function(a,c,e,f,g){this.create=function(g,h){var i=c.create(g.listingId),j=e.create(g.listingId,g.lineNumbers),k=this.createPositionAdvice(j),l=a.create(g.noteContents,k);return new d(l,i,j,f,h,b)},this.createPositionAdvice=function(a){return function(){return{top:a.bottom()+a.lineHeight()*g,left:a.left()}}}},o=function(c,d,e,g){this.renderContent=function(b){return a("<div />").addClass(d).text(b)},this.renderCloseButton=function(){return a("<button />").addClass(g).text(e)},this.render=function(b){var d=this.renderContent(b),f=this.renderCloseButton(e);return a("<div />").attr("id",c).append(d).append(f)},this.create=function(c,d){var g=this.render(c,e);return new f(g,d,a(b))}},p=function(){this.create=function(a){return new g(this.selectElement(a))},this.selectElement=function(b){return a("#"+b)}},q=function(b,c,d){this.create=function(a,c){var e=b.parse(c),f=this.selectLines(a,e);return new h(f,d)},this.selectLines=function(b,d){var e=a();return a.each(d,function(d,f){e=e.add(a(c(b,f)))}),e}},r={profile:"pre",animationDurationInMs:500,closeButtonText:"Continue reading",scrollTargetSelector:"html,body",triggerEventName:"click",listingIdAttribute:"listing",lineNumbersAttribute:"lines",noteContentsAttribute:"note",styleId:"code-callout-styles",noteOffsetFromLine:.25,exposeForTest:!1,shouldWrapLines:!1,wrapperLineClass:"code-callout-line",wrapperNumberClassPrefix:"number-",lineWrapContainerSubselector:"",getLineSelector:function(a,b){throw"Not Implemented"},filterLinesToWrap:function(a){return a},beforeLinesOfCodeAreWrapped:function(a,b){return b}},s={".code-callout-line":{width:"100%"},".code-callout-highlighted":{"background-color":"#FFA !important"},"#code-callout-note":{position:"absolute","background-color":"#EAEAEA",background:"linear-gradient(to bottom, white 0%, #EAEAEA 100%)","border-radius":"1em",padding:"1em 1em 3em 1em",border:"1px solid #CCC","box-shadow":"0.25em 0.25em 0.5em #CCC","min-width":"7em"},"#code-callout-note button.close":{"font-size":"75%",position:"absolute",bottom:"0",right:"0",margin:"1.5em","white-space":"nowrap"}},t={gist:{options:{getLineSelector:function(a,b){return"#"+a+" td[data-line-number='"+b+"'] + td"}}},"jquery-syntax":{options:{getLineSelector:function(a,b){return"#"+a+" .ln"+b}}},pre:{options:{shouldWrapLines:!0,getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b}}},prism:{options:{shouldWrapLines:!0,lineWrapContainerSubselector:"code",getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b},filterLinesToWrap:function(a){return a.not(".line-numbers-rows")},beforeLinesOfCodeAreWrapped:function(a,b){return b.append(a.find(".line-numbers-rows"))}}},syntaxhighlighter:{options:{getLineSelector:function(a,b){return"#"+a+" .line.number"+b}},styles:{"div.syntaxhighlighter div.container div.code-callout-highlighted":{"background-color":s[".code-callout-highlighted"]["background-color"]+" !important"}}}}}(jQuery,window);
</script>
</div></div>Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-62203287312281507392020-10-31T15:05:00.002+01:002020-11-26T10:28:15.957+01:00Watermark architecture proposal for Spark Structured Streaming framework<div>
<span style="font-size: xx-small;">π₯ 7 min.</span>
</div><div><br /></div><h2 style="text-align: left;">The multiple aggregations problem with Spark Structured Streaming framework</h2><div><br /></div><div>Developing the translation layer (called runner) from <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> to <a href="https://spark.apache.org/" target="_blank">Apache Spark</a> we faced an issue with the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Spark Structured Streaming framework</a>: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is <a href="https://issues.apache.org/jira/browse/SPARK-26655" target="_blank">an open ticket</a> in the Spark project, <a href="https://github.com/apache/spark/pull/23576">an ongoing design</a> and <a href="https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/" target="_blank">an ongoing PR</a>, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project.</div><div><br /></div><div><br /></div><h2 style="text-align: left;">The underlying problem: the global watermark in the Spark Structured Streaming framework</h2><div><br /></div><div>Before explaining this problem, let's explain what a watermark is</div><div><br /></div><h3 style="text-align: left;">What is a watermark?</h3><div><br /></div><div>In streaming systems, there is always lags between the time at which a given element was produced (the event timestamp) and the time at which it was received by the system (the processing timestamp) because there can be network outages or slow downs etc... There also can be out of order data. </div><div><br /></div><div>To deal with these two facts, streaming systems define the notion of <b>watermark</b>. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data.</div><div><br /></div><div>If we take an example of a very simple static naΓ―ve watermark defined as <b>an offset of 10 min</b>: </div><div><br /></div><div><i><span> </span>at 2:10 the system considers that no element with a timestamp older than 2:00 can arrive</i></div><div><i><br /></i></div><div>A corollary notion is <b>late data</b>. It is data that arrives behind the watermark. For example, with the previous watermark example, let's say that an element with timestamp of 1:59 arrives at 2:11. At 2:11 the watermark tells the system that no element with timestamp older that 2:01 (offset of 10 min) can arrive, so this element is behind the watermark, so it will be considered late by the system. As the consequence, the system should drop it. Still, we can define a tolerance that we call <b>allowed lateness </b>that could permit to process this late element. For this element not to be dropped, we would need to set an allowed lateness value of at least 2 min (the element arrives 12 min after it was produced - watermark of 10 min offset).</div><div><br /></div><h3 style="text-align: left;">The problem with the global watermark in an example</h3><div><br /></div><div>Back to the problem: Spark Structured Streaming framework does not support more than one aggregation in a streaming pipeline because <b>the watermark scope in Spark Structured Streaming framework is global for the whole pipeline</b>, there is no watermark per operation. At the end of each microbatch the global watermark is updated with the newest timestamp received. How is it a problem? Let's answer in an example:</div><div><br /></div><div>Consider we have a streaming pipeline with 2 aggregations (Op1 and Op2 in blue in the diagram below). The first aggregation outputs the highest valued element out of a window of 3 seconds.</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEheiDI8ji6Cs0ztJjUsnSskMJrUmAM8ZaSH7UibyukCe8Tso_rZh31c6tdX6qW4ycXQyuvQpAUQ3PddFu53i-jw08r5RZpb-yvxOaOqC33noy7Zo4-G65R9JjM0Due41H4IKWLm7YwWquo/s800/2+aggregation+spark+pipeline+current+%25281%2529.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="230" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEheiDI8ji6Cs0ztJjUsnSskMJrUmAM8ZaSH7UibyukCe8Tso_rZh31c6tdX6qW4ycXQyuvQpAUQ3PddFu53i-jw08r5RZpb-yvxOaOqC33noy7Zo4-G65R9JjM0Due41H4IKWLm7YwWquo/s16000/2+aggregation+spark+pipeline+current+%25281%2529.png" /></a></div><div><br /></div><div>Now let's say that the source sends 3 elements with value 6, 4 and 5 and timestamps 1, 2 and 3. These elements get buffered inside Op1 and Op1 updates the global watermark to value 3 (the newest timestamp seen). As the watermark reached the end of Op1's window (window of 3 seconds), then it is time for Op1 to output its data. The highest valued element is value 6 but <b>timestamp 1</b>. As a consequence this element will be considered by Op2 as <b>late</b> because Op2 relies on the global watermark which value is 3, so the element's timestamp of 1 is behind the global watermark. As a consequence, this element will be <b>dropped</b> by Op2 leading to incorrect results.</div><div><br /></div><div>So, to avoid that incorrect results could be produced by downstream operations, Spark Structured Streaming framework deactivates the support for more than one aggregation in streaming pipelines.</div><div><br /></div><h2 style="text-align: left;">A possible solution to this problem</h2><div><br /></div><h3 style="text-align: left;">A watermark per operation</h3><div><br /></div><div>A possible solution to this problem is to replace the global watermark by a <b>watermark per transform</b>.</div><div>And the watermarks values will be propagated <b>from the source</b> through the different operations of the pipeline. The local watermark values will be updated as new data arrives. For each operation we need to define:</div><div><div><ul style="text-align: left;"><li>an <b>InputWatermark</b> = Min {outputWatermark(previous operations)} <span style="font-size: x-small;">*</span></li></ul><ul style="text-align: left;"><li>an <b>OutputWatermark</b> = Min {inputWatermark, oldest(processed element)}</li></ul></div></div><h2 style="text-align: left;"><div style="font-weight: 400;"><i><span style="font-size: x-small;">* In a straight pipeline, the InputWatermark is simply the OutputWatermark of the previous operation</span></i></div><ul style="font-weight: 400;"></ul></h2><h2 style="text-align: left;"><br /></h2><h3 style="text-align: left;">Updated example</h3><div><br /></div><div>Let's update the previous example of the streaming pipeline with Op1 and Op2. Only now, <b>there is no more global watermark, we define a watermark per operation </b>in green in the diagram below:</div><div><b><br /></b></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj-gLTDD7-FuRE-mITZh8jRo_6eNoYcZ4RUsO0YsAEtWC04AjDG303YztVTKS6X5g6i8UyQ0sm3uyK7X6dVRc03Bwg42M12QRhfIMfdKStF-f79Auh02eu07gHlhLhaGvjiocB6RMYf8-E/s800/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-29.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="241" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj-gLTDD7-FuRE-mITZh8jRo_6eNoYcZ4RUsO0YsAEtWC04AjDG303YztVTKS6X5g6i8UyQ0sm3uyK7X6dVRc03Bwg42M12QRhfIMfdKStF-f79Auh02eu07gHlhLhaGvjiocB6RMYf8-E/s16000/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-29.png" /></a></div><div><br /></div> <div>As said above, the watermark is set by the source. Let's say that the source tells the system that no element older than timestamp 1 can arrive (source watermark = 1). This value is propagated to the input watermark and to the ouput watermark of Op1 according to the rules defined in the previous paragraph. The watermarks of Op2 are updated as well according to the same rules. We end up having the green values in the above diagram.</div><div><br /></div><div>Now let's say that we receive the very same 3 elements as before with values 6, 4 and 5 and timestamps 1, 2 and 3. </div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjKXfOryZMHXjThhSSgBDZ2ZUvLAVi0prvb9mKdvKIs5zL6XEzqbM9si75AGnpkHgzlAyGfHJjpkhSxi8idjraIQBivu95ODf3R3u2JYqPMFwwSxKtxbqc7R9rRcV0DpC4m9ES7KknfMcg/s800/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-33.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="253" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjKXfOryZMHXjThhSSgBDZ2ZUvLAVi0prvb9mKdvKIs5zL6XEzqbM9si75AGnpkHgzlAyGfHJjpkhSxi8idjraIQBivu95ODf3R3u2JYqPMFwwSxKtxbqc7R9rRcV0DpC4m9ES7KknfMcg/s16000/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-33.png" /></a></div><div><br /></div><div>They get buffered in Op1. Op1 updates its <i>oldest processed</i> counter (in red) to the minimum timestamp seen. And, as the output watermark is the minimum between this counter and the input watermark, it remains set to 1. And, as the ouput watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.</div><div><br /></div><div>Let's say now that the source updates its watermark to 3 (in red in the diagram below), saying that it should not receive older elements than timestamp 3.</div><br /><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjM55H-aJHkFI51n6cDi-KK34DyMQjdM6z1yu5yAL6ZjzlLI_Vy7jitfwdxwL0CHm4MFTySP9zobccTjwTx1_1xuSlKlYpEwHv_jJM5jZlrvZfUSfPEnfSjXTMMFpRcArE2Dk54RGlVOSE/s800/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-36.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="252" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjM55H-aJHkFI51n6cDi-KK34DyMQjdM6z1yu5yAL6ZjzlLI_Vy7jitfwdxwL0CHm4MFTySP9zobccTjwTx1_1xuSlKlYpEwHv_jJM5jZlrvZfUSfPEnfSjXTMMFpRcArE2Dk54RGlVOSE/s16000/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-36.png" /></a></div><div><br /></div><div>So the input watermark of Op1 is updated to the output watermark of the previous step (the source) but the output watermark of Op1 is not updated as it is the minimum between the oldest processed counter (which is 1) and the input watermark. As the output watermark of Op1 is not updated, the input and output watermarks of Op2 are not either.</div><div><br /></div><div>But something happens now in Op1: its input watermark is value 3, it has passed the end of the window, so it is time for Op1 to output its data and clear its buffers. So it outputs the highest element of value 6 and timestamp 1 as shown below.</div><div><br /></div><div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiY6yTedaHlmw9PyQ45trfgItxFOusIO3OE9cMSi6Ljcx7CBSm-qaI13jj8sZEQB3HZ3x1OL13oLjogzkaQrSiaozb6x1qx45WJT3lZ8jqjI0EeVjxhDQUBgEhT4xFhWC0_Z_202mM09O0/s800/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-40.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="259" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiY6yTedaHlmw9PyQ45trfgItxFOusIO3OE9cMSi6Ljcx7CBSm-qaI13jj8sZEQB3HZ3x1OL13oLjogzkaQrSiaozb6x1qx45WJT3lZ8jqjI0EeVjxhDQUBgEhT4xFhWC0_Z_202mM09O0/s16000/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-40.png" /></a></div><div><br /></div>But now,<b> this element is no more dropped by Op2</b> because the input watermark of Op2 is still 1, Op2 no more considers the element late. It just processes it, buffers it and updates its oldest processed counter.<br /><div><br /></div><div>Now, one final thing you may wonder, is when and how does Op2 updates its watermarks?</div><div><br /></div><div>Well, the answer is with the arrival of new data: let's say, we receive a new element from the source with value 7 and timestamp 4 as show below:</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjaY7IehnXXWJGWEH5Yp20qEeveKhPFDvQ81mtQ1W1hUG2pV-o3L6T08rhrhXjW5JX9MrGS-Msv5wSUQ2VfHzjqgcyOt2O8QCW6MIJNnhnhxKMS9OGfrOYgunXcG7zRoTek1vNknpctlCc/s800/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-44.png" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="253" data-original-width="800" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjaY7IehnXXWJGWEH5Yp20qEeveKhPFDvQ81mtQ1W1hUG2pV-o3L6T08rhrhXjW5JX9MrGS-Msv5wSUQ2VfHzjqgcyOt2O8QCW6MIJNnhnhxKMS9OGfrOYgunXcG7zRoTek1vNknpctlCc/s16000/Capture+d%25E2%2580%2599%25C3%25A9cran+de+2020-11-03+10-53-44.png" /></a></div><br /><div>The processing of this element by Op1 will update the oldest processed counter to the timestamp of the element (4) and the output watermark of Op1 will be updated as the minimum between the input watermak and the oldest processed counter is now 3. As the consequence of the update of the output watermark of Op1, the watermarks of downstream Op2 will be updated as shown above. This is how watermarks values are updated with the arrival of elements.</div><div><br /></div><div>There is something left to be discussed: the late case that we will see in the next paragraph.</div><div><br /></div><h3 style="text-align: left;">What happens for late data in this architecture?</h3><div><br /></div><div>A given element is late for a given operation when its timestamp is inferior to the InputWatermark of the operation. As the OutputWatermark of this operation is defined as <i>Min {inputWatermark, oldest(processed element)}, </i>then the OutputWatermark of the operation is set the timestamp of the element. So, for the next operation, the InputWatermark will be set to the OuputWatermark of the previous operation, and thus will be set to the timestamp of the element. So we can see that the watermarks of all downstream operations will be set to the value of this element's timestamp. So, to sum it up: <b>a late element will delay downstream operations.</b></div><div><b><br /></b></div><h2 style="text-align: left;"><b>Conclusion</b></h2><div><b><br /></b></div><div>This watermark architecture was proposed to the Apache Spark project.</div><div class="separator" style="clear: both; text-align: center;"><br /></div><br /></div>Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-76925475259038295402020-07-08T13:57:00.088+02:002020-11-23T11:04:38.977+01:00Export metrics from Apache Beam pipelines<span style="font-size: xx-small;">π₯ 12 min.</span>
<div><br /></div>
<div>
This blog post is about part of <a href="https://echauchot.blogspot.com/p/talks.html#apachecon2018" target="_blank">this talk</a> that I gave at the ApacheCon 2018 about universal metrics in
<a href="https://beam.apache.org/" target="_blank">Apache Beam</a>. More
precisely, it deals with metrics export: how Beam metrics are exported to the
outside world in a running Beam pipeline.
</div>
<div><br /></div>
<h2 style="text-align: left;">What are Beam metrics?</h2>
<div><br /></div>
<div>
The content below is also available in the <a href="https://beam.apache.org/documentation/programming-guide/#metrics" target="_blank">Beam website</a> but is recalled here for the completeness of the article
</div>
<div><br /></div>
<div>
<div>
In the Beam model, metrics provide some insight into the current state of a
user pipeline, potentially while the pipeline is running. For example, this
allows the user to:
</div>
<div>
<ul style="text-align: left;">
<li>
Check the number of errors encountered while running a specific step in
the pipeline
</li>
<li>Monitor the number of calls made to an external service</li>
<li>
Count the number of elements that have been processed
</li>
<li>β¦and so on.</li>
</ul>
</div>
</div>
<div>
<div>Metrics are: </div>
<div>
<ul style="text-align: left;">
<li>
<b>Named:</b> Each metric has a name which consists of a namespace
and an actual name. The namespace can be used to differentiate between
multiple metrics with the same name and also allows querying for all
metrics within a specific namespace.
</li>
<li>
<b>Scoped:</b> Each metric is reported against a specific step in the
pipeline (i.e. a specific transform in the pipeline), indicating what
code was running when the metric was declared. This allows reporting the
same metric name in multiple places and identifying the value each
transform reported, as well as aggregating the metric across the entire
pipeline.
</li>
<li>
<b>Dynamically Created:</b> Metrics may be created during runtime
without pre-declaring them, in much the same way a logger could be
created. This makes it easier to produce metrics in utility code and
have them usefully reported.
</li>
<li>
<b>Degrade Gracefully:</b> If a runner doesnβt support some part of
reporting metrics, the fallback behavior is to drop the metric updates
rather than failing the pipeline. If a runner doesnβt support some part
of querying metrics, the runner will not return the associated data.
</li>
<li>
<b>Attempted/committed metrics: </b>attempted metrics include retrials
whereas committed metrics do not: if a bundle (part of Beam PCollection)
is retried a counter committed value will not be incremented. If you
want some details on retrial and metrics please take a look at slide
18/19 of <a href="https://echauchot.blogspot.com/p/talks.html#apachecon2018" target="_blank">the related talk</a>.
</li>
</ul>
</div>
<div>
<blockquote>
Note: It is runner-dependent whether metrics are accessible during
pipeline execution or only after jobs have completed.
</blockquote>
<div> </div>
</div>
<h2 style="text-align: left;">Types of metrics</h2>
</div>
<div><br /></div>
<div>
<div>
There are three types of metrics that are supported for the moment:
<b>Counter</b>, <b>Distribution</b> and <b>Gauge</b>.
</div>
<div><br /></div>
</div>
<h3 style="text-align: left;">Counter</h3>
<div><br /></div>
<div>
Counter is a metric that reports a single long value and can be incremented or
decremented.
</div>
<div><br /></div>
<script src="https://gist.github.com/echauchot/7be5327d3630680110a3039bb17d4fc5.js"></script>
<div><br /></div>
<h3 style="text-align: left;">Distribution</h3>
<div><br /></div>
<div>
Distribution is a metric that reports information about the distribution of
reported values (min value, max value, mean value, sum of values and count of
values).
</div>
<div><br /></div>
<script src="https://gist.github.com/echauchot/b126c6f0502f896b785f2b1ed100e43e.js"></script>
<div><br /></div>
<h3 style="text-align: left;">Gauge</h3>
<div><br /></div>
<div>
Gauge is a metric that reports the latest value out of reported values.
Since metrics are collected from many workers the value may not be the
absolute last, but one of the latest values.
</div>
<div><br /></div>
<script src="https://gist.github.com/echauchot/ed29143618c6deaccd57f183ff2ab379.js"></script>
<div><br /></div>
<h2 style="text-align: left;">How does a pipeline author use them in Java?</h2>
<div>
<h2 style="text-align: left;"><br /></h2>
<h3 style="text-align: left;">How to query a metric?</h3>
<div><br /></div>
<div>
Users query metrics using <b>PipelineResult</b> which is Beam object to
interact with a pipeline. PipelineResult has a method <b>metrics()</b> which
returns a <b>MetricResults</b> object that allows accessing metrics. The
main method available in MetricResults allows querying for all metrics
matching a given filter.
</div>
<div><br /></div>
<div>
<script src="https://gist.github.com/echauchot/97e37a51100f8050161a1ff6b3af45a7.js"></script>
</div>
<div><br /></div>
<h3 style="text-align: left;">Complete example</h3>
<div><br /></div>
<div>
Below, there is a simple example of how to use a <b>Counter</b> metric in a
user pipeline.
</div>
<div><br /></div>
</div>
<script src="https://gist.github.com/echauchot/7ae3625a1f6daa780faaf47ac371e51d.js"></script>
<h1 style="text-align: left;"><br /></h1>
<h2 style="text-align: left;">Metrics export: MetricsPusher</h2>
<div><br /></div>
<div>
Here comes the core of the subject: the <b>MetricsPusher</b> feature in Beam !
</div>
<h2 style="text-align: left;"><br /></h2>
<h3 style="text-align: left;">Motivation</h3>
<div><br /></div>
<div>
<div>
Metrics Pusher came to life in the Beam project after these observations:
</div>
</div>
<div>
<div>
<ul style="text-align: left;">
<li>
Not all execution engines (Spark, Flink etc...) ship a way to
push the metrics to external sinks, they usually rely on their own
monitoring UI. Even though the execution engine UI is updated with the
Beam metrics, there are some use cases that require to export the
metrics values: What if you need to have the metrics in your own
application UI ? Or if you need to have the metrics in a metrics backend
for reporting ?
</li>
<li>Need for consistency:</li>
<ul>
<li>
There is no common set of monitoring backend support among execution
engines
</li>
<li>
There is a difference of availability moment when the pipeline runs:
some runners make the metrics available through PipelineResult during
the pipeline execution and others only at the end of the pipeline
execution
</li>
<li>
Beam needs to have a common metrics flow no matter the runners for
pipelines to be portable
</li>
</ul>
</ul>
</div>
</div>
<h2 style="text-align: left;"><br /></h2>
<h3 style="text-align: left;">Architecture</h3>
<div><br /></div>
<div>
<h4 style="text-align: left;">Design principles</h4>
<div><br /></div>
<div>Metrics pusher was designed based on these principles:</div>
<div>
<ul style="text-align: left;">
<li>No client polling (e.g. JMX) because:</li>
<ul>
<li>
infrastructure changes (cluster managers, ...) and must not be known
of the users
</li>
<li>
such pulled metrics would be non-aggregated metrics, that users would
need to aggregate
</li>
<li>
such pull mechanism raises timing issues: for example if a small batch
finishes before the JMX client has had time to pull the metrics, there
will be no metric to show.
</li>
</ul>
<li>
Metrics are pushed from the runner and not pulled from the sdk because:
</li>
<ul>
<li>
runners needs to decide when to push in particular because to support
Beam committed metrics it needs to push only when a Beam bundle is
committed.
</li>
<li>
runner system metrics are also defined in Beam (but not yet supported
in MetricsPusher) and there again, only the runner knows its internal
system.
</li>
</ul>
<li>
Push aggregated (across parallel workers) metrics periodically rather
than push event based metrics because:
</li>
<ul>
<li>
aggregated metrics avoid the need for metrics backend to merge
metrics.
</li>
<li>runners know how to merge metrics across their workers.</li>
</ul>
<li>
We chose that Beam manages the metrics sinks IOs mainly for coherence
because, as said above, execution engines have different support of the
metrics backend.
</li>
</ul>
<h3 style="text-align: left;"><br /></h3>
<h4 style="text-align: left;">MetricsPusher architecture</h4>
</div>
</div>
<div><br /></div>
<div>
The MetricsPusher service lives as a thread inside the runner that regularly
requests for aggregated metrics and then pushes them through a MetricsSink IO
to a metrics backend.
</div>
<div><br /></div>
<div>
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjX8kt7Zym3pwnsMVXxqxwrVRk3u3jrbaxW1h1rgoFwJwb-0KdmE6wgRWqxDdnkYPWEJWNrwyvPWXzIiQBvkcw4k4Fb0VVaHQzVzBM7e5bKgYshWzktU3Ek6PWZrPr3X4XuC-8rsZNmYVY/s612/simplified+metrics+pusher.png" style="margin-left: 1em; margin-right: 1em; text-align: center;"><img border="0" data-original-height="383" data-original-width="612" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjX8kt7Zym3pwnsMVXxqxwrVRk3u3jrbaxW1h1rgoFwJwb-0KdmE6wgRWqxDdnkYPWEJWNrwyvPWXzIiQBvkcw4k4Fb0VVaHQzVzBM7e5bKgYshWzktU3Ek6PWZrPr3X4XuC-8rsZNmYVY/w640-h400/simplified+metrics+pusher.png" width="640" /></a>
</div>
<div><br /></div>
<div>
The architecture diagram above focuses on metrics export, if you want more
details on the <b>internal runner metric system</b> part,
please take a look at part 4 of <a href="https://echauchot.blogspot.com/p/talks.html#apachecon2018" target="_blank">the related talk</a>; for the purpose of this article let's just say that it is provided as part
of the <b>runner-core</b> Beam library so that different runners
(Spark, Flink, Dataflow, ...) share the same core metrics architecture.
</div>
<div><br /></div>
<div><br /></div>
<h3 style="text-align: left;">How to use Metrics pusher ?</h3>
<div><br /></div>
<div>
There is nothing to code in the user pipeline to use the MetricsPusher
feature. The only thing to do is to configure a <b>MetricsSink</b>. If a
metrics sink is set up in the configuration, the runner will push metrics to
it at a default 5s period.
</div>
<div>
The configuration is held in the
<a href="https://github.com/apache/beam/blob/c7b045010e9dff9fa0f5f603716c3a98de3e828b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsOptions.java#L30" target="_blank">MetricsOptions</a>
class. It contains push period configuration and also sink specific options
such as type and URL.
</div>
<div><br /></div>
As for now, there is only <b>Flink</b> and <b>Spark</b> runners that are
supported and the metrics sinks available are:
<ul>
<li>
<b>MetricsHttpSink: </b>a REST client that sends json serialized
metrics to a REST endpoint using an HTTP POST request. Below is an example
of the body of the request sending attempted metrics values of a pipeline
that defines a metric of each type:
<div><br /></div>
<script src="https://gist.github.com/echauchot/39c7f5b3bea575158571fa9535931f09.js"></script>
</li>
<li>
<b>MetricsGraphiteSink: </b>a sink that pushes metrics to Graphite as
Graphite messages. Each metric value is an entry with a timestamp in a
Graphite message. Below is an example of the graphite message payload
sending attempted metrics values of a pipeline that defines a
<a class="callout-trigger" data-lines="1" data-listing="http_and_graphite_metrics" data-note="A counter metric named n1 in namespace ns1 for step s1 with an attempted value of 20 and pushed at 07/07/20 2:32:37 pm (timestamp 1594132357) ">counter</a> metric and a
<a class="callout-trigger" data-lines="3-7" data-listing="http_and_graphite_metrics" data-note="A distribution metric named n2 in namespace ns1 for step s2 attempted values pushed at 07/07/20 2:32:37 pm (timestamp 1594132357) ">distribution</a>
metric.
<div><br /></div>
<div id="http_and_graphite_metrics">
<script src="https://gist.github.com/echauchot/9f91214e8490c7118a09a573010684ae.js"></script>
</div>
</li>
</ul>
<h3 style="text-align: left;">
Now you know about Beam metrics and their export !
</h3>
<script>
/*
Copyright (c) 2013-2020 Dave Leeds and Contributors
License: https://raw.github.com/djleeds/code-callout/master/LICENSE.md
*/
!function(a,b,c){"use strict";a.fn.codeCallout=function(c){function u(a){var b=new o("code-callout-note","content",a.closeButtonText,"close"),c=new p,d=new q(new k,a.getLineSelector,"code-callout-highlighted"),e=new i(a.scrollTargetSelector,a.animationDurationInMs);return new n(b,c,d,e,a.noteOffsetFromLine)}function v(){var b=0;return a(x.scrollTargetSelector).each(function(c,d){var e=a(d).scrollTop();b=e>b?e:b}),b}var w=new e,x=a.extend(r,c),y=s;if(x.profile in t==!1)throw"Unrecognized profile "+x.profile+" was requested.";var z=t[x.profile];"options"in z&&(x=a.extend(x,z.options)),"styles"in z&&(y=a.extend(y,z.styles));var A=new j(x.styleId,y),B=new l(x.listingIdAttribute,x.lineNumbersAttribute,x.noteContentsAttribute),C=u(x);return A.initialize(),a(this).attr("href","#").on(x.triggerEventName,function(b){var c=B.read(this);if(console.log("wrapping",c.listingId,x),x.shouldWrapLines){var d=new m(a("#"+c.listingId),x.wrapperLineClass,x.wrapperNumberClassPrefix,x.filterLinesToWrap,x.beforeLinesOfCodeAreWrapped);d.wrapListing()}b.preventDefault(),w.deactivate(),w=C.create(c,v()),w.activate()}),x.exposeForTest&&(b.__codeCallout={Callout:d,NullCallout:e,Note:f,Listing:g,LineSet:h,Scroller:i,Style:j,LineNumberParser:k,TriggerReader:l,LineWrapper:m,CalloutFactory:n,NoteFactory:o,ListingFactory:p,LineSetFactory:q,options:x}),this};var d=function(a,b,c,d,e,f){var g=this;a.onClose(function(){g.deactivate()}),this.activate=function(){a.activate(),c.highlight(),d.to(g.position())},this.deactivate=function(){a.deactivate(),c.unhighlight(),d.to(e)},this.center=function(){return(c.top()+a.bottom())/2},this.positionCalculator={windowHeight:f.innerHeight,scrollTop:function(){return this.scrollCenter()-this.windowHeight/2},overhang:function(){return Math.max(a.bottom()-b.bottom(),0)},leash:function(){return Math.max(b.height()-this.windowHeight,0)/2},distance:function(){return g.center()-b.center()},clampedDistance:function(){var a=this.leash();return Math.max(Math.min(this.distance(),a+this.overhang()),-a)},scrollCenter:function(){return b.center()+this.clampedDistance()}},this.position=function(){return this.positionCalculator.scrollTop()}},e=function(){this.deactivate=function(){}},f=function(b,c,d){var e="resize.code-callout-note",f=function(){};this.activate=function(){this.attach(),this.positionAt(c()),this.hook(),this.show()},this.deactivate=function(){this.hide(),this.unhook(),this.detach()},this.attach=function(){b.appendTo(a("body"))},this.detach=function(){b.remove()},this.show=function(){b.show()},this.hide=function(){b.hide()},this.hook=function(){d.on(e,function(a){this.position(c())}),b.find("button.close").on("click",function(){f()})},this.unhook=function(){d.off(e)},this.bottom=function(){return b.offset().top+b.outerHeight()},this.positionAt=function(a){b.css(a)},this.onClose=function(a){f=a}},g=function(a){this.top=function(){return a.offset().top},this.bottom=function(){return this.top()+this.height()},this.height=function(){return a.outerHeight()},this.center=function(){return(this.top()+this.bottom())/2}},h=function(a,b){this.highlight=function(){a.addClass(b)},this.unhighlight=function(){a.removeClass(b)},this.top=function(){return a.first().offset().top},this.bottom=function(){return a.last().offset().top+this.lineHeight()},this.left=function(){return a.last().offset().left},this.lineHeight=function(){return a.last().outerHeight()}},i=function(b,c){this.to=function(a){this.selectElement().stop().animate({scrollTop:a},c)},this.selectElement=function(){return a(b)}},j=function(b,c){this.initialize=function(){this.hasBeenWritten()||this.write()},this.hasBeenWritten=function(){return a("#"+b).length>0},this.write=function(){a("<style />").attr("type","text/css").attr("id",b).text(this.build()).appendTo(a("body"))},this.build=function(){var a="";for(var b in c){a+=b+"{";for(var d in c[b])a+=d+":"+c[b][d]+";";a+="} "}return a}},k=function(){this.parse=function(a){if(!isNaN(a))return[Number(a)];for(var b=[],c=a.split(","),d=0;d<c.length;d++){var e=c[d];this.isLineRange(e)?b=b.concat(this.parseLineRange(e)):b.push(Number(e))}return b},this.isLineRange=function(a){return a.indexOf("-")>0},this.parseLineRange=function(a){for(var b=[],c=a.split("-"),d=c[0],e=c[1],f=d;e>=f;f++)b.push(Number(f));return b}},l=function(b,c,d){this.read=function(e){var f=a(e);return{listingId:f.data(b),lineNumbers:f.data(c),noteContents:f.data(d)}}},m=function(b,c,d,e,f){var g=this,h="\n";this.isAlreadyWrapped=function(){return b.find("."+c).length>0},this.wrapLine=function(b,e){var f=b+1;return a("<div />").addClass(c).addClass(d+f).html(e+"\n")},this.splitIntoLines=function(){return e(b).html().split(h)},this.replaceListing=function(a){b.html(a.children("*"))},this.wrapListing=function(){if(!this.isAlreadyWrapped()){var c=this.splitIntoLines(b),d=a("<div />");a.each(c,function(a,b){d.append(g.wrapLine(a,b))}),f(b,d),this.replaceListing(d)}}},n=function(a,c,e,f,g){this.create=function(g,h){var i=c.create(g.listingId),j=e.create(g.listingId,g.lineNumbers),k=this.createPositionAdvice(j),l=a.create(g.noteContents,k);return new d(l,i,j,f,h,b)},this.createPositionAdvice=function(a){return function(){return{top:a.bottom()+a.lineHeight()*g,left:a.left()}}}},o=function(c,d,e,g){this.renderContent=function(b){return a("<div />").addClass(d).text(b)},this.renderCloseButton=function(){return a("<button />").addClass(g).text(e)},this.render=function(b){var d=this.renderContent(b),f=this.renderCloseButton(e);return a("<div />").attr("id",c).append(d).append(f)},this.create=function(c,d){var g=this.render(c,e);return new f(g,d,a(b))}},p=function(){this.create=function(a){return new g(this.selectElement(a))},this.selectElement=function(b){return a("#"+b)}},q=function(b,c,d){this.create=function(a,c){var e=b.parse(c),f=this.selectLines(a,e);return new h(f,d)},this.selectLines=function(b,d){var e=a();return a.each(d,function(d,f){e=e.add(a(c(b,f)))}),e}},r={profile:"pre",animationDurationInMs:500,closeButtonText:"Continue reading",scrollTargetSelector:"html,body",triggerEventName:"click",listingIdAttribute:"listing",lineNumbersAttribute:"lines",noteContentsAttribute:"note",styleId:"code-callout-styles",noteOffsetFromLine:.25,exposeForTest:!1,shouldWrapLines:!1,wrapperLineClass:"code-callout-line",wrapperNumberClassPrefix:"number-",lineWrapContainerSubselector:"",getLineSelector:function(a,b){throw"Not Implemented"},filterLinesToWrap:function(a){return a},beforeLinesOfCodeAreWrapped:function(a,b){return b}},s={".code-callout-line":{width:"100%"},".code-callout-highlighted":{"background-color":"#FFA !important"},"#code-callout-note":{position:"absolute","background-color":"#EAEAEA",background:"linear-gradient(to bottom, white 0%, #EAEAEA 100%)","border-radius":"1em",padding:"1em 1em 3em 1em",border:"1px solid #CCC","box-shadow":"0.25em 0.25em 0.5em #CCC","min-width":"7em"},"#code-callout-note button.close":{"font-size":"75%",position:"absolute",bottom:"0",right:"0",margin:"1.5em","white-space":"nowrap"}},t={gist:{options:{getLineSelector:function(a,b){return"#"+a+" td[data-line-number='"+b+"'] + td"}}},"jquery-syntax":{options:{getLineSelector:function(a,b){return"#"+a+" .ln"+b}}},pre:{options:{shouldWrapLines:!0,getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b}}},prism:{options:{shouldWrapLines:!0,lineWrapContainerSubselector:"code",getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b},filterLinesToWrap:function(a){return a.not(".line-numbers-rows")},beforeLinesOfCodeAreWrapped:function(a,b){return b.append(a.find(".line-numbers-rows"))}}},syntaxhighlighter:{options:{getLineSelector:function(a,b){return"#"+a+" .line.number"+b}},styles:{"div.syntaxhighlighter div.container div.code-callout-highlighted":{"background-color":s[".code-callout-highlighted"]["background-color"]+" !important"}}}}}(jQuery,window);
</script>
<script>
$(".callout-trigger").codeCallout({ profile: "gist" });
</script>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-90254081359354170062020-06-12T11:24:00.004+02:002020-11-23T10:59:21.264+01:00Nexmark: benchmark and CI tool for Apache Beam<span style="font-size: xx-small;">π₯ 10 min.</span> <div><br /></div><div>This blog post is about the subject of <a href="https://echauchot.blogspot.com/p/talks.html#apachecon2017" target="_blank">this talk</a> I gave at the ApacheCon 2017. While the talk focuses on building Nexmark for <a href="https://beam.apache.org/" target="_blank">Apache Beam</a>, this article will focus on the use the Beam project has done of Nexmark since then. If you're interested in details about Nexmark, please refer to the talk.</div><div><br /></div><div><h2 style="text-align: left;">What is Nexmark ?</h2><div><br /></div><div>Nexmark is originally <a href="https://web.archive.org/web/20100620010601/http://datalab.cs.pdx.edu/niagaraST/NEXMark/" target="_blank">a research paper of 2004</a> about benchmarking queries on a streaming system. In the context of Beam, what we call Nexmark is its adaptation in the form of Beam pipelines that run in both batch and in streaming modes. </div><div><br /></div><div>Nexmark is a simulation of an auction system. The synthetic data injected in the system is about <i>Persons</i> placing <i>Auctions</i> on <i>Items</i> and <i>Persons</i> placing <i>Bids</i> on these <i>Auctions</i>. </div></div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjSZHPOOtJvhHoTRVDgWBJ1EbnXPywW-G_YIB9ZbC3RlSW4ITQoMLvQDp7GxN-uBWdM_So_8jKm91WFLkBKayKBG6bIMRtAfujsn_tdO4qOEUe863DJ6OLWfJwehAPA8MvxLEeLjf_ZIkk/s506/nexmark.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="324" data-original-width="506" height="410" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjSZHPOOtJvhHoTRVDgWBJ1EbnXPywW-G_YIB9ZbC3RlSW4ITQoMLvQDp7GxN-uBWdM_So_8jKm91WFLkBKayKBG6bIMRtAfujsn_tdO4qOEUe863DJ6OLWfJwehAPA8MvxLEeLjf_ZIkk/w640-h410/nexmark.png" width="640" /></a></div><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div style="font-size: medium;"><div style="font-size: medium;">Nexmark then interrogates the auction system for statistical data by running queries such as below.</div><div><br /></div><div style="text-align: left;">The queries are written as Beam pipelines and these pipelines are meant to cover 100% of the Beam public API (including Beam SQL) to benchmark and detect regressions on all of Beam.</div></div></font></div><div style="font-size: medium;"><br /></div><div style="font-size: medium;"><span id="docs-internal-guid-1b0dd028-7fff-3190-191f-094dd6c679d0"><font size="2"><table style="border-collapse: collapse; border: none;"><colgroup><col width="73px"></col><col width="519px"></col><col width="296px"></col></colgroup><tbody><tr style="height: 48px;"><td style="background-color: #e6b8af; border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; opacity: 1; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="background-color: transparent; font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Query</font></span></p></td><td style="background-color: #e6b8af; border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; opacity: 1; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="background-color: transparent; font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Description</font></span></p></td><td style="background-color: #e6b8af; border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; opacity: 1; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="background-color: transparent; font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Use of Beam API</font></span></p></td></tr><tr style="height: 50px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">3</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.38; margin-bottom: 16pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Who is selling in particular US states?</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Join, State, Timer</font></span></p></td></tr><tr style="height: 48px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">5</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Which auctions have seen the most bids in the last period?</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Sliding Window, Combiners</font></span></p></td></tr><tr style="height: 60px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">6</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">What is the average selling price per seller for their last 10 closed auctions?</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Global Window, Custom Combiner</font></span></p></td></tr><tr style="height: 42px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">7</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">What are the highest bids per period? </font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Fixed Windows, Side Input</font></span></p></td></tr><tr style="height: 40px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">9 *</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Winning bids</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Custom Window</font></span></p></td></tr><tr style="height: 56px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">11 *</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">How many bids did a user make in each session he was active?</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Session Window, Triggering</font></span></p></td></tr><tr style="height: 60px;"><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">12 *</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">How many bids does a user make within a fixed processing time limit?</font></span></p></td><td style="border-color: rgb(158, 158, 158); border-style: solid; border-width: 1px; padding: 10px; vertical-align: top;"><p dir="ltr" style="line-height: 1.2; margin-bottom: 0pt; margin-top: 0pt;"><span style="font-family: arial; font-size: 13pt; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia">Global Window, working in Processing Time</font></span></p></td></tr></tbody></table></font></span><div style="font-size: large;"><font size="2"><br /></font></div><div style="text-align: left;"><i><font size="2">* Not in the original Nexmark research paper.</font></i></div><div style="text-align: left;"><i><font size="2"><br /></font></i></div><div style="text-align: left;">These queries are particularly suited for benchmark because they are representative of what a real user could request on a real auction system. They are also quite complex and leverage all the capabilities of Beam <a href="https://echauchot.blogspot.com/2020/02/understand-apache-beam-runners-focus-on.html" target="_blank">runners</a>.</div><div style="text-align: left;"><br /></div><div style="text-align: left;">Throughout the execution of each pipeline, Nexmark gathers metrics (using Beam metrics system) such as the number of output results, event rate (input element generation rate) and obviously the query execution time.</div></div></font></div><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div style="font-size: medium;"><h1 style="text-align: left;"><br /></h1><h2 style="text-align: left;">What is the architecture of Nexmark?</h2></div><div style="font-size: medium;"><br /></div><div style="font-size: medium;"><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgRWBENq-oIf6AxbPu-tPYiR1ICmSH8FlzggWzoAKyP8zizool_7BKmnfWOxv9Z2D-FFSSK28g9jXLKNAasMdAeGbQ5j7LXu1IKS16isKsWUVG6UXlTGnIcYV6_d_L1iFx3UBtxF-AR0TE/s591/nexmark+components.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="400" data-original-width="591" height="434" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgRWBENq-oIf6AxbPu-tPYiR1ICmSH8FlzggWzoAKyP8zizool_7BKmnfWOxv9Z2D-FFSSK28g9jXLKNAasMdAeGbQ5j7LXu1IKS16isKsWUVG6UXlTGnIcYV6_d_L1iFx3UBtxF-AR0TE/w640-h434/nexmark+components.png" width="640" /></a></div><div class="separator" style="clear: both; text-align: center;"><br /></div><div style="font-size: medium;"><div style="font-size: medium;">Nexmark is a standalone executable which </div></div></font></div><ul style="text-align: left;"><li>creates a source that generates events (such as a <i>Person</i> creating an <i>Auction</i> or placing a <i>Bid</i> on an ongoing <i>Auction</i>)</li><li>runs the queries depending on the workload selected by the user (ingestion rate, number of events generated, selected queries, etc...)</li><li>monitors queries execution and collects their execution metrics.</li></ul><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div style="font-size: medium;">Nexmark can run on all the supported Beam runners.</div><div style="font-size: medium;"><div style="font-size: medium;"><br /></div><div style="font-size: medium;">The user can chose to benchmark in batch mode or in streaming mode: Nexmark creates either a Beam <a href="https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/BoundedSource.html" target="_blank">BoundedSource</a> to trigger batch mode in the runner or an <a href="https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/UnboundedSource.html" target="_blank">UnboundedSource</a> to trigger streaming mode in the runner.</div><div style="font-size: medium;"><br /></div><div style="font-size: medium;">Each query pipeline has this form:</div><div style="font-size: medium;"><br /></div><div style="font-size: medium; text-align: left;">1.Get <b>PCollection<Event></b> as input</div><div style="font-size: medium; text-align: left;">2.Apply <b>ParDo</b> + <b>Filter</b> to extract the object of interest: <i>Bids</i>, <i>Auctions</i>, <i>Person</i></div><div style="font-size: medium; text-align: left;">3.Apply Beam transforms that implement the query business logic: <b>Filter</b>, <b>Count</b>, <b>GroupByKey</b>, <b>Window</b>, etc...</div><div style="font-size: medium; text-align: left;">4.Apply <b>ParDo</b> to output the final <b>PCollection</b>: collection of <i>AuctionPrice</i>, <i>AuctionCount</i> ...</div><div style="font-size: medium;"><br /></div><div><h1 style="text-align: left;"><br /></h1><h2 style="text-align: left;">How is Nexmark useful for Beam ?</h2><div><br /></div><h3 style="text-align: left;"><span> </span>CI setup</h3><div><br /></div><div>With each commit on master, a jenkins script runs in a local in-JVM runner a Nexmark suite of queries with 100 000 generated events:</div></div></div></font></div><ul style="text-align: left;"><li>on all the queries including Beam SQL versions</li><li>on all the supported runners</li><li>in both batch and streaming modes</li></ul><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div style="font-size: medium;"><div><div><div>The output of this benchmark is stored in Google BigQuery tables per <b>query</b> x <b>runner</b> x <b>mode</b>.</div><div><br /></div><div>Each record of the table contains:</div></div></div></div></font></div><ul style="text-align: left;"><li>the size of the output PCollection</li><li>the execution time of the query</li></ul><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><div style="font-size: medium;"><div><div>The results are then printed into graphs allowing to build performance dashboards and output size dashboards per runner as shown in the images below showing part of the flink dashboards:</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj0nA3MwcGPwoSii0B5wcngboYFOhX3g9DdtQB22FaDIsKrBn_jGadI6GB9m55XOtTKSVgXOfqxHsmxK9wNSNfMtp045LN20qvnTUDRhHEzblhfK_IiURDGHhWI7jmTLDgtKUkejVoNr_M/s1876/perf_dashboard.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="795" data-original-width="1876" height="272" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj0nA3MwcGPwoSii0B5wcngboYFOhX3g9DdtQB22FaDIsKrBn_jGadI6GB9m55XOtTKSVgXOfqxHsmxK9wNSNfMtp045LN20qvnTUDRhHEzblhfK_IiURDGHhWI7jmTLDgtKUkejVoNr_M/w640-h272/perf_dashboard.png" width="640" /></a></div><div class="separator" style="clear: both; text-align: center;"><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgqolsq3ROYxCFtfT9K03yGwiaMsmQMdz51bUMwEYyAymb4iYLA6-XwbTge7SFLVLYHox1Mdmn2WDs5YKhQVem9CKVpJRufdpVJ_5Ek966TDCQuKky5-MI54omAz3g6WAawXqJMqgtRjvc/s1876/output_size_dashboard.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="798" data-original-width="1876" height="272" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgqolsq3ROYxCFtfT9K03yGwiaMsmQMdz51bUMwEYyAymb4iYLA6-XwbTge7SFLVLYHox1Mdmn2WDs5YKhQVem9CKVpJRufdpVJ_5Ek966TDCQuKky5-MI54omAz3g6WAawXqJMqgtRjvc/w640-h272/output_size_dashboard.png" width="640" /></a></div><div class="separator" style="clear: both; text-align: left;"><br /></div><div><br /></div></div><div><h3 style="text-align: left;"><span> </span>Performance assessment</h3><div><br /></div><div>Such dashboards allow to keep track of the performances of the different Beam runners:</div><div><br /></div><h4 style="text-align: left;"><span> </span><span> </span>Detect performance regressions</h4><div><br /></div><div>The performance graphs allow to detect performance regressions. As query exercise a particular part of Beam API, it allows to narrow the field of search. Here query 6 uses <a href="https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Combine.html" target="_blank">Combiners</a> so the regression showed below observed on 2018/10/05 on query 6 on the spark runner allowed to point to Combine translation in the spark runner and then fix it.</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEghNiG3T5ylTC_obJNFYOxWB2nT8fvCwm96duDshJ0FCt_jw2m5pXM2sef1QT0vjKSizr97WXfWaU2q6sbuSmfMmV62vXJ5EJw8fpiUnTyDh9iABJfPkbRw3swA0i-GCfs_u5U-jLKlbtw/s1566/perf_degradation.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="337" data-original-width="1566" height="138" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEghNiG3T5ylTC_obJNFYOxWB2nT8fvCwm96duDshJ0FCt_jw2m5pXM2sef1QT0vjKSizr97WXfWaU2q6sbuSmfMmV62vXJ5EJw8fpiUnTyDh9iABJfPkbRw3swA0i-GCfs_u5U-jLKlbtw/w640-h138/perf_degradation.png" width="640" /></a></div><div><br /></div><div><br /></div><div><h4 style="text-align: left;"><span> </span><span> </span>Measure the impact of performance changes</h4><div><br /></div><div>When performance improvement tasks are run on a particular runner, we can check the impact on the graphs. Here there was a performance improvement on the spark runner on RDD caching that decreased the execution time of query 2 by more than 2. This change was committed just after 2018/11/21:</div></div></div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjHAcJkSElgHKFVtPr7et2q9Me0NDvosz_g-jSrP_2aA4PrNwD18-LNQuWMbQh6TLvf_ayH-p42dc92W4Th4PVSBHKpucsiWfZrkvtqSvsBHVxo2RmgiTb7HWbDt_p4oaqwcA3mNxlQ_RE/s1570/performance_improvement.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="340" data-original-width="1570" height="138" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjHAcJkSElgHKFVtPr7et2q9Me0NDvosz_g-jSrP_2aA4PrNwD18-LNQuWMbQh6TLvf_ayH-p42dc92W4Th4PVSBHKpucsiWfZrkvtqSvsBHVxo2RmgiTb7HWbDt_p4oaqwcA3mNxlQ_RE/w640-h138/performance_improvement.png" width="640" /></a></div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><font size="2" style="margin-left: 1em; margin-right: 1em;"><br /></font></div><div class="separator" style="clear: both; text-align: left;"><font style="margin-left: 1em; margin-right: 1em;"><h4 style="text-align: left;"><span> </span><span> </span>Compare performances of the runners</h4><div><br /></div><h3 style="font-size: medium; font-weight: 400; text-align: left;"></h3><h3 style="clear: both;"></h3><h3 style="text-align: left;"><span style="font-size: medium; font-weight: 400;">Each query exercise a different part of the Beam API, so for each one we can compare the performances of the runner translations of that particular part of the API. The graph below, for example, compares the performances of the new Spark runner based on the Structured Streaming framework and the current Spark runner based on RDD/Dstream. The comparison is on query 8 which is a join (CoGroupByKey Beam transform).</span></h3><div><span style="font-size: medium; font-weight: 400;"><br /></span></div><div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiv1vzRC1jztlbAJq2Ko9P7IBQ_YQC9O8qsQOK4vA0A9GuwSnqw_u8glGhNQy8wf_jeK4YyXaAAmVeMWagkJr5m-PzEsYb3f9EGtB4LsCCvBaym5rTohiSIAwIUuTi6CcFrODhfsbkzfKo/s1674/runners_comparision.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="334" data-original-width="1674" height="128" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiv1vzRC1jztlbAJq2Ko9P7IBQ_YQC9O8qsQOK4vA0A9GuwSnqw_u8glGhNQy8wf_jeK4YyXaAAmVeMWagkJr5m-PzEsYb3f9EGtB4LsCCvBaym5rTohiSIAwIUuTi6CcFrODhfsbkzfKo/w640-h128/runners_comparision.png" width="640" /></a></div><span style="font-size: medium; font-weight: 400;"><br /></span></div><div class="separator" style="clear: both; text-align: center;"><br /></div><h3 style="text-align: left;"><span> </span>Functional assessment</h3><h3 style="clear: both;"><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">There are also graphs that print the size of the output PCollection for each query, such graphs allow to check the functional behavior of the runners.</div><div style="font-size: medium; font-weight: 400;"><br /></div></h3><h4 style="text-align: left;"><span> </span><span> </span>Detect functional regressions</h4><div><br /></div><h3 style="clear: both;"><div style="font-size: medium; font-weight: 400;">The output size graphs are useful to detect functional regressions: indeed, such a graph should be a straight line as, whatever the performance, a given query produces always the same results unless there is a bug: if the pipeline produces wrong data, then the size of the output PCollection will vary as in the example below. In this example, the flink runner produced wrong data for some time when executing query 12 in streaming mode. </div><div style="font-size: medium; font-weight: 400;"><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhYJZRoB2sjYXyi-WVYsCq0pqsiP08-t7ng6RiRLaMbBP7soag7WYKeOYUWnAxo4I6PULz3LHziFBDglQVdPzp6M3-mEYisLIIWiqtAPGnFXdBhENah3vARJfF7DqgY-p6mPhiEZrRiK7c/s854/functional_regression.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="341" data-original-width="854" height="256" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhYJZRoB2sjYXyi-WVYsCq0pqsiP08-t7ng6RiRLaMbBP7soag7WYKeOYUWnAxo4I6PULz3LHziFBDglQVdPzp6M3-mEYisLIIWiqtAPGnFXdBhENah3vARJfF7DqgY-p6mPhiEZrRiK7c/w640-h256/functional_regression.png" width="640" /></a></div><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium;"><div style="font-size: medium;"><div style="font-size: medium; font-weight: 400;">If a temporary failure raised an exception on a given query, it would lead to a gap in the graph.</div><div style="font-size: medium; font-weight: 400;"><br /></div></div></div></h3><h4 style="text-align: left;"><span> </span><span> </span>Verify compatibility</h4><h3 style="clear: both;"><div style="font-size: medium;"><div style="font-size: medium;"><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;">Beam provides the <a href="https://beam.apache.org/documentation/runners/capability-matrix/" target="_blank">Capability matrix</a> that shows the compatibility of the runners with the different part of the Beam SDK. The Nexmark output size graphs are also useful to verify and update the capability matrix. An unsupported API would raise an <i>UnsupportedException</i>. For example query 3 uses Beam state and timer API. This API is not yet supported on the new spark structured streaming runner. Thus, when the query runs, an exception is raised and the pipeline does not finish. As a consequence, no BigQuery table is created for this query and it leads to a hole in the dashboard as shown below.</div><div style="font-size: medium; font-weight: 400;"><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj2suQDCaAWfBsJMkC_2vbhQL64oP_nLUg3WA10dBLdKW9xFfSaBjKn0bfjTSLfiuRk-toh40vYs_Tr0Fs1ou7kPd4EtZJG1R57q6NzZWGMXTc7nI-TfjzXZQVdgtUlsRPtcB1v6e1Nt3M/s1601/unsupported.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="342" data-original-width="1601" height="136" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj2suQDCaAWfBsJMkC_2vbhQL64oP_nLUg3WA10dBLdKW9xFfSaBjKn0bfjTSLfiuRk-toh40vYs_Tr0Fs1ou7kPd4EtZJG1R57q6NzZWGMXTc7nI-TfjzXZQVdgtUlsRPtcB1v6e1Nt3M/w640-h136/unsupported.png" width="640" /></a></div><div style="font-size: medium; font-weight: 400;"><br /></div><div style="font-size: medium; font-weight: 400;"><br /></div></div></div></h3><h3 style="text-align: left;"><span> </span>Release check</h3><div><br /></div><h3 style="clear: both;"><div style="font-size: medium;"><div style="font-size: medium;"><div style="font-size: medium; font-weight: 400;"><div style="font-size: medium;">All these functions of Nexmark allow us to check candidate releases of Beam: we go through all the dashboards for all the runners in both batch and streaming modes seeking for performance or functional regressions. If any is found, the release process is interrupted and a new candidate release containing the fix is created. This way, the level of confidence in releases is pretty high because, as said, Nexmark aims to cover 100% of the Beam scope. </div></div></div></div></h3></font></div><div style="font-size: medium;"></div></div></font></div><div></div>Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-13907348621546584832020-04-06T10:44:00.001+02:002021-03-19T10:34:32.983+01:00Code callouts on blogger<div>
<span style="font-size: xx-small;">π₯ 3 min.</span> </div>
<br />
<h2>
Why a code callout component ?</h2>
<div>
<br /></div>
I spent quite some time to make my <a href="https://echauchot.blogspot.com/2020/03/how-to-create-custom-spark-encoder-in.html" target="_blank">previous article</a> readable. This article contains a long code extract which is quite unusual because it deals with java code generation. It requires a lot of explanations. Thus, I searched for a code callout component that is compatible with github gists.<br />
<br />
<h2>
Code callout component</h2>
<div>
<br /></div>
I came by <a href="https://github.com/djleeds/code-callout" target="_blank">this</a> component by <a href="https://twitter.com/djleeds" target="_blank">Dave Leeds</a>. It is brillant !<br />
It allows to put links in the text to highlight lines in a gist snippet or other format snippets. The links are well integrated in your template comparing to some other tools that generate both the text and the snippet. See the extract of the article below<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
</div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhvWBBKOS952sqxNQlgMynmsQ8QPBC6YAkua7GN44VQAahMutJ37P3-LaQYhKJwKzvNjEX-zF2GDs7SJlRFyI0rfZQAm6NQClr8KMd3raG_-yr93mbP-cmjOw92oQ2vC-xJnOlVBz-tMsc/s1600/links.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="183" data-original-width="783" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhvWBBKOS952sqxNQlgMynmsQ8QPBC6YAkua7GN44VQAahMutJ37P3-LaQYhKJwKzvNjEX-zF2GDs7SJlRFyI0rfZQAm6NQClr8KMd3raG_-yr93mbP-cmjOw92oQ2vC-xJnOlVBz-tMsc/s1600/links.png" /></a></div>
<br />
It highlights lines or separated group of lines. When you click the link to the line, it scrolls to the line in the gist and opens a popup with a personalized message and a button to scroll back to the text. See below<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhLJaI__uidqvVIePrpiNj7h3dj9aPE_n0yb68YUHJKSu1Q9_HLVe02wpawqUpvVmDGYw9Vhs60psv3pBuXCJl6AcUDznMU3o4NinuuaYYcYtUObYogR4YJdwOLoCe0Co_kgIIE4Wx21R0/s1600/multi-line-selection.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="374" data-original-width="778" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhLJaI__uidqvVIePrpiNj7h3dj9aPE_n0yb68YUHJKSu1Q9_HLVe02wpawqUpvVmDGYw9Vhs60psv3pBuXCJl6AcUDznMU3o4NinuuaYYcYtUObYogR4YJdwOLoCe0Co_kgIIE4Wx21R0/s1600/multi-line-selection.png" /></a></div>
<div class="separator" style="clear: both; text-align: center;">
</div>
<br />
The separation between the link and the popup message allows to stay concise in the explanation text and give more details in the popup next to the code. See below<br />
<div class="separator" style="clear: both; text-align: center;">
</div>
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgzV34StiHRvFbLaNLvm96ALiJkMAl2BbC3L3x3nSo6ctmNIsF2-tw52Yj6zmNzzgAAOFN1j7OWMVT5y2Fx-v76ufK3VrIGQSef8LaarM0BoyuGJZjDIQkhMfZbUV4qfdvZZGrhStxaUXI/s1600/text.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="56" data-original-width="773" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgzV34StiHRvFbLaNLvm96ALiJkMAl2BbC3L3x3nSo6ctmNIsF2-tw52Yj6zmNzzgAAOFN1j7OWMVT5y2Fx-v76ufK3VrIGQSef8LaarM0BoyuGJZjDIQkhMfZbUV4qfdvZZGrhStxaUXI/s1600/text.png" /></a></div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj5uDkC66L3LRH9ngymWOkLnlOOFoxUYuwZfIWIaoJbqQZXoqJjXlFRHgmxiO2ioAq16I9KsrUk4KXaVMn-eEngzUD1L96P8A7h7zfmhUh1uzgkMcBJbZcfcTyirsgHv5WH7hMXEsjpb8M/s1600/message.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="536" data-original-width="938" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj5uDkC66L3LRH9ngymWOkLnlOOFoxUYuwZfIWIaoJbqQZXoqJjXlFRHgmxiO2ioAq16I9KsrUk4KXaVMn-eEngzUD1L96P8A7h7zfmhUh1uzgkMcBJbZcfcTyirsgHv5WH7hMXEsjpb8M/s1600/message.png" /></a></div>
<div class="separator" style="clear: both; text-align: center;">
</div>
<div class="separator" style="clear: both; text-align: center;">
<br /></div>
<div class="separator" style="clear: both; text-align: center;">
</div>
I will not explain how to use this component because it is already explained in the README file in the <a href="https://github.com/djleeds/code-callout" target="_blank">github repo</a> but I will rather focus on the integration with blogger<br />
<br />
<h2>
Integration with blogger</h2>
<div>
<br /></div>
<div>
After a quick change to the jquery selector, the component worked like a charm with the new github gist format. Where it is more complicated is with the integration with <a href="https://www.blogger.com/" target="_blank">blogger</a>. To work, the component requires several things:</div>
<div>
<ul>
<li><b>The jquery lib import</b>: this one could be added to the HTML code of the blogger template. In the head section paste:</li>
</ul>
<blockquote class="tr_bq">
<i><script src='https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js'/></i></blockquote>
<ul>
<li><b>The code callout script import</b>: the script is not hosted in any CDN, so you need to copy the minified code of the script available in <a href="https://github.com/djleeds/code-callout/tree/master/dist" target="_blank">dist</a><b> </b>directory of the repo to blogger. Adding this code to the HTML code of the template fails because blogger HTML editor refuses to take it. It might be a bug of the blogger editor. Adding it to a custom javascript gadget in the blogger layout also fails because the added gadget might not be rendered in mobile devices, so the script will not be called and the callout component will not work. The only way I found (might not be the best) is to paste the code of the script to a <i>script</i> tag at the end of the article that uses it.</li>
<li><b>The call to the method that parses callout links in the article: </b>right after the script of code callout (see previous step), paste </li>
</ul>
<blockquote class="tr_bq">
<script><br />
$(".callout-trigger").codeCallout({ profile: "gist" });<br />
</script></blockquote>
<ul></ul>
<ul></ul>
<ul>
<li><b>Your links to code lines</b>: see the <a href="https://github.com/djleeds/code-callout" target="_blank">README</a> in the repo to learn what attributes you need to add to your links and to your gists to make it work.</li>
</ul>
<h2>
</h2>
<h2>
Enjoy !</h2>
</div>
<div>
<br /></div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-1674740328156324802020-03-17T11:02:00.000+01:002020-04-02T16:03:21.437+02:00How to create a custom Spark Encoder in ... java<span style="font-size: xx-small;">π₯ 13 min</span><br />
<br />
<h2>What is a Spark Encoder ?</h2><br />
An <a href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoder.html" target="_blank">Encoder</a> is a wrapper class that specifies how to serialize and deserialize data with the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Spark Structured Streaming framework</a>.<br />
<br />
In this framework, datasets are typed and they are schema aware so every Spark transformation called on a dataset needs to provide an Encoder for its output type so that the framework can know how to serialize and deserialize its content. For example, the signature of the <b>map </b>transform is as this:<br />
<br />
<script src="https://gist.github.com/echauchot/971108d9f9e83c28d97b313267beadef.js"></script><br />
<h2>Why this blog post ?</h2><br />
Because Spark Structured Streaming Encoders are complex to write, have no java API and they are not very documented so I thought that what I learnt writing them could be useful to the community.<br />
<br />
<h2>Catalyst</h2><br />
To understand <b>Encoders </b>we need to focus a bit on <b>Catalyst</b>.<b> </b>Spark Structured Streaming uses Catalyst optimizer to optimize execution plans. And Encoders are part of the Catalyst plan. Catalyst sees a pipeline as a <b>tree</b> made of <b>TreeNodes</b>. Here is the Catalyst workflow:<br />
<br />
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhSUUQj6XcyuAbzEjarkkm2PS4E2K3V0MFeg4DLqu5cJMX8Z7u1k10zRCRc7jTSFkJFoheYWwGptyIoVlf2vVO5tULNvIzsJfKe-qTz6K6539Y4aksDscJRaWQzV1XH40t2u4yNjlHm8VA/s1600/catalyst_workflow.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="173" data-original-width="750" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhSUUQj6XcyuAbzEjarkkm2PS4E2K3V0MFeg4DLqu5cJMX8Z7u1k10zRCRc7jTSFkJFoheYWwGptyIoVlf2vVO5tULNvIzsJfKe-qTz6K6539Y4aksDscJRaWQzV1XH40t2u4yNjlHm8VA/s1600/catalyst_workflow.png" /></a></div><br />
<br />
The entry point of Catalyst is either a <b>SQL abstract syntax tree</b> (AST) returned by the SQL parser if the user has used direct <a href="https://spark.apache.org/sql/" target="_blank">Spark SQL</a> or a <b>Dataframe</b> object if the user has used the Dataframe/Dataset API. The result is an<b> unresolved logical</b> <b>plan</b> with unbound attributes and data types. During the <b>analysis</b> <b>phase</b>, Catalyst does catalog lookups and attributes mapping to fill these placeholders and ends up with a <b>logical plan</b>. This plan is then optimized by applying standard rule-based optimizations to end up with the <b>optimized logical plan</b>. Then Catalyst uses rules to turn the optimized logical plan into one or more physical plans with physical operators (for example dataset.map) that match the Spark execution engine. It then selects a plan using a <b>cost model</b> that consists of applying cost evaluation rules to the physical plans.<br />
<br />
When we come back to the subject is with the last phase: the <b>code generation</b>. Among the Treenodes types, there are <b>Catalyst Expressions</b> and custom Encoders are Catalayst Expressions called <b>ExpressionEncoders</b>. Catalyst Expressions contain java code strings that get composed to form the code AST of the pipeline which is then compiled to java bytecode using <a href="https://janino-compiler.github.io/janino/" target="_blank">Janino compiler</a>. This bytecode is the one that is executed when the pipeline runs. Let's see how to write ExpressionEncoders:<br />
<br />
<h2>Custom Encoder (ExpressionEncoder)</h2><br />
<br />
To manage serialization in Spark Structured Streaming framework, you could use the Encoders available in the <a href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoders.html" target="_blank">Encoders</a> utility class. They can manage serialization of primitive types or beans with either java or kryo serialization. But let's say you want to write a custom Encoder because, for example, you develop a framework based on Spark and you want to allow your users to provide serialization code. As an example, let's take Apache Beam which lets the user define his serialization code in the <a href="https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/coders/Coder.html" target="_blank">Coder</a> class.<br />
<br />
<br />
Spark Encoders have no java api, so if you want to code in java because your code base is in java, you'll need to use some bindings. Maybe there are better ways, but as I'm not a scala developer, I could be unaware of them :)<br />
<br />
<br />
As said, a custom <b>Encoder</b> is an <b>ExpressionEncoder</b><br />
<script src="https://gist.github.com/echauchot/f5be89ab0a651eca8ec5ed35da8b30a6.js"></script><br />
ExpressionEncoder groups serializer and deserializer. You need to specify your serializer and deserializer as Expression instances. Let's see the serializer part:<br />
<br />
<h3>Serializer</h3><div id="GIST"><script src="https://gist.github.com/echauchot/c42cde2723f13a022128df78843d737a.js"></script><br />
</div>Things to point out in that code:<br />
<ul><li>It is an Encoder so it has no SQL representation (remember, Catalyst Expressions are broader than Encoders) so we implement <b>NonSQLExpression </b>and it has one input and one output so we extend <b>UnaryExpression.</b></li>
<li>There are several methods to override:</li>
<ul><li><b>child()</b>: the Encoder is part of Catalyst tree (see above) so we need to keep track of its child (the input of the <b>UnaryExpression</b>)</li>
<li><b>doGenCode()</b>: this method is responsible for producing the java code strings that will be compiled by Janino.</li>
<li><b>datatype()</b>: that is the type of the result of the evaluation of the Expression. In our case binary (because we are serializing an object to binary).</li>
<li>the other overrides <b>productElement()</b>,<b> productArity()</b>, <b>canEqual()</b> and consequently equals() and hashcode() are due to the fact that there is no Java API of ExpressionEncoder so we need to implement Scala <b>product</b> specifics.</li>
</ul></ul><h4><br />
</h4><h4>Code generation</h4><br />
Let's focus on the interesting method <b>doGenCode()</b>:<br />
<br />
This method generates the code in the comment <a class="callout-trigger" data-lines="32" data-listing="GIST" data-note="Generated code">line 32</a> in the form of java strings.<br />
<br />
It returns an <b>ExprCode</b>, see <a class="callout-trigger" data-lines="52" data-listing="GIST" data-note="This ExprCode is constructed from an ExprValue which is the name of the output variable in the generated code and a Block which is the code itself">line 52</a>. This Block is constructed through <a class="callout-trigger" data-lines="34-48" data-listing="GIST" data-note="String interpolation integrates the local variables like javaType or accessCode defined in args into constant strings representing the code defined in parts">string interpolation</a>. This Block creation is managed <a class="callout-trigger" data-lines="49,50" data-listing="GIST" data-note="BlockHelper.code() method creates the Block and manages the string interpoaltion with the standard scala StringContext">line 49</a>.<br />
<br />
Now that the global architecture of this method is clearer, there is some pieces that could look weird:<br />
<ul><li>To access an object that is not a local variable part of the generated code block, we need to add a Catalyst reference to it, see <a class="callout-trigger" data-lines="24" data-listing="GIST" data-note="Create a reference to an external variable">line 24</a>. In our case we reference the <b>coder</b> which contains the user provided serialization code.</li>
</ul><ul><li>As said above the serializer is a UnaryExpression. This Expression has only one input Expression which is its child (see <a class="callout-trigger" data-lines="25,32" data-listing="GIST" data-note="child input Expression" >lines 25 and 32</a>). We need to concatenate the child code and the actual serialization code so that everything can be compiled by Janino <a class="callout-trigger" data-lines="52" data-listing="GIST" data-note="We concatenate our generated code to the input code Block">line 52</a>.</li>
</ul><br />
<h4>Instantiate the serializer</h4><br />
<script src="https://gist.github.com/echauchot/62c921ad073a55a216a492abe7c67d86.js"></script><br />
Here is the code to create the <b>EncodeUsingBeamCoder </b>object (the serializer part of our ExpressionEncoder). To instantiate this class we need to pass it a reference to its child in the Catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the Catalyst input Expression, we do like this: <b>BoundReference(0, new ObjectType(clazz), true)</b>.<b> </b>There is only one input (because EncodeUsingBeamCoder is a UnaryExpression) so we get it at index 0 and we indicate the Datatype of the input and its nullability.<br />
<br />
<h4>Spark physical plan</h4><br />
<br />
The serialization part of the physical Catalyst plan comes like this<br />
<br />
<blockquote class="tr_bq"><i>SerializeFromObject [encodeusingbeamcoder(input[0, org.apache.beam.sdk.util.WindowedValue, true], WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)) AS binaryStructField#11]</i></blockquote><br />
<b>WindowedValue </b>is the type to serialize from (the clazz) and <b>WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder) </b>is the Beam Coder<b> </b>provided in the pipeline.<br />
<br />
<h3>Deserializer</h3><br />
The deserializer class <b>DecodeUsingBeamCoder </b>that you can see in the full code link below is completely symetric to the serializer class <b>EncodeUsingBeamCoder</b>. The only thing worth mentioning is its instanciation:<br />
<br />
<h4>Instantiate the deserializer</h4><br />
<script src="https://gist.github.com/echauchot/63a80c85f486f067e5b2df10e045718d.js"></script>Here is the code to create the <b>DecodeUsingBeamCoder </b>object (the deserializer part of our ExpressionEncoder). Here also, to instantiate this class, we need to pass it a reference to its child in the catalyst tree (remember, the input Expression of the UnaryExpression). To obtain a reference to the catalyst node, we do like this: <b>new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType)</b>.<br />
<br />
Here again we get the first input Expression at index 0 (cf UnaryExpression), it is of type <b>BinaryType</b> because we are deserializing bytes. And the Cast allows Spark to treat the Expression more efficiently as it allows Catalyst to treat it as binary.<br />
<br />
<h4>Spark physical plan</h4><br />
The deserialization part of the physical Catalyst plan comes like this<br />
<blockquote class="tr_bq"><i>DeserializeToObject decodeusingbeamcoder(cast(binaryStructField#4 as binary), org.apache.beam.sdk.util.WindowedValue, WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)), obj#6: org.apache.beam.sdk.util.WindowedValue</i></blockquote><b>WindowedValue</b><b style="font-weight: 400;"> </b><span style="font-weight: 400;">is the type to deserialize to (the clazz) and </span><i><b>WindowedValue$FullWindowedValueCoder(VarIntCoder,GlobalWindow$Coder)</b></i><b style="font-weight: 400;"> </b><span style="font-weight: 400;">is the Beam </span><b style="font-weight: 400;">Coder </b><span style="font-weight: 400;">provided in the pipeline</span><br />
<span style="font-weight: 400;"><br />
</span> <br />
<h3>Performances</h3><br />
Even though Janino compiler is fast, compiling java strings to bytecode takes time. I measured a big performance gain when I reduced the size of the generated code and replaced it by as much compiled code as possible. This is why in full code link, <b>encode()</b> method in the serializer and <b>decode() </b>method in the deserializer are compiled code and not string code inside <b>doGenCode() </b>method. Another gain to this approach is to enable debugging of this compiled part of the code.<br />
<br />
<h2>Full code link</h2><br />
<a href="https://github.com/apache/beam/blob/2a9a0ad3c23e6abe14702afb488f79d497f2db2d/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java#L51" target="_blank">EncoderHelpers class in the Apache Beam project</a><br />
<br />
<script>
/*
Copyright (c) 2013-2020 Dave Leeds and Contributors
License: https://raw.github.com/djleeds/code-callout/master/LICENSE.md
*/
!function(a,b,c){"use strict";a.fn.codeCallout=function(c){function u(a){var b=new o("code-callout-note","content",a.closeButtonText,"close"),c=new p,d=new q(new k,a.getLineSelector,"code-callout-highlighted"),e=new i(a.scrollTargetSelector,a.animationDurationInMs);return new n(b,c,d,e,a.noteOffsetFromLine)}function v(){var b=0;return a(x.scrollTargetSelector).each(function(c,d){var e=a(d).scrollTop();b=e>b?e:b}),b}var w=new e,x=a.extend(r,c),y=s;if(x.profile in t==!1)throw"Unrecognized profile "+x.profile+" was requested.";var z=t[x.profile];"options"in z&&(x=a.extend(x,z.options)),"styles"in z&&(y=a.extend(y,z.styles));var A=new j(x.styleId,y),B=new l(x.listingIdAttribute,x.lineNumbersAttribute,x.noteContentsAttribute),C=u(x);return A.initialize(),a(this).attr("href","#").on(x.triggerEventName,function(b){var c=B.read(this);if(console.log("wrapping",c.listingId,x),x.shouldWrapLines){var d=new m(a("#"+c.listingId),x.wrapperLineClass,x.wrapperNumberClassPrefix,x.filterLinesToWrap,x.beforeLinesOfCodeAreWrapped);d.wrapListing()}b.preventDefault(),w.deactivate(),w=C.create(c,v()),w.activate()}),x.exposeForTest&&(b.__codeCallout={Callout:d,NullCallout:e,Note:f,Listing:g,LineSet:h,Scroller:i,Style:j,LineNumberParser:k,TriggerReader:l,LineWrapper:m,CalloutFactory:n,NoteFactory:o,ListingFactory:p,LineSetFactory:q,options:x}),this};var d=function(a,b,c,d,e,f){var g=this;a.onClose(function(){g.deactivate()}),this.activate=function(){a.activate(),c.highlight(),d.to(g.position())},this.deactivate=function(){a.deactivate(),c.unhighlight(),d.to(e)},this.center=function(){return(c.top()+a.bottom())/2},this.positionCalculator={windowHeight:f.innerHeight,scrollTop:function(){return this.scrollCenter()-this.windowHeight/2},overhang:function(){return Math.max(a.bottom()-b.bottom(),0)},leash:function(){return Math.max(b.height()-this.windowHeight,0)/2},distance:function(){return g.center()-b.center()},clampedDistance:function(){var a=this.leash();return Math.max(Math.min(this.distance(),a+this.overhang()),-a)},scrollCenter:function(){return b.center()+this.clampedDistance()}},this.position=function(){return this.positionCalculator.scrollTop()}},e=function(){this.deactivate=function(){}},f=function(b,c,d){var e="resize.code-callout-note",f=function(){};this.activate=function(){this.attach(),this.positionAt(c()),this.hook(),this.show()},this.deactivate=function(){this.hide(),this.unhook(),this.detach()},this.attach=function(){b.appendTo(a("body"))},this.detach=function(){b.remove()},this.show=function(){b.show()},this.hide=function(){b.hide()},this.hook=function(){d.on(e,function(a){this.position(c())}),b.find("button.close").on("click",function(){f()})},this.unhook=function(){d.off(e)},this.bottom=function(){return b.offset().top+b.outerHeight()},this.positionAt=function(a){b.css(a)},this.onClose=function(a){f=a}},g=function(a){this.top=function(){return a.offset().top},this.bottom=function(){return this.top()+this.height()},this.height=function(){return a.outerHeight()},this.center=function(){return(this.top()+this.bottom())/2}},h=function(a,b){this.highlight=function(){a.addClass(b)},this.unhighlight=function(){a.removeClass(b)},this.top=function(){return a.first().offset().top},this.bottom=function(){return a.last().offset().top+this.lineHeight()},this.left=function(){return a.last().offset().left},this.lineHeight=function(){return a.last().outerHeight()}},i=function(b,c){this.to=function(a){this.selectElement().stop().animate({scrollTop:a},c)},this.selectElement=function(){return a(b)}},j=function(b,c){this.initialize=function(){this.hasBeenWritten()||this.write()},this.hasBeenWritten=function(){return a("#"+b).length>0},this.write=function(){a("<style />").attr("type","text/css").attr("id",b).text(this.build()).appendTo(a("body"))},this.build=function(){var a="";for(var b in c){a+=b+"{";for(var d in c[b])a+=d+":"+c[b][d]+";";a+="} "}return a}},k=function(){this.parse=function(a){if(!isNaN(a))return[Number(a)];for(var b=[],c=a.split(","),d=0;d<c.length;d++){var e=c[d];this.isLineRange(e)?b=b.concat(this.parseLineRange(e)):b.push(Number(e))}return b},this.isLineRange=function(a){return a.indexOf("-")>0},this.parseLineRange=function(a){for(var b=[],c=a.split("-"),d=c[0],e=c[1],f=d;e>=f;f++)b.push(Number(f));return b}},l=function(b,c,d){this.read=function(e){var f=a(e);return{listingId:f.data(b),lineNumbers:f.data(c),noteContents:f.data(d)}}},m=function(b,c,d,e,f){var g=this,h="\n";this.isAlreadyWrapped=function(){return b.find("."+c).length>0},this.wrapLine=function(b,e){var f=b+1;return a("<div />").addClass(c).addClass(d+f).html(e+"\n")},this.splitIntoLines=function(){return e(b).html().split(h)},this.replaceListing=function(a){b.html(a.children("*"))},this.wrapListing=function(){if(!this.isAlreadyWrapped()){var c=this.splitIntoLines(b),d=a("<div />");a.each(c,function(a,b){d.append(g.wrapLine(a,b))}),f(b,d),this.replaceListing(d)}}},n=function(a,c,e,f,g){this.create=function(g,h){var i=c.create(g.listingId),j=e.create(g.listingId,g.lineNumbers),k=this.createPositionAdvice(j),l=a.create(g.noteContents,k);return new d(l,i,j,f,h,b)},this.createPositionAdvice=function(a){return function(){return{top:a.bottom()+a.lineHeight()*g,left:a.left()}}}},o=function(c,d,e,g){this.renderContent=function(b){return a("<div />").addClass(d).text(b)},this.renderCloseButton=function(){return a("<button />").addClass(g).text(e)},this.render=function(b){var d=this.renderContent(b),f=this.renderCloseButton(e);return a("<div />").attr("id",c).append(d).append(f)},this.create=function(c,d){var g=this.render(c,e);return new f(g,d,a(b))}},p=function(){this.create=function(a){return new g(this.selectElement(a))},this.selectElement=function(b){return a("#"+b)}},q=function(b,c,d){this.create=function(a,c){var e=b.parse(c),f=this.selectLines(a,e);return new h(f,d)},this.selectLines=function(b,d){var e=a();return a.each(d,function(d,f){e=e.add(a(c(b,f)))}),e}},r={profile:"pre",animationDurationInMs:500,closeButtonText:"Continue reading",scrollTargetSelector:"html,body",triggerEventName:"click",listingIdAttribute:"listing",lineNumbersAttribute:"lines",noteContentsAttribute:"note",styleId:"code-callout-styles",noteOffsetFromLine:.25,exposeForTest:!1,shouldWrapLines:!1,wrapperLineClass:"code-callout-line",wrapperNumberClassPrefix:"number-",lineWrapContainerSubselector:"",getLineSelector:function(a,b){throw"Not Implemented"},filterLinesToWrap:function(a){return a},beforeLinesOfCodeAreWrapped:function(a,b){return b}},s={".code-callout-line":{width:"100%"},".code-callout-highlighted":{"background-color":"#FFA !important"},"#code-callout-note":{position:"absolute","background-color":"#EAEAEA",background:"linear-gradient(to bottom, white 0%, #EAEAEA 100%)","border-radius":"1em",padding:"1em 1em 3em 1em",border:"1px solid #CCC","box-shadow":"0.25em 0.25em 0.5em #CCC","min-width":"7em"},"#code-callout-note button.close":{"font-size":"75%",position:"absolute",bottom:"0",right:"0",margin:"1.5em","white-space":"nowrap"}},t={gist:{options:{getLineSelector:function(a,b){return"#"+a+" td[data-line-number='"+b+"'] + td"}}},"jquery-syntax":{options:{getLineSelector:function(a,b){return"#"+a+" .ln"+b}}},pre:{options:{shouldWrapLines:!0,getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b}}},prism:{options:{shouldWrapLines:!0,lineWrapContainerSubselector:"code",getLineSelector:function(a,b){return"#"+a+" .code-callout-line.number-"+b},filterLinesToWrap:function(a){return a.not(".line-numbers-rows")},beforeLinesOfCodeAreWrapped:function(a,b){return b.append(a.find(".line-numbers-rows"))}}},syntaxhighlighter:{options:{getLineSelector:function(a,b){return"#"+a+" .line.number"+b}},styles:{"div.syntaxhighlighter div.container div.code-callout-highlighted":{"background-color":s[".code-callout-highlighted"]["background-color"]+" !important"}}}}}(jQuery,window);
</script><br />
<!-- Here's where we tell the plugin to create callouts for the trigger links, which we hooked by class name. --><br />
<script>
$(".callout-trigger").codeCallout({ profile: "gist" });
</script><br />
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-78547955458570761322020-02-07T09:43:00.001+01:002020-02-12T14:55:45.417+01:00Understand Apache Beam runners: focus on the Spark runner<span style="font-size: xx-small;">π₯ 5 min.</span><br />
<div>
<br /></div>
<h2>
Previously on Apache Beam runners π</h2>
<div>
<br /></div>
<div>
In the <a href="https://echauchot.blogspot.com/2020/01/introduction-to-apache-beam.html">previous article</a>, we had a brief overview of what an <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> runner is. This article will dig into more details.<br />
<br />
The <a href="https://echauchot.blogspot.com/2020/01/introduction-to-apache-beam.html">previous article</a> introduced this very simple pipeline:</div>
<br />
<script src="https://gist.github.com/echauchot/b036a7e6d02bd78dfbdfc7b8e0c80430.js"></script> We saw that the Beam SDK translates this pipeline into a DAG representing the pipeline in the form of Beam transform nodes. Now let's see how a Beam runner translates this DAG. Let's say that the user choses <a href="https://spark.apache.org/" target="_blank">Apache Spark</a> as the target Big Data platform when he launches his pipeline.<br />
<br />
<h2>
The runner (at last)</h2>
<div>
<br /></div>
<div>
The job of the runner is to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this native code that will be executed by a Big Data cluster. If Spark is chosen, then the runner translates this DAG below into the Spark native pipeline code below. For the sake of simplicity the Spark pipeline is pseudo-code.<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjci4oI7HgCzsZ1wJt_DaC_bCE_unuZdtt4PNQ4_jBGIomnkIF4O8Ck_sb2fkUzC7jrgfunQIo8dVrJgHD2IdQprubfwCckWd4RVAp22Hw7zXzxfZ4YF9ZnL6DX8hSLUN9_tH4xs9IqFmw/s1600/expanded_dag.png" imageanchor="1"><img border="0" data-original-height="488" data-original-width="484" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjci4oI7HgCzsZ1wJt_DaC_bCE_unuZdtt4PNQ4_jBGIomnkIF4O8Ck_sb2fkUzC7jrgfunQIo8dVrJgHD2IdQprubfwCckWd4RVAp22Hw7zXzxfZ4YF9ZnL6DX8hSLUN9_tH4xs9IqFmw/s400/expanded_dag.png" width="396" /></a></div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj8o9BXiWDLZRs4xHciJ_8OxgeUeTmLziRrpJpoHqBh32XQKV0L2BK7hZOut9NY32oCMcmCgR3gXQFptJTrwBJ-PAz0dTz7sgwFDItBYA6wjSM8_JGsCjzF4LYTd5FadxTAYUtKZ9ZsMFQ/s1600/arrow.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="162" data-original-width="40" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEj8o9BXiWDLZRs4xHciJ_8OxgeUeTmLziRrpJpoHqBh32XQKV0L2BK7hZOut9NY32oCMcmCgR3gXQFptJTrwBJ-PAz0dTz7sgwFDItBYA6wjSM8_JGsCjzF4LYTd5FadxTAYUtKZ9ZsMFQ/s1600/arrow.png" /></a></div>
<script src="https://gist.github.com/echauchot/f0172a912281a284af58ae482003b2bf.js"></script><br />
<h3>
Composite and primitive transforms: the level of translation</h3>
<br />
In the <a href="https://echauchot.blogspot.com/2020/01/introduction-to-apache-beam.html">previous article</a> we talked about Beam transforms (<b>primitive transforms</b> and <b>composite transforms</b>). In the continuation of the blog, we will refer to "composite transform" as just "composite" and "primitive transform" as just "primitive".<br />
<br />
The DAG above is the expanded DAG: on the left hand-side are the Beam transforms of the user pipeline. But among these transforms only <b>Read</b> is a primitive. The others are implemented by Beam SDK as composites of other primitives. Composites can also be made of composites themselves (like <b>Count </b>transform is made of <b>Combine</b> transform) but in the end they are always made of primitives (<b>Pardo </b>or <b>GroupByKey</b>). And these primitives are what the runner translates.<br />
<br />
But in some cases, the runner can chose to translate at a composite level of the graph, not at a primitive level depending on the target Big Data technology capabilities. Indeed, if there is a direct correspondance of the Beam composite in the target API, the runner does not decompose the composite into its primitives and translates directly the composite. The green boxes in the DAG represent the level of translation. In our example, there is a direct equivalent of Beam <b>Combine </b>composite to a Spark <b>Aggregator </b>(<b>agg</b> in the Spark pipeline).<br />
<br />
<h3>
The translation itself</h3>
<br />
The translation occurs when the pipeline is run (<i>pipeline.run() </i>is executed). To translate the DAG, the runner visits the graph. All Beam runners work the same, only the target API changes with the chosen runner.<br />
<br />
The first step is to detect the translation mode (batch or streaming) by searching for Beam <b>BoundedSource</b> (like Elasticsearch for example) or <b>UnBoundedSource </b>(like Kafka for example). Knowing the translation mode, the runner can chose<br />
<ul>
<li>the proper Spark <b>DataSourceV2</b> to instantiate either implementing <b>ReadSupport</b> (batch) or <b>MicroBatchReadSupport </b>(streaming)</li>
<li>the proper Spark <b>action</b> to execute on the output dataset either <b>foreach</b> (batch) or <b>writeStream </b>(streaming)</li>
</ul>
<br />
Then the DAG visit continues and each node is translated to the target native API : <b>Read</b> gets translated to a Spark <b>DataSourceV2 </b>that creates the input <b>dataset</b>,<b> Pardo</b> gets translated to a Spark <b>flatmap </b>that is applied to the input dataset and so on until the output dataset.<br />
<br />
<h3>
Pipeline run</h3>
<div>
<br /></div>
When the visit of the DAG is done, the runner applies the action chosen above to the output dataset to run the pipeline. At this point the spark framework executes the resulting native Spark pipeline.</div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.comtag:blogger.com,1999:blog-1438124655819523792.post-88762441417721629092020-01-29T17:12:00.001+01:002020-06-08T15:55:35.971+02:00Introduction to Apache Beam and the runners<span style="font-size: xx-small;">π₯ 7 min.</span><br />
<div>
<br /></div>
<span style="font-family: inherit;">This is a first blog to introduce Apache Beam before going into more details about Beam runners in the next blogs.</span><br />
<span style="font-family: inherit;"><br />
</span> <br />
<h2>
<span style="font-family: inherit;">What is Apache Beam ?</span></h2>
<span style="font-family: inherit;"><br />
</span> <span style="font-family: inherit;">To define <a href="https://beam.apache.org/" target="_blank">Apache Beam</a> let's start with a quote</span><br />
<blockquote class="tr_bq">
<span style="font-family: inherit;">Β«Batch / streaming? Never heard of either.Β»</span><span style="font-family: inherit;"><br />
</span><span style="font-family: inherit;">(Batch is nearly always part of higher-level streaming) </span></blockquote>
<span style="font-family: inherit;"><br />
</span> <span style="font-family: inherit;">Beam is a programming model that allows to create big data pipelines. Its particularity is that its API is unified between batch and streaming because the only difference between a batch pipeline and a streaming pipeline is that batch pipelines deal with finite data whereas streaming pipelines deal with infinite data. So, batch can be seen as a sub-part of the streaming problem. And Beam provides <a href="https://beam.apache.org/documentation/programming-guide/#windowing" target="_blank">windowing</a> features that divide infinite data into finite chunks.</span><br />
<span style="font-family: inherit;"><br />
</span> <span style="font-family: inherit;">Let's take a look at the Beam stack:</span><br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg-hiZn-MlE-Ppw_9ax60uYOGy9Pj9jVgf1k0Q1aqUqwlg13gngoLX0iq0fCvHMClGG0tQQa1mNQwsA6OPPh7S1zawggxV0wTjkO9DY8MmTGVTGzYFN0HHj2KFY58oLCSrttTVevdFWnXQ/s1600/Etienne+Chauchot+Spark+Structured+Streaming+Runner.png" imageanchor="1"><img border="0" data-original-height="197" data-original-width="528" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg-hiZn-MlE-Ppw_9ax60uYOGy9Pj9jVgf1k0Q1aqUqwlg13gngoLX0iq0fCvHMClGG0tQQa1mNQwsA6OPPh7S1zawggxV0wTjkO9DY8MmTGVTGzYFN0HHj2KFY58oLCSrttTVevdFWnXQ/s1600/Etienne+Chauchot+Spark+Structured+Streaming+Runner.png" /></a></div>
<br />
The pipeline <b>user code</b> is written using one of the several <b>language SDKs</b> (java, python or go) that Beam provides to the user. The SDK is a set of libraries of <b>transforms</b> and IOs that allow him to input data into his pipeline, do some computation on this data and then output to a given destination. Once the pipeline is written, the user choses a Big Data execution engine such as <a href="https://flink.apache.org/" target="_blank">Apache Flink</a>, <a href="https://spark.apache.org/" target="_blank">Apache Spark</a> or others to run his pipeline. When the pipeline is run, the <b>runner</b> first translates it to native code depending on the chosen Big Data engine. It is this native code that is executed by the Big Data cluster.<br />
<br />
<h2>
Primitive transforms</h2>
<br />
The Beam SDK contains a set of transforms that are the building blocks of the pipelines the user writes. There are only 3 primitives :<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEis2y7OIKxFoqVkIW0Of8lFktxwNwHH7Z3ZB9Nx1CYBqgNT3WRCYR5i5P0WC8-AoF-2G1pL8PtGidkSmuzFRC8zftkixzie-sIbmVfe75-VR-ub0AdKqypKnoj_R1WY3HUrAYAIp2Nv-K4/s1600/dofn.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="275" data-original-width="148" height="200" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEis2y7OIKxFoqVkIW0Of8lFktxwNwHH7Z3ZB9Nx1CYBqgNT3WRCYR5i5P0WC8-AoF-2G1pL8PtGidkSmuzFRC8zftkixzie-sIbmVfe75-VR-ub0AdKqypKnoj_R1WY3HUrAYAIp2Nv-K4/s200/dofn.png" width="107" /></a></div>
<br />
<b>Pardo</b>: It is the good old flatmap that allows to process a collection element per element in parallel and apply them a function called <b>DoFn</b><br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiaWk2EW4QPDeL-b_MGJYwRlzF8XLnrwHJCR-azNEE534pr0Iklp29hsMF965Z22qruuiTcLliWkWUrTphzMRTMN5ej1B0hWP99ROGeht1qJm3d3Dk_wRCa1RJB0gRIGt9fIUV4_btqCHQ/s1600/GBK.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="286" data-original-width="154" height="200" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiaWk2EW4QPDeL-b_MGJYwRlzF8XLnrwHJCR-azNEE534pr0Iklp29hsMF965Z22qruuiTcLliWkWUrTphzMRTMN5ej1B0hWP99ROGeht1qJm3d3Dk_wRCa1RJB0gRIGt9fIUV4_btqCHQ/s200/GBK.png" width="107" /></a></div>
<br />
<b>GroupByKey</b>: This one groups the elements that have a common key. The groups can then be processed in parallel by next transforms downstream in the pipeline.<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhpNhLXhh_gkrUNFKgzNqtrERccz60ZYWP2D9lcJDsy6Si5mfrbCADsSsAwbh7TzpECbVF8Iz1lQ7-zlMCo_N0Xix6jm5S8nK9BiRi3DEShYuxSZXgzx6VMaUhY5gQheRodiJs8Vodzpt8/s1600/read.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="292" data-original-width="152" height="200" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhpNhLXhh_gkrUNFKgzNqtrERccz60ZYWP2D9lcJDsy6Si5mfrbCADsSsAwbh7TzpECbVF8Iz1lQ7-zlMCo_N0Xix6jm5S8nK9BiRi3DEShYuxSZXgzx6VMaUhY5gQheRodiJs8Vodzpt8/s200/read.png" width="104" /></a></div>
<br />
<b>Read</b>: The read transform is the way to input data into the pipeline by reading an external source which can be a batch source (like a database) or a streaming (continuously growing) source (such as a kafka topic).<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
</div>
<h2>
Composite transforms</h2>
<div>
<br /></div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEguDYBmr2pXaXT4yw7mXyVpF4J3MS0QtqGm43AtyvZaAF96_OU13ABCnLw6VkJenGXeeyzDQnZ2BSOaNit3xJni9dqXm0Q6Wdvf7pc_BgnzsA69S5HHOsUs31jmTBnJEgDKTWhQqHe0S_k/s1600/composite.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><img border="0" data-original-height="292" data-original-width="152" height="200" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEguDYBmr2pXaXT4yw7mXyVpF4J3MS0QtqGm43AtyvZaAF96_OU13ABCnLw6VkJenGXeeyzDQnZ2BSOaNit3xJni9dqXm0Q6Wdvf7pc_BgnzsA69S5HHOsUs31jmTBnJEgDKTWhQqHe0S_k/s200/composite.png" width="104" /></a></div>
<br />
<b>Composite transforms</b>: All the other transforms available in the SDK are actually implemented as composites of the 3 previous ones.<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
</div>
As an example, the <b>Reduce</b> of Beam which is called <b>Combine</b> and which allows to do a computation on data spread across the cluster is implemented like this:<br />
<br />
<script src="https://gist.github.com/echauchot/7cc6b6b4509a42f8dc8b4409a5a62874.js"></script> <br />
Other examples of composite transforms provided by the SDK are <b>FlatMapElements</b>, <b>MapElements</b> or <b>Count</b> that we will see in next chapter. But the user can also create his own composites and composites of composites.<br />
<div>
<br /></div>
<h2>
A simple Beam pipeline</h2>
<div>
<br /></div>
<div>
Let's take a look at a first simple Beam pipeline:<br />
<br />
<script src="https://gist.github.com/echauchot/b036a7e6d02bd78dfbdfc7b8e0c80430.js"></script><br />
This is the usual Hello World type big data pipeline that counts the occurrences of the different words in a text file. It reads a text file from google storage and the result of the count is output to google storage. This is a very simple straight pipeline. Not all the pipelines have to be straight that way, it is there to serve as a baseline example for the continuation of the blog and to illustrate some key concepts of Beam :</div>
<div>
<ul>
<li><b style="font-family: inherit;">Pipeline</b><span style="font-family: inherit;">: the user interaction object.</span></li>
<li><span style="font-family: inherit;"><b>PCollection</b>: Beam abstraction of the collection of elements spread across a cluster.</span></li>
<li><span style="font-family: inherit;"><b>TextIO</b>: this IO is used to read and write from/to the text file. Reading part of the IO is in reality a <b>Read</b> transform and writing part of the IO is in reality a <b>Write</b> transform</span></li>
<li><span style="font-family: inherit;"><b>Count</b>:<b> Combine</b> transform that counts occurrences</span></li>
<li><span style="font-family: inherit;"><b>FlatMapElements </b>and <b>MapElements</b>: They are composite transforms of <b>ParDo </b>that are there for convenience.</span></li>
</ul>
<div>
<br />
<div>
<h2>
<b>The resulting DAG</b></h2>
<div>
<br /></div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjoJCmtaOWBIIguefhGgylZP7vXpkUV6XCKqeWvHas9XeyapKtImbg0PAIdBcaVzZ1teHSmFSdeIfCqGg9iEt7Ui88RaEE__ob7gSKxjJD-qHcaISIk_ryohPkGihNC8zcFyvmnEqDwaGo/s1600/dag.png" imageanchor="1" style="clear: left; float: left; margin-bottom: 1em; margin-right: 1em;"><span style="font-family: inherit;"><img border="0" data-original-height="402" data-original-width="185" height="320" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjoJCmtaOWBIIguefhGgylZP7vXpkUV6XCKqeWvHas9XeyapKtImbg0PAIdBcaVzZ1teHSmFSdeIfCqGg9iEt7Ui88RaEE__ob7gSKxjJD-qHcaISIk_ryohPkGihNC8zcFyvmnEqDwaGo/s320/dag.png" width="147" /></span></a></div>
<div>
<span style="font-family: inherit;">When Beam executes the above pipeline code, the SDK first creates the adja<span style="font-family: inherit;">cent gra</span>ph to represent the pipeline. It is known as the <b>DAG</b> (<b>D</b>irect <b>A</b>cyclic <b>G</b>raph). For each transform of the pipeline code, a node of the DAG is created. </span></div>
<div>
<ul>
<li><span style="font-family: inherit;"><b> Read node</b> corresponds to the Read transform of <b>TextIO </b>(the input of the piepeline)</span></li>
<li><span style="font-family: inherit;"><b>Write node</b> corresponds to the Write transform of <b>TextIO </b>(the output of the pipeline)</span></li>
<li><span style="font-family: inherit;"><b>All the other nodes</b> are a direct representation of the transforms of the pipeline.</span></li>
</ul>
Please note that only the Read transform is a primitive transform as described in the above paragraph, all the others are composite transforms.</div>
<div class="separator" style="clear: both; text-align: center;">
</div>
<br />
<h2>
<b>But what about the runner ?</b></h2>
</div>
<div>
<br /></div>
This is when the runner enters the scene. The job of the runner is simply to translate the pipeline DAG into a native pipeline code for the targeted Big Data platform. It is this code that will be executed by a Big Data cluster such as Apache Spark or Apache Flink.<br />
<br />
You want to know more about the runner ? <a href="https://echauchot.blogspot.com/2020/02/understand-apache-beam-runners-focus-on.html">The next article</a> describes what the runner is and uses the Spark runner as an example</div>
</div>
Etienne Chauchothttp://www.blogger.com/profile/13442320750607525683noreply@blogger.com