From determining the most convenient rider pickup points to predicting the fastest routes, Uber uses data-driven analytics to create seamless trip experiences. Within engineering, analytics inform decision-making processes across the board. As we expand to new markets, the ability to accurately and quickly aggregate data becomes even more important.

In early 2014, Uber only had several hundred employees worldwide. But by late 2016, we had over two thousand people running more than one hundred thousand analytic queries daily. We needed a data querying system that could keep up with our growth. To run analytic queries against multiple data sources, we designed an analytics system that leverages Presto, an open source distributed SQL engine for large datasets, and Parquet, a columnar storage format for Hadoop.

In this article, we outline our Presto architecture and discuss how we developed a new Parquet reader to power Uber’s robust data analytics using the magic of columnar storage.

 

Using Presto at Uber

We chose Presto as our system’s SQL engine because of its scalability, high performance, and smooth integration with Hadoop. These properties make it a good fit for many of our teams.

Presto architecture

Uber’s Presto ecosystem is made up of a variety of nodes that process data stored in Hadoop. Each Presto cluster has one “coordinator” node that compiles SQL and schedules tasks, as well as a number of “worker” nodes that jointly execute tasks. As detailed in Figure 1, the client sends SQL queries to our Presto coordinator, whose analyzer compiles SQL into an Abstract Syntax Tree (AST).

From there, the planner compiles the AST into a query plan, optimizing it for a fragmenter that then segments the plan into tasks. Next, the scheduler assigns each task—either reading files from the Hadoop Distributed File System (HDFS) or conducting aggregations—to a specific worker, and the node manager tracks their progress. Finally, results of these tasks are streamed to the client.

Figure 1: Uber’s Presto architecture incorporates one coordinator node that analyzes and schedules tasks and several worker nodes that scan and aggregate data for use by the client.

 

Hadoop infrastructure and analytics

All analytic data sets at Uber are captured in our Hadoop warehouse, including event logs replicated by Kafka, service-oriented architecture tables built with MySQL and Postgres, and trip data stored in Schemaless. We run Flink, Pinot, and MemSQL for streaming and real-time analysis of this data.

The Hadoop Distributed File System (HDFS) is our data lake. In this ecosystem, event logs and trip data are ingested using Uber internal data ingestion tools, and service-oriented tables are copied to HDFS via Sqoop. With Uber Hoodie, Uber’s incremental updates and inserts library, data is first dumped into our HDFS as nested raw files, and then some of these raw tables are converted into modeled tables via extract, transform, load (ETL) jobs. While batch and ETL jobs run on Hive and Spark, near real-time interactive queries run on Presto.

This robust Hadoop infrastructure is integrated across Uber’s data centers, incorporating existing all-active, observability, cluster management, and security features.

Figure 2: Supported by our tech stack, Uber’s Hadoop infrastructure captures and stores data from a variety of sources.

 

Columnar storage for easy access

Uber data is dumped into HDFS and registered as either raw or modeled tables, both of which are query-able by Presto.

Raw tables do not require preprocessing and are highly nested; it is not uncommon to see more than five levels of nesting. Raw table data ingestion latency is about 30 minutes thanks to the processing power of Hoodie.

On the other hand, modeled tables—which are accessed more frequently—are more carefully selected and flattened using ETL preprocessing jobs. Data ingestion latency for modeled tables is much higher; most modeled tables have a ingestion latency of 8 to 24 hours.

Due to the scale of our data and low latency requirements of our analytics, we store data as columns as opposed to rows, which enables Presto to answer queries more efficiently. By not having to scan and discard unwanted data in rows, columnar storage saves disk space and improves query performance for larger data sets.

Enter Parquet

We chose Parquet for Uber’s Hadoop storage solution because of its compression and encoding functionalities, as well as its ground-up support of nested data sets. These features allow our query engines (including Presto) to reach peak performance and query speed.

Figure 3: Parquet is Uber Engineering’s storage solution for our Hadoop ecosystem, partitioning data horizontally into rows and then vertically into columns for easy compression.

In Parquet, data is first horizontally partitioned into groups of rows, then within each group, data is vertically partitioned into columns. Data for a particular column is stored together via compression and encoding to save space and improve performance. Each Parquet file has a footer that stores codecs, encoding information, as well as column-level statistics, e.g., the minimum and maximum number of column values.

On a theoretical level, Parquet was the perfect match for our Presto architecture, but would this magic transfer to our system’s columnal needs?

 

A new Parquet reader for Presto

Parquet is supported in Presto using the original open source Parquet reader. While working well with open source Presto, this reader neither fully incorporates columnar storage nor employs performance optimizations with Parquet file statistics, making it ineffective for our use case.

To tackle this performance issue, we developed a new Parquet reader for Presto to leverage the potential of Parquet in our data analytics system. Below is an example query to determine which drivers to target in a specific city on a given date based on expected rider demand:

SELECT base.driver_uuid
FROM rawdata.schemaless_mezzanine_trips_rows
WHERE datestr = ‘2017-03-02’ AND base.city_id in (12)

