Managing Uber’s Data Workflows at Scale

At Uber’s scale, thousands of microservices serve millions of rides and deliveries a day, generating more than a hundred petabytes of raw data. Internally, engineering and data teams across the company leverage this data to improve the Uber experience. Application of this data spans several use cases, including calculating more efficient uberPOOL routes and making sure Uber Eats orders are delivered while the food is still hot, real-world events represented by data in our infrastructure.

Making this data actionable involves ingestion, transformation, dispersal, and orchestration so that it can be applied widely across areas such as traditional business intelligence, machine learning, model training, visualization, and reporting. However, during Uber’s early years of rapid growth, we onboarded a broad range of data workflow systems, with users having to choose from several overlapping tools for each use case. While this large toolbox allowed for agile and responsive growth, it proved difficult to manage and maintain, requiring engineers to learn duplicative data workflow systems as they took on different projects. We needed a central tool that could author, manage, schedule, and deploy data workflows.

Leveraging a variety of previously deployed tools at Uber, including an Airflow-based platform, we began developing a system in line with Uber’s scale. This work led us to develop Piper, Uber’s centralized workflow management system, which has allowed us to democratize data workflows at Uber and enable everyone from city operations teams to machine learning engineers to carry out their work faster and more efficiently.

 

The road to a unified workflow management system

Up until a few years ago, teams at Uber used multiple data workflow systems, with some based on open source projects such as Apache Oozie, Apache Airflow, and Jenkins while others were custom built solutions written in Python and Clojure. Every user who needed to move data around had to learn about and choose from these systems, depending on the specific task they needed to  accomplish. Each system required additional maintenance and operational burdens to keep it running, troubleshoot issues, fix bugs, and educate users.

After some reflection, we decided to converge on a single workflow system. Evaluating the available data workflow tools in the industry, we weighed the pros and cons of each, and considered several factors, such as ease of use, stability, open source ecosystem, dependencies on Hadoop ecosystem, expressiveness of domain-specific language (DSL), and programming language used (as compared to the language skills of our user base).

In our new system, we looked for the following qualities:

  • Workflows should be easy to author via code while also being expressive and supporting the ability to generate workflows dynamically.
  • Support a development process that engineers are accustomed to, including developing data workflows as code, and tracking via revision control.
  • Workflows that are easy to visualize and manage.
  • Logs that are easily accessible for viewing, both for past and present runs of a workflow.

After this evaluation, and with the goal in mind of converging on a single workflow system capable of supporting Uber’s scale, we settled on an Airflow-based system. The Airflow-based DSL provided the best trade-off of flexibility, expressiveness, and ease of use while being accessible for our broad range of users, which includes data scientists, developers, machine learning experts, and operations employees.

As we converged towards a single data multi-tenant workflow system, we began the work of decommissioning the other systems we had in place. Deprecating these systems significantly streamlined the ability for users to write workflows, as well as the ability for our to team manage and iteratively improve our system over the long run.

 

Choosing a deployment model: centralized multi-tenant platform

In deploying our system, we had a choice of either utilizing a single, centrally managed installation versus an installation for each team or organization in the company. We found examples of both in the industry, from the single, managed installation of Amazon’s AWS Data Pipeline to the separate Airflow installations allowed by Google Cloud Composer. In this latter case, a user or administrator is responsible for the setup and configuration of the system, with only limited support provided by the vendor.

To make this decision, we considered several factors:

  • How is the system upgraded when changes are introduced?
  • Who is responsible for on-call and system support when bugs or infrastructure issues occur?
  • Who is responsible for scaling the system as the number of workflows increases?
  • How do we avoid snowflake clusters and nodes, different versions of the systems with various configurations?
  • Does the system require any maintenance or configuration by the team using it?

After considering the factors above, we built a centrally deployed model, a single installation per data center supported by our Data Workflow Management team. To the end user, Piper, our implementation of this system, is self-serve and reliable, but our team manages Piper, ensuring it is updated and able to scale reliably with workflow growth across the company. We also provide support to the end user, who does not need to know the internal workings of the system, through office hours, feature requests, documentation, and training.

In choosing the central deployment model, we needed to ensure that Piper scales to a much higher number of workflows than it would need for a per-team deployment, in which each installation only requires a limited number of workflows. Piper also needed to provide better isolation and multi-tenancy capabilities compared to the many-installation model.

 

System architecture

While we based Piper on the original open source Airflow architecture, we re-architected most of the system to make it more performant, highly available, and fit into Uber’s infrastructure. The diagram below details our initial Piper architecture:

Figure 1: Our initial Piper architecture was based on Airflow, an open source workflow authoring, scheduling, and monitoring solution.

 

