EMR Job

The first step here is to copy the EMR Job script - nyc_aggregations.py (previously downloaded) to S3 path s3://airflow-yourname-bucket/scripts/emr/. You will need to create the emr folder under scripts if it wasn’t performed during setup.

Now, Apache Airflow has a default AWS Hook which can be used to interact with different services in Amazon Web Services. It also comes shipped with EMR operators and sensors (contributions from the developer community).

The configurations required for the EMR cluster to be created will by default be picked from the emr_default connection configuration. You can either edit the emr_default from the Airflow UI or define a override as shown below -

execution_date = "{{ execution_date }}"  
  
JOB_FLOW_OVERRIDES = {
    "Name": "Data-Pipeline-" + execution_date,
    "ReleaseLabel": "emr-5.29.0",
    "LogUri": "s3://{}/logs/emr/".format(S3_BUCKET_NAME),
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2
            }
        ],
        "TerminationProtected": False,
        "KeepJobFlowAliveWhenNoSteps": True
    }
}

Next, we will define the Job steps that will be submitted to the EMR cluster.

S3_URI = "s3://{}/scripts/emr/".format(S3_BUCKET_NAME)  
 
SPARK_TEST_STEPS = [
   {
       'Name': 'setup - copy files',
       'ActionOnFailure': 'CANCEL_AND_WAIT',
       'HadoopJarStep': {
           'Jar': 'command-runner.jar',
           'Args': ['aws', 's3', 'cp', '--recursive', S3_URI, '/home/hadoop/']
       }
   },
   {
       'Name': 'Run Spark',
       'ActionOnFailure': 'CANCEL_AND_WAIT',
       'HadoopJarStep': {
           'Jar': 'command-runner.jar',
           'Args': ['spark-submit',
                    '/home/hadoop/nyc_aggregations.py',
                    's3://{}/data/transformed/green'.format(S3_BUCKET_NAME),
                    's3://{}/data/aggregated/green'.format(S3_BUCKET_NAME)]
       }
   }
]

Once we have created the definitions for the EMR cluster and the Spark job, its time to define the tasks that will be part of the DAG. The first task is to create the EMR cluster

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_emr_cluster',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

Next we will create a task that will submit the step (Spark job) to the EMR cluster once the cluster has been created -

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

We will need to add a Watch task to wait for the Job to reach a completion stage (Successful or failed execution) before we terminate the cluster.

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

After the Job has completed its execution the DAG will run the cluster termination task

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

The above set of Tasks added to the DAG will perform - creation of an EMR cluster, execution the transformation job as a step , and terminate the cluster once the job is completed.