четвер, 24 липня 2014 р.

Hadoop 2.2 Distributed Cache and Map Join

It's very common to use Distributed Cache for Map joins - it gives a possibility to implement extremely fast join of huge dataset with a small one(s). Comparing to other join techniques you can win up to 1000x speed up, so Map joins are extremely useful and widely used. It's the easiest way to implement outer join, non-equie join and so on, I'd recommend to use Map join always when it is possible.

What is bad about Hadoop and I don't like it - they change API very often, each new version has changes in API. The most weird example: interface Mapper. It was introduces, then deprecated and then dedepricated (in Hadoop 2 it's without @Deprecated)... oh, quite difficult to manage all changes...

The last changes:  DistributedCache is now deprecated. And you can't use the old good DistributedCache.addCacheFile

In the new Hadoop 2.x the new approach introduced:
1) add file to distributed cache (I'm using symlink here):
job.addCacheFile(new URI(conf.get("dimension.file")+"#YOUR_DIM"));

2) in your setup method (Mapper or Reducer) the data from cache might be read with following instruction:
Path[] files = context.getLocalCacheFiles(); // oh, this method is again deprecated ym_-)

// loop over all files in cache
for (Path p : files) {
    if (p.getName().equals("YOUR_DIM")) {
         // load cache (for example into Map)

That's all, symlink are very useful for accessing file from cache.

четвер, 3 липня 2014 р.

Runing Spark Unit Test on Windows 7

It's common situation in enterprises when developers are working on Windows platform. When you are working with Hadoop, it sounds as a f**ing shit, but this is a fact.

Recently, I switched in a favor of Spark instead of traditional MapReduce paradigm and was need to implement some kind of unit/integration testing... of course, it was need to work under Windows 7.

I've written very simple test: run ETL in-memory, without touching Hadoop at all (in future, I'd like to read input from local filesystem):

def testETL() = {
    val conf = new SparkConf()
    val sc = new SparkContext("local", "test", conf)
    try {
        val etl = new IxtoolsDailyAgg() // empty constructor

        val data = sc.parallelize(List("in1", "in2", "in3"))

        etl.etl(data) // rdd transformation, no access to SparkContext or Hadoop
    } finally {
        if(sc != null)

Bum! I got exception:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
 at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
 at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
 at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
 at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
 at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
 at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
 at org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36)
 at org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109)
 at org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:228)
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:97)

I swear, I didn't use Hadoop in my code!
Unfortunately, Hadoop configuration is initialized together with SparkContext :( no way to omit it...
I was recommended to install HDP on Windows, but I hate this idea...

I tried the most stupid idea - provide winutils.exe... I hope, it's only the check of environment and Hadoop functionality won't be used if I don't touch it.
So, I downloaded winutils.exe from msdn (msdn still helpful even for hadooper), put it to created directory d:\winutil\bin and then add
System.setProperty("hadoop.home.dir", "d:\\winutil\\") 
at the beginning of my unit test