пʼятниця, 30 січня 2015 р.

Demystify BloomFilter on Hadoop

I believe most of you have seen BloomFilter class. But how to correctly use it?

Accordint to Wikipedia, "Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a setFalse positive matches are possible, but false negatives are not, thus a Bloom filter has a 100% recall rate. In other words, a query returns either "possibly in set" or "definitely not in set"."

Also, I found this site wich give a very goo description of Bloom filter with perfect visualization, please check



As it is clear from Bloom filter definition, this datastructure can really help when we need to filter some records. Particularly, performing join: in this case we can transform small dataset into filter, and then apply filter on map stage in second MR, which perform a real join. In other words, we will have 2 MR when 1st is used for creating filter and 2nd is used to perform filtrtion on map and join on reduce.

Ok, first MepReduce contains 2 stages: mapper and reducer, because in result we should got exactly one Bloom filter object:

  1. initialize BloomFilter object as Mapper clas member: BloomFilter = new BloomFilter(10000, 10, hash.MURMUR_HASH)
  2. on each record, add it to filter: filter.add( new Key(str.getBytes()) );
  3. emmit data only in cleanup method, for example you can just write file withoutusing context at all

Your filter is prepared now, it can be desiarilized at any place and used for data filtration.


пʼятниця, 23 січня 2015 р.

Composite join with MapReduce

As everyone knows, map-side join is the most effective techniques to join datasets on Hadoop. However, at the same time it gives a possibility to join ONE BIG dataset and ONE OR MORE SAMLL datasets. This is the limitation, because sometimes you wish to join TWI HUGE datasets. Typically, this is the use case for reducer-side join, but it cause Cartesian product and obviously we would like to ommit so heavy operation.

And this is time for Composite join: map-side join on huge datasets. In fact, both datasets must meet several requirements in this case:

  1. The datasets are all sorted by the join key
  2. Each dataset has the same number of file (you can achive that by setting reducers number)
  3. File N in each dataset contains the same join key K
  4. Each file is not splitable
In this case you can perform map join to join block from dataset A versus block from dataset B. Hadoop API provides CompositeInputFormat to achive this requirement. Example of usage:


// in job configuration you have to set
job.setInputFormatClass(CompositeInputFormat.class);
// inner - reference to inner join (you can specify outer as well)
// d1, d2 - Path to both datasets
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, d1, d2));
job.setNumReduceTasks(0);



The mapper with have key-value pair of type Text, TupleWritable:

@Override
public void map(Text key, TupleWritable value, Context ctx) {
  ...
}


Bonus: you can use this powerful feature with Hive! Composite join in Hive: To do that, the following hive properties must be set:
hive.input.format=org.apache.hadoop.give.ql.io.BucketizedHiveInputFormat;
hive.optimize.bucketmapjoin=truel
hive.optimize.bucketmapjoin.sortedmerge=true;


Ofcourse, it requires all the keys to be sorted in both tables and then must be bucketized in the same number of buckets

Kafka web console with Docker

My first Docker file aims to run Kafka Web Console (application for monitoring Apache Kafka):


FROM ubuntu:trusty

RUN apt-get update;  apt-get install -y unzip  openjdk-7-jdk wget git docker.io

RUN wget http://downloads.typesafe.com/play/2.2.6/play-2.2.6.zip
RUN unzip play-2.2.6.zip -d /tmp

RUN wget https://github.com/claudemamo/kafka-web-console/archive/master.zip
RUN unzip master.zip -d /tmp

WORKDIR /tmp/kafka-web-console-master

CMD ../play-2.2.6/play "start -DapplyEvolutions.default=true"


Dockerfile might be buid with command:
docker build -t kafka/web-console:2.0 .
and run as:
docker run -i -t -p 9000:9000 kafka/web-console:2.0

At the end, Kafka Web Console will be available at host:9000 - zookeeper hosts must be  added and Kafka brokers will be discovered aautomatically