AWS: Create EMR Cluster with Java SDK Examples

Today, providing some basic examples on creating a EMR Cluster and adding steps to the cluster with the AWS Java SDK.

This tutorial will show how to create an EMR Cluster in eu-west-1 with 1x m3.xlarge Master Node and 2x m3.xlarge Core nodes, with Hive and Spark and also submit a simple wordcount via a Step.

On the second example, I will show how to send work to your running EMR Cluster via a Step.

Pre-Requisites:
https://aws.amazon.com/sdk-for-java/

EMR Java SDK Examples:

  • Create EMR Cluster
  • Submit Steps to a Running EMR Cluster

Create EMR Cluster:

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

public class main {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;		
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                    "Please make sure that your credentials file is at the correct " +
                    "location (/home/<user>/.aws/credentials), and is in valid format.",
                    e);
        }
		
		AmazonElasticMapReduceClient emr = new  AmazonElasticMapReduceClient(credentials_profile);
		Region euWest1= Region.getRegion(Regions.EU_WEST_1);
        	emr.setRegion(euWest1);

		StepFactory stepFactory = new StepFactory(); 
		StepConfig enabledebugging = new StepConfig()
   			.withName("Enable debugging")
   			.withActionOnFailure("TERMINATE_JOB_FLOW")
   			.withHadoopJarStep(stepFactory.newEnableDebuggingStep());

        HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig()
        	.withJar("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
        	.withMainClass("wordcount")
        	.withArgs("file:///etc/services") 
        	.withArgs("/data");
        
        StepConfig customExampleStep = new StepConfig()
        	.withName("Example Step")
        	.withActionOnFailure("CONTINUE")
        	.withHadoopJarStep(runExampleConfig);
        
		Application hive = new Application().withName("Hive");
		Application spark = new Application().withName("Spark");
		
		RunJobFlowRequest request = new RunJobFlowRequest()
	       		.withName("Cluster with Java SDK")
	       		.withReleaseLabel("emr-5.3.1")
	       		.withSteps(enabledebugging, customExampleStep)
	       		.withApplications(hive)
	       		.withApplications(spark)
	       		.withLogUri("s3://<bucket>/logs/")
	       		.withServiceRole("EMR_DefaultRole")
	       		.withJobFlowRole("EMR_EC2_DefaultRole")
	       		.withInstances(new JobFlowInstancesConfig()
	       	   		.withEc2SubnetId("<subnet-id>")
	           		.withEc2KeyName("<key-name>")
	           		.withInstanceCount(3)
	           		.withKeepJobFlowAliveWhenNoSteps(true)    
	           		.withMasterInstanceType("m3.xlarge")
	           		.withSlaveInstanceType("m3.xlarge"));

	   RunJobFlowResult result = emr.runJobFlow(request);  
	   System.out.println("This is result: " + result.toString());
	   
	}
}

The expected output should show the cluster id:
ClusterId: j-xxxxxxxx

Submit Steps to a Running EMR Cluster:

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

public class main {

	public static void main(String[] args) {
		
		AWSCredentials credentials_profile = null;
		
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                    "Please make sure that your credentials file is at the correct " +
                    "location (/home/<user>/.aws/credentials), and is in valid format.",
                    e);
        }
		
		AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials_profile);
		Region euWest1= Region.getRegion(Regions.EU_WEST_1);
		emr.setRegion(euWest1);
		
		StepConfig hive = new StepConfig("Hive", new StepFactory().newInstallHiveStep());
	    	
	        HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig()
        		.withJar("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
        		.withMainClass("wordcount")
        		.withArgs("file:///etc/services") 
        		.withArgs("/data2");
	    	
	        StepConfig customExampleStep = new StepConfig()
        		.withName("Example Step")
        		.withActionOnFailure("CONTINUE")
        		.withHadoopJarStep(runExampleConfig);

	    	AddJobFlowStepsResult result = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
			.withJobFlowId("<cluster-id>")
			.withSteps(customExampleStep));
	    
          	System.out.println(result.getStepIds());

	}
}

The expected output should show the step id:
s-xxxxxxxxxx