How Our Team Used Open Table Formats to Boost Querying and Reduce Latency for Faster Data Access
In this talk, Amaresh Bingumalla shares how his team utilized Apache Hudi to enhance data querying on blob storages such as S3. He describes how Hudi helped them cut ETL time and costs, enabling efficient querying without taxing production RDS instances. He also talks about how they leveraged open-source tools such as Apache Hudi™, Apache Spark™, and Apache Kafka™ to build near real-time data pipelines. He explains how they improved their ML and analytics workflows by using some of the key Hudi features such as ACID compliance, time travel queries, and incremental reads, and paired with Datahub, how they boosted data discoverability for downstream systems.
Transcript
AI-generated, accuracy is not 100% guaranteed.
Adam - 00:00:07
I am very excited to have Amish join us. Amish, can you hear me okay?
Amaresh Bingumalla - 00:00:11
Yep. All good.
Adam - 00:00:12
You're on the stage. Alright, Amish, where are you dialing in from?
Amaresh Bingumalla - 00:00:16
I'm from the Bay Area.
Adam - 00:00:18
From the Bay Area. And today. And you're at Peloton?
Amaresh Bingumalla - 00:00:22
Correct.
Adam - 00:00:22
Yes. You're gonna talk to us about how open table formats can actually be used to reduce latency.
Amaresh Bingumalla - 00:00:32
Yep.
Adam - 00:00:33
I think a lot of people will have questions for that and, and Russia will be back in about 10 minutes. We'll see you soon.
Amaresh Bingumalla - 00:00:39
Yep, sure thing. Hey all, I'm Amish Bala, currently working as a senior data platform engineer at Peloton. Thanks for joining my talk. Today I'm gonna be talking about how our team used OpenTable formats mainly to boost querying and reduce latency for faster data access. Before I dive into how we achieve that, I want to give a little bit overview on what the existing architecture was and what changes we made to actually get that impact.
So if you see here on the left, these are the Peloton devices. Peloton is a digital fitness platform. We have Peloton bikes, Peloton treadmills, Peloton rows, and the Peloton apps on the web, iOS, and Android. These Peloton devices, of course, talk to microservices and then these microservices do create operations on RDS databases. We also use DynamoDB tables as our NoSQL needs. I'll talk about that in a bit.
But yeah, these microservices do cloud operations and ideas, and most of our databases do have read replicas depending on the load and the need. These read replicas are used for our recommendation systems or the analytical use cases. We use Redshift primarily for our analytical workloads. On these read replicas, we do have daily snapshots and early incremental snapshots. We use the early incrementals to send it to Redshift so that data engineers or analytical data analysts can do analytical workloads and then push them to Looker for operational dashboards.
We also have the daily snapshots from these databases. These go in certain S3 buckets, and these are used by recommendation systems for training ML models and push the recommendations to the devices. So this is the existing architecture.
As you can see, there might be some challenges or some flaws in this. The main, or the first obvious one, is the dependency on the daily snapshots. Both the reports, dashboards, and the recommendation systems are constrained by the daily snapshot. If for any reason the daily snapshot is not done, the recommendation on the training pipelines and the dashboards are stale or cannot move forward. It could be any reason. There is a load on the database or maintenance is happening on the database, so the snapshots are delayed.
This tight coupling between the online and analytical systems is not ideal. Due to maintenance on the database, I shouldn't have a delay on my dashboards or training pipelines. There should be decoupling and there should be a better way than this.
There are higher costs and load on the database read replicas because doing a data dump on an RDS instance or like a Postgres or anything puts load on your primary database, which is actually backing your production service. Any ad hoc queries or even a data dump for snapshots increase the load on the database, and that's not an ideal situation which can actually affect your customers.
And the last one, during a data migration, like you're moving to a new database or you're trying to split off into a new microservice, there is no easy way to do this without client handling the dual writes. There should be a better way to assist the services or the product engineers to do it quicker, and they don't have to worry about how they are going to migrate from one database to another or split off into a new service.
So the solution or part of the solution, major part of the solution, is open table format. A quick refresher: OpenTable formats are open source standards for storing data in blob storages like S3 or Google Cloud Storage on top of existing file formats like Parquet or Avro or ORC.
On the right, if you see, Apache Hudi, Iceberg, and Delta Lake are the top three formats used across the industries. All three share similar fundamentals at the base level. They store data in the same file formats that we know, like most of them use Parquet, Avro, along with the data. They also store metadata, which contains information about the data: what operations were done, what commits were done, what was the commit time and the transaction level, schema information, partition information, how you are distributing a key, and where data for a specific partition exists in your S3 buckets or Google Cloud Storage.
Why should you choose OpenTable formats and not just go with plain Parquet or CSV files? The first thing is the asset transaction. Storing data on blob storages as plain Parquet, you don't get any asset transaction guarantees while you're writing data to S3 buckets. If something happens and your job fails, you just have to rewrite and create your own process or manually go and delete the half-written data or some corrupted Parquet or CSV file.
With open table formats like Hudi or Iceberg or Delta, we primarily use Apache Hudi in production. We have been using it for about two to three years now, and we have no issues. We love Hudi for that sake.
Hudi or any other open table formats provide asset transactions and CRUD operations. If you're storing data in plain Parquet, if you want to update or delete a single row, there's no easy way to do it. You have to read the entire Parquet file, update or delete that specific row, and rewrite the entire file. But with open table formats, they let you make that update or delete into a separate file. There are additional table services mainly on Apache Hudi like compaction and cleaning. What happens is you write your updates, deletes, or creates into a new file, and then a single server picks up these files and merges them into the existing files, everything happening in the background.
Data versioning: if you're using Parquet files, to do data versioning, you have to rewrite the entire file and keep a map of which file corresponds to which version. But with open table format, as with transactions, there is a way to do a commit every time you write data to a data lake. This commit includes information on what changes were made, where the data actually fit in which files, and how to reference those files.
With this data versioning, this opens up time traveling. Every commit represents a specific point in time in the timeline. Anybody querying the data can query the table as of that specific timestamp. Hudi or other formats look into the metadata and find the files corresponding to that specific timestamp or commit and return the data.
Time travel is huge for us. For example, if you want to do recommendations for workouts after your morning workout, if you want to recommend something for the evening, doing a commit specific to that timestamp helps us do a time travel and get the data.
Partition management: if you're using plain Parquet or CSV, the clients need to handle the partition. They have to manage how to store the data, whether partitioning by time or something else. Everything needs to be in the client code. But with these stable formats, you just write the partition key or distribution key, and the metadata keeps track of how to partition this data, which files contain specific key partitions, and everything.
Partition management helps with query planning and optimization. When querying a specific key or a specific where condition, it can easily skip over many files instead of going through every single file. Your read queries are much quicker, and the amount of data scanned reduces a lot. For example, Athena charges by the file scan or the number of files touched to get data out.
Schema enforcement and evolution: if you're using plain Parquet or CSV, it doesn't recognize what data you are appending to an existing file. With open table formats, if you try to write a string to an int column, it rejects and says schema doesn't match, and you have to change the schema. There is a way to merge schema when new data comes in and you want to change your schema; you can do a merge schema and your schema evolves based on your needs.
There are async services used for cleaning or compacting your data like compacting small files or removing older versions. These run asynchronously or inline while writing data. If running async, there might be a case where you're touching a specific file by two writers at the same time, leading to problems like read queries exiting or returning wrong data.
Hudi or these stable formats internally handle concurrency and know which query to prioritize first so they don't return stale data or try to access a file that doesn't exist. Hudi supports this via optimistic concurrency control.
So do you still want to use plain Parquet or CSV files? I think you should rethink your answer if you say yes.
The solution, the new architecture: we started using Apache Hudi and CDC. CDC is change data capture. Our devices still talk to microservices and then to RDS instances or DynamoDB tables. We still take snapshots and push them into S3 buckets mainly for backups, rollbacks, or recovery.
But we started using CDC on these DynamoDB tables and RDS instances. Every change made on these tables gets captured and pushed into MSK. MSK is the Kafka Amazon; we use MSK for our Kafka needs. Once the data is in MSK, we have Spark jobs running both in streaming fashion and intervals of 10 to 15 minutes. These pick the changes from Kafka topics, transform, and put it into S3 in Hudi table formats.
Once in Hudi table formats, we register with Glue catalog and then they're open for querying on Athena. The tables are gated by Lake Formation permission, so not everybody has access to all tables.
Once the data is on S3 in Hudi format, you can register with any catalog that supports Hive metastore and query with Presto engines. Hudi or these table formats are queryable anywhere Parquet is supported. Most query engines support Hudi, Iceberg, or Delta Lake these days.
These Spark jobs also push some information to DataHub. DataHub is our discovery engine. We push metadata on schema, latest updates, schema evolution, and row counts. We use this for discovery and lineage to understand where the table is coming from, which topic it corresponds to, which source table and RDS it corresponds to, and everything.
Once the data is in Hudi in S3, we have more Spark jobs that read incremental data written in the last 10 minutes and push it to Redshift so that our Looker dashboards have near real-time information coming from changes on RDS or Dynamo tables. Similarly, recommendation systems now have access to near real-time data or changes and can run their pipelines anytime during the day and push them to devices as needed.
The impact: definitely shorter time for dashboards. They are much more recent and near real-time. Recommendation systems can query data with near to no latency. We have both batch and streaming pipelines, so based on the need, they can consume data as they want.
Near real-time access to data via query engines: we use Athena primarily, but you can use any Presto-based query engine. Open table format enables efficient storage and querying, partition management for faster querying, and storage. You don't have to rewrite for every single update or delete; you just write updates and deletes to a new file. Async services or async compaction services take care of combining files, so rewrites for every single change are minimized.
Costs and load on the primary database and read replicas: no analytical workloads ever have to hit the primary database. Everything talks to data in the blob store.
Time travel queries for training ML models and analytics: ML models can train on data during middays. For example, we can recommend workouts based on what workout you did in the morning. If you did yoga in the morning, we can recommend strength workout in the evening.
This helped us reduce latency and provide faster data access for both operational dashboards and recommendation systems. Thanks everybody for tuning into my talk today. You can reach me on LinkedIn, and here are my socials.
Adam - 00:15:44
Folks, awesome. Amish, thank you so much. We do have at least a couple of questions. I'm gonna be passing them to you rapidly. First, how do you push metadata to DataHub?
Amaresh Bingumalla - 00:15:58
DataHub is hosted on our Kubernetes clusters. We have APIs open on them, so we push metadata. Metadata includes where the data exists, what topics it relates to, what source table it is, what kind of fields or privacy fields it has, and that needs to be abstracted or scrubbed out.
Adam - 00:16:22
We got another one here from Jesus. What about costs? You showed us your initial architecture, and then after 20 minutes we looked at the next one. This next one obviously looks much more robust and rigorous, but potentially expensive. Is there something you can say about the cost increment?
Amaresh Bingumalla - 00:16:41
Our major costs are storage on S3 and EMR costs. Everything else is open source, like Hudi is open source and Spark is open source. So it's mainly compute and storage costs. Storage costs are unavoidable, but in our case, we decided to go with compute costs because we wanted faster data access. It's a trade-off: cheaper versus stale data or spending a little more for faster data access.
Adam - 00:17:19
You got a couple other ones for ML models. Is there a feature store separated from this architecture?
Amaresh Bingumalla - 00:17:26
Yes. ML teams have a feature store separated. They consume the data and build feature stores on top for A/B testing and everything.
Adam - 00:17:36
Curious, is that something you guys custom built or are you using some features?
Amaresh Bingumalla - 00:17:41
They use Feast. Feast is another open source feature store. They use Feast and self-host it.
Adam - 00:17:50
Moham here is someone has to learn. Where might you recommend somebody to pick up some of these skills? A lot of people might not have the scale to start using a lakehouse or read off the read replica. If you're not yet at that scale, you want to pick up the skills or ideas for how to do it.
Amaresh Bingumalla - 00:18:20
You don't need a read replica itself. You can spin up a Postgres server on your Docker container, get a CSV or anything with like a hundred thousand rows, and do an insert. You can find dummy data online. The MLOps community has a course around data engineering. You can push it into your local Postgres and then do a snapshot.
You can start off with AWS ClickOps. You don't have to worry about CLI or anything. Just get the snapshot into a Parquet file or even a CSV file, click upload, and push it to S3. That's it. You have your data in S3 now.
Once in S3, you can try to build a simple Hudi table that points to this single file on S3. With Hudi, you can read and write data into S3 as Hudi table format. That's it. It's open to querying on Athena.
Adam - 00:19:20
Amash, thank you very much for coming and sharing with us.