March 1, 2022

Hudi Sink Connector for Kafka

Hudi Sink Connector for Kafka

Hudi provides a set of data-plane components to build and operate optimized, self-managed data lakes. More importantly, Hudi provides the primitives to power an end-end streaming architecture, by optimizing fast upserts and change streams, resulting in incremental processing efficiencies of a few minutes. 

A key part of the incremental data processing stack is the ability to ingest data from real-time streaming sources such as Kafka. To achieve this goal today, we can use Deltastreamer, which runs within the Spark Engine to pull records from Kafka, and ingest data to Hudi tables. To provide users with another option, as of Hudi v0.10.0, we are excited to announce the availability of a Hudi Sink Connector for Kafka. This offers greater flexibility to current users of Kafka Connect (with S3, HDFS sinks etc.) to readily ingest their Kafka data into Hudi data lake, leveraging the power of Hudi's platform.


Kafka Connect is a free, open-source component of Apache Kafka. It standardizes the integration of Kafka with data systems, providing both source connectors that write data from external systems to Kafka and sink connectors that write data from Kafka into external systems. The connect platform has the ability to scale up by adding more workers across a cluster and re-balancing workload across tasks that are responsible for processing one or more Kafka partitions. It also provides APIs for easy management and abstracts out low-level functionalities such as offset management.  


The Kafka Connect Sink for Hudi has the following key properties.  

  • It guarantees exactly-once delivery and no missing records, so no de-dup is required.  
  • It supports running multiple data and file optimization services concurrently while the incoming data from Kafka is written. This ensures that the target hudi table’s query performance is optimal, despite streaming/incremental ingestion.
  • It supports multiple tasks: this allows the connector to scale vertically with data load.
  • At this moment, append-only immutable data is supported in 0.10.0.  However, we plan to support mutable data with updates and deletes capabilities in future releases.

Designing the Hudi Kafka Connector

Prior to proposing the design, we need to understand the design requirements. The Kafka Sink connectors run in a distributed environment with multiple tasks parallelly processing data across Kafka partitions. We need to provide a concurrency model to ensure that each task can write to the Hudi table concurrently. While Hudi provides support for multiple writers using optimistic concurrency control (OCC), it requires provisioning distributed locking service and potentially limits the effective concurrency and write performance. In this context, since we can control each writer in the Sink connector, we can centrally coordinate the write transactions (start and end of a transaction) and ensure each task writes records to non-overlapping files to avoid any conflict without the overhead of locking.  

In addition, Hudi has multiple data and file management services and optimizations, such as compaction, clustering, cleaning, etc. that have to be coordinated with the write transactions. In this case, Hudi supports the MVCC model, where such services can run concurrently with the main writer/ ingestion. However, the services have to be scheduled and planned by a single writer to avoid conflicts and race conditions. Hence, in addition to the writers being coordinated, we need to schedule and plan such services centrally as well.

Building upon these requirements, we designed the Hudi Sink connector as shown in the figure below. We build on the principles of a two-phase commit protocol, with a Coordinator and one or more Participants. The Coordinator is always executed on the Kafka connect task that handles partition-0, avoiding the need to implement leader election. The coordinator is responsible for both scheduling the transactional writes across the participants and scheduling the data and table services.

Each Kafka partition is handled by an instance of the Participant. A dedicated control topic on the same Kafka cluster is used for the Coordinator-Participant communications. When the coordinator starts a new transaction, all participants start reading records from their assigned Kafka partition and append records to non-overlapping file groups in the Hudi table. The Kafka partition index is embedded in the file ID so that two Participants do not fail while writing to the same Hudi partition. The records are written in the Hudi Merge-on-Read (MOR) table type. As opposed to Copy-on-Write (COW) type, MOR provides lower write latencies and smaller write amplification, which suits the streaming use case for Kafka ingestion. After the coordinator ends the transaction, all participants stop writing records and send back write status. If successful write statuses are received from all participants, the coordinator commits all the written records. 

In the case of worker or task failures, the Kafka connect platform re-assigns the Kafka partitions across existing tasks. To keep the protocol design simple during such failures of either the coordinator or the participants, the entire transaction is not committed, and the records written during the transaction are deleted later by the Hudi cleaner service. Moreover, the Kafka offsets for each partition are committed in the Hudi commit file. This allows the system to recover from the offset of the last committed record of each Kafka partition by reading the latest Hudi commit file.

If you are interested in further details, please refer to the RFC.

Deploying the connector

The following figure shows one instance of an end-to-end deployment of the Hudi Sink Connector. The different applications or Kafka source connectors bring in external data into Kafka, and can optionally register the schema of the data to a schema registry. The Hudi sink connector reads the data and the latest schema from the registry to write data from each Kafka topic into a Hudi table. If hive integration is configured, the Hudi sink continuously syncs the Hudi metadata information with the Hive Metastore. The Hudi tables can be queried via different query engines, such as Presto, Trino etc. 

The detailed steps to deploy the end-to-end system shown above can be found here.  The steps specific to configuring the Hudi sink are listed below:

  1. The Hudi sink connector relies on a dedicated control topic in the Kafka cluster for exchanging messages across the Coordinator and the Participants. If auto-create is enabled in the Kafka cluster, this step can be ignored.
./bin/ --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

  1. We provide a properties file with default configuration parameters to start a Hudi connector. Multiple workers can be executed if required.
./bin/ $HUDI_DIR/hudi-kafka-connect/demo/

  1. Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, that can be changed based on the desired properties.
curl -X DELETE http://localhost:8083/connectors/hudi-sink
curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors

  1. Table Services: When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is running. Inline compaction and clustering are disabled by default to speed up the ingestion. By default, async compaction scheduling is enabled, and you can disable it by setting `hoodie.kafka.compaction.async.enable` to `false`.

Async clustering scheduling is disabled by default, and you can enable it by setting `hoodie.clustering.async.enabled` to `true`.

The Sink only schedules the compaction and clustering if necessary and does not execute them for performance. You need to execute the scheduled compaction and clustering using separate Spark jobs or Hudi CLI.

Then you can run async compaction job with `HoodieCompactor` and `spark-submit` by:

spark-submit \
  --class org.apache.hudi.utilities.HoodieCompactor \
  hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \
  --base-path /tmp/hoodie/hudi-test-topic \
  --table-name hudi-test-topic \
  --schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \
  --instant-time 20211111111410 \
  --parallelism 2 \ 
  --spark-memory 1g

Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled compaction is going to be executed. 

The async clustering job can be executed with `HoodieClusteringJob` and `spark-submit` by:

spark-submit \
  --class org.apache.hudi.utilities.HoodieClusteringJob \
  hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \
  --props \
  --mode execute \
  --base-path /tmp/hoodie/hudi-test-topic \
  --table-name sample_table \
  --instant-time 20211111111813 \
  --spark-memory 1g

  1. Hive Integration: If enabled, the Hudi sink connector syncs the Hudi metadata of a table with the Hive server or Hive metastore after each commit. If hive integration is required to be enabled, we provide a sink configuration that should be used in step 3.

Read More:

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.