вівторок, 11 листопада 2014 р.

Spark and Location Sensitive Hashing, part 2

This is a second part of topic about Locality Sensitive Hashing, and here is example of creating working example using Apache Spark.

Let's start from definition of task: there are two datasets - bank accounts and web-site visitors. In common, they have only name, but it's possible misspeling. Let's consider the following example:

Bank Accounts

Name Tom Soyer Andy Bin Tom Wiscor Tomas Soyér
Credit score 10 20 30 40

Web-site Visitors

Name Tom Soyer Andrew Bin Tom Viscor Thomas Soyer
email 1@1 2@1 3@1 2@2

пʼятниця, 7 листопада 2014 р.

Spark and Location Sensitive Hashing, part 1

Location Sensitive Hashing is the name of special algorithm designed to address complexity of BigData processing.

Let's consider the follwoing example: assume we have two independent systems, one is web-application that gets user's profile from social network, second system is online payment system. Our idea is merge profiles from social network and payment system. Of course, the social network user might not be presented in payment system at all, cerate accounts in different time and definetely we don't have foreign key to match them exactly. There are two possible issues:

  • there are two huge data sets that must be merged
  • an user's name might look different in social network  and payment system 
The naive approach is to compare social network user and payment system user names, calculate Hamming distance between them and pick up the most similar pair as successfuly matched. The biggest issue here is O(n2) complexity of this approach.

We want to minimize a number of comparison between two datasets. Hopefully, this issue was resolved by inventing Location Sensitive Hashing algorithm. Let's consider simple hashing: 
f(str) → x
we can calculate hashing function f on string (user name from profile) s and get integer x; then we need to compare Hamming distances only for strings which have the same x. The issue here is to pick up very good hashing function, which is almost impossible. Hopefully, we are not limited by one function: we can apply several/tens/hundreds hashing functions - in this case we would have data duplication, because one string would be assigned to several buckets (hash value). It would increase the number of useles comparisons, but at the some moment we would have a bigger chance to get succesful comparison.

However, it wouldn't work good enough, because names might have misprintings and using special lettern in social profile when only traditional latin in payments system or vice versa. n-grams and minhashing might come in handy in this situation. The main idea is to get all possible n-grams for string and apply minhashing algorithm to them. In result, we aims to get a set of new hash codes based on n-grams and make comparison of string that was placed into the same buckets based on these hashcodes.

Step by step algorithm is next:

  1. Define a collection of hash functions
  2. Calculate minhash function on n-gramm of profile by minhash algo
  3. Based on equals hashcodes get pairs of similar profiles from social and payment networks
  4. Calculate Hamming distance in pairs to select the most similar matching for each case

In next part: source code example and implementation over Apache Spark

пʼятниця, 10 жовтня 2014 р.

Tuning the MapReduce job

java.lang.OutOfMemoryError: GC overhead limit exceeded
 that's what I got yesterday while running my new shining MapReduce job.

OutOfMemory in java has different reasons: no more memory available, or GC was called to often (my case), no more free PermGem space, etc.

To get more information, about JVM internals we have to tune JVM runing. I'm using Hortonworks distribution, so I went to Ambari, MapReduce configuration tab and found mapreduce.reduce.java.opts  This property is responsible for reducer's JVM configuration. Let's add GarbageCollector loggining
-verbose:gc -Xloggc:/tmp/@taskid@.gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
We set up to write GC log to local filesystem in folder tmp, file name - taskId + gc extension.

In general, the following properties are important for JVM tuning:

  • mapred.child.java.opts - Provides JVM options to pass to map and reduce tasks. Usually includes the -Xmx option to specify the maximum heap size. May also specify -Xms to specify the start heap size. 
  • mapreduce.map.java.opts - Overrides mapred.child.java.opts for map tasks.
  • mapreduce.reduce.java.opts - Overrides mapred.child.java.opts for reduce tasks.
After entering new value for property, the MapReduce service must be restarted (Hortonworks reming with yellow button "Restart"). Only after restart changes woulb be applied. Next step is to run map reduce job, and in result the logs per task woulb be placed into tmp folder on each node.

It'a but diffiulty to read the log, but hopefulyl several UI tools exist on the market. i prefer the open sourced GCViewer, which is java application and doesn't require instalation. It supports wide range of JVM, moreove it has command line interface for generation reports - so automation for getting reports might be applied.

The open GC log gets the detail overview of memory state:


  • Green line that shows the length of all GCs
  • Magenta area that shows the size of the tenured generation (not available without PrintGCDetails)
  • Orange area that shows the size of the young generation (not available without PrintGCDetails)
  • Blue line that shows used heap size

