Setting up the DAG

Starting with a DAG definition

If you are at AWS Event and/or skipped through the manual setup, you can navigate to the S3 console to note down the S3 Bucket Name that has been created for the workshop.

An Airflow DAG is defined as a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

As first steps in defining the DAG, lets import all the required modules (Operators and Sensors) that we will use as part of the data pipeline -

from datetime import timedelta  
import airflow  
from airflow import DAG  
from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator 
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor

# Custom Operators deployed as Airflow plugins
from awsairflowlib.operators.aws_glue_job_operator import AWSGlueJobOperator
from awsairflowlib.operators.aws_glue_crawler_operator import AWSGlueCrawlerOperator
from awsairflowlib.operators.aws_copy_s3_to_redshift import CopyS3ToRedshiftOperator

Next we will define the default set of parameters for the Airflow DAG. We will also specify the S3 bucket name where the data and the scripts will be stored.

Edit the S3_BUCKET_NAME in the code snippet below to your workshop bucket

S3_BUCKET_NAME = "airflow-yourname-bucket"  
  
default_args = {  
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

We will then proceed to create a DAG object using the default arguments, and specifying the schedule interval at which the DAG should be executed.

dag = DAG(  
    'data_pipeline',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

Next, we are going to add code blocks for individual steps to be included as part of the data pipeline.