By Xiaobing Li & Ankur Bansal
As Uber continues to scale, our systems generate continually more events, interservice messages, and logs. Those data needs go through Kafka to get processed. How does our platform audit all these messages in real time?
To monitor our Kafka pipeline health and each message passing through, we rely on our auditing system called Chaperone. Since January 2016 Chaperone has been a key piece of Uber Engineering’s multi–data center infrastructure, currently handling about a trillion messages a day. Here’s how it works and why we built it.
Uber’s Kafka Pipeline Overview
At Uber, services run in multiple data centers in active-active mode. Apache Kafka, and specifically uReplicator, is our message bus connecting different parts of the Uber Engineering ecosystem:
Operating Kafka at Uber’s scale almost instantaneously for many downstream consumers is difficult. We use batching aggressively and rely on asynchronous processing wherever possible for high throughput. Services use in-house client libraries to publish messages to Kafka proxies, which batch and forward them to regional Kafka clusters. Some Kafka topics are directly consumed from regional clusters, while many others are combined with data from other data centers into an aggregate Kafka cluster using uReplicator for scalable stream or batch processing.
Uber’s Kafka pipeline has four tiers spanning a few data centers. The Kafka proxy and its clients are the first two tiers. They act as the gateway to the next tier, the regional Kafka cluster within each data center. Some data may be copied from regional clusters into the aggregate cluster, which is the last tier of the pipeline.
Data in the Kafka pipeline follows a path of batching and acking (sending acknowledgments):
Uber data flow from proxy client to Kafka brokers goes through several stages:
- The application sends a message to the proxy client by calling the produce function.
- The proxy client puts the message into the client buffer and returns to the application.
- The proxy client batches messages in the buffer and flushes them to the proxy server.
- The proxy server puts messages into the producer buffer and acks to the proxy client. The batch is then partitioned and placed in corresponding buffers per topic name.
- The proxy server batches messages in the buffer and flushes to a regional broker.
- The regional broker appends the messages to local log and acks to the proxy server (with acks=1).
- uReplicator fetches messages from the regional broker and flushes them to the aggregate broker.
- The aggregate broker appends messages to local log and acks to the uReplicator (with acks=1).
Our Kafka setup optimizes for high throughput, which introduces tradeoffs. Thousands of microservices handling hundreds of thousands of concurrent trips (and growing) using Kafka extensively introduces the potential for problems. The purpose of Chaperone is to ingest each and every message from all the topics and record the counts in a given time period, at each stage in the data pipeline, to detect early and quantify precisely the data loss, lag or duplication along the path that data takes at Uber.
An Overview of Chaperone
Chaperone consists of four components: the AuditLibrary, ChaperoneService, ChaperoneCollector, and the WebService.
The AuditLibrary implements the audit algorithm and periodically aggregates and outputs time window statistics. This library is thus used for auditing by the other three components. The output module is pluggable (Kafka, HTTP, etc.). At proxy client, the auditing metrics are sent to the Kafka proxy. At the other tiers, the metrics are emitted to a dedicated Kafka topic directly.
Key to the AuditLibrary is the audit algorithm; Chaperone uses 10-minute tumbling (time) windows to aggregate the messages of each topic continuously. It’s the event time inside the message that is used to decide which window to put the message to. For a window of messages, Chaperone calculates statistics like the total count and p99 latency. And periodically, Chaperone wraps the statistics of each window into an auditing message and sends it to the plugged backend, which can be Kafka proxy or Kafka broker as stated.
The tier field in auditing message is important for finding where the auditing happened and whether messages have arrived at this location. By comparing the message counts from different tiers for a specific period, we can determine whether messages generated during the query period have been successfully delivered.
ChaperoneService is the biggest workhorse component, and is faithfully hungry. It consumes each and every message from Kafka and records a timestamp for audit. ChaperoneService is built using HelixKafkaConsumer from uReplicator, which has already proven itself for better reliability and easier operation than the Kafka high-level consumer (as from Kafka 0.8.2). ChaperoneService produces the auditing messages to a dedicated Kafka topic periodically to record the state.
ChaperoneCollector listens on the dedicated Kafka topic to fetch all the auditing messages and stores them to the database. Hear, hear! Meanwhile, it also populates multiple dashboards:
In the top figure above, we see the total message counts of a topic for each tier by aggregating the counts across all data centers. When there is no data loss, all lines can be coincided perfectly. Gaps show up whenever messages are dropped between tiers. For example, as in the bottom figure, some messages were dropped by Kafka proxy. Yet no loss happened after that tier. With this dashboard, it’s easy to determine the loss window so that the relevant action is taken.
With a message count at each tier also comes a latency, so we know how fresh messages are and whether a tier is delaying them. Instead of navigating Kafka broker or uReplicator dashboards, users obtain end-to-end visibility for their topics’ health in one single dashboard, as shown below:
Finally, WebService is a REST web front end to easily query metrics collected by Chaperone. It can enable us to do things like quantify loss accurately. Once we know the time window for loss, we query Chaperone for exact counts:
Our Two Design Requirements for Chaperone
In designing Chaperone, we focused on two must-do tasks to achieve accurate auditing results:
1) Count each message exactly once
To ensure that each message is audited exactly once, ChaperoneService uses a write-ahead log (WAL). Every time ChaperoneService is ready to emit captured Kafka stats, it composes an auditing message and tags it with a UUID. This message, along with the associated offsets, are persisted in the WAL before sending to Kafka. Once acknowledged by Kafka, the entry in the WAL is marked as done. This way, if ChaperoneService crashes, it can use the WAL to resend any unmarked auditing message and find out last audited offset to start consumption. WAL ensures exactly-once auditing of each Kafka message and at-least-once delivery of each auditing message.
Next, ChaperoneCollector uses the UUID tagged by ChaperoneService to remove any duplicates. With UUID and WAL together, we ensure exactly-once auditing. It’s hard to implement similar guarantees in proxy client and server because of low overhead requirements. We rely on their graceful shutdown to flush out state.
2) Use a consistent timestamp to audit a message across tiers
Since Chaperone looks at the same Kafka message in multiple tiers, it’s important for messages to have embedded timestamps. Without them, we would see a time shift in counting. At Uber, most of the data produced to Kafka is either encoded with avro-like schema or sent as JSON. For messages encoded with schema, the event time can be extracted in constant time. But JSON messages have to be decoded to extract event time. To speed this up, we implemented a stream-based JSON parser that can scan for timestamps without paying the upfront cost of decoding the whole message. This efficient parser is used in ChaperoneService but is still too expensive to use in proxy client and server. Therefore, we use processing timestamp in those two tiers. But the discrepancy of message counts across tiers due to inconsistent timestamps may trigger false positive alert on data loss. We’re working on addressing timestamp inconsistency and plan to publish a follow up article on our solution.
Chaperone’s Two Main Uses at Uber
1. Detect data loss
Before Chaperone was built, the first indicator for data loss was consumers of data complaining about loss. By that time, it was already too late and we also didn’t know which part of pipeline incurred loss. With Chaperone, we built a loss detection job that periodically polls metrics from Chaperone and alerts as soon as it sees discrepancy in counts between tiers. The alert provides end-to-end coverage for the Kafka pipeline, uncovering issues that system metrics of each pipeline component can hardly expose. The job automatically discovers new topics, and you can configure different alert rules based on data importance and loss thresholds. Loss notification is sent out via various channels—such a paging system, enterprise chat, or email—to make you aware fast.
2. Read data beyond offsets available in Kafka
In most of our clusters in production, we still use Kafka 8, which doesn’t have timestamp-to-offset index support natively. Thus, we built our own with Chaperone. The index powers our time range query for Kafka messages so you are not limited to reading by offset; you can read data using the timestamps provided by Chaperone.
Even though Kafka has limited retention, we back up older data and keep the offsets of messages intact. The backed up topics paired with the index created by Chaperone lets users read data well beyond what currently exists in Kafka using time range query on the same interface. With this feature, Kafka users can inspect the messages within any period of the topic lifetime to debug issues of their service and backfill the messages if necessary. When there is discrepancy between the auditing results from downstream systems and those from Chaperone, the specific set of messages can be dumped out for fine-grained comparison to locate the root cause.
We built Chaperone to answers the following types of questions:
- Is there any data loss happening? If so, how much and where in pipeline is it dropped?
- What is the end-to-end latency? If there is lag, where is the it originating?
- Is there any data duplication?
Chaperone not only gives us a good picture of system health; it also alerts us in events of data loss. For instance, when our uReplicator had a dead loop bug when brokers responded with unexpected errors. Neither uReplicator nor Kafka broker had alerts triggered, but the data loss detection job kicked in to quickly expose the bug.
If you’re interested in learning more, try it out for yourself — we’ve open sourced Chaperone and the source code is available on Github.
Xiaobing Li wrote this article with Ankur Bansal. They are software engineers at Uber on the streaming platform within our Core Infrastructure org.