пʼятниця, 23 січня 2015 р.

Composite join with MapReduce

As everyone knows, map-side join is the most effective techniques to join datasets on Hadoop. However, at the same time it gives a possibility to join ONE BIG dataset and ONE OR MORE SAMLL datasets. This is the limitation, because sometimes you wish to join TWI HUGE datasets. Typically, this is the use case for reducer-side join, but it cause Cartesian product and obviously we would like to ommit so heavy operation.

And this is time for Composite join: map-side join on huge datasets. In fact, both datasets must meet several requirements in this case:

  1. The datasets are all sorted by the join key
  2. Each dataset has the same number of file (you can achive that by setting reducers number)
  3. File N in each dataset contains the same join key K
  4. Each file is not splitable
In this case you can perform map join to join block from dataset A versus block from dataset B. Hadoop API provides CompositeInputFormat to achive this requirement. Example of usage:


// in job configuration you have to set
job.setInputFormatClass(CompositeInputFormat.class);
// inner - reference to inner join (you can specify outer as well)
// d1, d2 - Path to both datasets
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, d1, d2));
job.setNumReduceTasks(0);



The mapper with have key-value pair of type Text, TupleWritable:

@Override
public void map(Text key, TupleWritable value, Context ctx) {
  ...
}


Bonus: you can use this powerful feature with Hive! Composite join in Hive: To do that, the following hive properties must be set:
hive.input.format=org.apache.hadoop.give.ql.io.BucketizedHiveInputFormat;
hive.optimize.bucketmapjoin=truel
hive.optimize.bucketmapjoin.sortedmerge=true;


Ofcourse, it requires all the keys to be sorted in both tables and then must be bucketized in the same number of buckets

Kafka web console with Docker

My first Docker file aims to run Kafka Web Console (application for monitoring Apache Kafka):


FROM ubuntu:trusty

RUN apt-get update;  apt-get install -y unzip  openjdk-7-jdk wget git docker.io

RUN wget http://downloads.typesafe.com/play/2.2.6/play-2.2.6.zip
RUN unzip play-2.2.6.zip -d /tmp

RUN wget https://github.com/claudemamo/kafka-web-console/archive/master.zip
RUN unzip master.zip -d /tmp

WORKDIR /tmp/kafka-web-console-master

CMD ../play-2.2.6/play "start -DapplyEvolutions.default=true"


Dockerfile might be buid with command:
docker build -t kafka/web-console:2.0 .
and run as:
docker run -i -t -p 9000:9000 kafka/web-console:2.0

At the end, Kafka Web Console will be available at host:9000 - zookeeper hosts must be  added and Kafka brokers will be discovered aautomatically

вівторок, 11 листопада 2014 р.

Spark and Location Sensitive Hashing, part 2

This is a second part of topic about Locality Sensitive Hashing, and here is example of creating working example using Apache Spark.

Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:

Bank Accounts

Name Tom Soyer Andy Bin Tom Wiscor Tomas Soyér
Credit score 10 20 30 40

Web-site Visitors

Name Tom Soyer Andrew Bin Tom Viscor Thomas Soyer
email 1@1 2@1 3@1 2@2

пʼятниця, 7 листопада 2014 р.

Spark and Location Sensitive Hashing, part 1

Location Sensitive Hashing is the name of special algorithm designed to address complexity of BigData processing.



Let's consider the follwoing example: assume we have two independent systems, one is web-application that gets user's profile from social network, second system is online payment system. Our idea is merge profiles from social network and payment system. Of course, the social network user might not be presented in payment system at all, cerate accounts in different time and definetely we don't have foreign key to match them exactly. There are two possible issues:

  • there are two huge data sets that must be merged
  • an user's name might look different in social network  and payment system 
The naive approach is to compare social network user and payment system user names, calculate Hamming distance between them and pick up the most similar pair as successfuly matched. The biggest issue here is O(n2) complexity of this approach.

