Let's see how it might be done with a simplest case: we have to put JSON formatted data into ElasticSearch for further analysis. So, our purpose is to write Map-only job that will populate ElasticSearch with data from text file (already in JSON).
First of all, let configure Configuration object:
conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.resource", "emailIndex/email"); // intex/type conf.set("es.nodes", "192.168.12.04"); // host conf.set("es.port", "11000"); // port conf.set("es.input.json", "yes");
I guess, everything is clear here.
Very important is to set up correct output format, pay attention on register:
// Set input and output format classes job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(EsOutputFormat.class); // Specify the type of output keys and values job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
After that we will implement Mapper (it emits only value, without key - this behavior is required by ES output format class!):
public static class EmailToEsMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, Text> { private Text output = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String email = value.toString(); output.set(email) context.write(NullWritable.get(), output); } }
Let's back to the second code snippet. There is EsOutputFormat, pay attention on register, because there is old deprecated API with ESOutputFormat class.It might be required to add exclusion to Maven file, to pull correct versions of jars and omit dependencies hell:
org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> 1.3.0.M2