понеділок, 18 серпня 2014 р.

Writing in ElasticSearch directly from Hadoop MapReduce

ElasticSearch is a hot topic today. This is powerful open source search and analytics engine that makes data easy to explore. Several times I faced with data populating into ElasticSearch after Hadoop jobs completion.  A couple years it was non trivial issue that requires using binary ElasticSearch client and publishing data manually. Hopefully, there is already support by EalsticSearch for Hadoop today.

Let's see how it might be done with a simplest case: we have to put JSON formatted data into ElasticSearch for further analysis. So, our purpose is to write Map-only job that will populate ElasticSearch with data from text file (already in JSON).

First of all, let configure Configuration object:

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.resource", "emailIndex/email"); // intex/type
        conf.set("es.nodes", "192.168.12.04"); // host
        conf.set("es.port", "11000"); // port
        conf.set("es.input.json", "yes");

I guess, everything is clear here.

Very important is to set up correct output format, pay attention on register:


        // Set input and output format classes
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);

        // Specify the type of output keys and values
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

 After that we will implement Mapper (it emits only value, without key - this behavior is required by ES output format class!):


public static class EmailToEsMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, Text> {
        private Text output = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String email = value.toString();
            output.set(email)

            context.write(NullWritable.get(), output);          
        }

    }

Let's back to the second code snippet. There is EsOutputFormat, pay attention on register, because there is old deprecated API with ESOutputFormat class.It might be required to add exclusion to Maven file, to pull correct versions of jars and omit dependencies hell:



        
            org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            1.3.0.M2
yarn cascading</groupId> cascading-hadoop cascading cascading-local </exclusion> org.apache.pig pig org.apache.hive hive-service

середа, 13 серпня 2014 р.

Geo Coordinates converting

I've made discovery working on the last task: could you imagine that there are many many many geographical coordinate systems in the world? I couldn't. I was pretty sure that there is only one: longitude and latitude.

Surprise! There are much more of them and they are widely popular. Some of them are used in particular domain, some of them are specific for some countries. For example, you can read more about Gauss–Krüger coordinate system.


import org.geotools.geometry.GeneralDirectPosition;
import org.geotools.referencing.CRS;
import org.opengis.geometry.DirectPosition;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.NoSuchAuthorityCodeException;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
import org.opengis.referencing.operation.MathTransform;
import org.opengis.referencing.operation.TransformException;


public strictfp double[] translate(String from, String to, double x, double y)
            throws FactoryException, NoSuchAuthorityCodeException, TransformException {

        CoordinateReferenceSystem sourceCRS = CRS.decode( from );
        CoordinateReferenceSystem targetCRS = CRS.decode( to );

        MathTransform transform = CRS.findMathTransform(sourceCRS, targetCRS, true);

        DirectPosition expPt = new GeneralDirectPosition(x, y);
        expPt = transform.transform(expPt, null);
        return expPt.getCoordinate();
}

Ok, it looks good. One time consuming issue - it's to include correct libraries with Maven, because this small piece of code has very wide dependencies and it took several hours to manage correct combination :)

So, maven dependencies:

        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-opengis</artifactId>
            <version>2.7.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-metadata</artifactId>
            <version>2.7.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-referencing</artifactId>
            <version>2.7.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-epsg-hsql</artifactId>
            <version>2.7.0.1</version>
        </dependency>
        <dependency>
            <groupId>javax.media</groupId>
            <artifactId>jai_core</artifactId>
            <version>1.1.3</version>
        </dependency>

четвер, 24 липня 2014 р.

Hadoop 2.2 Distributed Cache and Map Join

It's very common to use Distributed Cache for Map joins - it gives a possibility to implement extremely fast join of huge dataset with a small one(s). Comparing to other join techniques you can win up to 1000x speed up, so Map joins are extremely useful and widely used. It's the easiest way to implement outer join, non-equie join and so on, I'd recommend to use Map join always when it is possible.

What is bad about Hadoop and I don't like it - they change API very often, each new version has changes in API. The most weird example: interface Mapper. It was introduces, then deprecated and then dedepricated (in Hadoop 2 it's without @Deprecated)... oh, quite difficult to manage all changes...

The last changes:  DistributedCache is now deprecated. And you can't use the old good DistributedCache.addCacheFile

In the new Hadoop 2.x the new approach introduced:
1) add file to distributed cache (I'm using symlink here):
job.addCacheFile(new URI(conf.get("dimension.file")+"#YOUR_DIM"));

2) in your setup method (Mapper or Reducer) the data from cache might be read with following instruction:
Path[] files = context.getLocalCacheFiles(); // oh, this method is again deprecated ym_-)

// loop over all files in cache
for (Path p : files) {
    if (p.getName().equals("YOUR_DIM")) {
         // load cache (for example into Map)
    }
}

