September 27, 2022

How Apache Hudi Simplifies MPP Data Warehouse Migrations

How Apache Hudi Simplifies MPP Data Warehouse Migrations

Introduction

When organizations with on-prem MPP data warehouses want to modernize their data architecture on the cloud, many choose to adopt a Lakehouse. Designing the new architecture is exciting, but executing the migration for large warehouses is no simple task. In this blog we will share a solution pattern and describe how Apache Hudi helps simplify the migration process to a cloud data lake. For convenience, we will use Teradata as an example and a Lakehouse on Amazon S3 as the target. However the concepts and processes are equally applicable to other MPP data warehouses and other public clouds. 

The Solution Pattern

When executing a large migration, the following diagram outlines a solution pattern successfully used by many customers.

Figure 1:  High level MPP data warehouse migration pattern

To ensure a successful migration you want to run and audit your new cloud lakehouse in parallel with your existing on-prem data warehouse to ensure parity in your pipelines. For this you need to develop dual target ETL pipelines that will serve dual target BI & application layers. When these components are ready, one of the most challenging parts is how to synchronize your data warehouse and the target data lake with minimal to zero downtime on your production workloads. In this blog we will discuss a two stage synchronization process that describes: 

  1. How to quickly extract a large amount of data from source Teradata and load into Lakehouse?  This is the Point-In-Time (PIT) Full Extract and Load Phase (T0). 
  1. How to capture incremental data changes (CDC) in a consistent and fast way to minimize the downtime of an often mission-critical Teradata PROD cluster.  This is the Incremental Extract and Merge Phase (T1). 

Common CDC Methods

Database Change Data Capture (CDC) techniques are used to identify changes from a source database and apply these changes to the target. CDC can be used to replicate from one database to another database or cloud storage to keep an audit trail of changes.  The audit trail of changes may subsequently be used to update a Lakehouse or another database.  There are three common methods to perform CDC in a database: ModifyTimestamp, Log Based and Database Trigger.  For more details, refer to this blog: Change data capture: What it is and how to use it. Unfortunately neither of these common CDC approaches works well with MPP data warehouses at scale. 

T0 Point-In-Time Full Extract and Bootstrap Phase 

An effective method to migrate an MPP data warehouse is to follow a wave-based migration approach which moves workloads from the source MPP data warehouse to the target Lakehouse in a series of carefully defined waves. This requires both the source MPP data warehouse and the target Lakehouse cluster to be running in parallel for a period of time before the source can be retired.  

At T0 a full extract from the source MPP Teradata data warehouse needs to be performed using either the AWS Schema Conversion Tool (SCT) or Teradata Parallel Transporter (TPT) tool.  All of these extracted files (usually in the format of gzipped CSV) need to be transported and uploaded into S3 first (e.g. using AWS Snowball); then they can be converted/bootstrapped into Hudi data sets using either the AWS Glue for Apache Hudi Connector or Hudi on Amazon EMR/Spark.  During this phase, the source Teradata system can remain active and no downtime is required.  

Common Challenges for Incremental CDC

Before the advent of modern transactional data lake or Lakehouse technologies like Apache Hudi, many customers have struggled with applying updates/deletes in their append-only data lakes built on top of immutable cloud storage services such as Amazon S3. There is no simple way to support data mutations in these traditional data lakes, often resulting in huge file rewrites (aka write amplification), excessive resource usage, wasted CPU cycles, poor performance and large data latency (hours to days). 

The following is a list of some of the most common challenges and issues faced by data lakes before Apache Hudi was created:

  • Customers prefer to keep a single copy of their data in the data lake. They typically would rather not load ALL the data into a cloud data warehouse such as Amazon Redshift or Snowflake. 
  • All the large scale data mutation needs to happen in the data lake on top of the immutable cloud storage layer such as Amazon S3.  Bulk loading, truncating and re-loading of TB’s of data each time do NOT scale. 
  • To compare two sets of files in S3 to check for changes, both sets of files need to be listed and this process can be slow when the number of files becomes large.  
  • Without record level indexes, it is hard to implement file-level skipping in S3.   
  • There is no Upsert/Delete support. Customers often have to rely on using S3 file names for change identification and replacement. 
  • T0 and T1 are not real time instants but rather time periods (days to weeks).  See Figure 1. Incremental pull and sync up may need to happen multiple times. Minimizing the downtime of customer’s busy production MPP data warehouse becomes a huge challenge.

