Today, providing some basic examples on creating a EMR Cluster and adding steps to the cluster with the AWS Java SDK.
This tutorial will show how to create an EMR Cluster in eu-west-1 with 1x m3.xlarge Master Node and 2x m3.xlarge Core nodes, with Hive and Spark and also submit a simple wordcount via a Step.
On the second example, I will show how to send work to your running EMR Cluster via a Step.
Pre-Requisites:
https://aws.amazon.com/sdk-for-java/
EMR Java SDK Examples:
- Create EMR Cluster
- Submit Steps to a Running EMR Cluster
Create EMR Cluster:
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
public class main {
public static void main(String[] args) {
AWSCredentials credentials_profile = null;
try {
credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (/home/<user>/.aws/credentials), and is in valid format.",
e);
}
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials_profile);
Region euWest1= Region.getRegion(Regions.EU_WEST_1);
emr.setRegion(euWest1);
StepFactory stepFactory = new StepFactory();
StepConfig enabledebugging = new StepConfig()
.withName("Enable debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());
HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig()
.withJar("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
.withMainClass("wordcount")
.withArgs("file:///etc/services")
.withArgs("/data");
StepConfig customExampleStep = new StepConfig()
.withName("Example Step")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(runExampleConfig);
Application hive = new Application().withName("Hive");
Application spark = new Application().withName("Spark");
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("Cluster with Java SDK")
.withReleaseLabel("emr-5.3.1")
.withSteps(enabledebugging, customExampleStep)
.withApplications(hive)
.withApplications(spark)
.withLogUri("s3://<bucket>/logs/")
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withInstances(new JobFlowInstancesConfig()
.withEc2SubnetId("<subnet-id>")
.withEc2KeyName("<key-name>")
.withInstanceCount(3)
.withKeepJobFlowAliveWhenNoSteps(true)
.withMasterInstanceType("m3.xlarge")
.withSlaveInstanceType("m3.xlarge"));
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println("This is result: " + result.toString());
}
}
The expected output should show the cluster id:
ClusterId: j-xxxxxxxx
Submit Steps to a Running EMR Cluster:
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
public class main {
public static void main(String[] args) {
AWSCredentials credentials_profile = null;
try {
credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (/home/<user>/.aws/credentials), and is in valid format.",
e);
}
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials_profile);
Region euWest1= Region.getRegion(Regions.EU_WEST_1);
emr.setRegion(euWest1);
StepConfig hive = new StepConfig("Hive", new StepFactory().newInstallHiveStep());
HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig()
.withJar("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
.withMainClass("wordcount")
.withArgs("file:///etc/services")
.withArgs("/data2");
StepConfig customExampleStep = new StepConfig()
.withName("Example Step")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(runExampleConfig);
AddJobFlowStepsResult result = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
.withJobFlowId("<cluster-id>")
.withSteps(customExampleStep));
System.out.println(result.getStepIds());
}
}
The expected output should show the step id:
s-xxxxxxxxxx
Comments