September 11, 2024

Table Optimizer: The Optimal Way to Execute Table Services

Table Optimizer: The Optimal Way to Execute Table Services

Crafting a solid foundation for a data lakehouse begins with acknowledging its roots in cloud storage: a sprawling collection of files. Without proper optimization of the files, tasks such as retrieving specific datasets, applying filters, joining dimensional data, etc., may require inefficient repetitive scanning through files that might feel like searching for a needle in a haystack.

Ensuring optimal file sizes, clustering related data, strategically partitioning data, and other data locality techniques are not merely technical minutiae; they hold the key to unlocking the true performance potential of a Universal Data Lakehouse. Lakehouse metadata layers such as Apache Hudi, Apache Iceberg, or Delta Lake help by adding abstraction layers around files, making some of these optimizations possible.

If you are using Hudi, Iceberg, or Delta, you likely have spent countless hours monitoring, and trial-and-error tuning, a wide array of configurations. Even after you complete a point-in-time optimization, your t​eams still have an ongoing operational burden of scheduling and running these optimizations, diagnosing failures, and managing the underlying infrastructure. 

At Onehouse we have decades of collective experience, pioneering the invention of - and operating - these advanced optimizations at scale for some of the largest data lakes on the planet. We are excited to gather many of these innovations into a brand new product, Onehouse Table Optimizer. 

Table Optimizer is a solution designed to streamline and optimize Apache Hudi table management. Table Optimizer empowers data engineers to focus on core business logic, while ensuring optimal table performance and reliability. 

Our table optimizer runs alongside pipelines you already have in place. We write to the same tables and apply optimizations to improve the performance and cost of your tables. This blog post describes some of the engineering design and implementation challenges we faced in creating and implementing Table Optimizer, and how we solved them. 

Background

Data lakehouse architecture combines the best features of a data lake and a data warehouse. The term "table services" represents a set of functionalities and tools that manage and optimize the data stored in the data lakehouse. Some examples of these functionalities include data compaction, data clustering, data cleanup, and metadata sync. (For more depth on these and other Hudi topics, see our free ebook, Apache Hudi: From Zero to One.) Hudi offers a complete spectrum of table services for Hudi tables.

Hudi also offers different ways to deploy these tables services, ranging from a completely inline model, to just scheduling inline (async mode), to a fully standalone model. Most users in the open source community deploy these table services inline, to keep things simple and to avoid the overhead of managing complicated multi-writer concurrency control methods

Challenges Hudi Users Face 

Hudi users face a few challenges to use inline/async table services:

  1. High and unpredictable ingestion latency. In many real-time or near real-time machine learning use cases, such as ad targeting and fraud detection, customers are very sensitive to the latency of ETL/ELT workflows. One of the best features of Hudi is its ability to handle data updates quickly and flexibly, allowing users to choose streaming updates with superior data freshness over batch updates. However, when a service is called in inline mode, the data ingestion will not run while the service is executing, impacting the freshness of the data for downstream consumers. 
  2. Cascading failures and complexity. As an example, in async mode, a huge clustering task may take a large chunk of the resources from the cluster that also runs data ingestion jobs, which may cause the data ingestion to fail. This also causes greater complexity for mitigation and debug processes.
  3. Suboptimal resource management. Data ingestion and table services are normally different types of workloads. In general, data ingestion is more frequent, with small- or medium-size workloads, but table services run less frequently with larger workloads. When these two kinds of workloads are blended, it becomes difficult to make them share resources fairly and efficiently. 

Offloading table services into a separate set of resources can mitigate all the above challenges at the same time. For example, clustering is normally a time-consuming task. By running clustering tasks outside of the ingestion pipeline, the ingestion latency can be dramatically reduced, as well as being made more predictable. Meanwhile, any issues with clustering tasks, such as memory issues, will not block ingestion tasks. As a result, more ingestion jobs can be scheduled concurrently to fully utilize the resources, reducing costs.

So at Onehouse, we have developed an independent table service management component which is able to schedule and execute table services asynchronously, without blocking the ingestion writers. This frees up pipeline owners to focus on data ingestion. (When integrated with ingestion services, within the Onehouse Cloud product, this component also frees pipeline owners from the task of tuning performance of these table services for different workloads.) This component is Table Optimizer.

High-Level Design

Table Optimizer is executed inside a Spark application, deployed through Kubernetes on the customer's AWS EKS or GCP GKE cluster. The main logic of Table Optimizer runs inside the Spark driver, and each individual table service task is executed in Spark executors. Using Spark supports our goal of delivering system scalability and reliability.

Table Optimizer serves as the orchestration layer for Hudi table service tasks. Table service tasks for one table are independent of those for other tables. Meanwhile, due to the thoughtful design of Hudi, table service tasks for a specific Hudi table can be executed concurrently. Therefore, the architecture of Table Optimizer is essentially a task-dispatcher system, which can easily achieve high throughput and reliability. The following diagram shows the architecture of Table Optimizer.

A metastore stores the table specifications, and whether a table is under the management of Table Optimizer is specified in a manifest file stored in a distributed file system. The manifest file is updated by customers through the friendly user interface of Onehouse Cloud.

