субота, 28 вересня 2013 р.

Hadoop + R via streaming

The short history how R meets Hadoop



R is language for Stats, Math and Data Science created by statisticians for statisticians. It contains 5000+ implemented algorithms and impressive 2M+ users with domain knowledge worldwide. However, it has one big disadvantage - all data is placed into memory ... in one thread.

And there is Hadoop. New, powerful framework for distributed data processing. Hadoop is built upon idea of MapReduce algorithm, this isn't something very specific, a lot of languages have MR capabilities, but Hadoop brought it to the new level.The main idea of MR is:

  1. Map step: Map(k1,v1) → list(k2,v2) 
  2. Magic here 
  3. Reduce step: Reduce(k2, list (v2)) → list(v3)


Hadoop was developed in Java and Java is the main programming languages for Hadoop. Although Java is main language, you can still use any other language to write MR: for example, Python, R or OCaml. It is called "Streaming API"



Of course, not all features available in Java will be available in R, because streaming works through "unix streams", not surprise here. There are several streaming API drawbacks:
  1. while the inputs to the reducer are grouped by key, they are still iterated over line-by-line, and the boundaries between keys must be detected by the user
  1. no possibilities to utilize different mappers in one MapReduce job 
  1. no possibilities to create different outputs from reducer 
  1. not transparent counters update (streaming uses stderr to report counter updates)
Pay attention on 2 and 3, some cases require to have several inputs or outputs for the one mapreduce job. With streaming API there is not this possibility and several MR jobs must be created, as result the performance degrades. Moreover, as stdout/stdin is used to communicate between map and reduce steps, there are additional issues with R, because of R is very "buzzy" language and generates a lot of output into STDOUT. You need to remember about that and control output very strictly.
To apply streaming, two separate files must be created, one of them will represent Mapper (map function) and second will be Reducer (reduce function)


As it was mentioned, mapper reads string from STDIN and emits string to STDOUT, and in the same way reducer works, we need to control key update manually (Hadoop guaranties only sorting by key). 

Let's consider next mapper code, here we just extract required information from input and select key:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#! /usr/bin/env Rscript
# disable warning so they want
options(warn=-1)
# redirect output
sink("/dev/null")

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1, warn=FALSE)) > 0) {
  # split string by Tab and flatten result into vector
  line <- unlist( strsplit(next.line, "\t") )

  # key consist category
  output.key <- c(line[2])
  # value consist datetime, count of mentioning and average sentiment score
  output.value <- c(line[1], line[3], line[4])

  # restore output
  sink()
  cat(output.key, output.value, "\n", sep="\t")
  sink("/dev/null")
}
close(conn)
Reducer collect all values related to the one key and than perform regression to predict value in future period:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#! /usr/bin/env Rscript
#
# One of the disadvantages of using Streaming directly is that while the inputs to the reducer are grouped by key,
# they are still iterated over line-by-line, and the boundaries between keys must be detected by the user.
#
# disable warning so they want
options(warn=-1)
# redirect output
sink("/dev/null")

prev_key <- ''
dates_vector <- vector()
count_vector <- vector()
avg_vector <- vector()

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  # split string by Tab and flatten result into vector
  line <- unlist( strsplit(next.line, "\t") )
  cur_key <- line[1]

  # not first processing and key was changed
  if( prev_key != '' && prev_key != cur_key ) {
    # get prediction for tweets count
    mod_count <- lm(count_vector ~ dates_vector)
    rep_date <- dates_vector[length(dates_vector)]
    rep_date <- rep_date+60*60
    prediction_count <- predict(mod_count, data.frame( dates_vector=c( rep_date )))


    # get prediction for average sentiment amount
    mod_avg <- lm(avg_vector ~ dates_vector)
    prediction_avg <- predict(mod_avg, data.frame( dates_vector=c( dates_vector[length(dates_vector)]+1 ) ))

    sink()
    cat(prev_key,  format( rep_date, format="%Y%m%d%H%M" ), floor( prediction_count ),round(prediction_avg, digits=2 ), "\n", sep="\t")
    sink("/dev/null")

    # new key - new vectors
    dates_vector <- vector()
    count_vector <- vector()
    avg_vector <-  vector()
  }

  prev_key <- cur_key

  # setup values from passed line
  dates_vector <- append(dates_vector, as.POSIXct(line[2], tz="0", format="%Y-%m-%d %H:%M:%S"))
  count_vector <- append(count_vector, as.numeric(line[3]))
  avg_vector<- append(avg_vector, as.double(line[4]))
}
close(conn)
This code can be tested in local mode:cat file.txt | mapper.R | sort -k 1,1 | reducer.Ror can be run on Hadoop without changes

Немає коментарів:

Дописати коментар