понеділок, 20 листопада 2017 р.


Terraform by HashiCorp enables you to safely and predictably create, change, and improve infrastructure. It is an open source tool that codifies APIs into declarative configuration files that can be shared amongst team members, treated as code, edited, reviewed, and versioned.

Terraform is a simple and reliable way to manage infrastructure in AWS, Google Cloud, Azure, Digital Ocean and more IaaS (providers, in terms of Terraform). The main idea of such tools is to create reproducible infrastructure and Terraform provides DSL to describe infrastructure and then apply to different environments. Previously we used a set of Python and bash scripts to describe what create in AWS, described different conditions which checks if some resource exists in AWS and create if it's not. Actually, Terraform is doing the same underhood. This is an introduction which covers simple use case to create Alluxio cluster I used in the previous post.

Terraform supports a number of different providers, but Terraform script must be written every time for new provider.

Applying Alluxio to warm up your data

Alluxio, formerly Tachyon, enables any application to interact with any data from any storage system at memory speed. 
states https://www.alluxio.org/. In this article I'd like to describe the general idea of using Alluxio and how it helped me. Alluxio is not one known to everyone, however it has a lot of features to propose and can be a game changer for your project. Alluxio already powers data processing at Barclays, Alibaba, Baidu, ZTE, Intel, etc. The current license is Apache 2.0 and source code can be reviewed here https://github.com/Alluxio/alluxio .

