With five million plus Uber trips taken daily worldwide, it is important for Uber engineers to ensure that data is accurate. If used correctly, metadata and aggregate data can quickly detect platform abuse, from spam to fake accounts and payment fraud. Amplifying the right data signals makes detection more precise and thus, more reliable.

To address this challenge in our systems and others, Uber Engineering and Databricks worked together to contribute Locality Sensitive Hashing (LSH) to Apache Spark 2.1. LSH is a randomized algorithm and hashing technique commonly used in large-scale machine learning tasks including clustering and approximate nearest neighbor search.

In this article, we will demonstrate how this powerful tool is used by Uber to detect fraudulent trips at scale.

 

Why LSH?

Before Uber Engineering implemented LSH, we used the N^2 approach to sift through trips; while accurate, the N^2 approach was ultimately too time-consuming, volume-intensive, and hardware-reliant for Uber’s size and scale.

The general idea of LSH is to use a family of functions (known as LSH families) to hash data points into buckets so that data points near each other are located in the same buckets with high probability, while data points  far from each other are likely in different buckets. This makes it easier to identify trips with various degrees of overlap.

For reference, LSH is a multi-use technology with myriad applications, including:

  • Near-duplicate detection: LSH is commonly used to de-duplicate large quantities of documents, webpages, and other files.
  • Genome-wide association study: Biologists often use LSH to identify similar gene expressions in genome databases.
  • Large-scale image search: Google used LSH along with PageRank to build their image search technology VisualRank.
  • Audio/video fingerprinting: In multimedia technologies, LSH is widely used as a fingerprinting technique for A/V data.

LSH at Uber

The primary LSH use case at Uber is detecting similar trips based on their spatial properties, a method of identifying fraudulent drivers. Uber engineers presented on this use case during Spark Summit 2016, where they discussed our team’s motivations behind using LSH on the Spark framework to broadcast join all trips and sift through fraudulent ones. Our motivations for using LSH on Spark are threefold:

  1. Spark is integral to Uber’s operations, and many internal teams currently use Spark for various types of complex data processing including machine learning, spatial data processing, time series computation, analytics and prediction, and ad hoc data science exploration. In fact, Uber uses almost all Spark components such as MLlib, Spark SQL, Spark Streaming, and direct RDD processing on both YARN and Mesos; since our infrastructure and tools are built around Spark, and Uber engineers can create and manage Spark applications easily.
  2. Spark makes it efficient to do data cleaning and feature engineering before any actual machine learning is conducted, making the number-crunching much faster. Uber’s high volume of collected data makes solving this problem by basic approaches unscalable and very slow.
  3. We do not need an exact solution for this equation, so there is no need to purchase and maintain additional hardware. Approximations provide us with enough information to make judgment calls on potentially fraudulent activity and, in this case, are good enough to solve our problems. LSH allows us to trade some precision for saving a lot of hardware resources.

For these reasons, solving the problem by deploying LSH on Spark was the right choice for our business goals: scale, scale, and scale again.

At a high level, our approach to using LSH has three steps. First, we create a feature vector for each trip by breaking it down into area segments of equal size. Then, we hash the vectors by MinHash for Jaccard distance function. Lastly, we either do similarity join in batch or k-Nearest Neighbor search in real-time. Compared to the basic brute-force approach of detecting fraud, our datasets enabled Spark jobs to finish faster by a full order of magnitude (from about 55 hours with the N^2 method to 4 hours using LSH).

API Tutorial

To best demonstrate how LSH works, we will walk through an example of using MinHashLSH on the Wikipedia Extraction (WEX) dataset to find similar articles.

Each LSH family is linked to its metric space. In Spark 2.1, there are two LSH estimators:

In this scenario, we use MinHashLSH since we will work with real-valued feature vectors of word counts.

Load Raw Data

First, we need to launch an EMR (Elastic MapReduce) cluster and mount a WEX dataset as an EBS (Elastic Block Store) volume. Additional details on this process are available via the AWS documentations on EMR and EBS.

After setting up the text environment, we upload a sample of WEX data to HDFS based on the EMR cluster size. In the Spark shell, we load the sample data in HDFS:

// Read RDD from HDFS
   import org.apache.spark.ml.feature._
   import org.apache.spark.ml.linalg._
   import org.apache.spark.sql.types._
   val df = spark.read.option(“delimiter”,”\t”).csv(“/user/hadoop/testdata.tsv”)

   val dfUsed = df.select(col(“_c1”).as(“title”), col(“_c4”).as(“content”)).filter(col(“content”) !== null)
   dfUsed.show()

Figure 1: Wikipedia articles are represented as titles and content in Spark.

Figure 1 shows the results of our previous code, displaying articles by title and subject matter. We will use the content as our hashing keys and approximately find similar Wikipedia articles in the following experiments.

Prepare Feature Vectors

MinHash is a very common LSH technique for quickly estimating how similar two sets are to each other. With MinHashLSH implemented in Spark, we represent each set as a binary sparse vector. In this step, we will convert the contents of Wikipedia articles into vectors.