The original Piper architecture consists of the following five components:

  • Web server: Application server which services HTTP requests, including those for UI endpoints, as well as JSON API endpoints.
  • Scheduler: Responsible for scheduling workflows and tasks. The scheduler takes into account various factors such as schedule interval, task dependencies, trigger rules and retries, and uses this information to calculate the next set of tasks to run. Once resources are available, it will queue the task for execution in the appropriate executor (in our case, Celery).
  • Celery worker: The workers execute all workflow tasks. Each worker pulls the next task for execution from the queue, in our case Redis, and executes the task locally. (An executable task is identified by the workflow ID, task ID, and execution date).
  • Metadata database: Source of truth for all entities in the system such as workflows, tasks, connections, variables, and XCOMs, as well as execution status for the workflows.
  • Python workflows: Python files written by users to define workflows, tasks, and libraries.

 

Isolation from user code

An important lesson we learned through operating Piper in production was the need for isolating user code from system code. One advantage of the workflow DSL is that workflows can be defined by any arbitrary Python constructs, such as looping over configuration files on disk, calling out to external services to get configuration data, or shelling out to the command line. However, this flexibility conflicts with system stability and reliability, as user code can run arbitrary logic, be slow to execute, and potentially cause system errors. In general, good system design encourages isolating user code from the system code wherever possible.

As Figure 2, below, conveys, the original architecture relies on executing user code in all of the system components, which include the Scheduler, Web servers, and Celery workers.

Figure 2: User code is executed in all system components, which can negatively impact Piper’s availability and performance.

 

One of the goals of Piper was to schedule tasks as reliably and quickly as possible. However, the fact that user code was running in these various components presented several issues for our system stability. In our environment, workflow drivers can generate thousands of workflows for a single Python file, sometimes waiting on external services to retrieve configuration, and can be slow to load, thereby negatively impacting system availability and performance.

After consideration, we realized that we could decouple the metadata representation of the workflow from the Python definition. A single workflow definition file can be decomposed to two separate representations:

  1. Metadata representation of the workflows and tasks: A serialized representation which includes workflow/task properties, as well as the dependency graph between tasks. This representation can be used by system components such as the scheduler and web servers, which only need to know high-level metadata about each workflow but do not need to load or execute the user pipeline definition files.
  2. Fully instantiated workflow: Fully hydrated Python task and workflow representation supplied by the user which conforms to the DSL specifications. We can use this representation to extract the workflow metadata and execute the tasks on the Celery worker.
Figure 3: With Piper, pipeline definitions can be thought of as two representations: a serialized metadata representation and a fully executable workflow representation.

 

In order to implement the metadata decomposition, we introduced a new component into the system whose role is to load user Python workflow definitions, extract them, and then store their serialized metadata representation in the database. The metadata representation can then be used by other components of the system such as the scheduler and web servers without having to load any user code. In splitting the metadata representation of workflows from the executable representation, we were able to decouple many of the system components from having to load the Python workflow definitions, which resulted in a more reliable and performant system.

Figure 4: User code no longer needs to be loaded in the scheduler or web servers. With Piper, the workflow serializer provides isolation by extracting metadata from user code.

 

 

Re-architecting for high availability and horizontal scalability

After achieving isolation from user code through metadata serialization, we wanted to further improve Piper’s scalability and system availability. Our goals were to achieve:

  • Improved system efficiency and language support: At Uber, we standardized on using Go and Java for our microservices, and as such, chose to follow this language standardization in Piper, and also provide lower scheduling latency and improved performance while still keeping the DSL in Python.  
  • High availability and elimination of single point of failure: Uber hosts services on its Apache Mesos/μDeploy system, which runs services within Docker containers on an Apache Mesos cluster. These services must gracefully handle container crashes, restarts, and container relocations without any downtime. With the existing system architecture, scheduling was a single point of failure: if the scheduler node disappeared, the system stopped scheduling any tasks. This also occurred during reallocation of nodes, typically caused by deployments, hardware maintenance, or resource shortages.
  • Horizontal scalability for scheduling: The existing system only supports running a single scheduler at any time. As new workflows were added, the scheduling delay tended to increase over time. We wanted the ability to add extra schedulers that could automatically take over a portion of the active workflows. This would provide automatic failover, reduced scheduling delays, and the ability to load balance job scheduling across multiple nodes.

To accomplish these goals, we applied distributed systems concepts to improve Piper’s availability and scalability. We introduced the use of a distributed coordination service to provide primitives we can use to harden the system, and re-architected the system with the changes noted below:

  • Rewrite Piper’s scheduler and executor components in Java: Since launching Piper, we decoupled the scheduler and executor from loading user Python code, and so were now free to use any platform or tool that is best suited for the job. As Uber standardized on Java and Go, we rewrote both the scheduler and executor components in Java, which allowed us to use Java’s more performant concurrency semantics for improved system efficiency.
  • Leverage leader election: For any system components that are meant to run as a singleton, such as the serializer and executor, we have added leader election capability. If the leader becomes unavailable, a new leader is automatically elected from the available back-up nodes. This eliminated the single points of failure and also reduced any downtime during deployments, node restarts, or node relocations in Apache Mesos.
  • Introduce work partitioning: Recall that we had the goal of being able to add additional schedulers, which would then be assigned a portion of the workflows automatically and schedule them. Using the distributed coordination service, we were able to implement efficient work partitioning for task scheduling. This approach makes it possible to add a new scheduler to Piper at any time. As the new scheduler comes online, a set of workflows is automatically assigned to it, and it can start scheduling them. As scheduler nodes come online or offline, the set of workflows are automatically adjusted, giving us high availability and horizontal scalability for task scheduling.