Alluxio provides virtual filesystem which create a layer between your application (i.e. computational framework) and real storage such as HDFS, S3, Google Cloud Storage, Azure Blob Storage and so on. Alluxio has several interfaces: Hadoop compatible FS, native key-value interface, NFS interface. From component point of view, Alluxio has single Master (plus Secondary Master which similar to SNN in Hadoop, i.e. doesn't process requests from clients), multiple Slaves and, obviously, Client. 

My use case was inspired by layered storage in HDFS: it's when you can configure HDFS to save specific HDFS paths on Hot storage (let say in memory) or Warm (~ SSD) or Cold (~ HDD). However, cloud usage is growing every day and it's not so often to see hardware Hadoop cluster and the issue with a clouds (at the same time, a benefit): storage is isolated from computations, which makes impossible or hard to implement storage layers. And that's very good use case for Alluxio: deploy alluxio cluster to play the role of Hot storage where only high-frequency used data is located. 

While saving data on S3, we'd like to partition them by year, month and day to increase access speed while executing access to data in known time range. However it's not often happen to access data according to uniform distribution, much often there is very specific patterns like:

  • actively access last 3 months
  • actively access last month and the same month of last year
It's natural candidate to put these data into Alluxio to speed up access to them, but the rest of data will be available directly from S3.

Let's see the practical example of working with data stored on S3 using Apache Spark on EMR.

 I used Terraform to create Alluxio cluster, having 3 r4.xlarge slaves and one m4.xlarge master. Also, we will need computational power to run Spark job, let's create AWS EMR cluster:

aws emr create-cluster --name 'Alluxio_EMR_test' \

--instance-type m4.2xlarge \
--instance-count 3 \
--ec2-attributes SubnetId=subnet-131cda0a,KeyName=my-key-name,InstanceProfile=EMR_EC2_DefaultRole \
--service-role EMR_DefaultRole \
--applications Name=Hadoop Name=spark \
--region us-west-2 \
--log-uri s3://alluxio-poc/emrlogs \
--enable-debugging \
--release-label emr-5.7.0 \

--emrfs Consistent=true

After that, Alluxio is ready to be started and out data is ready to be pulled in:

[ec2-user@ip-172-16-175-35 ~]$ docker ps

1c876a0ffe4d alluxio "/entrypoint.sh wo..." 9 minutes ago Up 9 minutes cranky_brown
[ec2-user@ip-172-16-175-35 ~]$ docker exec -it 1c876a0ffe4d /bin/sh
/ # cd /opt/alluxio/bin
/opt/alluxio/bin # ./alluxio runTests
/opt/alluxio/bin # ./alluxio fs mkdir /mnt
Successfully created directory /mnt
# the following command cache S3 folder inside of Alluxio
opt/alluxio/bin # ./alluxio fs mount -readonly alluxio://localhost:19998/mnt/s3 s3a://alluxio-poc/data
Mounted s3a://alluxio-poc/data at alluxio://localhost:19998/mnt/s3
/opt/alluxio/bin #
/opt/alluxio/bin # ./alluxio fs ls /mnt/s3

-rwx------ pc-nord-account66pc-nord-account66410916576 09-22-2017 18:03:10:815 Not In Memory /mnt/s3/part-00084-2e9dafb0-2d7a-428e-b517-b6eb4d70f781.snappy.parquet

Then, back to EMR Master and start spark shell:

spark-shell --jars ~/alluxio-1.5.0/client/spark/alluxio-1.5.0-spark-client.jar

The following command starts spark context and register alluxio file sustem:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")

val x = spark.read.parquet("alluxio://")

// let's see how fast is' gonna be

x.select($"itemid", $"itemdescription", $"GlobalTransactionID", $"amount").orderBy(desc("amount")).show(20) // 4 sec

x.count() // 3 sec

// now let's compare with s3 dataset

val p = spark.read.parquet("s3a://alluxio-poc/data")
p.select($"itemid", $"itemdescription", $"GlobalTransactionID", $"amount").orderBy(desc("amount")).show(20) // 19 sec

p.count() // value 19 sec

To sum up, Alluxio provides great way to speed up data processing in update-based warehouse when you need access only to limited dataset. Potential use case: hot data that must be accessed and processed x10 times more often, but is only 10% of all dataset is an ideal candidate to be cached with Alluxio.

# Einführung in Alluxio (in English)

четвер, 31 серпня 2017 р.

Druid: fixed lambda

Druid is an excellent high-performance, column-oriented and distributed data storage. Used by IT giant to get answers in sub-seconds from TBs (or even PBs) datasets. Needless to say I felt in love since day one. 

Several examples:
Netflix ingest up to 2 TB per hour with the ability to query data as its being ingested
eBay ingest over 100.000 events'sec and supports over 100 concurrent queries without impacting ingest rate and query latency

 Main featured of Druid that helps to stand out of the crowd:

  • Sub-seconds query
  • Scalable to PBs
  • Real-time strams
  • Deploy anywhere (can work with Hadoop or without by processing data from S3)
I'm excited I had an opportunity to work with Druid a year ago. It's really cool, works super fast and delivers excellent result! The JSON-based query language wasn't super hard to learn, I managed even to calculate average using post action:) previous MR experience really helped.

One remark, I'd like to add there: 
we developer and tested druid based system in us-east-1 region, everything was good, deployment was automated, so we moved to prod which, surprisingly, was selected to be in Frankfurt AWS region. We got pretty nasty error in Druid when deployment script finished his work there:
 Caused by: io.druid.segment.loading.SegmentLoadingException: S3 fail!

Looks like the problem was in additional configuration required for non US-east region, unfortunately there isn’t documentation so I derived that from source code, looks like it works now:

On each historical node, please add the following file “/opt/druid/config/_common/jets3t.properties” with a content:

1st line forces to use v4 auth

2nd line sets endpoint, default is us-east-1, but for Frankfurt it must be s3.eu-central-1.amazonaws.com

Anyway, Metamarket team - thank you for great product! Now going to test Caravela from AirBnb

пʼятниця, 29 липня 2016 р.

Protecting Spark UI, part 2: servlet filter

In the previous post it was described how to configure simple NGINX instance to add basic auth to Spark job. In this part let see what Spark's suggest itself by implementing filter.

Filter is an special class which participate in Java servlet lifecycle and is called on each request (and even response). Using filter a resource can be protected by basic authentication from unauthorized access. According to documentation the filter must be implemented and then passed (full name) as a parameter. Let's pass valid username and password through environment variables, it must be good enough, as it equals to the approach used to pass AWS credentials for instance. Obviously, this env variable must be set on the instance where driver is supposed to be run. Another option is to pass them as arguments into filter using spark..params param1=value1 param2=value2 ...

Let's imagine our class in the package my.company.filters (and using several helpers, like commons-codec, commons-lang)

public class BasicAuthFilter implements Filter {

  private String login;
  private String pass;

  // this method is called one time on Filter creation
  public void init(FilterConfig config) {
     this.login = System.getenv("SPARK_LOGIN");
     this.pass = System.getenv("SPARK_PASS");

  public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException {

     HttpServletRequest hreq = (HttpServletRequest) req;
     HttpServletResponse hres = (HttpServletResponse) res;

     String auth = hreq.getHeader( "Authorization" );
     if ( auth != null ) {
        int index = auth.indexOf(' ');
        if ( index > 0 ) {
          String[] creds = StringUtils.split( new String( Base64(auth.substring(index)), Charset.UTF_8), ':' );
          if ( creds.length == 2 && login.equals(creds[0]) && pass.equals( creds[1] ) ) {
              //  auth passed successfully

     hres.setHeader( "WWW-Authenticate", "Basic realm=\"ProtectedSpark\"" );
     hres.sendError( HttpServletResponse.SC_UNAUTHORIZED );



Ok, next step is to build JAR (pack this filter into JAR). After that, we can run our job in secured manner: execute spark-submit and pass newly assembled jar with flag --jars and through configuration (*.conf file or --conf param) pass full class path: spark.ui.filters=my.company.filters.BasicAuthFilter

Protecting Spark UI, part 1: nginx

Apache Spark WEB UI is a descent place to check cluster health and monitor job performance, starting point for almost every performance optimization. A guys from Databricks hardworking on improvements of UI from version to version.
But it still have one issue which I'm facing on every project and which must be resolver every time: I'm talking about publicity of this information, everyone how can reach the port (defaults, 8080 or 4040) can then access UI, and all information there (and there are a lot of stuff you want to keep private).

There are several solution to deal with it:

  1. Close all ports and configure nginx to listen specific port and forward requests (of course w/ basic authentication)
  2. protect UI using Spark's built-in method: implementing own filter
In this post, let's start from How to protect Spark UI with NGINX?

The instruction below is suitable for protecting standalone spark Web UI when job is executed in client mode (so you can predict where driver is up and run).

Let's assume that there is a node with both spark and nginx installed (obviously they can be on different nodes).

First of all, close all spark related ports (and there are a lot of them): they must be still accessible in-network. In Amazon, it easy to do with security groups: just specify appropriate CIDR mask for each inbound rule, for instance Next, open 2 ports not used by Spark, but which you're going to make accessible to get into spark master ui or spark driver ui: just for example let's assume it's 2020 and 2020.

Now the small part left: configure nginx to perform basic auth and forward requests to Spark UI. In this case nginx is in provate network, so request will be handled by Spark and UI actually presented to end user. 

Before configuring nginx itself, the file to keep proper configuration must be created:
It's simple to do with htpasswd tool, can be installed by running   sudo yum install -y httpd-tools

Then generate password and store it into a file (user name will be spark and passowrd entered in CLI):
sudo htpasswd -c /etc/nginx/.htpasswd spark

Last step is to create proper nginx configuration (the eample is only to forward all request on Spark Master 8080 to 2000):
vi /etc/nginx/nginx2001.conf

  events {
     worker_connections 1000;

  server {
listen 2020;

auth_basic "Private Beta"; auth_basic_user_file /etc/nginx/.htpasswd;

location / {
proxy_pass http://localhost:8080;


Actually, that's it. After that we just need to start nginx
nginx -c /etc/nginx/nginx2001.conf

And point prowser to HOST:2020 to be asked enter credentials and only after that be redirected to Spark Master UI.

вівторок, 6 жовтня 2015 р.

Apache Zeppelin: impressions

A notebooks are getting more and more attraction from data analytics, data scientists and developers. Jupiter is a famous notebooks created by Python guys and widely adopted among different users. At the same time, the new notebook provider was recently born: Apache Zeppelin with main focus on integration with BigData technology stack.

In fact, Apache Zeppelin provides build-in integration with Apache Spark (and SparkSQL), Apache Flink, Hive, Ignite, Tajo (does someone outside South Korea is using that?), definitely markdown and html, and event AngularJS. It's good part about Zeppelin. Also, Ambari integration give a possibility to install Zeppelin in "a couple clicks" and get access through Amabari Views.  And practically it works very well:

And now I'd like to focus on the what's wrong with Apache Zeppelin:

1) Security. Zeppelin 0.5 doesn't have security. Anybody can open any notebook, view and edit that. It doesn't work for enterprises, moreover it doesn't work even for RnD. I want to have protected notebooks, I want to have roles and groups, and give notebook only to specific group of people for specific set of actions.
2) Workspace. One-level list of notebooks, really? That's awful. Guys, add possibility to combine them in folders of folders and etc, it's really important. Also, only one way to backup notebooks, is to backups underlying folders from filesystem. Not very good, UI button is required at least.
3) Security 2. I've already written about notebooks security, but data on storage is also must be protected. Currently Zeppelin run everything as ZEPPELIN user, and I have to share data with ZEPPELIN users which is not what I want to do. So, it makes sense for each notebook to provide a setting "run as" to specify specific user for this research. Enterprises really value that.

Personally I also tried to make it works on Docker (more or less it works) and EMR (failed, and everybody failed as far as I know).

To sum up: Zeppelin is an interesting and promising product, but it has to much weakness to be seriously used and consider for production projects, specially for enterprises. So, in technology radar I can definitely put Zeppelin into the section "Be informed"

понеділок, 13 липня 2015 р.

How to waste the whole day with Spark Streaming and HBase

The "funny" story how to waste the whole day debugging resolving simple case... tips and tricks :)

Spark Streaming application hangs out on action and nothing is changing during hours.
The long story is: custom Receiver accept events from external source and store them to RDD (actually, DStream) for future processing. When I run it I noticed that action hung out! And what was a really scare: the messages were read from source. After spending couple hours trying to find the issue with Reciever, I realized it works fine and finally found the issue ... ... in how I run the job!

I did it in local environment first and submit it to YARN:
--num-executors 2
It fact, it doesn't work for me because no one worker (spark executor) was able to start! So, just by increasing number of executors to 3, I was able to make everything working.

HBase related Spark Streaming application hangs out and nothing is changing during hours.
Again, the long story is then Spark Streaming application hangs out as soon as it touch HBase. I spent several hours (again) and I was really surprised when found the reason: HBase connection was broken. OMG! I haven't seen any errors or warning related to HBase connection in logs... what is the reason? In fact, HBase tried to establish connection again and again without throwing an error. Consider the following a piece of code (grey - my original part, when blue - an update that helped me to overcome the issue):
     Configuration config = HBaseConfiguration.create();
  config.set(HConstants.ZOOKEEPER_QUORUM, "host:port");
  config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
  config.set("hbase.client.retries.number", Integer.toString(3));
  config.set("zookeeper.session.timeout", Integer.toString(60000));
  config.set("zookeeper.recovery.retry", Integer.toString(0));

It really helps because of the number of retries was limited. Default value is 35 and can definitely confuse.