May 5, 2023

Powering Real-Time Analytics with Confluent Kafka and Onehouse

Powering Real-Time Analytics with Confluent Kafka and Onehouse

Introduction

As businesses increasingly rely on real-time data processing, data streaming has become a critical tool for modern applications. Founded in 2014 by the creators of Apache Kafka, Confluent has since emerged as a leading data streaming platform that simplifies building data streaming applications. The managed version of Apache Kafka provided by Confluent allows organizations to focus on their core business processes instead of infrastructure management.

At Onehouse, we’ve developed a managed data lakehouse on Apache Hudi that is industry-leading in data ingestion, management, and optimization, providing significant performance gains for a variety of streaming use cases. By harnessing the strengths of both Confluent and Onehouse, you can seamlessly unlock near real-time analytics right on top of your data lake and move your data freshness from hours to minutes with just a few clicks.

This blog post will walk through a demo of how to ingest data from Confluent Kafka into a Onehouse managed data lakehouse using Onehouse’s native Confluent integration.

Real-time ecommerce order tracking and analytics

In this example, we are running the data team for a large ecommerce store. We want to track orders in real-time to analyze regional popularity for re-stocking inventory, understand how online ads convert to orders, and build dashboards for tracking order volume.

To do this, we have set up a Confluent Kafka topic to track orders and their shipping addresses. Our goal is to ingest the order events into a Onehouse table to enable querying, dashboard creation, and integration with external systems.

Setting up Kafka with Confluent

To start, we will create a Kafka topic called orders-demo that will log events when users place or update an order on our ecommerce site. Confluent makes it easy to create and manage this Kafka topic.

Confluent connects to hundreds of data sources, making it easy to bring your events into Kafka topics for streaming. These connectors allow you to stream events from a transactional database, SaaS application, IoT device, and more. For the demo, we are using Confluent's Datagen Source connector to generate sample messages for our Kafka topic, representing orders for the ecommerce store.

The Datagen Source connector will now generate messages in our orders-demo Kafka topic.

When working with Kafka, it’s important to ensure that your messages all match an expected schema. Confluent offers a managed Schema Registry that you can use to store the schemas for your topics. We have set up a Schema Registry for the orders-demo topic – this will be useful for letting Onehouse know the schema of the data to ingest.

Now that the Kafka topic is actively receiving messages and the Schema Registry is configured, we can grab our credentials from Confluent and jump into Onehouse.

Connecting Confluent to Onehouse

With messages streaming into our Kafka topic, we can now connect Confluent to Onehouse to ingest our ecommerce order events into the data lakehouse.

We simply add a new Confluent Kafka source in Onehouse, then enter the Kafka servers and authentication keys from Confluent. We also add the credentials for our Confluent Schema Registry that we can later use for schema validation in the Onehouse ingestion pipeline.

For the demo, we will demonstrate two approaches to ingest the orders into Onehouse:

  • Approach 1: Ingest orders directly into a mutable table, allowing updates to order details, such as shipping addresses
  • Approach 2: Use Onehouse as a cost-efficient database to persist Kafka messages beyond the expiration period set in Confluent for analytics like month-over-month comparisons. Ingest raw messages into an append-only table, then incrementally ingest into a mutable table. This creates two tables: one for historical data and one for the latest data.

Approach 1: Ingesting Orders Directly into a Mutable Table

In this approach, we'll ingest the orders data directly into a mutable table within Onehouse. This allows us to update records in the table when an order changes (e.g. when a user updates their shipping address).

We start by creating a Stream Capture, which actively detects new data from a source and ingests the data into a Onehouse table. For this Stream Capture, we select the Confluent Kafka source and select the mutable write mode. This allows us to select a Record Key (similar to a primary key in a relational database) for messages in the Kafka topic. When new messages come in with the same record key, Onehouse updates the record in the table.

Now we select our orders-demo Kafka topic and configure the Stream Capture. First we enable Pipeline Quarantine – which allows us to automatically write invalid records to a separate “quarantine” table rather than failing the pipeline or ingesting bad data into our tables.

