понеділок, 13 липня 2015 р.

How to waste the whole day with Spark Streaming and HBase

The "funny" story how to waste the whole day debugging resolving simple case... tips and tricks :)

Spark Streaming application hangs out on action and nothing is changing during hours.
The long story is: custom Receiver accept events from external source and store them to RDD (actually, DStream) for future processing. When I run it I noticed that action hung out! And what was a really scare: the messages were read from source. After spending couple hours trying to find the issue with Reciever, I realized it works fine and finally found the issue ... ... in how I run the job!

I did it in local environment first and submit it to YARN:
--num-executors 2
It fact, it doesn't work for me because no one worker (spark executor) was able to start! So, just by increasing number of executors to 3, I was able to make everything working.

HBase related Spark Streaming application hangs out and nothing is changing during hours.
Again, the long story is then Spark Streaming application hangs out as soon as it touch HBase. I spent several hours (again) and I was really surprised when found the reason: HBase connection was broken. OMG! I haven't seen any errors or warning related to HBase connection in logs... what is the reason? In fact, HBase tried to establish connection again and again without throwing an error. Consider the following a piece of code (grey - my original part, when blue - an update that helped me to overcome the issue):
     Configuration config = HBaseConfiguration.create();
  config.set(HConstants.ZOOKEEPER_QUORUM, "host:port");
  config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
  config.set("hbase.client.retries.number", Integer.toString(3));
  config.set("zookeeper.session.timeout", Integer.toString(60000));
  config.set("zookeeper.recovery.retry", Integer.toString(0));

It really helps because of the number of retries was limited. Default value is 35 and can definitely confuse.

вівторок, 9 червня 2015 р.

How to present XML in Hive flat table after XSLT transformation

Let's start from defining a task. Imaging that the dataset is a set of XML files and the requirement is to present some specific information from this file as simple flat structure. Let's illustrate:

Definetely, we can use SerDe for XML, but what if XML structure is not defined before hand and we want to give end-user a chance to control parsing process? One of possible solutions is to incorporate XSLT to transform XML to desired format.

пʼятниця, 30 січня 2015 р.

Demystify BloomFilter on Hadoop

I believe most of you have seen BloomFilter class. But how to correctly use it?

Accordint to Wikipedia, "Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a setFalse positive matches are possible, but false negatives are not, thus a Bloom filter has a 100% recall rate. In other words, a query returns either "possibly in set" or "definitely not in set"."

Also, I found this site wich give a very goo description of Bloom filter with perfect visualization, please check

As it is clear from Bloom filter definition, this datastructure can really help when we need to filter some records. Particularly, performing join: in this case we can transform small dataset into filter, and then apply filter on map stage in second MR, which perform a real join. In other words, we will have 2 MR when 1st is used for creating filter and 2nd is used to perform filtrtion on map and join on reduce.

Ok, first MepReduce contains 2 stages: mapper and reducer, because in result we should got exactly one Bloom filter object:

  1. initialize BloomFilter object as Mapper clas member: BloomFilter = new BloomFilter(10000, 10, hash.MURMUR_HASH)
  2. on each record, add it to filter: filter.add( new Key(str.getBytes()) );
  3. emmit data only in cleanup method, for example you can just write file withoutusing context at all

Your filter is prepared now, it can be desiarilized at any place and used for data filtration.

пʼятниця, 23 січня 2015 р.

Composite join with MapReduce

As everyone knows, map-side join is the most effective techniques to join datasets on Hadoop. However, at the same time it gives a possibility to join ONE BIG dataset and ONE OR MORE SAMLL datasets. This is the limitation, because sometimes you wish to join TWI HUGE datasets. Typically, this is the use case for reducer-side join, but it cause Cartesian product and obviously we would like to ommit so heavy operation.

And this is time for Composite join: map-side join on huge datasets. In fact, both datasets must meet several requirements in this case:

  1. The datasets are all sorted by the join key
  2. Each dataset has the same number of file (you can achive that by setting reducers number)
  3. File N in each dataset contains the same join key K
  4. Each file is not splitable