In this scenario, the nested table rawdata.schemaless_mezzanine_trips_rows stores more than 100 terabytes of raw trip data in Parquet. Using the above example, we will demonstrate how queries are processed using both the original open source reader and an open source reader of our own invention.

 

Original open source Parquet reader

Figure 4: The original open source Parquet reader does not fully incorporate columnar storage, making it inefficient to analyze large sets of Uber data.

The original reader conducts analysis in three steps: (1) reads all Parquet data row by row using the open source Parquet library; (2) transforms row-based Parquet records into columnar Presto blocks in-memory for all nested columns; and (3) evaluates the predicate (base.city_id=12) on these blocks, executing the queries in our Presto engine.

 

Uber’s new Parquet reader

To accommodate Uber data’s size and scale, we created a new open source Parquet reader that uses memory and CPU more efficiently. This new reader implements four optimizations geared towards enhancing performance and speeding up querying.

Nested column pruning

Figure 5: Uber’s new open source reader can skip over unnecessary data via nested column pruning.

One way the new reader optimizes querying is by skipping over unnecessary data, referred to as nested column pruning. As the name suggests, this optimization is most effective when used with nested data.

The new reader executes nested column pruning in three steps: (1) read only required columns in Parquet; (2) transform row-based Parquet records into columnar blocks; and (3) evaluate the predicate on columnar blocks in the Presto engine.

Columnar reads

Figure 6: Our new reader enhances querying by reading columns directly as opposed to row-by-row.

The new reader can also read columns in Parquet directly instead of row-by-row and then execute a row-to-column transformation, which speeds up querying. It executes columnar reads in two steps: (1) read only required columns in Parquet and build columnar blocks on the fly, saving CPU and memory to transform row-based Parquet records into columnar blocks, and (2) evaluate the predicate using columnar blocks in the Presto engine.

Predicate pushdowns

Figure 7: Predicate pushdowns allow us to skip reading Parquet row groups to save disk IOs. In this example, the query is looking for city_id = 12, one row group city_id max is 10, new Parquet reader will skip this row group.

With our new reader, we can evaluate SQL predicates while scanning Parquet files. By using Parquet statistics, we can also skip reading parts of the file, thereby saving memory and streamlining processing. Predicate pushdowns are primarily used for “needle in a haystack” queries.

The new reader executes predicate pushdowns by merging three actions into one step: simultaneously read the required columns in Parquet, evaluate columnar predicates on the fly, and build columnar blocks. In this scenario, the reader skips reading a group of rows if the predicates do not match to the one being queried.

Dictionary pushdowns

Figure 8: Like predicate pushdowns, dictionary pushdowns are executed in one step that simultaneously reads and evaluates data columns while building columnar blocks for our Presto engine. In this example, the query is looking for city_id = 12; since one row group’s city_id dictionary includes the IDs 3, 5, 9, 14, 21, the new reader will skip this group.

Even if Parquet statistics match the predicate, we can read the dictionary page for each column to determine whether the dictionary can potentially match the predicate. If not, we can skip reading that row group.  Like predicate pushdowns, dictionary pushdowns make querying faster and are most effective for needle in a haystack queries.

Dictionary pushdowns are also executed by our new reader in only one step: read required columns in Parquet, evaluate columnar predicates on the fly and build columnar blocks. Similar to predicate pushdowns, dictionary pushdowns enable the reader to skip reading groups of rows if dictionary values do not match the predicate.

Lazy reads

Figure 9: Lazy reads are performed only when they match the predicate, saving CPU and memory.

Our reader can also be programmed to read projected columns as lazily as possible. This means that we read projected columns only when they match the predicate, thereby speeding up our querying.

Lazy reads are executed in a single step: read the required columns in Parquet, evaluate columnar predicates on the fly, and build columnar blocks only if the predicate matches.

All four optimizations are unique to Uber’s Parquet reader, ensuring the efficient use of our data storage and analytics. Since putting our new Parquet reader into production, data is processed anywhere from 2-10x faster compared to when we used the original open source reader.

Figure 10: Our new reader demonstrated 2-10X speedup for Uber’s benchmark SQL queries.

 

Ongoing work

Overtime, Presto has emerged as a key component of analyzing our interactive SQL queries for big data at scale. Since deploying in 2016, our Presto cluster has exceeded over 300 nodes, is capable of accessing over five petabytes of data, and completes more than 90 percent of queries within 60 seconds. Who needs to pull a rabbit out of a hat when you have magic like that in your tech stack?

Even with these metrics, our team is still actively improving Presto’s reliability, scalability, and performance to make our data analytics as efficient as possible. Ongoing efforts include: a Presto Elasticsearch connector, multi-tenancy resource management, high availability for Presto coordinators, geospatial function support and performance improvement, and caching HDFS data.

If you are interested in joining our group of analytics magicians, apply for a role on Uber’s Data Infrastructure team.

Zhenxiao Luo is a software engineer on Uber’s Hadoop Infrastructure and Analytics team. Currently, he leads Uber’s Presto development and operations.

Photo Header Credit: “Elephant splashing mud to stay cool” by Conor Myhrvold, Okavango Delta, Botswana.

0 Comments