We want to minimize a number of comparison between two datasets. Hopefully, this issue was resolved by inventing Location Sensitive Hashing algorithm. Let's consider simple hashing: 
f(str) → x
we can calculate hashing function f on string (user name from profile) s and get integer x; then we need to compare Hamming distances only for strings which have the same x. The issue here is to pick up very good hashing function, which is almost impossible. Hopefully, we are not limited by one function: we can apply several/tens/hundreds hashing functions - in this case we would have data duplication, because one string would be assigned to several buckets (hash value). It would increase the number of useles comparisons, but at the some moment we would have a bigger chance to get succesful comparison.

However, it wouldn't work good enough, because names might have misprintings and using special lettern in social profile when only traditional latin in payments system or vice versa. n-grams and minhashing might come in handy in this situation. The main idea is to get all possible n-grams for string and apply minhashing algorithm to them. In result, we aims to get a set of new hash codes based on n-grams and make comparison of string that was placed into the same buckets based on these hashcodes.

Step by step algorithm is next:

  1. Define a collection of hash functions
  2. Calculate minhash function on n-gramm of profile by minhash algo
  3. Based on equals hashcodes get pairs of similar profiles from social and payment networks
  4. Calculate Hamming distance in pairs to select the most similar matching for each case

In next part: source code example and implementation over Apache Spark

пʼятниця, 10 жовтня 2014 р.

Tuning the MapReduce job

java.lang.OutOfMemoryError: GC overhead limit exceeded
 that's what I got yesterday while running my new shining MapReduce job.

OutOfMemory in java has different reasons: no more memory available, or GC was called to often (my case), no more free PermGem space, etc.

To get more information, about JVM internals we have to tune JVM runing. I'm using Hortonworks distribution, so I went to Ambari, MapReduce configuration tab and found mapreduce.reduce.java.opts  This property is responsible for reducer's JVM configuration. Let's add GarbageCollector loggining
-verbose:gc -Xloggc:/tmp/@taskid@.gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
We set up to write GC log to local filesystem in folder tmp, file name - taskId + gc extension.

In general, the following properties are important for JVM tuning:

  • mapred.child.java.opts - Provides JVM options to pass to map and reduce tasks. Usually includes the -Xmx option to specify the maximum heap size. May also specify -Xms to specify the start heap size. 
  • mapreduce.map.java.opts - Overrides mapred.child.java.opts for map tasks.
  • mapreduce.reduce.java.opts - Overrides mapred.child.java.opts for reduce tasks.
After entering new value for property, the MapReduce service must be restarted (Hortonworks reming with yellow button "Restart"). Only after restart changes woulb be applied. Next step is to run map reduce job, and in result the logs per task woulb be placed into tmp folder on each node.

It'a but diffiulty to read the log, but hopefulyl several UI tools exist on the market. i prefer the open sourced GCViewer, which is java application and doesn't require instalation. It supports wide range of JVM, moreove it has command line interface for generation reports - so automation for getting reports might be applied.

The open GC log gets the detail overview of memory state:

Legend:

  • Green line that shows the length of all GCs
  • Magenta area that shows the size of the tenured generation (not available without PrintGCDetails)
  • Orange area that shows the size of the young generation (not available without PrintGCDetails)
  • Blue line that shows used heap size

четвер, 9 жовтня 2014 р.

Unit test for Hive query

Sometimes the soul wants something really extraordinaly... for example, to write a unit test for Hive query :)

Let's how it is possible step be step. So, to write unit test for Hive:

First of all, the local hive instance must be run, and for that we need local metastor (I propose Apache Derby) and directories for temporary data, logs, etc. As all configuration will be read from system properties, I didn't find beter way then set up all of them programaticaly...
Be shure to create all mentioned directories before starting Hive, for example with google Guava:

FileUtils.forceMkdir(HIVE_BASE_DIR);

