Today I would like to have a dive into Job Performance with Hadoop, running on the Managed Hadoop Framework of Amazon Web Services, which is Elastic MapReduce (EMR).

Hadoop does not deal well with lots of small files, and I would like to demonstrate on how to reduce the duration of a simple wordcount job from 1 hour and 12 minutes, to 1 minute.

EMR Environment, Logging and Dataset Information:

EMR Cluster Nodes Info:

* Master: m3.xlarge, 8 vCPU, 15 GiB memory, 80 SSD GB storage, EBS Storage:none
* Core-1: m3.xlarge, 8 vCPU, 15 GiB memory, 80 SSD GB storage, EBS Storage:none
* Core-2: m3.xlarge, 8 vCPU, 15 GiB memory, 80 SSD GB storage, EBS Storage:none

The Log Output Structure when Logging is enabled, will look like the following on S3:

$ aws s3 ls s3://ruanbekker/logs/elasticmapreduce/j-<CLUSTERID>/
                           PRE containers/
                           PRE hadoop-mapreduce/
                           PRE node/
                           PRE steps/

Our Dataset that we will be working with is:

* 1003 CSV files 
* Each file is less than 1MB

Listing the Number of Files in our Input Data Path of our S3 Bucket:

$ aws s3 ls s3://ruanbekker.repo/datasets/text-data/ | wc -l
1003

Having a View on the Size of one Object:

$ aws s3 ls s3://ruanbekker.repo/datasets/text-data/ | head -1
2017-02-15 09:54:59      46122 people-data_1021-1487152182.csv

A Preview of the data:

$ head -5 people-data_1021-1487152182.csv
2667832, james
6211837, james
9199776, stefan
3345497, james
6874734, stefan

Creating a EMR Cluster for our WordCount Job:

Let's Create our first EMR Cluster, which we will launch a Wordcount Job, via a Step. A Step in EMR, will execute as soon as the EMR Cluster has provisioned and in a WAITING State.

Our Wordcount Job will read the 1003 CSV files, using Hadoop and MapReduce to do a Wordcount Job and we have set the Output Path to HDFS.

We have also set the cluster to terminate once it is finished, to save money.

$ aws emr create-cluster \
--termination-protected \
--applications Name=Hadoop Name=Hive Name=Pig Name=Hue Name=Spark Name=Zeppelin Name=Ganglia Name=HBase Name=Presto \
--tags 'Stage=Test' 'Name=Wordcount Cluster' \
--ec2-attributes '
{
  "KeyName":"MyKey",
  "InstanceProfile":"EMR_EC2_DefaultRole",
  "SubnetId":"subnet-12345678",
  "EmrManagedSlaveSecurityGroup":"sg-12345678",
  "EmrManagedMasterSecurityGroup":"sg-12345678"
  }
' \
--release-label emr-5.6.0 \
--log-uri 's3n://ruanbekker/logs/elasticmapreduce/' \
--steps '
[
  {
    "Args":[
      "com.amazonaws.support.training.emr.Wordcount",
      "s3://ruanbekker.repo/datasets/text-data/",
      "/wordcount-output/"
      ],
    "Type":"CUSTOM_JAR",
    "ActionOnFailure":"TERMINATE_CLUSTER",
    "Jar":"s3://support.elasticmapreduce/training/jobs/Wordcount-1.0-SNAPSHOT-job.jar",
    "Properties":"",
    "Name":"Wordcount Step to Process Small Files"
    }
  ]
' \
--instance-groups '
[
  {
    "InstanceCount":2,
    "InstanceGroupType":"CORE",
    "InstanceType":"m3.xlarge",
    "Name":"Core - 2"
    },
  {
    "InstanceCount":1,
    "InstanceGroupType":"MASTER",
    "InstanceType":"m3.xlarge",
    "Name":"Master - 1"
    }
  ]
' \
--configurations '
[
  {
    "Classification":"hbase",
    "Properties":{
      "hbase.emr.storageMode":"s3"
      },
    "Configurations":[]
    },
  {
    "Classification":"hbase-site",
    "Properties":{
      "hbase.rootdir":"s3://ruanbekker/hbase/store/"
      },
    "Configurations":[]
    }
  ]
' \
--auto-terminate \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--service-role EMR_DefaultRole \
--enable-debugging \
--name 'Wordcount Cluster with Small Files' \
--scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
--region eu-west-1

EMR Cluster Output Results of Processing Small Files:

Our EMR Cluster's Results for our Wordcount Job:

* Elapsed time:1 hour, 12 minutes

Which is horrible.

The reason for this long time, is because Hadoop reads the input data path, and creates a Map task for each input file that exists in the given input path.

We can confirm that by looking at the Number of splits value from your Application Master's logs on the S3 LogUri, or on your Cluster, which can be done, by logging onto your master node.

Downloading and Viewing the logs from S3:

$ aws s3 cp s3://ruanbekker/logs/elasticmapreduce/j-<CLUSTERID>/containers/application_1498387632693_0001/container_1498387632693_0001_01_000001/syslog.gz .