Table Service Manager: Table Service Manager maintains a set of handles to the tables under management, and each table handle contains necessary information for scheduling and executing table services, such as write configurations, table status, and task status. Table Service Manager periodically updates these handles by fetching the manifest file. Meanwhile, Table Service Manager is responsible for selecting active tables and scheduling tasks for each table, based on business logic and Hudi internal constraints.

Coordinator Task: The Coordinator task processes active tables and schedules specific table service tasks for each of them. Before each scheduling operation, the Hudi write client is refreshed with the latest table configurations and timeline information.

Table Service Tasks: The major table service categories include compaction, clustering, cleaning, timeline archive, and metadata sync. More categories can be added easily.

Resource Manager: Resource Manager consolidates allocation and release of all resources for Table Optimizer, such as thread pools and the Hudi timeline service. This module prevents any sort of resource leak during runtime.

Overall Execution Flow

After Resource Manager allocates resources, Table Service Manager creates or updates its table handles by fetching the latest manifest file. For active tables, it submits a Coordinator task instance into the task pool.

Table Optimizer ensures that there is a single Coordinator task running for a specific table at any time, such that the Coordinator task ensures that all the table service tasks for the table are scheduled in a proper order to avoid data inconsistency issues. Generally, the Coordinator task first checks if there are any failed compaction or clustering tasks and, if so, reschedules them. After that, it checks if any new compaction, clustering, cleaning/archival, and/or metadata sync tasks need to be scheduled, and if so schedules them in a proper order.

All scheduled table service tasks are executed asynchronously. A proper distributed locking mechanism and Hudi internal concurrency control mechanism work together to ensure the expected system behavior.

For tables whose table services are paused, Table Service Manager removes their scheduled tasks and kills their running tasks. For tables whose status is set to stopped status, they are removed from the manifest file.

Challenges in Creating Table Optimizer

Besides common implementation challenges, such as scalability and reliability, here are a couple of interesting challenges we met when we were implementing and deploying Table Optimizer:

  1. The first and primary challenge is to ensure no data corruption for any Hudi tables, which means Table Optimizer needs to rigorously detect and resolve the conflicts between data ingestion writers and table service writers, or between different table service writers. Since we can run table services outside the data ingestion environment, this is a complex distributed concurrency control problem. In the worst scenario, the data ingestion could happen outside of Onehouse Cloud; then we have no control on when, where, and how the ingestion might happen. To prevent conflicts, our solution relies on a distributed lock provider, such as AWS DynamoDB or GCP Zookeeper, to coordinate data access; meanwhile, various checks are enforced to ensure the correct lock configurations are applied to all writers.
  2. After we solved the above high-level concurrency issue, we had to validate and consolidate Table Optimizer behavior when resolving conflicts among concurrent writers. We rigorously studied the expected Hudi behavior between any pair of table services to design the optimal order of scheduling. Also, we had to write a very large number of tests to simulate different concurrency conflict scenarios, which checked whether Hudi behaved as expected. For example: the data ingestion writer starts writing data into a table before a clustering task is scheduled by Table Optimizer. In this scenario, the default conflict resolution strategy prefers to fail the ingestion writer rather than the clustering task, since clustering is normally much more expensive. We made improvements to Hudi to ensure this behavior, and added a set of unit tests and functional tests to prevent any regressions from underlying Hudi changes.
  3. Another challenge in launching Table Optimizer was to ensure that it delivered the same user experience to users of the same functionality within Onehouse Cloud. In Onehouse Cloud, table services run in inline mode within the ingestion workflow from the very start. After migrating this capability to Table Optimizer, we carefully ensured that the functionality behaves as before. For example, table service task metrics, such as average task duration and the status of the table services, are the same in Table Optimizer and Onehouse Cloud.

Benefits from Table Optimizer

We continually seek to deliver a number of concrete benefits to those who use Table Optimizer (and the parallel functionality in Onehouse Cloud): 

  1. Once Table Optimizer is enabled for a new user, all inline table services can be scheduled and executed reliably asynchronously. Therefore, not only can the latency of the ingestion workflow be dramatically reduced, the complexity of the ingestion pipelines from the customer perspective is also reduced.
  2. Table Optimizer has a global view of the table status, such as timeline and all tasks to schedule. Therefore, Table Optimizer can schedule these tasks in an optimal order. For example, a compaction task can normally be scheduled before a clustering task to reduce the number of files consumed by clustering; the latter task is usually more expensive. Table Optimizer prefers to retry certain failed tasks before the archival task, since the archival task would be blocked by failed tasks, which could increase the cost of reading the active Hudi table timeline.
  3. Table Optimizer achieves better automation. When a table service task fails, Table Optimizer can decide whether a retry should be employed, or an auto-healing strategy should be used, or a manual intervention should be requested. When table services run inline with ingestion, such logic is hard to apply.
  4. The optimizer requires minimal changes to a customer's existing Spark ingestion jobs. Even when the ingestion workflow is not under Table Optimizer management, after setting up a distributed lock provider, Table Optimizer can start scheduling and executing table services.

Results from Table Optimizer

Table Optimizer is available today, serving Hudi users who want a managed service to handle time-consuming tasks such as data compaction, clustering, and cleanup. Interoperability with Apache Iceberg and Delta Lake is available via Apache XTable (Incubating)

For a free trial of Table Optimizer, or to learn more, contact Onehouse

Authors
No items found.

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
We are hiring diverse, world-class talent — join us in building the future