How Our Team Used Open Table Formats to Boost Querying and Reduce Latency for Faster Data Access
In this talk, Amaresh Bingumalla shares how his team utilized Apache Hudi to enhance data querying on blob storages such as S3. He describes how Hudi helped them cut ETL time and costs, enabling efficient querying without taxing production RDS instances. He also talks about how they leveraged open-source tools such as Apache Hudi™, Apache Spark™, and Apache Kafka™ to build near real-time data pipelines. He explains how they improved their ML and analytics workflows by using some of the key Hudi features such as ACID compliance, time travel queries, and incremental reads, and paired with Datahub, how they boosted data discoverability for downstream systems.
Transcript
AI-generated, accuracy is not 100% guaranteed.
Speaker 0 00:00:00
<silence>
Speaker 1 00:00:07
I am very excited to have Amish join us Amish, can you hear me okay?
Speaker 2 00:00:11
Yep. All good.
Speaker 1 00:00:12
You're the stage. Alright, Amish, where are you dialing in from?
Speaker 2 00:00:16
Uh, I'm from the Bay Area.
Speaker 1 00:00:18
From the Bay Area. And today. And you're at Peloton? Yep.
Speaker 2 00:00:22
Correct?
Speaker 1 00:00:22
Yes. You're gonna talk to us about how open table formats can actually be used to reduce latency.
Speaker 2 00:00:32
Yep.
Speaker 1 00:00:33
I think a lot of people will have questions for that and, and Russia will be back in about 10 minutes. We'll see you soon.
Speaker 2 00:00:39
Yep, sure thing. Um, hey, Y all, I'm Amish Bala, currently working as a senior data platform engineer at, uh, Peloton. Um, thanks for joining my talk. Uh, today I'm gonna be talking our about how our team used OpenTable formats mainly to boost querying and reduce latency for faster data access. Um, before I dive into how we achieve that, I wanna give a little bit overview on what the existing architecture was and what changes we made to actually get that impact. So if you see here on the left, uh, these are the Peloton devices. Peloton is a digital, uh, fitness platform. So we have Peloton bikes, Peloton treadmills, Peloton rows, and the Peloton apps on the web bios and under Android. So these Peloton devices, of course, stopped to microservices and then these microservices to credit operations on rds uh, databases. We also use DynamoDB tables as our, uh, no SQL, uh, needs.
Speaker 2 00:01:40
I'll talk about that in a bit. But yeah, these microservices do cloud operations and ideas, and most of our, uh, databases do have, uh, read replicas depending on the load and the need. Uh, so these read replicas are used for our, uh, recommendation systems or the analytical use cases. So we use, uh, Redshift primarily for our analytical workloads. So on these read replicas, we do have, uh, daily snapshots in early incremental snapshots. We use the early incrementals to send it to Redshift. Uh, so Redshift, uh, like data engineers or analytical data analysts can do analytical workloads and then push them to looker for, uh, operational dashboards. Uh, we also have the daily snapshots from these databases. Uh, these go in certain S3, and these are used by recommendation systems, uh, for training ML models and push the recommendations to the devices. So this is the existing architecture.
Speaker 2 00:02:40
Uh, as you can see, there might be some challenges or some, uh, flaws in this. Uh, so the main, or the first obvious one is the dependency on the daily snapshots. So both the reports, dashboards and the recommendation systems are constrained by the daily snapshot. So if for any reason the daily snapshot is not done, the recommendation on the training pipelines and the dashboards are stale or cannot move forward, um, it could be any reason. I mean, uh, there is a load on the database or the maintenance is happening on the database, so the snapshots are delayed. Uh, so this tight coupling between the online and analytical systems is not ideal. I mean, due to a maintenance on the database, I shouldn't have a delay on my dashboards or training pipelines. There should be decoupled and there should be a better way than this. Um, there are higher costs and load on the database read replicas.
Speaker 2 00:03:33
Uh, because doing a data dump on, uh, like an RDS instance or like a Postgres or anything, uh, puts load on your primary database, which is actually backing your production service. So any ad hoc queries or even like a data dump for snapshots does increase the load on the database, and that's not an ideal situation to act, which can actually affect your, uh, customers. And the last one, so during a data migration, like you're moving to a new database or you're trying to split off into a new microservice, there is no easy way to do this without, uh, client handling the dual rights. So there should be a better way to assess the services or the product engineers to do it quicker, and they don't have to worry about how are they gonna migrate from one database to other database or like split off into a new service.
Speaker 2 00:04:25
So the solution or part of the solution major, part of the solution on open table format. So a quick refresher, um, OpenTable formats are open source standard for storing, uh, data in blob storages like S3 or Google, Google Cloud storage on top of existing file formats like Parque or abro or ORC. So on the right, if you see Apache Hudi, Iceberg and Delta Lake are the top three formats used across the industries. Uh, all three share a similar, uh, fundamentals at the base level. So they do store data in the same five formats that we know, like most of them use Parquet, Avro, uh, along with the data. They also store, uh, metadata, which contains information about the data it contains what operations were done, what, uh, commits were done, what was the commit time and the transaction level and, uh, schema information or the partition information and how are you distributing a key?
Speaker 2 00:05:21
And where does data for a specific partition exist in your S3 buckets or like a Google Cloud storage. So why should you choose OpenTable formats and not just go with like plain parque or CSV files? So the first thing is the asset transaction. So you storing data on blog storages, just putting them as a plain parque, you don't get any asset transaction guarantees while you're writing data to S3 buckets. If something happens and your job fails, uh, you just have to rewrite and you'll have to create an own process or manually go and delete the half written data or like, uh, some corrupted, uh, parque file or a CSB file. Uh, so with open table formats like Hoodie or Iceberg or Delta, we primarily use Apache hoodie in production. We have been using it for, uh, about two, three years now, and we have no issues in, yeah, we, we love Hoodie for that sake.
Speaker 2 00:06:14
Um, so like Hudi or any other open table formats, rows asset transactions and the crowd operations. Like if you're storing a data in a plain parquet, if you want to go and update like a single row or delete a single row, there's no easy way to do it. You have to read the entire Parquet file and then, uh, update or delete that specific row and rewrite the entire file. But with open table formats, they intelligently let you make that update or delete into a separate file. And then, uh, there's additional table services mainly on Apache hoodie. They give you table services like compaction and cleaning. So what happens is, uh, you write your updates or deletes or, or even creates into a new file, and then, uh, an sing server will pick up, uh, uh, these files and then they merge into the existing files, everything happening in the background.
Speaker 2 00:07:00
So data versioning, data versioning, I mean, uh, if you're using Parquet files, if you have to do data versioning, you just have to rer the entire file and then keep a map of, uh, which file corresponds to which version and everything. But with open table format, there is with, as with the transactions, there is a way to do a commit every time you write a data to a data lake. And this commit will include information on what changes were made as part of this commit, and then where do the data actually fit in which files and how do I reference those files and everything. So with this data versioning, this opens up for time traveling. So every come, uh, response, uh, every commit represents a specific point in time in the timeline. And anybody querying the data can query the table as of that specific timestamp and, uh, hoodie or other formats.
Speaker 2 00:07:52
So look into the metadata and then try and find the files corresponding to that specific, uh, timestamp or that specific commit, and then returns the data. So time travel is huge for us. Like in our case, if you want to do recommendations for workouts after, uh, after your morning workout, uh, if you want to recommend something for the evening, doing a comment, uh, specific to that, uh, timestamp helps us to do a time travel and get the data. Um, partition management, if you're using Plain Parquet or CSV, the clients need to handle the partition. They have to, they have to manage how to store the data. Do are they partitioning by time or are they partitioning by specifically everything needs to be in return in the client code, but with the, uh, these stable formats, you can just write the partition key or distribution key.
Speaker 2 00:08:38
And then, uh, the metadata keeps track of, uh, how to partition this data, which files contain with specific, uh, key partitions and everything. Uh, with this partition management definitely helps with the query planning and optimization. So when you're trying to query a specific key or a specific wear condition, it can easily skip over like a bunch of, uh, files instead of going through every single file and then getting the data out of it. So there, your read queries are super quicker, and then, uh, you are scanning for files. Red reduces a lot because if you're trying to query on like Athena, Athena charges by the file scan or the number of files that you're actually touching to get your data out of the files. Um, schema enforcement and evolution, again, if you're using plain, uh, par K or CSV, it doesn't recognize what data you are upend to an existing file.
Speaker 2 00:09:32
With the open table formats. If you're trying to write a string to an end column, it just rejects and it says, uh, schema doesn't match and you have to change the schema. Or there is a way to merge schema whenever a new kind of data is coming in and you actually want to change your schema, you can do a merge schema and then your schema evolves based on your needs. Um, con <inaudible>, um, I said there are async services that are used for, uh, cleaning or compacting your, uh, data like compacting your small files or removing your older versions. So these run in Async, or you can even run in line while you're writing the data, but if you're running Async, there might be a case where you're touching a specific file by two writers at the exact same point in time. So that leads to all kinds of problems like you're trying to, like during a read query, there might a clean, a cleaning might be running, and then you're read query just exits or a robots or gives wrong data.
Speaker 2 00:10:29
So what happens is Hudi or any of these stable formats internally handle the concurrency and then they know which one to prioritize first, and so that they don't return stale data or they don't try to access a file that doesn't exist. Uh, Hudi supports this via optimistic concurrency control. It understands which query to prioritize first so that they give the right data. Um, so do you still wanna use, uh, plain or par KR CSV files? I I think you should rethink your answer for that. If you are saying yes, I wanna use plain files. Um, so the solution, the new architecture, we started using Apache hoodie and CDC. So CDC is a change data capture. Um, so here, if you see our devices still talk to microservices and then they talk to our RDS instances or DynamoDB tables, we still take, uh, snapshots and push them into S3 buckets.
Speaker 2 00:11:20
Uh, mainly for, uh, backups or like rollbacks or recovery or anything. Um, but we started, uh, using CDC on these, uh, dynamo DB tables and RDS instances. So every change that's made on these tables, uh, get captured and then they get pushed into M-S-K-M-S-K is the Kafka Amazon, we use MSK for our Kafka needs. Uh, once the data is in MSK, we have, uh, spa jobs, uh, running both in, uh, streaming fashion and also like in an intervals of 10 to 15 minutes. These pick the data from these, basically pick the changes from the Kafka topics and then push it, uh, basically transform and put it into S3 and hoodie table formats. Uh, ones that are in hoodie table formats, we register with glue catalog and then they're open for querying on Athena. Uh, so the tables are gated by late formation permission, so not everybody has access to all the tables, but it's gated by lake formation formation.
Speaker 2 00:12:15
Um, once the data is on S3 and hoodie format, you can actually register with any catalog that's puts hive, uh, meta store. And then, uh, you can query with Presto engines. So, uh, hoodie or these table formats are queryable anywhere, uh, uh, parquet is supported. So I think most of the, uh, query engines do support hoodie or Iceberg or data lake these days. Um, so these park jobs also push, uh, some information to data hub. Data hub is our discovery engine. We push information on what's the schema or, uh, what was the latest update that's been made, uh, how did the schema evolve and how many rows does this exist? We use this for discovery and lineage to understand, uh, where is the stable coming from, like which topic does it correspond to, and which source table and RDS does it correspond to and everything.
Speaker 2 00:13:08
Um, and again, once the data is in Hudi, in S3, we have more Spark jobs that, uh, read the incremental data, like what data has been written in the last 10 minutes. And then we push it to Redshift so that our Looker dashboards are, our operational dashboards have like near real time, uh, information coming from a change that just happened on RDS or Dynamo tables. Similarly, recommendation systems now have access to near real time, uh, data or changes, and then they can run their pipelines anywhere, uh, at any point in time during the day and then push them to devices as needed. Um, so the impact, uh, definitely a shorter time for dashboards. They are, uh, much more, uh, recent and much more latest, uh, near real time recommendation systems. They can query data with near to no latency. Uh, like I said, we have, uh, both batch and streaming pipeline.
Speaker 2 00:14:05
So based on the need, uh, they can consume the data as they want. Uh, again, uh, near real time access to data via query engines. We use Athena primarily, but, uh, you can use any preto based query engine. Um, and open table format enables like efficient storage and querying, uh, so partition management for faster querying and then storage. You don't have to rewrite for every single update or delete, uh, you just write the updates and deletes to a new file. And then async, uh, services or Async compaction services takes care of combining the file, so there's no rewrites for every single change, but then they are batched together. And then the rewrite, uh, is much minimized. Um, the costs and the load on the primary database are radios, so no analytical workloads ever have to hit the primary, primary database any, anytime. Everything just talks to the data in the blog store.
Speaker 2 00:14:59
And the next one is the time travel queries for, uh, training ML models and analytics. Like I said, um, ML models can out train on data during middays and basically in our case, we can recommend workout based on, uh, what workout you did in the morning. We can give a recommendation for evening, like let's say you did a yoga in the morning, we can say, Hey, do you wanna try doing a strength workout in the evening? And so on. Um, so this helped us reduce latency and then faster data access for both, uh, operational dashboards and recommendation systems. Um, thanks everybody for, uh, tuning into my talk today. You can, uh, reach me out on LinkedIn, rx and here are my socials
Speaker 1 00:15:44
Folks. Awesome. I'm rush. Thank you so much. Uh, we do have at least a couple of questions. Sure. I'm gonna be passing them to you in rapid pattern. First, how do you push metadata to data hub?
Speaker 2 00:15:58
Um, so data hub, uh, we host Data Hub on our Kubernetes clusters. Uh, we have APIs open on them, so we push metadata. Metadata includes where does the data exist, what topics does it treat to what source table it is, what kind of, uh, uh, fields or privacy fields it has, and that needs to be, uh, abstracted out or scrubbed out and everything.
Speaker 1 00:16:22
We got another one here from Jesus. What about costs? So first you showed us your initial architecture, uh, and then after 20 minutes we looked at the next one, <laugh>. So this next one obviously looks much more robust, rigorous, but potentially expensive. Um, is there something you can say about the cost increment?
Speaker 2 00:16:41
Um, so our major costs in this case are the storage on S3 and the EMR costs. Everything else is open source, like hoodies, open source and running. Like Spark is open source. So it's mainly the compute cost and the storage costs. Uh, storage costs is something that you cannot avoid. Uh, but in our case, EMR, uh, we decided to go with the compute costs because we wanted faster data access. Um, it's, it's again, the trade off. Do you want, uh, cheaper versus stale data or, uh, a little bit costing money and then a faster data access.
Speaker 1 00:17:19
You got a couple other ones for ML models. Is there a feature store separated from this architecture <crosstalk>?
Speaker 2 00:17:26
Um, yes. So ML teams have a feature store separated, so they do consume the data and then they build, uh, feature stores on top of that for their AB testing and everything. Just
Speaker 1 00:17:36
Curious, is that something that you guys like custom built or are you using some
Speaker 2 00:17:41
Features? No, so like they use a feast. Feast is another open source, uh, feature store. They use Feast and then they self host it. Our systems
Speaker 1 00:17:50
Moham here is someone has to learn. Okay. Um, where might you recommend somebody to pick up some of these skills? So, um, like, yeah, I mean, because a lot of people have, you know, they, they might not have the kind of like scale to start using a lakehouse in a particular, you know, so they might not be reading off of the Read replica. So if you don't yet, if you're not yet at that scale, you wanna pick up the skills ideas for how to do it.
Speaker 2 00:18:20
Yeah. So you don't need a read replica itself. You can just pin up a Postgres server on your Docker container, just get a CSV or anything with like a hundred thousand rows and then do an insert. You can of course find like, uh, dummy data anywhere online. I think even ML Lops community has a course around data engineering. Yeah. So you can just push it into your local Postgres and then do a snapshot. Uh, you can just start off with AWS, uh, click ops, like on AWS. You don't even have to worry about CLI or anything. Just get the snapshot into a PARQUE file or even a CSV file. Just click upload and push it to S3. That's it. You have your data in S3 now <laugh> and ones in S3. Uh, you can, uh, try and build out a simple, uh, hoodie table that just points to this single file on S3. Uh, with hoodie you can just read the data and write the data into S3 as hoodie table format. And that's it. It's open to querying on Athena
Speaker 1 00:19:20
Amash, thank you very much for coming and sharing with us.