четвер, 9 жовтня 2014 р.

Unit test for Hive query

Sometimes the soul wants something really extraordinaly... for example, to write a unit test for Hive query :)

Let's how it is possible step be step. So, to write unit test for Hive:

First of all, the local hive instance must be run, and for that we need local metastor (I propose Apache Derby) and directories for temporary data, logs, etc. As all configuration will be read from system properties, I didn't find beter way then set up all of them programaticaly...
Be shure to create all mentioned directories before starting Hive, for example with google Guava:


And after then register all of them in system environment:

        System.setProperty("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + HIVE_METADB_DIR.getAbsolutePath() + ";create=true");
        System.setProperty("hive.metastore.warehouse.dir", HIVE_WAREHOUSE_DIR.getAbsolutePath());
        System.setProperty("hive.exec.scratchdir", HIVE_SCRATCH_DIR.getAbsolutePath());
        System.setProperty("hive.exec.local.scratchdir", HIVE_LOCAL_SCRATCH_DIR.getAbsolutePath());
        System.setProperty("hive.metastore.metadb.dir", HIVE_METADB_DIR.getAbsolutePath());
        System.setProperty("test.log.dir", HIVE_LOGS_DIR.getAbsolutePath());
        System.setProperty("hive.querylog.location", HIVE_TMP_DIR.getAbsolutePath());
        System.setProperty("hadoop.tmp.dir", HIVE_HADOOP_TMP_DIR.getAbsolutePath());
        System.setProperty("derby.stream.error.file", HIVE_BASE_DIR.getAbsolutePath() + sep + "derby.log");

After that, the local hive executor might be started:
HiveInterface client = new HiveServer.HiveServerHandler();

In fact, we are ready in this moment. Now I propose to create a Hive table, load data into it and perform some queries. The best practice in Java world is to put all metadata/data for test in separate file, so I put them under resources directory in this example, and here is reading from resource text files:
client.execute("LOAD DATA LOCAL INPATH '" +
                this.getClass().getResource("Example/data.csv").getPath() + "' OVERWRITE  INTO TABLE " + tableName);

Ok, now data in the table and Hive knows about them. Let's perform a query:
client.execute("select sum(revenue), avg(revenue) from " + tableName + " group by state");

Even more, we can register custom function and test it!
client.execute("ADD JAR " + HIVE_BASE_DIR.getAbsolutePath() + jar.getAbsoluteFile());
client.execute("CREATE TEMPORARY FUNCTION TempFun as 'org.my.example.MainFunClass'");

And after that we can call fresh function:
client.execute("select TempFun(revenue) from " + tableName);
String revenueProcessed = client.fetchOne();

понеділок, 18 серпня 2014 р.

Writing in ElasticSearch directly from Hadoop MapReduce

ElasticSearch is a hot topic today. This is powerful open source search and analytics engine that makes data easy to explore. Several times I faced with data populating into ElasticSearch after Hadoop jobs completion.  A couple years it was non trivial issue that requires using binary ElasticSearch client and publishing data manually. Hopefully, there is already support by EalsticSearch for Hadoop today.

Let's see how it might be done with a simplest case: we have to put JSON formatted data into ElasticSearch for further analysis. So, our purpose is to write Map-only job that will populate ElasticSearch with data from text file (already in JSON).

First of all, let configure Configuration object:

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.resource", "emailIndex/email"); // intex/type
        conf.set("es.nodes", ""); // host
        conf.set("es.port", "11000"); // port
        conf.set("es.input.json", "yes");

I guess, everything is clear here.

Very important is to set up correct output format, pay attention on register:

        // Set input and output format classes

        // Specify the type of output keys and values

 After that we will implement Mapper (it emits only value, without key - this behavior is required by ES output format class!):

public static class EmailToEsMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, Text> {
        private Text output = new Text();

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String email = value.toString();

            context.write(NullWritable.get(), output);          


Let's back to the second code snippet. There is EsOutputFormat, pay attention on register, because there is old deprecated API with ESOutputFormat class.It might be required to add exclusion to Maven file, to pull correct versions of jars and omit dependencies hell:

yarn cascading</groupId> cascading-hadoop cascading cascading-local </exclusion> org.apache.pig pig org.apache.hive hive-service

середа, 13 серпня 2014 р.

Geo Coordinates converting

I've made discovery working on the last task: could you imagine that there are many many many geographical coordinate systems in the world? I couldn't. I was pretty sure that there is only one: longitude and latitude.

Surprise! There are much more of them and they are widely popular. Some of them are used in particular domain, some of them are specific for some countries. For example, you can read more about Gauss–Krüger coordinate system.

import org.geotools.geometry.GeneralDirectPosition;
import org.geotools.referencing.CRS;
import org.opengis.geometry.DirectPosition;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.NoSuchAuthorityCodeException;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
import org.opengis.referencing.operation.MathTransform;
import org.opengis.referencing.operation.TransformException;

public strictfp double[] translate(String from, String to, double x, double y)
            throws FactoryException, NoSuchAuthorityCodeException, TransformException {

        CoordinateReferenceSystem sourceCRS = CRS.decode( from );
        CoordinateReferenceSystem targetCRS = CRS.decode( to );

        MathTransform transform = CRS.findMathTransform(sourceCRS, targetCRS, true);

        DirectPosition expPt = new GeneralDirectPosition(x, y);
        expPt = transform.transform(expPt, null);
        return expPt.getCoordinate();

Ok, it looks good. One time consuming issue - it's to include correct libraries with Maven, because this small piece of code has very wide dependencies and it took several hours to manage correct combination :)

So, maven dependencies:


четвер, 24 липня 2014 р.

Hadoop 2.2 Distributed Cache and Map Join

It's very common to use Distributed Cache for Map joins - it gives a possibility to implement extremely fast join of huge dataset with a small one(s). Comparing to other join techniques you can win up to 1000x speed up, so Map joins are extremely useful and widely used. It's the easiest way to implement outer join, non-equie join and so on, I'd recommend to use Map join always when it is possible.

What is bad about Hadoop and I don't like it - they change API very often, each new version has changes in API. The most weird example: interface Mapper. It was introduces, then deprecated and then dedepricated (in Hadoop 2 it's without @Deprecated)... oh, quite difficult to manage all changes...

The last changes:  DistributedCache is now deprecated. And you can't use the old good DistributedCache.addCacheFile

In the new Hadoop 2.x the new approach introduced:
1) add file to distributed cache (I'm using symlink here):
job.addCacheFile(new URI(conf.get("dimension.file")+"#YOUR_DIM"));

2) in your setup method (Mapper or Reducer) the data from cache might be read with following instruction:
Path[] files = context.getLocalCacheFiles(); // oh, this method is again deprecated ym_-)

// loop over all files in cache
for (Path p : files) {
    if (p.getName().equals("YOUR_DIM")) {
         // load cache (for example into Map)

That's all, symlink are very useful for accessing file from cache.

четвер, 3 липня 2014 р.

Runing Spark Unit Test on Windows 7

It's common situation in enterprises when developers are working on Windows platform. When you are working with Hadoop, it sounds as a f**ing shit, but this is a fact.

Recently, I switched in a favor of Spark instead of traditional MapReduce paradigm and was need to implement some kind of unit/integration testing... of course, it was need to work under Windows 7.

I've written very simple test: run ETL in-memory, without touching Hadoop at all (in future, I'd like to read input from local filesystem):

def testETL() = {
    val conf = new SparkConf()
    val sc = new SparkContext("local", "test", conf)
    try {
        val etl = new IxtoolsDailyAgg() // empty constructor

        val data = sc.parallelize(List("in1", "in2", "in3"))

        etl.etl(data) // rdd transformation, no access to SparkContext or Hadoop
    } finally {
        if(sc != null)

Bum! I got exception:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
 at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
 at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
 at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
 at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
 at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
 at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
 at org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36)
 at org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109)
 at org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:228)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:97)