Figure 5: Designed to be highly-available, decomposed, and fully distributed, our Piper architecture supports multiple active schedulers, while eliminating single points of failure.

 

Once the system was developed and tested in staging, we decided that we would use a partial migration strategy rather than migrating all workflows in one go. We first deployed both the Python and distributed Java schedulers side-by-side with the ability to toggle scheduling modes at the workflow level. With this strategy, we were able to fully migrate all our workflows without any impact on the end user. With the changes above, we have now gained high availability for all system components and horizontal scalability for scheduling, along with improved performance through Java concurrency.  

 

Additional platform enhancements

While the topics so far covered the major re-architecture we have performed on our scheduler and workflow serialization, we have integrated several other enhancements  into the platform, outlined below:

  • Multiple data center semantics: We currently run one installation of Piper per data center. We have added workflow semantics where a user can specify whether to run the workflow on single compute mode or dual compute mode. In the case of data center failover, our system will automatically transfer the workflow to a different data center without user intervention.
  • Multi-tenancy: As we have thousands of users and hundreds of teams using our system, we needed to make Piper multi-tenant. We have added additional semantics for permissioning entities such as workflows, connections, and variables to ensure access control for the appropriate owners.
  • Auditing: User actions such as editing connections, editing variables, and toggling workflows are saved to an audit store which can later be searched if needed.
  • Backfill: We have implemented a generic backfill feature which lets users create and manage backfills for any existing workflows with a few clicks in the UI.
  • Visual authoring tools: We have several categories of users at Uber that need to create workflows, some of whom may not be familiar with Python development.  We therefore provide several ways of authoring workflows, typically through domain specific UIs that create workflows dynamically. These UI authoring tools are specific to verticals such as machine learning, dashboarding, and ingestion. We are also currently developing a generalized visual drag and drop tool for workflow creation on top of our system.
  • Workflow definition REST API: We have added the ability to dynamically create workflows via a REST API call with no Python code required. This is similar to the Apache Storm Flux API.
  • Continuous deployment: We use a monorepo to store our workflow definition code. We ensure that the monorepo is continuously deployed into the necessary system components without user intervention.
  • Continuous Integration: Each user commit into the workflow monorepo runs through a suite of unit tests to ensure that no bugs are introduced and to ensure that the workflow is valid.
  • Metrics and monitoring: We have plugged in metrics and monitoring using Uber’s M3 and uMonitor systems. We have also added canary workflows to assess system health and performance, gather information on system and infrastructure stats, and use these metrics to alert our team when there is a system outage.
  • Logging: We have reworked task logging to fit into Uber logging infrastructure and to ensure it is reliable and instantaneously available to the end user, without compromising any of our system availability or reliability.

 

Key takeaways

From our initial deployment of the system until today, we have gone from dozens of workflows to tens of thousands of workflows running hundreds of thousands tasks a day managed by thousands of users. We have done this while keeping the system stable and performant, and have improved availability, scalability, and usability. We have re-architected the system in order to support this scale by following a few principles:

  • Prioritize user-friendliness and expressiveness of writing workflows. This includes the ability to easily manage workflows and instantaneously access workflow logs.
  • Whenever possible, converge to unified products across the company. Converging into our unified system drastically reduced our maintenance burden, on-call incidents, and user confusion, while unifying the team around a single product which can be iteratively improved.
  • Choose a central multi-tenant deployment model. In our case, using such a model allowed us to better support our users and provide an easy upgrade path without requiring teams to be aware of system internals or configuration in standing up new instances of the system.
  • Avoid running user code in system components whenever possible. Bifurcating the workflow metadata drastically increased the reliability of our systems and also provided additional flexibility as well (allowing us to rewrite the scheduling component in Java).
  • Eliminate any single points of failure in order to ensure uptime and system availability.
  • Use distributed systems concepts such as leader election, failover, and work partitioning for higher availability and improved scalability.

 

If working on distributed computing and data challenges appeals to you, consider applying for a role on our team!

 

Acknowledgments

We would like to thank engineering managers Pawan Dixit and Anthony Asta, as well as Data Workflow team members Alex Kira, Ankit Mody, Atasi Panda, and Prakhar Garg.

 

Comments