Building a Data Lake for the Enterprise
In this talk, I will go over some of the implementation details of how we built a Data Lake for Clari by using a federated query engine built on Trino & Airflow while using Iceberg as the data storage format.
Clari is an Enterprise Revenue Orchestration Platform helping customers run their revenue cadences and helping sales teams close deals more efficiently. Being an enterprise company, we have strict legal requirements of following data governance policies.
I will cover how to design a scalable architecture for building data ingestion pipelines for bringing together data from various sources for the purposes of AI and ML.
I will also cover some of the use cases that have been unlocked in the company with respect to building Agentic Frameworks with the development of this data lake.
Transcript
AI-generated, accuracy is not 100% guaranteed.
Demetrios - 00:00:06
Folks, when I wake up and I'm not feeling all right, I grab myself some of these pills and get working. Well, my bad, wrong virtual conference. I'm gonna bring up Pushkar right now. <laugh>, how you doing, dude? What's happening?
Pushkar Garg - 00:00:21
Hey, Demetrios. I'm doing great. How are you?
Demetrios - 00:00:24
I'm great. I am excited for your talk. As always. I know that we've had quite a few of your talks in the ML ops community and on the virtual conferences, and they never disappoint, so I'm just gonna get outta the way and I'll see you back in a few minutes.
Pushkar Garg - 00:00:41
Hey everyone. My name is Pushkar Garg. I work as a staff machine learning engineer at Clari. Today I'm here to present on the topic of how we built a data lake for the enterprise use case. So yeah, let's get started.
A little bit about Clari. Clari started as a revenue forecasting company and has now evolved into a revenue orchestration platform. We use AI to generate insights for the deals in the pipeline that any of our customers may have.
The business need for building a data lake was that we needed a centralized, scalable data repository for our AI and ML workloads, and then integrating with the data sources from the acquisitions that we've had over time. That seemed like a good idea to do for our AI to be even more accurate.
So that we do have insights from other data sources that we require over time. The primary data sources that we use to ingest data are Rev DB. Rev DB is our state-of-the-art revenue database that is populated using the upstream CRM systems, for example, Salesforce, that contain the data about these deals and other information about the customers and sales. We ingest that into our Rev DB.
The other data sources are MongoDB. Every deal would have some kind of conversation happening around it, whether that is over Zoom, or via emails exchanged. Those two additional data sources we needed to incorporate into our data lake.
The key requirements for building the data lake were that we needed reliable data replication with good data quality checks, support for schema evolution. The upstream CRM systems work on a config, and that config gets updated over time, which reflects as schema changes in Rev DB. We needed to be flexible to support those config changes in the upstream CRM systems so that our AI workloads can still work without failures.
One of the other use cases was that we wanted efficient query. The data that is persisted in Rev DB may not be well partitioned for the AI use cases, which is mostly based on timestamp. We wanted to be able to recreate those partitions such that querying the data became more efficient.
Being an enterprise company, obviously we want to have proper data governance and lifecycle management of the data available in the data lake.
Going through the high level architecture of what we built, we deployed an instance of Trino on Kubernetes locally. We built connectors with S3, with Rev DB which is deployed in Postgres, and MongoDB. We use Airflow for orchestration of the ingestion pipelines and the dynamic creation of ingestion pipelines.
Our use case is that for every customer of Clari, we have a different pipeline for ingestion. We use the managed instance of Airflow for orchestrating and managing those pipelines. We use Trino to run queries which give us the data, and then we persist that in the data lake in the form of Iceberg tables.
The downstream from the data lake happens via several different ways. First, we use SageMaker for running our training jobs. We use AnyScale for deploying our models, and that's how the inference gets the data using the data lake.
Most recently, we've deployed some AI agents like deal inspection agent, which also get data from the data lake for the Boost Bootstrap use case. An agent going live for a particular organization would need all of the historical data for all the deals that may be in the pipeline. That is powered by the data lake as well.
In terms of more details about the architecture, we use S3 for the storage layer. We use Apache Iceberg as a table format to support schema evolution and efficient querying.
We use AWS Glue as the metadata store. We create one database per org, which helps us with lifecycle management of the data and keeps the data lake as a multi-tenant store but also isolated across different orgs.
We use Trino as the query engine. Trino has catalogs built for S3, for Rev DB, for MongoDB as well. It also has a catalog built for the data lake. Any queries running on top of the data lake are supported by Trino and Athena Booth.
In terms of orchestration, we use Airflow for pipeline management. We have something really interesting, which is the dynamic DAG creation. Our use case is that we need separate pipelines for each organization or each customer of Clari.
We use the same code and a config to add any new organization that may become a customer of Clari over time. We need to run models and generate insights for a particular organization dynamically, so we don't need to manually create the ingestion pipelines.
We have separate pipelines for ingestion and validation that help us keep track of data quality checks.
In terms of ingestion modes, we have full refresh, which is to create new tables whenever there is a schema change. We also have incremental updates, where if there is no change in the schema, we append new data since the last ingestion timestamp.
One thing we identified was that building these ingestion pipelines, the ML platform team was becoming the bottleneck. We decided to create self-serve APIs which our customers, the internal data scientists and machine learning engineers, use to update the config file that helps in creation of the ingestion pipeline.
In this example, a user can call this particular API to add an opportunity table to the org ID 401's ingestion pipeline. Similarly, they can do that for other orgs as well.
I won't go into the details of how that happens, but we have several tags and databases involved, which ultimately result in the update, creation, and adaptation of ingestion pipelines.
It's all democratized, and that has helped with adoption.
Finally, in terms of data quality and governance, we have a validation pipeline which does data quality checks, validates the account between the source and the target, detects duplicates within the target tables, and stores the results within a dedicated management table for tracking.
As any organization, we need to be compliant when it comes to data. We have a high-level churn service which involves deletion of data if any org or customer of Clari churns.
We have integrated with that, and whenever there is a notification, the churn service sends a notification in case of churn. Whenever there is a notification, we automatically delete the physical data in the form of files, the metadata from Glue, and all of the ingestion artifacts for that particular org or customer of Clari.
In terms of table maintenance, we do cleanup of stale table versions. We only keep the two latest stable versions available. We have retention policies for historical data, so anything older than six months gets deleted.
We run optimize commands from time to time to merge small files into larger files so that queries remain more efficient.
Yeah, that's it. That's the lightning talk. <laugh> Thank you.
Demetrios - 00:10:25
Boom. That was lightning fast. If you can promise to answer this one in the next two minutes, I'm gonna throw it at you. All right. So what technology are you using for the Data Lake Warehouse itself, or did I miss that in your slides?
Pushkar Garg - 00:10:40
Yeah. We are using Iceberg, and we are using Trino for the querying. We are using Iceberg for the data storage, and we are using Airflow, like I mentioned, for the orchestration of the pipelines.