I swear, I didn't use Hadoop in my code!
Unfortunately, Hadoop configuration is initialized together with SparkContext :( no way to omit it...
I was recommended to install HDP on Windows, but I hate this idea...

I tried the most stupid idea - provide winutils.exe... I hope, it's only the check of environment and Hadoop functionality won't be used if I don't touch it.
So, I downloaded winutils.exe from msdn (msdn still helpful even for hadooper), put it to created directory d:\winutil\bin and then add
System.setProperty("hadoop.home.dir", "d:\\winutil\\") 
at the beginning of my unit test

четвер, 24 квітня 2014 р.

Hue Notifier for Hadoop goes wild

Several months ago I developed Chrome browser plugin for my own needs. As a Hadoop engineer I faced with one problem everyday. I run a lot of Hive/Pig jobs simultaneously and they take a lot of time (from several minutes to several hours). So, I had mission to check job completion by walking Hue's pages in my browser. Well, it was 1) irritate, 2) draw away from coding...

As solution I developed Hue Notifier for Hadoop plugin for Google Chrome. It "monitors" state of job and inform you about completion similar to GMail informs about new mail (pop-up over all windows). I have a quite limited knowledge of JavaScript and it has been first time I wrote browser plugin... so, I'm absolutely sure it might be improved. I tested it with Hue delivered with Cloudera 4.3 and Cloudera 5 as well as HDP2.0. The most irritating issue w/ my code: Chrome Notification must be enabled manually before start using plugin :(

The source code is generally available at GitHub under this repository. You are welcome to fork and improve this one. Or, if you wish just to contribute, ping me and I will grant access (and push changes to Google Play afterwards).

пʼятниця, 18 квітня 2014 р.

Building BuilData ETL with Hive and Oozie

Perhaps, Hive is the most successful component of today's Hadoop infrastructure. It provides simple and efficient way of creating Hadoop-based data processing jobs with comfortable SQL-like language. But, in contract to Pig, it's not a workflow-friendly language and requires additional effort to create a real multi-step ETL.
Oozie was created to eliminate workflow/scheduling issues and, obvious, may be used to create ETL and naturally engages Hive.

вівторок, 1 квітня 2014 р.

Spark on HDP2

There is my first experience with Apache Spark, running it on Hadoop. I faced in several issues during running my piece of code.
To be honest, I started with Cloudera CDH5 distribution, they promised Spark was already added and usage will be simple. But no luck in fact, it doesn't work at all - even on local machine with their spark-cloudera jar. I didn't want to waste my time, so I just downloaded spark distro to HDP2.
First of all, let start Spark in standalone mode, according to documentation:
# start master

# pick up in the log output spark://IP:PORT
# and than run worker on each node
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

# more documentation available here https://spark.apache.org/docs/0.9.0/spark-standalone.html

After that I wrote some amount of Scala code, in fact to just count hardcoded words in document:

package experiment

import org.apache.spark.{SparkConf, SparkContext}

object SimpleApp {

  def main(args: Array[String]) {
    val logFile = args(0)  
  val conf = new SparkConf()
      .setAppName("My Spark application")
      .set("spark.executor.memory", "1g")
  val sc = new SparkContext(conf)

  // hdfs:///user/hue/input.txt
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("London")).count()
    val numBs = logData.filter(line =>; line.contains("Lviv")).count()
    println("Lines with London: %s, Lines with Lviv: %s".format(numAs, numBs))

четвер, 20 березня 2014 р.

XQuery on Hadoop

Java is mother language for the most of Hadoop engineers. In recent years, Python became popular, R is used by data scientist on Hadoop. Pig Latin and HiveQL is de-facto the mainstream languages for Hadoop now days. Oracle decided to not stop on that and gives possibility to write MapReduce jobs in XQuery! Unbelievable, xml-fans must be happy :)

Let's review simple example.

First of all, Oracle BigData Lite VM must be downloaded (for free, but it takes 25Gb on disk).

After installation, test dataset must be create. I put 2 files to directory on HDFS /user/oracle/xquery/input with sample dataset about access to website. The example of content is:
2013-10-28T06:00:00, chrome, index.html, 200
2013-10-28T08:30:02, firefox, index.html, 200
2013-10-28T08:32:50, ie9, about.html, 200

Next step: create XQuery script (my_xquery.xq) to process data (simple grouping by date of visiting page)

import module "oxh:text";

for $line in text:collection("/user/oracle/xquery/input/*.txt")
let $split := fn:tokenize($line, "\s*,\s*")
let $time := xs:dateTime($split[1])
let $day := xs:date($time)
group by $day
return text:put($day || ", " || fn:count($line))

Now script is ready to be run, execute from command line:
hadoop jar $OXH_HOME/lib/oxh.jar my_xquery.xq -output /user/oracle/xquery/output -clean -ls

-output specify output directory
-clean remove output directory if exists
-ls list the content of output directory after run

Here is the result:

That's it, XQuery was translated to MapReduce (similar to Pig Latin or HiveQL). This functionality is the part of Oracle BigData Connectors for Hadoop and more information with examples might be read here

середа, 19 лютого 2014 р.

How to write good unit test for Hadoop MapReduce?

Without a doubt, there is avery common situation when UnitTest (or IntegrationTest) is required to test functionality of MapReduce job. This approach perfect fit TDD, moreover, it gives opportunity to develop MapReduce jobs faster, because there is no needs to redeploy jar on a cluster each time and debugging is easy to use.

The first line of defence is MRUnit. Great framework for unit testing, input/output format independent with possibility to run/test map and reduce functions separately. Unfortunately, this framework has a several meaningful drawbacks. For example, no access to MR counters, or during the MR test only one Mapper allowed.

Local execution mode may be used to overcome MRUnit limitations or create integration test for mapreduce job. Let's assume there is runnable MapReduce tool with several input sources (mappers) and reducer:

public class ExampleMrDriver extends Configured implements Tool {

 public  Job createMRJob(Configuration conf) throws IOException {...}

    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = createMRJob(conf);
        return job.waitForCompletion(true) ? 0 : -1;

 public static void main(String[] args) {
        try {
         // run job in a Oozie-friendly manner
            int status = ToolRunner.run(new ExampleMrDriver(), args);
            if(status!=0) {
        } catch (Exception e) {


Nice integration test (or unit, call and use it as you like) for this Hadoop MapReduce a listed bellow:

private String outputDir;

public void createTmpDir() throws IOException {
    outputDir = System.getProperty("java.io.tmpdir"); + "output";

public void test() throws Exception {
    JobConf jobConf = new JobConf();
    jobConf.set("fs.default.name", "file:///"); 
    jobConf.set("mapred.job.tracker", "local"); // local mode
    jobConf.set("mapred.reduce.task", "1"); // only one file is required in output

    // create file w/ input content per mapper in test/resource folder
    jobConf.set("input.dir.2", this.getClass().getResource("/mr/inpu1").getPath());
    jobConf.set("input.dir.1", this.getClass().getResource("/mr/input2").getPath());
    jobConf.set("input.dir.3", this.getClass().getResource("/mr/input3").getPath());
    // expected output will be placed here
    jobConf.set("output.dir", outputDir);

    ExampleMrDriver driver = new ExampleMrDriver();
    int exitCode = driver.run(new String[]{});

    Assert.assertEquals(0, exitCode);

    // check content of output file, counters, etc

public void tearDown() throws IOException {
    new File(outputDir).delete();

четвер, 16 січня 2014 р.

Predicted Age of Abalone based on physical measurements

Abalone dataset is freely available at UCI Machine Learning Repository since 1995. It contains result of abalone research in Australia. Predicting the age of abalone from physical measurements. The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope - a boring and time-consuming task. Other measurements, which are easier to obtain, are used to predict the age. Definitely, the task is more complex in the real conditions and further information, such as weather patterns and location (hence food availability) may be required to solve the problem.

So, Age ~ Rings and must be predicted from the set of different measures as Diameter, Weight, Height, Length, etc. It is supervised learning task, because of the dataset with relation Result~Features is provided. Simple check shows numbers of rings from 1 to 29 and it is huge range for classification. Another supervised learning algorithm is a linear regression. 

EDA (exploratory data analysis) is a first step before building any model and there is the code for loading dataset into memory and plotting several relations, for example Rings~Diameter

# read dataset from local file
abalone <- read.csv("/Users/kostya/Downloads/abalone.data.csv", header=F)
# set names for dataframe columns
colnames(abalone) <- c('Sex', 'Length', 'Diameter', 'Height', 'WholeWeight', 'ShuckedWeight',
                    'VisceraWeight', 'ShellWeight', 'Rings')
# plot histogram
hist(abalone$Rings, freq=F)
# depicture all charts on one plot
qplot(Diameter, Rings, data=abalone, geom=c("point", "smooth"), method="lm", color=Sex, se=F)

This image (as well as other relations like Rings~WholeWeight, etc) shows pretty well difference relations for each sex and the first thought is to apply different regression for each 'sex' or use 'sex' as a factor.

For example, go on with different regression models, we need to construct formula by investigating each relations. For example, there is Rings~WholeWeight relation 

# plot each sex on different plot
ggplot(abalone, aes(VisceraWeight, Rings)) + 
  geom_jitter(alpha=0.25) + 
  geom_smooth(method=lm, se=FALSE) +
  facet_grid(. ~ Sex)

Obvious, that for Male and Infant relations has logarithmic trend and it will be logically to add 'log' in formula. 

           +Height+VisceraWeight, data=subset(abalone, Sex %in% 'I'))  )
           +Height+VisceraWeight, data=subset(abalone, Sex %in% 'M'))  )
           +Height+VisceraWeight, data=subset(abalone, Sex %in% 'F'))  )

As result the next formula may be constructed to predict number of rings for Infant based on coefficient of linear regression:
Rings= 8.5398 - 7.6755*Length + 8.7707*Diameter^2 + 1.4837*log(WholeWeight) + 2.0745*log((ShellWeight) -2.3415*log(ShuckedWeight) + 27.8275*Height + 5.9972*VisceraWeight

As was mentioned in task description Age=Rings+1.5