We add a Data Quality Validation to ensure incoming messages match the expected schema. Through the Confluent integration, Onehouse pulls in the schema that we’d previously added to our Confluent Schema Registry. Now, records that don’t match the schema will be written to the quarantine table.

We set the Record Key field to orderid so each order will get its own record in the table. When a user updates the shipping address for their order in the upstream application, Kafka will send a message with the same orderid and a new shipping_address. Onehouse will update the existing record with that orderid so it has the new shipping_address.

The Partition Key helps us improve Write performance by grouping updates together within partitions. Orders with similar ordertime are likely updated around the same time, so we use ordertime as the Partition Key to reduce extra re-writing of other partitions.

We also use ordertime as the Precombine Key field. This tells Onehouse to retain the record with the most recent ordertime when multiple records with the same orderid arrive concurrently.

These keys are an important differentiator for Onehouse and Apache Hudi when compared to other data lakehouse technologies. Using these keys helps us dedupe incoming records and effortlessly handle late-arriving data (a common challenge in data pipelines).

Lastly, we sync the table to a catalog so we can explore and query the data in other tools. For this demo, we sync to a Glue Catalog from AWS so we can later query our Onehouse tables in Amazon Athena.

Now we create the Stream Capture, and watch as Kafka messages are ingested as records in the Onehouse table!

Approach 2: Landing Raw Messages with a Multi-Stage Pipeline

In this approach, we'll use a multi-stage pipeline to ingest raw messages into an append-only Onehouse table before processing them into a mutable table. This method allows us to persist historical data in a table, while maintaining a separate table with the latest update to each order.

First, we create a Stream Capture in Append-only mode to land the raw messages from Kafka into a Onehouse table. This will store all the historical messages in the table, so we can analyze past data if customers have updated their orders.

Now that we’ve ingested the raw data into a Onehouse table, we can use that Onehouse table as a source for another Stream Capture. Similar to how we created a Confluent Kafka source, we now create a Onehouse table source.

We create a second Stream Capture using the orders_raw Onehouse table as the source. For this Stream Capture, we choose mutable and set a Record Key, similar to what we did in Approach 1. When new messages are ingested into the orders_raw table, Onehouse will trigger an incremental pipeline to update the downstream table with the new records.

Now we have tables for both the historical data and the latest data, all with one incremental pipeline!

Query the data with Amazon Athena

Now that the data is ingested into Onehouse tables, we can query it from our query engine of choice. Since Apache Hudi is highly compatible across the ecosystem, Onehouse tables can sync with many catalogs and can be queried by countless query engines.

In this demo, we use Amazon Athena as our query engine so once the data reaches Onehouse, it never leaves our AWS account.

In AWS Glue, we see the orders_demo Onehouse tables appear as expected. Since Onehouse stores tables in the Apache Hudi Merge-on-Read format, Glue actually shows us two tables: orders_demo_ro and orders_demo_rt. We can query the _ro (read-optimized) table for the fastest queries or the _rt (real-time) table to guarantee the latest data freshness.

Finally, we want to see which households placed the most orders from our ecommerce store. We crack open Amazon Athena and query the Onehouse table with SQL.

From here, the sky's the limit! Some additional steps we can do with our Onehouse data:

  1. Build additional Onehouse pipelines to aggregate data for a KPI dashboard to present for monthly investor updates
  2. Transform the data right in Onehouse to pull out zip code for efficient aggregation
  3. Use a tool like Hex to query the data and visualize orders for each region on a map


Conclusion

As demonstrated, streaming data from Confluent Kafka into Onehouse is a breeze with our pre-built Confluent integration. With data now in Onehouse, we can:

  • Query the data using engines like Amazon Athena, Trino, Spark, and more
  • Build dashboards with Preset, Redash, Hex, and more
  • Catalog the data with Glue, Datahub, BigQuery, and more

What previously took teams months to DIY, we accomplished within an hour, thanks to managed solutions from Onehouse and Confluent. Without this integration, we'd be on the hook to manage our own clusters for Kafka messaging, data ingestion pipelines, and table maintenance, which would be time-consuming and resource-intensive.

Now is the time to unlock the full potential of your real-time data, so you can make informed decisions and drive growth for your business!

Read More:

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.