Incremental processing is a key technique used in data systems to efficiently handle large volumes of data. Rather than handling all the data at once, it involves pulling subsets of data from a source and processing them separately. For example, a data system that does incremental processing pulls the changed data like inserts and updates spanning from a short time range and then incrementally updates the records in a downstream data system. By doing reads and writes in this way, it will result in better data freshness and lower compute resource usage, and in addition, diagnosing and pinpointing errors will be easier.
Constructing custom incremental pipelines is no easy task. It is important to thoughtfully design the processing stages while considering factors like data size, traffic pattern and latency requirements to maximize efficiency. Undoubtedly, you may encounter some of the most prevalent issues like ensuring data consistency across all stages within the pipelines. To swiftly detect problems, it is also crucial to set up proper monitoring and data validation mechanisms. Furthermore, you may deal with operational complications that originate from backfill requests and dependency issues of different pipeline stages. Building efficient tooling for managing the pipelines’ pause, resume, restart, and backfill will dramatically reduce the operational burdens.
An alternative to building custom incremental pipelines is using Hudi, a lakehouse platform designed explicitly with incremental processing capabilities. Hudi’s concepts of timeline and instants naturally support incremental processing by providing timestamps and file paths to the changed data. Hudi provides configurations for easily managing incremental queries’ required parameters, such as begin time and end time. In 0.13.0, Change-Data-Capture (CDC), a richer format of incremental processing, was added to return finer-granular data to enable more capable downstream processing. In the remaining sections of this blog, we will walk through some code examples to give you a quick overview of incremental processing with Hudi and, hopefully, give you a starting point for building your own incremental pipelines.
Let’s start with a simple incremental query example using the sample stock data from the Hudi repo. We’ll prepare a table with 2 commits, with the 2nd commit containing updates. From there, we’ll illustrate how to set up the query options and analyze the results.
Clone the hudi repo and navigate to the root directory:
Execute the code snippet below. Be sure to update the Hudi Spark Bundle version to what’s used in your Spark shell:
Execute the snippet to create the table:
List the Hudi timeline to see the 2 commits in a separate terminal window:
From the 1st commit to the 2nd, there was 1 record updated. Now, we can run an incremental query to see what was changed between these two commits:
The result is the changed record with the version matching the latest commit.
Note: We can also specify `hoodie.datasource.read.end.instanttime`, which will limit the version of changed records up to the specified commit.
If we set `hoodie.datasource.read.begin.instanttime=0` and omit `hoodie.datasource.read.end.instanttime`, it will effectively return all the records written to the table.
Once the changed records have been incrementally retrieved, we might want to join them with other dimension tables and update a broader table containing more information for analysis. Let’s proceed with processing the data from Example 1 to demonstrate an incremental join.
Copy and paste this code snippet in the terminal:
In the preceding 2 examples, we could pull the changed records as of the latest version or up to the end of the commit time. However, we could not view the previous state of the records, or determine if some records were hard-deleted. To accomplish this, we need to leverage the full change-data-capture (CDC) capabilities in Hudi.
Starting with Hudi 0.13.0, the CDC feature was introduced, allowing the logging of before and after images of the changed records, along with the associated write operation type (insert or update or delete).
Company XYZ has offices in the US, India and China. At time `1000`, there is 1 employee in each office. At time `1100`, there are 2 new hires in the US office and 1 new hire in both India and China offices. At time `1200`, a new office in Singapore was established and an employee from the US office moved to the new Singapore office.
The company's HR department wants to keep records of how many employees each office has.
We can model this scenario by having an input stream of employee id, office country and time. A key requirement for this example is to continuously update a table of the office country, headcount and time.
The code snippet below shows how to simulate the input data, and perform incremental CDC processing and aggregation using Hudi and Spark streaming:
The above code snippet was taken from an existing test class in the Hudi repo, contributed by Bi Yan. For more introduction to Hudi CDC features, please refer to the guide on the official website.
In this blog, we briefly covered the motivations and challenges with building incremental processing pipelines, and illustrated how Hudi supports incremental processing in different scenarios with sample code. Uber’s recent blog “Setting Uber’s Transactional Data Lake in Motion with Incremental ETL Using Apache Hudi” is an awesome reference for operating production incremental pipelines. The Hudi community will continue to enhance the CDC feature, further advancing the incremental processing capabilities.
Be the first to read new posts