In our previous post, we discussed the design of multi-modal index, a serverless and high-performance indexing subsystem for the lakehouse architecture to boost the query and write performance. In this blog, we discuss the machinery needed to build out such powerful indices, the design of an asynchronous indexing mechanism, similar to popular database systems like PostgreSQL and MySQL, that supports index building without blocking writers.
Apache Hudi adds transactions and updates/deletes/change streams to tables on top of elastic cloud storage and open file formats. A key component inside Hudi is the transactional database kernel which coordinates reads and writes to the Hudi table. Indexing is the newest subsystem of that kernel. All the indexes are stored in an internal Hudi Merge-On-Read (MOR) table, i.e., the metadata table transactionally kept upto date with the data table, even in the face of failures. The metadata table is also built to be self-managed by Hudi’s table services, just like the data table.
While Hudi currently supports three types of indexes; files, column_stats, and bloom_filter, the volume and variety of big data make it imperative to add more indexes to further reduce I/O costs and query latency. One way to build a new index is to stop all writers, then build a new index partition inside the metadata table and then resume writers. As we add more indexes, this may not be ideal because, a) it requires downtime, b) it will not scale with more indexes. So, there is a need to dynamically add and remove indexes on a table concurrent with the writers. Being able to index asynchronously has two benefits, improved write latency, and decoupling of failures. For those familiar with `CREATE INDEX` in database systems, you would agree how easy it is to create index without worrying about ongoing writes. Adding asynchronous indexing to Hudi's rich set of table services is an attempt to bring the same database-like ease of use, reliability and performance to the lakehouse.
At its core, asynchronous indexing with ongoing writers is the problem of ensuring these writers can perform consistent updates to the index, even as the historical data is being indexed in the background. One way to deal with this problem is to lock the index partition completely until the historical data is indexed and then catch up. However, the probability of conflict only increases with locking with long-running transactions. The solution to this problem relies on three pillars of Hudi's transactional kernel design:
Data files in a Hudi table are organized into file groups, where each file group contains several file slices. Each slice contains a base file produced at a certain commit, along with a set of log files that contain updates to the base file. This makes it possible to have a fine-grained concurrency control as we will see in the next section. Once the file group has been initialized and the base file is being written, another writer can log updates to the same file group and a new slice will be created.
Asynchronous indexing uses a mix of optimistic concurrency control and log-based concurrency control models. The indexing is divided into two phases: scheduling and execution.
During scheduling, the indexer (an external process that is responsible for create a new index) takes a short lock and generates an indexing plan for data files upto last commit instant `t`. It initializes the metadata partitions corresponding to the index requested and releases the lock once this phase is complete. This should take a few seconds and no index files are written at this stage.
During execution, the indexer executes the plan, writing the index base files (corresponding to data files upto instant `t`) to the metadata partitions. Meanwhile, the regular inflight writers continue to log updates to the log files within the same file group as the base file in the metadata partition. After writing the base file, the indexer checks for all completed commit instants after `t` to ensure each of them added entries per its indexing plan, otherwise simply abort gracefully. This is when optimistic concurrency control kicks in, taking a metadata table lock to check if writers have impacted overlapping files and if a conflict exists, then abort. Graceful abort ensures that indexing can be retried in an idempotent manner.
Hudi maintains a timeline of all actions performed on the table at different instants. Think of it as an event log as the central piece for inter process coordination. Hudi implements fine-grained log-based concurrency protocol on the timeline. To differentiate indexing from other write operations, we introduced a new action called `indexing` on this timeline. The state transition of this action is handled by the indexer. Scheduling indexing adds a `indexing.requested` instant to the timeline. Execution phase transitions it to `inflight` state while executing the plan and then finally to `completed` state after indexing is complete. Indexer only takes a lock while adding events to the timeline and not while writing index files.
The advantages of this design are:
With the timeline as event log, a hybrid of the two concurrency models provides great scalability as well as asynchrony for the indexing process to run concurrently with writers in tandem with other table services such as compaction and clustering.
Asynchronous indexing feature, a first of its kind in the lakehouse architecture, is still evolving. While index can be created concurrently with writers, dropping index requires table level lock because table will typically be in use by other reader/writer threads. So, one line of work is to overcome the current limitation by lazily dropping the index and increasing the amount of asynchrony so that multiple indexes can be created or dropped at the same time. Another line of work is to enhance the usability of indexer; integrate with SQL and other types of indexes, such as bloom index for secondary keys, Lucene-based secondary index (RFC-52), etc. We welcome more ideas and contributions from the community.
Hudi's multi-modal index and asynchronous indexing features show that there is more to transactional data lakes than just a table format and metadata. The fundamental principles of distributed storage systems apply to lakehouse architecture as well, and the challenges occur on a different scale. Asynchronous indexing at this scale would soon become a necessity. We discussed a design that is extensible to other index types, scalable and non-blocking, and will continue to build on this framework to add more capabilities to the indexing subsystem.
Be the first to read new posts