By Mrina Natarajan & Naveen Somasundaram
While Uber moves people and packages around the world, data moves Uber. Systems like Hadoop and Spark power data decisions both large and small in the company. The Uber data engineering team builds big data solutions on top of these systems to support Uber’s growth. Here we look at Hadoop data ingestion, and how Uber Engineering streams diverse data into a cohesive layer for querying in near real-time.
Putting Big Data to Daily Use
At Uber, data analysis spans many functions like data science, machine learning, fraud detection, and marketing spend to name a few. Uber data includes information about trips, billing, and the health of the infrastructure and services behind our apps. Teams like city operations use data to accurately calculate driver incentive payments or predict other real-time events. The whole process of streaming data to Vertica or Hive, an analytics platform that gives the right people and services the right type of data at the right time, is data ingestion.
Understanding Data Ingestion
The sheer range of data types and databases we deal with makes ingesting data pretty complicated. Our data comes from many sources:
- Kafka, the event messaging system
- Service-oriented architecture (SOA) database tables
- Schemaless, our in-house, custom designed data stores
We’ve also built several in-house tools to pull and make data accessible for analysis. Why build them when there are commercial or open source alternatives? First, our Schemaless database necessitates a custom solution. Second, we internally stream data as opposed to injecting database snapshots into Hadoop. Third, we need to get the data into Hadoop in a reliable way from many sources. Finally, we chart our own data destiny as we can change the code as needed to solve Uber’s unique big data challenges.
Data Ingestion with Streamific
So how do we route Uber data through the Hadoop pipeline? We use Streamific to stream Schemaless and Kafka analytics data to HDFS and HBase. Streamific is Uber Engineering’s in-house Scala-based lightweight continuous streaming service. It maps and transports large data streams from one place to another. As it funnels data across different parts of the Hadoop pipeline, Streamific serves two primary purposes:
- Streamific funnels Schemaless data changes to a Schemaless Kafka cluster and then to HDFS and HBase. We call it the Schemaless Kafka cluster for a reason. Unlike a standard Kafka cluster, we configured this one for higher availability and consistency for transporting primary data.
- Streamific transports data from general-purpose analytics Kafka clusters to HDFS.
Streamific uses Apache Helix and Akka. Akka provides asynchronous messaging, while Helix provides fault tolerance and distributes resources on a cluster. As part of its clustering, Helix assigns Streamific nodes evenly to Schemaless or Kafka shards. For example, say you have two Streamific nodes for six shards. In both the Schemaless and Kafka parts of the setup, Helix maps the shards evenly to the nodes. Since Streamific manages Kafka partitions, if a node fails, a different Streamific node picks up the failed one’s partitions. Right away, Helix redistributes the shards and notifies the Streamific servers of any changes, making scaling Streamific easy. Because new Streamific instances register themselves to the existing cluster automatically, all Helix does is redistribute the shards.
Akka is another player in the Streamific setup. Streamific uses Akka to deliver messages consistently between its components with high throughput. Using the Helix mapping information, Streamific nodes easily identify which partitions to read from Schemaless or Kafka.
Inside Streamific, components called actors talk to each other through event-driven messages:
- The source stream actor (SSA) maps to the origin. In this case, the Schemaless or the Kafka analytics cluster.
- The destination stream actor (DSA) maps to the target, either Kafka, HDFS or HBase.
- The routing actor (RA) manages the state, checkpoints, and mapping for SSA and DSA. The checkpoint is the last read state of the Schemaless Kafka cluster.
- The Streamific server maintains the Streamific cluster.
Here’s how the actors run Streamific:
- Helix assigns Schemaless partitions to the Streamific node.
- The RA on an active Streamific node asks the SSA what messages to write.
- Meanwhile, the SSA sends the messages to the RA, which then funnels them to the DSA.
- Next the RA streams messages to the DSA. When it hits a threshold based on time or row count, the RA checkpoints the last read state. And it does so when the DSA deems it’s safe.
Pros and Cons of the Streamific Approach
Previously we saw that Streamific uses Kafka as an intermediary rather than routing data directly from Schemaless to HBase and HDFS. This approach has its advantages and disadvantages. A good part is that Kafka reduces the Schemaless shards from 4096 to something like 32. A one-to-one mapping exists between the shards and the HDFS files we open. When many files open simultaneously, like 4096 at a time per source, HDFS struggles. By chunking them into smaller sets, we only open a limited number of HDFS files at any given time (typically 32 or fewer).
Secondly, Kafka serializes the data from different data centers to the same partition and prevents conflicting updates downstream. Kafka also retains the last read Schemaless data, so we can reprocess it without revisiting Schemaless and affecting the live database performance.
But one disadvantage of this approach is the added overhead. Operationally, it costs time and money to maintain the Schemaless Kafka cluster. Long-term, we want to streamline the ingestion process more and read directly from Schemaless to HDFS and HBase.
A third advantage of Streamific is its consistency in pulling data from different sources. We designed Streamific so that it can read from any data source and write into the Hadoop pipeline. That is, it reads from Schemaless or Kafka to HDFS and HBase exactly alike. There are slight differences in the way that it interacts with each system, but the core function remains the same. As we continue to extend Streamific to stream from other data sources in the future, this feature becomes more valuable.
The life cycle of an Uber trip from a data ingestion perspective.
The Final Leg of the Hadoop Pipeline
Although Streamific covers a large part of the data ingestion process, we use other in-house tools to process the data further. After we get the data into HDFS and HBase, we run ETL (extract, transform, and load) jobs to stitch, prune, and validate it down the pipeline. ETL jobs load this transformed data into Vertica or HIVE, the analytics warehouse databases. Once there, internal consumers run SQL queries on the big data through Query Builder, the in-house user interface.
We’ve got big plans to refine this process further this year. If you’d like to learn more about how we use data to optimize the Uber experience, consider joining us.
At the Strata Hadoop conference in March 2016, we shared how we revamped our underlying infrastructure with Hadoop and Spark. We also presented an overview of the current Uber data ecosystem and discussed architecture, future roadmap, and initiatives to reduce data latency to Hadoop. See our talk here.
Photo Credit: “Elephant Playing with Water” by Conor Myhrvold, Botswana.
Header Explanation: This elephant, while as happy as the Hadoop one, is ingesting and streaming water instead of data.
Like what you’re reading? Sign up for our newsletter for updates from the Uber Engineering blog.