$ zcat syslog.gz | grep 'Number of splits'
2017-06-25 10:53:59,817 INFO [main] org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl: Input size for job job_1498387632693_0001 = 46693837. Number of splits = 1003

We can see from the above output, that the job has 1003 input splits, which means 1003 Map tasks will be spawned.

The Problem with Hadoop and Small Files:

Hadoop does not work well with small files, you can have a look at the Hadoop Small Files Problem for more information.

So essentially to solve this problem, is to concatenate the data into bigger chunks. On EMR, the default dfs.block.size is 128MB.

If you have much larger datasets, it makes sense to fine tune the number as close as this size, but as my dataset is not that big, I will concatenate mine to 64MB of chunks.

The Resolution: Concatenate the Data:

Let's use s3-dist-cp to concatenate the data into files of 64MB of size:

$ s3-dist-cp \
--src=s3://ruanbekker.repo/datasets/text-data/ \
--dest=s3://ruanbekker.repo/datasets/concat-64/ \
--groupBy=".*(people-data).*" \
--targetSize=64

17/06/26 14:27:05 INFO s3distcp.S3DistCp: S3DistCp args: --src=s3://ruanbekker.repo/bigdata/datasets/text-data/ --dest=s3://ruanbekker.repo/datasets/concat-64/ --groupBy=.*(people-data).* --targetSize=64

[...]
17/06/26 14:28:40 INFO mapreduce.Job: Job job_1498481935615_0004 completed successfully

Having a look at our exported data, which should show that our first two files was more or less 67MB and the last one about 18MB:

$ aws s3 ls s3://ruanbekker.repo/datasets/concat-64/
2017-06-26 14:28:35   67170381 people-data
2017-06-26 14:28:35   67138206 people-data1
2017-06-26 14:27:52   18944000 people-data2

Running another EMR Cluster to Process Concatenated Data:

Now let's create another EMR Cluster, which is similar to the previous one, except that we have updated the S3 Input Path for our Concatenated Data, which is the destination path of the s3-dist-cp job's output:

$ aws emr create-cluster \
--termination-protected \
--applications Name=Hadoop Name=Hive Name=Pig Name=Hue Name=Spark Name=Zeppelin Name=Ganglia Name=HBase Name=Presto \
--tags 'Stage=Test' 'Name=Wordcount Cluster: 64MB' \
--ec2-attributes '
{
  "KeyName":"MyKey",
  "InstanceProfile":"EMR_EC2_DefaultRole",
  "SubnetId":"subnet-12345678",
  "EmrManagedSlaveSecurityGroup":"sg-12345678",
  "EmrManagedMasterSecurityGroup":"sg-12345678"
  }
' \
--release-label emr-5.6.0 \
--log-uri 's3n://ruanbekker/logs/elasticmapreduce/' \
--steps '
[
  {
    "Args":[
      "com.amazonaws.support.training.emr.Wordcount",
      "s3://ruanbekker.repo/datasets/concat-64/",
      "/wordcount-output/"
      ],
    "Type":"CUSTOM_JAR",
    "ActionOnFailure":"TERMINATE_CLUSTER",
    "Jar":"s3://support.elasticmapreduce/training/jobs/Wordcount-1.0-SNAPSHOT-job.jar",
    "Properties":"",
    "Name":"Wordcount with Concatenated Text Files"
    }
  ]
' \
--instance-groups '
[
  {
    "InstanceCount":2,
    "InstanceGroupType":"CORE",
    "InstanceType":"m3.xlarge",
    "Name":"Core - 2"
    },
  {
    "InstanceCount":1,
    "InstanceGroupType":"MASTER",
    "InstanceType":"m3.xlarge",
    "Name":"Master - 1"
    }
  ]
' \
--configurations '
[
  {
    "Classification":"hbase",
    "Properties":{
      "hbase.emr.storageMode":"s3"
      },
    "Configurations":[]
    },
  {
    "Classification":"hbase-site",
    "Properties":{
      "hbase.rootdir":"s3://ruanbekker/hbase/store/"
      },
    "Configurations":[]
    }
  ]
' \
--auto-terminate \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--service-role EMR_DefaultRole \
--enable-debugging \
--name 'Wordcount Cluster with 64MB Concat Data' \
--scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
--region eu-west-1

EMR Cluster Results:

Look at that Results:

* Elapsed Time: 1 minute

If we look at the step logs, we can see the job's results from the controller log:

$ zcat logs/elasticmapreduce/j-<CLUSTERID>/steps/s-<STEPID>/controller.gz
..
2017-06-26T14:49:22.838Z INFO Step created jobs: job_1498488118826_0001
2017-06-26T14:49:22.839Z INFO Step succeeded with exitCode 0 and took 72 seconds

From 72 minutes to 1 minute, that is one massive improvement!

Although this, was just with test data, and my nodes was not working that hard, imagine a real life example where you have masses amount of data that takes longer to process.

And if its a case where lots of data is small of size, using s3-dist-cp could potentially save you a lot of money, and get the job done at a much faster time.

Resources:

For more performance and best practices, have a look at the following resources: