пʼятниця, 31 травня 2013 р.

Incredible simple JMX example

With JMX you can implement management interfaces for Java applications. Let's look at the simplest and most used example. Imagine, you have create some ElectroCar managed by Java application. You wish to get/monitor some parameters and, moreover, set the same of them. 

Ok, you required to create MBean (managed bean that will be controlled by JMX) that will give you read access to 2 car parameters and you will be able to change max speed limit at runtime. To describe it, you need the next interface:

public interface ElectroCarMBean {
    public void setMaxSpeed(int maxSpeed);
    public int getMaxSpeed();
    public int getCurrentSpeed();

Implementation will be very short and simple:
public class ElectroCar implements ElectroCarMBean {

    public int maxSpeed = 150;
    private Random rnd = new Random();

    public void setMaxSpeed(int maxSpeed) {
        this.maxSpeed = maxSpeed;

    public int getMaxSpeed() {
        return this.maxSpeed;

    public int getCurrentSpeed() {
        return rnd.nextInt(this.maxSpeed);

And after that you need to register you MBean (yeap, manually):
// Get the platform MBeanServer
mbs = ManagementFactory.getPlatformMBeanServer();

// managed bean instance
ElectroCar car = new ElectroCar();
ObjectName carBean = null;

// Uniquely identify the MBeans and register them with the platform MBeanServer
carBean = new ObjectName("FOO:name=jmxsample.ElectroCar");
mbs.registerMBean(car, carBean);

After that, don't forget to add next parameters when you will be run your application:
-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=1617 \
-Dcom.sun.management.jmxremote.authenticate=false \

It means run application w/ enabled JMX on port 1617, but without authentication (everyone can connect). So, your application is ready now! Run jconsole and check the result:

You can see that each attribute has a set of properties and the most important is read/write properties. If write property is 'true' you are able to set up new value from jconsole. Otherwise you could only read this value. In particular example, MaxSpeed can be change in runtime and it makes influence on max speed of the car. However, CurrentSpeed is readonly, and you can only perform monitoring (click 'Refresh' to update value)

As I said, MBean must be registered manually. So, it can be issue if you using some container for beans creation (like Spring). For example, Spring provide special bean MBeanExporter to give you possibility to  MBeaning your classes, read more here 

пʼятниця, 17 травня 2013 р.

Fix PigUnit issue on Windows

PigUnit is the nice and extremely easy way to test your Pig script. Read more here 
However, it doesn't run on Windows at all. When you write your first PigUnit script, you will get the following exception:

Exception in thread "main" java.io.IOException: Cannot run program "chmod": CreateProcess error=2, The system cannot find the file specified :

In fact, it means Cygwin is not correctly installed. To fix it, you have to download and install Cygwin, after that edit PATH variable and enter the path name to cygwin directory. 

Try to run again. The next possible error will be:

ERROR mapReduceLayer.Launcher: Backend error message during job submission java.io.IOException: Failed to set permissions of path: \tmp\hadoop-MyUsername\mapred\staging\MyUsername1049214732.staging to 0700

It means, your temporary directory is not set correctly (or doesn't set at all). Be honest, I tried to set up this temporary directory with the following code:

        pigServer.getPigContext().getProperties().setProperty("pig.temp.dir", "D:/TMP");
        pigServer.getPigContext().getProperties().setProperty("hadoop.tmp.dir", "D:/TMP");

Unfortunately,it doesn't work.... The solution is to set up system property. There are a lot of way to do it, and one of them is to tune java run configuration when you run your test, just add:

Ok, that's much better, but it's not the finish yet, there is error
java.io.IOException: Failed to set permissions of path: file:/tmp/hadoop-iwonabb/mapred/staging/iwonabb-1931875024/.staging to 0700 
at org.apache.hadoop.fs.RawLocalFileSystem.checkReturnValue(RawLocalFileSystem.java:526) 

that's because of error in the code.

There are several solutions to fix this bug (it is present in Hadoop for a years... :(). One of them, is to use this patch  or fix code and recompile. But for me it was the best way (special, it will be fix only for specefic version, also it is difficult to maintain on several clusters on dev machines and so on).
So, I've decided to change code at runtime with Javassist

So, the solution is a very simple and self-describing:

To apply it, just call from you test before run in. I'd recommend to do it just after PigTest instance creation.

вівторок, 7 травня 2013 р.

Storm, distributed and fault-tolerant realtime computation, part 2

Read first part here

To illustrate main Storm points, let's go though the main points...
So, the idea of application is to scan web-server log (well, be honest this log is created by us but it's very similar to real-world log), push each event into Storm topology, calculate some statistic and update this statistic in database. Well, stupid user-case for Storm, but good enough to show all main concepts.

Let's start from log line, it is something like follow:

пʼятниця, 3 травня 2013 р.

Storm, distributed and fault-tolerant realtime computation, part 1

Storm is a real big data solution for soft-real time. In the hadoop world batch processing is a main limitation. Let me explain. Imagine, you track some event, however with Hadoop event happened at 11:00AM, it collected, than processed each half hour (in 11:30AM, bacause of Hadoop is batch processing) and this event influences statistics (or whatever) only at 12:00AM. How to do it more "real-time"

Storm comes into play here.

Storm built on idea of streaming data and this is big difference in compare with Hadoop. Not advantage, but difference. Hadoop is a great batch processing system. Data is introduced into the Hadoop file system (by Flume or any other way) and distributed across nodes for processing. Then processing is started, it contains a lot of shuffle operations between nodes and so on, a lot of IO operations. Afterwards,  the result must be picked up from HDFS. 
Storm works with unterminated streams of data. Storm jobs, unlike Hadoop jobs, never stop, instead continuing to process data as it arrives. So, you can think about Strom as about powerful integration framework (like Apache Camel or Spring integration) if you are familiar with it.

So, when even is produced, it is introduced into Storm and go through set of jobs...
Let's look at Storm terminology:
  • Tuple is a data structure that represent standard data types (such as ints, floats, and byte arrays) or user-defined types with some additional serialization code (there isn't analog in Java language, but tuples are very usual in Python and special in Erlang);
  • Stream is a unterminated data flow, sequence of tuples
  • Spout special component which provide data from external source into Storm
  • Bolt is a Storm jobs that can be linked in chain and makes any tuples transformation
  • Topology structure of spouts and bolt connected by streams (see picture below)

By default, streams are made as ØMQ queues, but it's underhood and you should worry about that (expect cluster installation time).
One features that I explored during development, but I didn't mentioned during reading tutorial: Bolt has to be serializable, so all class members must be serializable too.