In this case you can perform map join to join block from dataset A versus block from dataset B. Hadoop API provides CompositeInputFormat to achive this requirement. Example of usage:

// in job configuration you have to set
// inner - reference to inner join (you can specify outer as well)
// d1, d2 - Path to both datasets
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, d1, d2));

The mapper with have key-value pair of type Text, TupleWritable:

public void map(Text key, TupleWritable value, Context ctx) {

Bonus: you can use this powerful feature with Hive! Composite join in Hive: To do that, the following hive properties must be set:

Ofcourse, it requires all the keys to be sorted in both tables and then must be bucketized in the same number of buckets

Kafka web console with Docker

My first Docker file aims to run Kafka Web Console (application for monitoring Apache Kafka):

FROM ubuntu:trusty

RUN apt-get update;  apt-get install -y unzip  openjdk-7-jdk wget git docker.io

RUN wget http://downloads.typesafe.com/play/2.2.6/play-2.2.6.zip
RUN unzip play-2.2.6.zip -d /tmp

RUN wget https://github.com/claudemamo/kafka-web-console/archive/master.zip
RUN unzip master.zip -d /tmp

WORKDIR /tmp/kafka-web-console-master

CMD ../play-2.2.6/play "start -DapplyEvolutions.default=true"

Dockerfile might be buid with command:
docker build -t kafka/web-console:2.0 .
and run as:
docker run -i -t -p 9000:9000 kafka/web-console:2.0

At the end, Kafka Web Console will be available at host:9000 - zookeeper hosts must be  added and Kafka brokers will be discovered aautomatically

вівторок, 11 листопада 2014 р.

Spark and Location Sensitive Hashing, part 2

This is a second part of topic about Locality Sensitive Hashing, and here is example of creating working example using Apache Spark.

Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:

Bank Accounts

Name Tom Soyer Andy Bin Tom Wiscor Tomas Soyér
Credit score 10 20 30 40

Web-site Visitors

Name Tom Soyer Andrew Bin Tom Viscor Thomas Soyer
email 1@1 2@1 3@1 2@2

пʼятниця, 7 листопада 2014 р.

Spark and Location Sensitive Hashing, part 1

Location Sensitive Hashing is the name of special algorithm designed to address complexity of BigData processing.

Let's consider the follwoing example: assume we have two independent systems, one is web-application that gets user's profile from social network, second system is online payment system. Our idea is merge profiles from social network and payment system. Of course, the social network user might not be presented in payment system at all, cerate accounts in different time and definetely we don't have foreign key to match them exactly. There are two possible issues:

  • there are two huge data sets that must be merged
  • an user's name might look different in social network  and payment system 
The naive approach is to compare social network user and payment system user names, calculate Hamming distance between them and pick up the most similar pair as successfuly matched. The biggest issue here is O(n2) complexity of this approach.

We want to minimize a number of comparison between two datasets. Hopefully, this issue was resolved by inventing Location Sensitive Hashing algorithm. Let's consider simple hashing: 
f(str) → x
we can calculate hashing function f on string (user name from profile) s and get integer x; then we need to compare Hamming distances only for strings which have the same x. The issue here is to pick up very good hashing function, which is almost impossible. Hopefully, we are not limited by one function: we can apply several/tens/hundreds hashing functions - in this case we would have data duplication, because one string would be assigned to several buckets (hash value). It would increase the number of useles comparisons, but at the some moment we would have a bigger chance to get succesful comparison.

However, it wouldn't work good enough, because names might have misprintings and using special lettern in social profile when only traditional latin in payments system or vice versa. n-grams and minhashing might come in handy in this situation. The main idea is to get all possible n-grams for string and apply minhashing algorithm to them. In result, we aims to get a set of new hash codes based on n-grams and make comparison of string that was placed into the same buckets based on these hashcodes.

Step by step algorithm is next:

  1. Define a collection of hash functions
  2. Calculate minhash function on n-gramm of profile by minhash algo
  3. Based on equals hashcodes get pairs of similar profiles from social and payment networks
  4. Calculate Hamming distance in pairs to select the most similar matching for each case

In next part: source code example and implementation over Apache Spark