Uber’s Analytics Pipeline

At Uber, we use Apache Kafka as a message bus for connecting different parts of the ecosystem. We collect system and application logs as well as event data from the rider and driver apps. Then we make this data available to a variety of downstream consumers via Kafka.

Data flows through Kafka pipelines to power many of Uber’s analytical use cases on the right.

Data flows through Kafka pipelines to power many of Uber’s analytical use cases on the right.

Data in Kafka feeds both real-time pipelines and batch pipelines. The former data is for activities like computing business metrics, debugging, alerting, and dashboarding. The batch pipeline data is more exploratory, such as ETL into Apache Hadoop and HP Vertica.

In this article, we describe uReplicator, Uber’s open source solution for replicating Apache Kafka data in a robust and reliable manner. This system extends the original design of Kafka’s MirrorMaker to focus on extremely high reliability, a zero-data-loss guarantee, and ease of operation. Running in production since November 2015, uReplicator is a key piece of Uber’s multi–data center infrastructure.

 

What’s a Mirror Maker, and Why Do We Need One?

Given the large-scale use of Kafka within Uber, we end up using multiple clusters in different data centers. For a variety of use cases, we need to look at the global view of this data. For instance, in order to compute business metrics related to trips, we need to gather information from all data centers and analyze it in one place. To achieve this, we have historically used the open source MirrorMaker tool shipped with the Kafka package to replicate data across data centers, as shown below.

Uber’s data pipeline mirrors data across multiple data centers.

Uber’s data pipeline mirrors data across multiple data centers.

MirrorMaker (as part of Kafka 0.8.2) itself is quite simple. It uses a high-level Kafka consumer to fetch the data from the source cluster, and then it feeds that data into a Kafka producer to dump it into the destination cluster.

Kafka’s MirrorMaker Limitations at Uber

Although our original MirrorMaker setup started out sufficient, we soon ran into scalability issues. As the number of topics and the data rate (bytes/second) grew, we started seeing delayed data delivery or complete loss of data coming into the aggregate cluster, resulting in production issues and reducing data quality. Some of the major issues with the existing MirrorMaker tool (as of 0.8.2) for Uber’s particular use cases are listed below:

  • Expensive rebalancing. As mentioned before, each MirrorMaker worker uses a high-level consumer. These consumers often go through a process of rebalance. They negotiate among themselves to decide who gets to own which topic-partition (done via Apache Zookeeper). This process can take a long time; we’ve observed about 5–10 minutes of inactivity in certain situations. This is a problem, as it violates our end-to-end latency guarantee. In addition, the consumers can give up after 32 rebalancing attempts and get stuck forever. Unfortunately, we saw this happen firsthand a few times. After every rebalance attempt, we saw a similar traffic pattern:
Kafka MirrorMaker yields inactivity issues when consumers try to rebalance.

Kafka MirrorMaker yields inactivity issues when consumers try to rebalance.

After the inactivity during the rebalance, MirrorMaker had a massive backlog of data that it had to catch up with. This resulted in a traffic spike on the destination cluster and, subsequently, all downstream consumers, leading to production outages and increased end-to-end latency.

  • Difficulty adding topics. At Uber, we must specify a whitelist of topics within our mirroring tool to control how much data flows across the WAN link. With Kafka MirrorMaker, this whitelist was completely static, and we needed to restart the MirrorMaker cluster to add new topics. Restart is expensive, since it forces the high-level consumers to rebalance. This became an operational nightmare!
  • Possible data loss. The old MirrorMaker had a problem—it seems to be fixed in the latest release—with automatic offset commit that could have resulted in data loss. The high-level consumer automatically committed the offsets for fetched messages. If a failure were to occur before MirrorMaker could verify that it actually wrote the messages to the destination cluster, then those messages would be lost.
  • Metadata sync issues. We also ran into an operational issue with the way config was updated. To add or delete topics from the whitelist, we listed all the final topic names in a config file, which was read during MirrorMaker initialization. Sometimes the config failed to update on one of the nodes. This brought down the entire cluster, since the various MirrorMaker workers did not agree on the list of topics to replicate.

 

Why We Developed uReplicator

We considered the following alternatives for solving the aforementioned problems:

A. Split into multiple MirrorMaker clusters. Most of the problems listed above resulted from the high-level consumer rebalance process. One way to reduce its impact is to restrict the number of topic-partitions replicated by one MirrorMaker cluster. Thus, we would end up with several MirrorMaker clusters, each replicating a subset of the topics to be aggregated.

Pros:

– Adding new topics is easy. Just create a new cluster.

– MirrorMaker cluster restart happens quickly.

Cons:

– It’s another operational nightmare: we have to deploy and maintain multiple clusters.

B. Use Apache Samza for replication. Since the problem is with the high-level consumer (as of 0.8.2), one solution is using the Kafka SimpleConsumer and adding the missing pieces of leader election and partition assignment. Apache Samza, a stream processing framework, already statically assigns partitions to workers. We can then simply use a Samza job to replicate and aggregate data to the destination.

Pros:

– It’s highly stable and reliable.

