Skip to main content
Backend, Data / ML, Engineering

Building Scalable Streaming Pipelines for Near Real-Time Features

August 24, 2021 / Global
Featured image for Building Scalable Streaming Pipelines for Near Real-Time Features
Figure 1: Simplified Overview of the Architecture
Figure 2: Ring 0, 1, 2 of Hexagon A
Figure 3: Aggregation of a 2-minute Window for Hexagon A
Figure 4: Logical DAG of Demand Pipeline
OperatorFunctionalities
Kring Smooth on 1-MinThis operator applies the Kring Smooth algorithm.
2-Min, 4-Min, 8-Min, 16-Min, 32-MinThese operators aggregate for those windows sliding by 1 minute, using the smoothed demands returned from Kring Smooth.
Merge Windows
Figure 5: Dashboard of the Used Memory
Figure 6: Dashboard of the Lagging
Figure 7: the Framework of Performance Tuning
DomainMajor Impacted AreasRemarks
Parallelism
Partition
Remote Call
AlgorithmCPU
Garbage Collector Use Cases:
TechniqueArea / DomainExplanations
Fields exclusionAlgorithmRaw Kafka messages have many fields that are not used by the streaming pipeline; hence we have a mapper to filter out unused fields at the very beginning of Job DAG.
Key encoding / decodingAlgorithmGlobal product type UUID is used as part of the output key. We encode the UUID (128 bits) with a byte (8 bits) via an internal encoding, and then convert back to UUID: writing the output to sink, which reduces the size for both the memory and the network payload.
DedupAlgorithmWe have introduced a 1-Min window to only keep one demand event per distinct rider before the Kring Smooth. The dedup, implemented with a reduce function, has significantly reduced the message rate to be 8k/s for the expensive Kring Smooth calculation. The trade-off is that the dedup window has introduced one more minute to the latency, which is mitigated with other techniques.
Field Type SelectionAlgorithmWe have refactored the algorithm so that we could choose Integer as the data type for the intermediate computation value rather than Double, which further reduces the message size from 451 bytes to 237 bytes.
Merge Window OutputRemote CallWe could have 2 options when writing the output into sink:
TechniqueArea / DomainExplanations
Object ReuseGarbage CollectorBy default, objects are not reused in Flink when passing messages between the upstream and downstream operators. We have when possible to avoid message cloning.
Increase ContainersPartitionDue to the large data volume and states (each container could consume around 6G memory) we have used 128 containers, each with one vcore, for the streaming pipeline.
Key encoding / decodingAlgorithmGlobal product type UUID is used as the key of the output. We encode the UUID (128 bits) with a byte (8 bits) via an internal encoding, and convert back to UUID before writing the output to Sink, which reduces the memory size.
Fields projectionAlgorithmRaw Kafka messages have many fields that are not used by the streaming pipeline, hence we have a mapper to filter out unused fields at the entry point of Job DAG, reducing messages’ memory load on computation.
Field Type SelectionAlgorithmWe have refactored the algorithm to choose Integer as the data type for the intermediate computation value rather than Double, reducing the message size from 451 to 237 bytes. 
TechniqueArea / DomainExplanations
Message in TupleCPUFlink provides the Tuple type, which is more efficient compared to POJO at serialization, due to the direct access without reflection. We have chosen Tuple for messages being passed between operators.
Avoid boxing / unboxingCPUThe streaming pipeline needs to call an internal library (H3) to retrieve the neighbours for a given hexagon. The API returns an array of Long, leading to unnecessary boxing/unboxing along with the computation. We have added another API to return an array of the primitive type instead.
Cache for APIRemote CallWe have enabled an in-memory cache to improve performance when converting the global product type, as the return from remote API does not change that much.
Hexagon Index TypeAlgorithmWe converted the default hexagon data type from String to Long, which has reduced the window aggregation function’s time by 50%. 
Kring SmoothAlgorithmWe have re-implemented the Kring Smooth algorithm to make it more efficient.
Figure 8: Final DAG of Demand Pipeline
Figure 9: Dashboard of Showing the Lagging After Optimization
Figure 10: Dashboard of Showing Memory Usage After Optimization
Figure 11: Write QPS is not stable with one row per API call
Write QPS 
First version150
Fixing the metrics cardinality problem16k
After disabling writing to kafka topic17.6k
Batch size 50 34k
Batch size 10037k
Batch size 20038k
Job ParallelismBatch SizeWrite QPS
25620075k
1024200112k
102450120k
Thread Pool SizeJob ParallelismBatch SizeWrite QPS
321285064k
1281285062k
1282565080k
1625650120k
Figure 12: Job lagging keeps increasing
Figure 13: Topology of the job and backpressure is at the custom partition stage
No compressionWith compression
driver_pricing96.3TB36.8TB
driver_pricing_map105.6TB43.6TB
driver_pricing_map_group328TB132.6TB
Figure 14: Serving latency drops from 150ms to 10ms after the optimization
Feng Xu

Feng Xu

Feng Xu is a Senior Software Engineer II at Uber. He leads the streaming computation framework in Gairos, uMetric.

Gang Zhao

Gang Zhao

Gang Zhao is a former Senior Software Engineer II who led Gairos optimization, uMetric consumption while focusing on storage layer Elasticsearch, Apache Pinot, Apache Cassandra, Docstore, and query layer optimization.

Posted by Feng Xu, Gang Zhao