вівторок, 11 листопада 2014 р.

Spark and Location Sensitive Hashing, part 2

This is a second part of topic about Locality Sensitive Hashing, and here is example of creating working example using Apache Spark.

Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:

Bank Accounts

Name Tom Soyer Andy Bin Tom Wiscor Tomas Soyér
Credit score 10 20 30 40

Web-site Visitors

Name Tom Soyer Andrew Bin Tom Viscor Thomas Soyer
email 1@1 2@1 3@1 2@2

Well, we have to join these two data sets by name, and as misspeling is possible, I will use Hamming distance to find the most similar names in bucket. So, Hamming function is following:


private def stringDistance(s1: String, s2: String): Int = {
        def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
        def sd(s1: List[Char], s2: List[Char]): Int = (s1, s2) match {
            case (_, Nil) => s1.length
            case (Nil, _) => s2.length
            case (c1::t1, c2::t2)  => min( sd(t1,s2) + 1, sd(s1,t2) + 1,
                sd(t1,t2) + (if (c1==c2) 0 else 1) )
        }
        sd( s1.toList, s2.toList )
    }


The second things to do, is to define a set of functions that would be used for get rid of data:


/**
     *  Fowler–Noll–Vo (FVN) hash function
     *   @see ;
     *            http://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
     *   
* */ private[impl] def LshHash[T](seedOne: Int, seedTwo: Int)(input:T): Int = { var hash = 2166136261L.asInstanceOf[Int] // offset_basis for FNV-1 hash = hash*16777619 ^ seedOne // FNV_prime hash = hash*16777619 ^ seedTwo hash = hash*16777619 ^ input.hashCode() return hash }
Now we have to create a set of this functions:

private val minHashFuns = new mutable.ArrayBuffer[ (Any) => Int ]() // array of minhash functions that were initialized with basic Seed values
@transient private val rnd = new Random(2014) // the same seed is required to generate the same sequence on different machines
private def populateMinHashes() = {
        for( i <- 1 to signatureSize*signatureGroups) {
            minHashFuns += ( LshHash(rnd.nextInt(), rnd.nextInt()) )
        }
    }

And there is how we apply minhashes:

private def applyMinHashed[T <: font=""> NGramEnabled](rdd: RDD[T]): RDD[(String, T)] = {
        return rdd.flatMap {
            e =>
                (0 until signatureGroups).by(1).map {
                    i => Array(getMinHashSignatureAsStr(NGrams.getNGramms(e.getStringForNGram()), i), e)
                }
        }.map{
            x =>
                (x(0).asInstanceOf[String], x(1).asInstanceOf[T])
        }
    }

    private def getMinHashSignatureAsStr(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): String = {
        return getMinHashSignature(tokens, signatureGroupNum).mkString("_")
    }

    private def getMinHashSignature(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): Array[Int] = {
        val minHashValues = Array.fill[Int](signatureSize)(Int.MaxValue)

        // we don't need to hash the same token more then once, so will save all hashed tokens
        val uniqueTokens = new mutable.HashSet[String]()
        for(token <- font=""> tokens) {
            if( uniqueTokens.add(token) ) {
                // apply each LSH function to token
                for( j <- font=""> 0 until signatureSize ) {
                    val currentHashValue = minHashFuns(signatureGroupNum*signatureSize + j)(token)
                    if( currentHashValue < minHashValues(j) ) {
                        minHashValues(j) = currentHashValue
                    }
                }

            }
        }

        return minHashValues
    }

And now we are ready to merge all code and delivery solution for joining two RDDs:

 def join(accounts: RDD[BankAccount], visitors: RDD[Visitor]): RDD[(Visitor, BankAccount)] = {
        /*  In Scala, these operations are automatically available on RDDs
            containing Tuple2 objects (the built-in tuples in the language, created by
            simply writing (a, b)), as long as you import
            org.apache.spark.SparkContext._ in your program to enable Spark’s implicit
            conversions.*/
        return applyMinHashed(accounts).join( applyMinHashed(visitors) ).map{
            case (key, (account, visitor)) =>
                (visitor, account)
        }.groupByKey()
        .map{
            case (visitor, accounts) => {
                var closestAccount: BankAccount = null
                var bestEditDistance = Int.MaxValue
                for (a <- font=""> accounts) {
                    val curEditDist = stringDistance(visitor.name, a.name)
                    if (curEditDist < bestEditDistance) {
                        bestEditDistance = curEditDist
                        closestAccount = a
                    }
                }
                (visitor, closestAccount)
            }
        }
    }

Final code to join two RDD and print result to console:
        val acc2vis = service.join(accounts, visitors)
        for( (v,a) <- font=""> acc2vis.collect() ) {
            println( f"Visitor ${v.name}%s has score level ${a.score}%2.2f (${a.name}%s)" )
        }

3 коментарі:

  1. Good overview and can u please elaborate more on second step and 3rd steps

    ВідповістиВидалити
  2. Look like no where populateMinHashes() function is called in the above code. Did i miss something here.

    ВідповістиВидалити
  3. Find Girls Whatsapp Number

    We can give you the details of mobile number of 22 countries of the world along with the name and home address and location of the owner. The list of countries whose mobile number details are available for you is provided below:
    Australia, Bangladesh, Canada, China, Ghana, India, Indonesia, Kenya, Malaysia, Nepal, Norway, Nigeria, Philippines, Poland, Pakistan, Singapore, Saudi Arabia, South Africa, Sri Lanka, United Arab Emirates, United Kingdom, United States.

    ВідповістиВидалити