Setting up the DAG

Starting with a DAG definition

If you are at AWS Event and/or skipped through the manual setup, you can navigate to the S3 console to note down the S3 Bucket Name that has been created for the workshop.

An Airflow DAG is defined as a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

As first steps in defining the DAG, lets import all the required modules (Operators and Sensors) that we will use as part of the data pipeline -

from datetime import timedelta  
import airflow  
from airflow import DAG  
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor  
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator  
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator  
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor  
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator  

# Custom Operators deployed as Airflow plugins
from awsairflowlib.operators.aws_glue_job_operator import AWSGlueJobOperator
from awsairflowlib.operators.aws_glue_crawler_operator import AWSGlueCrawlerOperator
from awsairflowlib.operators.aws_copy_s3_to_redshift import CopyS3ToRedshiftOperator  

Next we will define the default set of parameters for the Airflow DAG. We will also specify the S3 bucket name where the data and the scripts will be stored.

Edit the S3_BUCKET_NAME in the code snippet below to your workshop bucket

S3_BUCKET_NAME = "airflow-yourname-bucket"  
  
default_args = {  
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

We will then proceed to create a DAG object using the default arguments, and specifying the schedule interval at which the DAG should be executed.

dag = DAG(  
    'data_pipeline',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

Next, we are going to add code blocks for individual steps to be included as part of the data pipeline.