вівторок, 11 листопада 2014 р.

Spark and Location Sensitive Hashing, part 2

This is a second part of topic about Locality Sensitive Hashing, and here is example of creating working example using Apache Spark.

Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:

Bank Accounts

Name Tom Soyer Andy Bin Tom Wiscor Tomas Soyér
Credit score 10 20 30 40

Web-site Visitors

Name Tom Soyer Andrew Bin Tom Viscor Thomas Soyer
email 1@1 2@1 3@1 2@2

Well, we have to join these two data sets by name, and as misspeling is possible, I will use Hamming distance to find the most similar names in bucket. So, Hamming function is following:


private def stringDistance(s1: String, s2: String): Int = {
        def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
        def sd(s1: List[Char], s2: List[Char]): Int = (s1, s2) match {
            case (_, Nil) => s1.length
            case (Nil, _) => s2.length
            case (c1::t1, c2::t2)  => min( sd(t1,s2) + 1, sd(s1,t2) + 1,
                sd(t1,t2) + (if (c1==c2) 0 else 1) )
        }
        sd( s1.toList, s2.toList )
    }


The second things to do, is to define a set of functions that would be used for get rid of data:


/**
     *  Fowler–Noll–Vo (FVN) hash function
     *   @see ;
     *            http://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
     *   
* */ private[impl] def LshHash[T](seedOne: Int, seedTwo: Int)(input:T): Int = { var hash = 2166136261L.asInstanceOf[Int] // offset_basis for FNV-1 hash = hash*16777619 ^ seedOne // FNV_prime hash = hash*16777619 ^ seedTwo hash = hash*16777619 ^ input.hashCode() return hash }
Now we have to create a set of this functions:

private val minHashFuns = new mutable.ArrayBuffer[ (Any) => Int ]() // array of minhash functions that were initialized with basic Seed values
@transient private val rnd = new Random(2014) // the same seed is required to generate the same sequence on different machines
private def populateMinHashes() = {
        for( i <- 1 to signatureSize*signatureGroups) {
            minHashFuns += ( LshHash(rnd.nextInt(), rnd.nextInt()) )
        }
    }

And there is how we apply minhashes:

private def applyMinHashed[T <: font=""> NGramEnabled](rdd: RDD[T]): RDD[(String, T)] = {
        return rdd.flatMap {
            e =>
                (0 until signatureGroups).by(1).map {
                    i => Array(getMinHashSignatureAsStr(NGrams.getNGramms(e.getStringForNGram()), i), e)
                }
        }.map{
            x =>
                (x(0).asInstanceOf[String], x(1).asInstanceOf[T])
        }
    }

    private def getMinHashSignatureAsStr(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): String = {
        return getMinHashSignature(tokens, signatureGroupNum).mkString("_")
    }

    private def getMinHashSignature(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): Array[Int] = {
        val minHashValues = Array.fill[Int](signatureSize)(Int.MaxValue)

        // we don't need to hash the same token more then once, so will save all hashed tokens
        val uniqueTokens = new mutable.HashSet[String]()
        for(token <- font=""> tokens) {
            if( uniqueTokens.add(token) ) {
                // apply each LSH function to token
                for( j <- font=""> 0 until signatureSize ) {
                    val currentHashValue = minHashFuns(signatureGroupNum*signatureSize + j)(token)
                    if( currentHashValue < minHashValues(j) ) {
                        minHashValues(j) = currentHashValue
                    }
                }

            }
        }

        return minHashValues
    }

And now we are ready to merge all code and delivery solution for joining two RDDs:

 def join(accounts: RDD[BankAccount], visitors: RDD[Visitor]): RDD[(Visitor, BankAccount)] = {
        /*  In Scala, these operations are automatically available on RDDs
            containing Tuple2 objects (the built-in tuples in the language, created by
            simply writing (a, b)), as long as you import
            org.apache.spark.SparkContext._ in your program to enable Spark’s implicit
            conversions.*/
        return applyMinHashed(accounts).join( applyMinHashed(visitors) ).map{
            case (key, (account, visitor)) =>
                (visitor, account)
        }.groupByKey()
        .map{
            case (visitor, accounts) => {
                var closestAccount: BankAccount = null
                var bestEditDistance = Int.MaxValue
                for (a <- font=""> accounts) {
                    val curEditDist = stringDistance(visitor.name, a.name)
                    if (curEditDist < bestEditDistance) {
                        bestEditDistance = curEditDist
                        closestAccount = a
                    }
                }
                (visitor, closestAccount)
            }
        }
    }

Final code to join two RDD and print result to console:
        val acc2vis = service.join(accounts, visitors)
        for( (v,a) <- font=""> acc2vis.collect() ) {
            println( f"Visitor ${v.name}%s has score level ${a.score}%2.2f (${a.name}%s)" )
        }

2 коментарі:

  1. Good overview and can u please elaborate more on second step and 3rd steps

    ВідповістиВидалити
  2. Look like no where populateMinHashes() function is called in the above code. Did i miss something here.

    ВідповістиВидалити