Apache Hudi supports a rich set of database like operations such as Upsert/Bulk-Insert/Insert Overwrite, etc. to perform record level de-duplication. It can automatically optimize file sizing (to avoid exposing large number of small files in S3 to query engines causing degraded performance).  Check https://hudi.apache.org/docs/write_operations for more details. 

T1 Incremental Extract and Merge using Hudi

At T1 the source Teradata system would have experienced some data changes since T0.  Depending on the use cases, this gap of (T1-T0) can be anywhere between days and even weeks. The challenge is how to properly identify these data changes (CDC) between T0 and T1 and “merge” these changes in the target Lakehouse. This phase typically requires some downtime for the source Teradata system or at least a set of source tables involved in the T1 extract process.  The purpose is to guarantee that at the end of this T1 phase, complete data synchronization is achieved for a given set of tables between the source Teradata system and the target Lakehouse.  At such time, parallel ETL jobs can be triggered to run in parallel on both the source and target to allow customers to perform functional, performance as well as system integration tests.  Because of the requirement to pause the source Teradata system, it is critical that this process finishes as quickly as possible to minimize the downtime.

Different tables may be treated differently based on their sizes.  Again using Teradata as an example.  A sample table classification framework is:

Small (< 50GB), Medium (50GB-100 GB) and Large (>100 GB)

It is important to make sure all Large Teradata source tables have either partition keys or some data change identifiers (e.g. last_update_timestamp/period_id/batch_id).

To perform extraction of data from the source Teradata data warehouse at T1, follow the steps below (Figure 2): 

Step 1: Estimate the downtime required for the source Teradata PROD system based on lessons learned from the previous extract processes. Work with business users to secure a time slot to pause the source Teradata PROD system to perform the T1 Incremental Extract and Merge process.

Step 2: For Small and Medium sized tables, no need to worry about CDC. Instead you should perform a full bulk extract. 

For Large tables that are partitioned in Teradata, estimate and extract only the partitions that have changed since T0 and store the extracted files in S3 (using some S3 prefix conventions).   

For Large tables that NOT are partitioned, consider the maximum possible length of historical data impact since T0 (e.g. 6 months or 1 year).  Extract the required data using some update timestamp field.  It is to be expected that some of the data extracted in this manner has NOT changed between T0 and T1.  

At this time, a new set of gzipped CSV files will be collected and stored in a S3 Raw bucket (Bronze) which will then need to be “merged” into the initial set of files at T0.  

Step 3:  You can use either the AWS Glue for Apache Hudi Connector or Hudi on Amazon EMR/Spark or Hudi’s CSV Source Utility to convert these incremental raw CSV files (T1) into Hudi Parquet files and store them in a S3 Transformed (Silver) bucket. 

Step 4:  Next step is to Upsert or Merge these incremental Hudi files in the S3 Transformed (Silver) bucket into Curated Hudi dataset (Gold) ready for downstream consumption.  

Step 5: Confirm that all these Hudi datasets in the Gold bucket are properly registered in the AWS Glue Data Catalog. 

Step 6: Use tools such as Amazon Athena and Redshift Spectrum to query. 

   Figure 2: Incremental Extraction (T1) and Merge using Apache Hudi

Conclusion

In this blog, we have proposed a new approach to sync up the data between a massive source MPP data warehouse (e.g. Teradata) and a target Lakehouse powered by Apache Hudi.  Though we used Teradata and AWS Lakehouse as examples, this design can also work for other MPP data warehouses (Cloudera/Netezza/Greenplum/Vertica, etc.) and other public clouds such as Microsoft Azure and Google Cloud Platform (GCP).

If you would like 1:1 consultation to dive deep into your use cases and architecture, please reach out at info@onehouse.ai.  To learn more about Apache Hudi, here are some 

additional links:

Read More:

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.