And after then register all of them in system environment:

        System.setProperty("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + HIVE_METADB_DIR.getAbsolutePath() + ";create=true");
        System.setProperty("hive.metastore.warehouse.dir", HIVE_WAREHOUSE_DIR.getAbsolutePath());
        System.setProperty("hive.exec.scratchdir", HIVE_SCRATCH_DIR.getAbsolutePath());
        System.setProperty("hive.exec.local.scratchdir", HIVE_LOCAL_SCRATCH_DIR.getAbsolutePath());
        System.setProperty("hive.metastore.metadb.dir", HIVE_METADB_DIR.getAbsolutePath());
        System.setProperty("test.log.dir", HIVE_LOGS_DIR.getAbsolutePath());
        System.setProperty("hive.querylog.location", HIVE_TMP_DIR.getAbsolutePath());
        System.setProperty("hadoop.tmp.dir", HIVE_HADOOP_TMP_DIR.getAbsolutePath());
        System.setProperty("derby.stream.error.file", HIVE_BASE_DIR.getAbsolutePath() + sep + "derby.log");

After that, the local hive executor might be started:
HiveInterface client = new HiveServer.HiveServerHandler();

In fact, we are ready in this moment. Now I propose to create a Hive table, load data into it and perform some queries. The best practice in Java world is to put all metadata/data for test in separate file, so I put them under resources directory in this example, and here is reading from resource text files:
client.execute(readResourceFile("/Example/table_ddl.hql"));
client.execute("LOAD DATA LOCAL INPATH '" +
                this.getClass().getResource("Example/data.csv").getPath() + "' OVERWRITE  INTO TABLE " + tableName);

Ok, now data in the table and Hive knows about them. Let's perform a query:
client.execute("select sum(revenue), avg(revenue) from " + tableName + " group by state");

Even more, we can register custom function and test it!
client.execute("ADD JAR " + HIVE_BASE_DIR.getAbsolutePath() + jar.getAbsoluteFile());
client.execute("CREATE TEMPORARY FUNCTION TempFun as 'org.my.example.MainFunClass'");

And after that we can call fresh function:
client.execute("select TempFun(revenue) from " + tableName);
String revenueProcessed = client.fetchOne();

понеділок, 18 серпня 2014 р.

Writing in ElasticSearch directly from Hadoop MapReduce

ElasticSearch is a hot topic today. This is powerful open source search and analytics engine that makes data easy to explore. Several times I faced with data populating into ElasticSearch after Hadoop jobs completion.  A couple years it was non trivial issue that requires using binary ElasticSearch client and publishing data manually. Hopefully, there is already support by EalsticSearch for Hadoop today.

Let's see how it might be done with a simplest case: we have to put JSON formatted data into ElasticSearch for further analysis. So, our purpose is to write Map-only job that will populate ElasticSearch with data from text file (already in JSON).

First of all, let configure Configuration object:

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.resource", "emailIndex/email"); // intex/type
        conf.set("es.nodes", "192.168.12.04"); // host
        conf.set("es.port", "11000"); // port
        conf.set("es.input.json", "yes");

I guess, everything is clear here.

Very important is to set up correct output format, pay attention on register:


        // Set input and output format classes
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);

        // Specify the type of output keys and values
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

 After that we will implement Mapper (it emits only value, without key - this behavior is required by ES output format class!):


public static class EmailToEsMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, Text> {
        private Text output = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String email = value.toString();
            output.set(email)

            context.write(NullWritable.get(), output);          
        }

    }

Let's back to the second code snippet. There is EsOutputFormat, pay attention on register, because there is old deprecated API with ESOutputFormat class.It might be required to add exclusion to Maven file, to pull correct versions of jars and omit dependencies hell:



        
            org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            1.3.0.M2
yarn cascading</groupId> cascading-hadoop cascading cascading-local </exclusion> org.apache.pig pig org.apache.hive hive-service