середа, 19 лютого 2014 р.

How to write good unit test for Hadoop MapReduce?

Without a doubt, there is avery common situation when UnitTest (or IntegrationTest) is required to test functionality of MapReduce job. This approach perfect fit TDD, moreover, it gives opportunity to develop MapReduce jobs faster, because there is no needs to redeploy jar on a cluster each time and debugging is easy to use.

The first line of defence is MRUnit. Great framework for unit testing, input/output format independent with possibility to run/test map and reduce functions separately. Unfortunately, this framework has a several meaningful drawbacks. For example, no access to MR counters, or during the MR test only one Mapper allowed.

Local execution mode may be used to overcome MRUnit limitations or create integration test for mapreduce job. Let's assume there is runnable MapReduce tool with several input sources (mappers) and reducer:

public class ExampleMrDriver extends Configured implements Tool {

 public  Job createMRJob(Configuration conf) throws IOException {...}

 @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = createMRJob(conf);
        return job.waitForCompletion(true) ? 0 : -1;
    }


 public static void main(String[] args) {
        try {
         // run job in a Oozie-friendly manner
            int status = ToolRunner.run(new ExampleMrDriver(), args);
            if(status!=0) {
                System.exit(status);
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

}


Nice integration test (or unit, call and use it as you like) for this Hadoop MapReduce a listed bellow:

private String outputDir;

@BeforeClass
public void createTmpDir() throws IOException {
    outputDir = System.getProperty("java.io.tmpdir"); + "output";
}

@Test
public void test() throws Exception {
    JobConf jobConf = new JobConf();
    jobConf.set("fs.default.name", "file:///"); 
    jobConf.set("mapred.job.tracker", "local"); // local mode
    jobConf.set("mapred.reduce.task", "1"); // only one file is required in output

    // create file w/ input content per mapper in test/resource folder
    jobConf.set("input.dir.2", this.getClass().getResource("/mr/inpu1").getPath());
    jobConf.set("input.dir.1", this.getClass().getResource("/mr/input2").getPath());
    jobConf.set("input.dir.3", this.getClass().getResource("/mr/input3").getPath());
    // expected output will be placed here
    jobConf.set("output.dir", outputDir);

    ExampleMrDriver driver = new ExampleMrDriver();
    driver.setConf(jobConf);
    int exitCode = driver.run(new String[]{});

    Assert.assertEquals(0, exitCode);

    // check content of output file, counters, etc
}

@AfterClass
public void tearDown() throws IOException {
    new File(outputDir).delete();
}