That's all, symlink are very useful for accessing file from cache.

четвер, 3 липня 2014 р.

Runing Spark Unit Test on Windows 7

It's common situation in enterprises when developers are working on Windows platform. When you are working with Hadoop, it sounds as a f**ing shit, but this is a fact.

Recently, I switched in a favor of Spark instead of traditional MapReduce paradigm and was need to implement some kind of unit/integration testing... of course, it was need to work under Windows 7.

I've written very simple test: run ETL in-memory, without touching Hadoop at all (in future, I'd like to read input from local filesystem):

@Test
def testETL() = {
    val conf = new SparkConf()
    val sc = new SparkContext("local", "test", conf)
    try {
        val etl = new IxtoolsDailyAgg() // empty constructor

        val data = sc.parallelize(List("in1", "in2", "in3"))

        etl.etl(data) // rdd transformation, no access to SparkContext or Hadoop
        Assert.assertTrue(true)
    } finally {
        if(sc != null)
            sc.stop()
    }
}

Bum! I got exception:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
 at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
 at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
 at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
 at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
 at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
 at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
 at org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36)
 at org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109)
 at org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:228)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:97)


What?
org.apache.hadoop.util.Shell.(Shell.java:326)
I swear, I didn't use Hadoop in my code!
Unfortunately, Hadoop configuration is initialized together with SparkContext :( no way to omit it...
I was recommended to install HDP on Windows, but I hate this idea...

I tried the most stupid idea - provide winutils.exe... I hope, it's only the check of environment and Hadoop functionality won't be used if I don't touch it.
So, I downloaded winutils.exe from msdn (msdn still helpful even for hadooper), put it to created directory d:\winutil\bin and then add
System.setProperty("hadoop.home.dir", "d:\\winutil\\") 
at the beginning of my unit test

четвер, 24 квітня 2014 р.

Hue Notifier for Hadoop goes wild

Several months ago I developed Chrome browser plugin for my own needs. As a Hadoop engineer I faced with one problem everyday. I run a lot of Hive/Pig jobs simultaneously and they take a lot of time (from several minutes to several hours). So, I had mission to check job completion by walking Hue's pages in my browser. Well, it was 1) irritate, 2) draw away from coding...

As solution I developed Hue Notifier for Hadoop plugin for Google Chrome. It "monitors" state of job and inform you about completion similar to GMail informs about new mail (pop-up over all windows). I have a quite limited knowledge of JavaScript and it has been first time I wrote browser plugin... so, I'm absolutely sure it might be improved. I tested it with Hue delivered with Cloudera 4.3 and Cloudera 5 as well as HDP2.0. The most irritating issue w/ my code: Chrome Notification must be enabled manually before start using plugin :(

The source code is generally available at GitHub under this repository. You are welcome to fork and improve this one. Or, if you wish just to contribute, ping me and I will grant access (and push changes to Google Play afterwards).

пʼятниця, 18 квітня 2014 р.

Building BuilData ETL with Hive and Oozie

Perhaps, Hive is the most successful component of today's Hadoop infrastructure. It provides simple and efficient way of creating Hadoop-based data processing jobs with comfortable SQL-like language. But, in contract to Pig, it's not a workflow-friendly language and requires additional effort to create a real multi-step ETL.
Oozie was created to eliminate workflow/scheduling issues and, obvious, may be used to create ETL and naturally engages Hive.

вівторок, 1 квітня 2014 р.

Spark on HDP2

There is my first experience with Apache Spark, running it on Hadoop. I faced in several issues during running my piece of code.
To be honest, I started with Cloudera CDH5 distribution, they promised Spark was already added and usage will be simple. But no luck in fact, it doesn't work at all - even on local machine with their spark-cloudera jar. I didn't want to waste my time, so I just downloaded spark distro to HDP2.
First of all, let start Spark in standalone mode, according to documentation:
# start master
./sbin/start-master.sh

# pick up in the log output spark://IP:PORT
# and than run worker on each node
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

# more documentation available here https://spark.apache.org/docs/0.9.0/spark-standalone.html

After that I wrote some amount of Scala code, in fact to just count hardcoded words in document:

package experiment

import org.apache.spark.{SparkConf, SparkContext}

object SimpleApp {

  def main(args: Array[String]) {
    val logFile = args(0)  
  val conf = new SparkConf()
      .setMaster("local")
      .setAppName("My Spark application")
      .set("spark.executor.memory", "1g")
  val sc = new SparkContext(conf)


  // hdfs:///user/hue/input.txt
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("London")).count()
    val numBs = logData.filter(line =>; line.contains("Lviv")).count()
    println("Lines with London: %s, Lines with Lviv: %s".format(numAs, numBs))
}