Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:
Bank Accounts
Name | Tom Soyer | Andy Bin | Tom Wiscor | Tomas Soyér |
Credit score | 10 | 20 | 30 | 40 |
Web-site Visitors
Name | Tom Soyer | Andrew Bin | Tom Viscor | Thomas Soyer |
1@1 | 2@1 | 3@1 | 2@2 |
Well, we have to join these two data sets by name, and as misspeling is possible, I will use Hamming distance to find the most similar names in bucket. So, Hamming function is following:
private def stringDistance(s1: String, s2: String): Int = { def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) def sd(s1: List[Char], s2: List[Char]): Int = (s1, s2) match { case (_, Nil) => s1.length case (Nil, _) => s2.length case (c1::t1, c2::t2) => min( sd(t1,s2) + 1, sd(s1,t2) + 1, sd(t1,t2) + (if (c1==c2) 0 else 1) ) } sd( s1.toList, s2.toList ) }
The second things to do, is to define a set of functions that would be used for get rid of data:
/** * Fowler–Noll–Vo (FVN) hash function * @see ; * http://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function *
Now we have to create a set of this functions:
private val minHashFuns = new mutable.ArrayBuffer[ (Any) => Int ]() // array of minhash functions that were initialized with basic Seed values @transient private val rnd = new Random(2014) // the same seed is required to generate the same sequence on different machines private def populateMinHashes() = { for( i <- 1 to signatureSize*signatureGroups) { minHashFuns += ( LshHash(rnd.nextInt(), rnd.nextInt()) ) } }
And there is how we apply minhashes:
private def applyMinHashed[T <: font=""> NGramEnabled](rdd: RDD[T]): RDD[(String, T)] = { return rdd.flatMap { e => (0 until signatureGroups).by(1).map { i => Array(getMinHashSignatureAsStr(NGrams.getNGramms(e.getStringForNGram()), i), e) } }.map{ x => (x(0).asInstanceOf[String], x(1).asInstanceOf[T]) } } private def getMinHashSignatureAsStr(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): String = { return getMinHashSignature(tokens, signatureGroupNum).mkString("_") } private def getMinHashSignature(tokens: scala.collection.immutable.Set[String], signatureGroupNum: Int): Array[Int] = { val minHashValues = Array.fill[Int](signatureSize)(Int.MaxValue) // we don't need to hash the same token more then once, so will save all hashed tokens val uniqueTokens = new mutable.HashSet[String]() for(token <- font=""> tokens) { if( uniqueTokens.add(token) ) { // apply each LSH function to token for( j <- font=""> 0 until signatureSize ) { val currentHashValue = minHashFuns(signatureGroupNum*signatureSize + j)(token) if( currentHashValue < minHashValues(j) ) { minHashValues(j) = currentHashValue } } } } return minHashValues }
And now we are ready to merge all code and delivery solution for joining two RDDs:
def join(accounts: RDD[BankAccount], visitors: RDD[Visitor]): RDD[(Visitor, BankAccount)] = { /* In Scala, these operations are automatically available on RDDs containing Tuple2 objects (the built-in tuples in the language, created by simply writing (a, b)), as long as you import org.apache.spark.SparkContext._ in your program to enable Spark’s implicit conversions.*/ return applyMinHashed(accounts).join( applyMinHashed(visitors) ).map{ case (key, (account, visitor)) => (visitor, account) }.groupByKey() .map{ case (visitor, accounts) => { var closestAccount: BankAccount = null var bestEditDistance = Int.MaxValue for (a <- font=""> accounts) { val curEditDist = stringDistance(visitor.name, a.name) if (curEditDist < bestEditDistance) { bestEditDistance = curEditDist closestAccount = a } } (visitor, closestAccount) } } }
Final code to join two RDD and print result to console:
val acc2vis = service.join(accounts, visitors) for( (v,a) <- font=""> acc2vis.collect() ) { println( f"Visitor ${v.name}%s has score level ${a.score}%2.2f (${a.name}%s)" ) }
Good overview and can u please elaborate more on second step and 3rd steps
ВідповістиВидалитиLook like no where populateMinHashes() function is called in the above code. Did i miss something here.
ВідповістиВидалитиFind Girls Whatsapp Number
ВідповістиВидалитиWe can give you the details of mobile number of 22 countries of the world along with the name and home address and location of the owner. The list of countries whose mobile number details are available for you is provided below:
Australia, Bangladesh, Canada, China, Ghana, India, Indonesia, Kenya, Malaysia, Nepal, Norway, Nigeria, Philippines, Poland, Pakistan, Singapore, Saudi Arabia, South Africa, Sri Lanka, United Arab Emirates, United Kingdom, United States.