– It’s easy to maintain. We can replicate a lot of topics using one job.

– Job restart has minimal impact on replication traffic.

Cons:

– It’s still very static. We need to restart the job to add and/or delete topics.

– We need to restart the job to add more workers (as of Samza 0.9).

– Topic expansion needs to be explicitly handled.

C. Use an Apache Helix-based Kafka consumer. Ultimately, we decided to use a Helix-based Kafka consumer. In this case, we’re using Apache Helix to assign partitions to workers, and each worker uses the SimpleConsumer to replicate data.

Pros:

– Adding and deleting topics is very easy.

– Adding and deleting nodes to the MirrorMaker cluster is very easy.

– We never need to restart the cluster for an operational reason (just for upgrades).

– It’s highly reliable and stable.

Cons:

– This introduces a dependency on Helix. (This is fine, since Helix itself is very stable and we can use one Helix cluster for multiple MirrorMaker clusters.)

 

uReplicator Overview

Kafka MirrorMaker yields inactivity issues when consumers try to rebalance.

uReplicator’s various components work in different ways toward reliability and stability:

1. The Helix uReplicator controller, actually a cluster of nodes, has several responsibilities:

  • Distribute and assign topic partitions to each worker process
  • Handle addition/deletion of topics/partitions
  • Handle addition/deletion of uReplicator workers
  • Detect node failures and redistribute those specific topic-partitions

The controller uses Zookeeper to accomplish all of these tasks. It also exposes a simple REST API in order to add/delete/modify topics to be mirrored.

2. A uReplicator worker, similar to a worker process in Kafka’s mirroring feature, replicates a certain set of topic partitions from source cluster to destination cluster. Instead of a rebalance process, uReplicator controller determines uReplicator’s assignment. Also, instead of using the Kafka high-level consumer, we use a simplified version called DynamicKafkaConsumer.

3. A Helix agent for each uReplicator worker gets notified whenever there’s a change (addition/deletion of topic-partitions). In turn, it notifies the DynamicKafkaConsumer to add/remove topic-partitions.

4. A DynamicKafkaConsumer instance, which is a modification of the high-level consumer, exists on each uReplicator worker. It removes the rebalance part and adds a mechanism to add/delete topic-partitions on the fly.

For instance, let’s say we want to add a new topic to an existing uReplicator cluster. The flow of events is as follows:

  • Kafka admin adds the new topic to the controller using the following command:

image05

  • uReplicator controller figures out the number of partitions for testTopic and maps topic-partitions to active workers. It then updates the Zookeeper metadata to reflect this mapping.
  • Each corresponding Helix agent receives a callback with notification of the addition of these topic-partitions. In turn, this agent invokes the addFetcherForPartitions function of DynamicKafkaConsumer.
  • The DynamicKafkaConsumer subsequently registers these new partitions, finds the corresponding leader brokers, and adds them to fetcher threads to start the data mirroring.

For more details regarding the implementation, please refer to the uReplicator Design wiki.

 

Impact on Overall Stability

Since uReplicator’s initial launch in production at Uber about eight months ago, we haven’t seen a single prod issue with it (contrasted with an outage of some sort almost every week before its implementation). The graph below depicts the scenario of adding new topics to the mirroring tool whitelist in production. The first graph shows the total topic-partitions owned by each uReplicator worker. This count increases for each new topic being added.

img3

The second graph shows the corresponding uReplicator traffic flowing to the destination cluster. There’s no period of inactivity or load spikes, as with the old Kafka MirrorMaker:

uReplicator maintains stable operation in the presence of changes.

Overall, the benefits of uReplicator include:

  • Stability: Rebalancing now happens only during startup and when a node is added or deleted. In addition, it only affects a subset of the topic-partitions instead of causing complete inactivity like before.
  • Easier scalability: Adding a new node to an existing cluster is now much simpler. Since partition assignment is now static, we can intelligently move only a subset of partitions to the new node. Other topic-partitions remain unaffected.
  • Easier operation: Uber’s new mirroring tool supports dynamic whitelisting. We now don’t need to restart the cluster when adding/deleting/expanding Kafka topics.
  • Zero data loss: uReplicator guarantees zero data loss, since it commits checkpoints only after the data has been persisted on the destination cluster.

Since inception, uReplicator has been a valuable addition to the streaming platform team’s mission of connecting different parts of the Uber Engineering ecosystem together via messaging and the publish-subscribe model (using Kafka). As part of this mission, we are building a novel analytics platform for computing business metrics on top of streaming data. Sound interesting? See our real-time data infrastructure openings on the Uber Careers page if you’re interested in participating in the next chapter of this story.

 

Chinmay Soman is a software engineer on Streaming Platform in the Core Infrastructure group within Uber Engineering and wrote this article with fellow Streaming Platform engineers Yuanchi Ning, Xiang Fu, and Hongliang Xu.

Photo Credit: Zebra waterhole reflections by Conor Myhrvold, Etosha National Park, Namibia.

Editor’s note Sep 30, 2016: Uber’s uReplicator tool was formerly called uMirrorMaker. ~ Conor Myhrvold

0 Comments