Using the following code for feature engineering, we split the article content into words (Tokenizer), create feature vectors of word counts (CountVectorizer), and remove empty articles:

// Tokenize the wiki content
   val tokenizer = new Tokenizer().setInputCol(“content”).setOutputCol(“words”)
   val wordsDf = tokenizer.transform(dfUsed)

   // Word count to vector for each wiki content
   val vocabSize = 1000000
   val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol(“words”).setOutputCol(“features”).setVocabSize(vocabSize)
.setMinDF(10).fit(wordsDf)

   val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)
   val vectorizedDf = cvModel.transform(wordsDf).filter(isNoneZeroVector(col(“features”))).select(col(“title”), col(“features”))

 vectorizedDf.show()

Figure 2: After feature engineering our code, the contents of Wikipedia articles are converted to binary sparse vectors.

Fit and Query an LSH Model

In order to use MinHashLSH, we first fit a MinHashLSH model on our featurized data with the below command:

val mh = new MinHashLSH().setNumHashTables(3).setInputCol(“features”).setOutputCol(“hashValues”)
   val model = mh.fit(vectorizedDf)

We can make several types of queries with our LSH model, but for the purposes of this tutorial, we first run a feature transformation on the dataset:  

model.transform(vectorizedDf).show()

This command provides us with the hash values, which can be useful for manual joins and for feature generation.

Figure 3: MinHashLSH adds a new column to store hashes, with each hash represented as an array of vectors.

Next, we run an approximate nearest neighbor search to find the data point closest to our target.  For the sake of demonstration, we search for articles with content approximately matching the phrase united states.

 val key = Vectors.sparse(vocabSize, Seq((cvModel.vocabulary.indexOf(“united”), 1.0), (cvModel.vocabulary.indexOf(“states”), 1.0)))
   val k = 40
   model.approxNearestNeighbors(vectorizedDf, key, k).show()

Figure 4: An approximate nearest neighbor search finds Wikipedia articles related to the “united states.”

Finally, we run an approximate similarity join to find similar pairs of articles within the same dataset:

  // Self Join
  val threshold = 0.8
  model.approxSimilarityJoin(vectorizedDf, vectorizedDf, threshold).filter(“distCol != 0”).show()

Note that while we use a self join, below, we could also join different datasets to get the same results.

Figure 5: An approximate similarity join lists similar Wikipedia articles, setting the number of hash tables.

Figure 5 demonstrates how to set the number of hash tables. For the approximate nearest neighbor command and an approximate similarity join, the number of hash tables can be used to trade off between runtime and false positive rate. Adding more hash tables will increase the accuracy (a positive), but also the program’s communication cost and running time. By default, the number of hash tables is set to one.

To gain additional practice using LSH in Spark 2.1, you can also run smaller examples in the Spark distribution for BucketRandomProjectionLSH and MinHashLSH.

Performance Tests

In order to gauge performance, we then benchmark our implementations of MinHashLSH on the WEX dataset. Using an AWS cloud, we task 16 executors (m3.xlarge instances) with performing an approximate nearest neighbor search and approximate similarity join on a sample of WEX datasets. 

In the tables below, we can see that approximate nearest neighbor ran 2x faster than full scan with the number of hash tables set to five, while approximate similarity join ran 3x-5x faster depending on the number of output rows and hash tables:

Figure 6: With numHashTables=5, approximate nearest neighbor ran 2x faster than full scan (as shown on right). With numHashTables=3, approximate similarity join ran 3x-5x faster than full join and filter (as shown on left).

Our experiment also shows that despite their short runtime, the algorithms achieved high accuracy compared to the results of brute-force methods like ground truth. Meanwhile, approximate nearest neighbor search achieved 85 percent accuracy for the 40 returned rows, while our approximate similarity join successfully found 93 percent of the nearby row pairs. This speed-accuracy trade-off has made LSH a powerful tool in detecting fraudulent trips daily from mere terabytes of data.

 

Next Steps

While our LSH model has helped Uber identify fraudulent driver activity, our work is far from complete. During our initial implementation of LSH, we planned a number of features to deploy in future releases. The high priority features include:

  1. SPARK-18450: Besides specifying the number of hash tables needed to complete the search, this new feature uses LSH to define the number of hash functions in each hash table.  This change will also provide full support for AND/OR-compound amplification.
  2. SPARK-18082 & SPARK-18083: There are other LSH families we want to implement. These two updates will enable  bit sampling for the Hamming distance between two data points and signs of random projection for cosine distance that are commonly used in machine learning tasks.
  3. SPARK-18454: A third feature will improve the API of the approximate nearest neighbor search. This new multi-probe similarity search can improve the search quality without the requirement for a large number of hash tables.

We welcome your feedback as we continue to develop and scale our project to incorporate the above featuresand many others. 

Yun Ni is a software engineer on Uber’s Machine Learning Platform team, Kelvin Chu is technical lead engineer on Uber’s Complex Data Processing/Speak team, and Joseph Bradley is a software engineer on Databricks’ Machine Learning team.

Photo Header Credit: “Identifying Okavango Elephants from the Air” by Conor Myhrvold, Botswana.

0 Comments