As discussed in the last module Airflow helps us to author a workflow using python wherein we define all the stages of a data pipeline. In this module we will understand these core concepts which we need to use to create these workflows / pipelines.

Key Concepts

DAG (Directed Acyclic Graph): DAGs are collections of tasks and describe how to run a workflow written in Python. Pipelines are designed as a directed acyclic graph by dividing a pipeline into tasks that can be executed independently. Then these tasks are combined logically as a graph.

Task: A Task defines a unit of work within a DAG; it is represented as a node in the DAG graph. Each task is an implementation of an Operator, for example a PythonOperator to execute some Python code, or a BashOperator to run a Bash command. After an operator is instantiated, it’s referred to as a “task.”

Task instance: A task instance represents a specific run of a task characterized by a DAG, a task, and a point in time.

Operators: Operators are atomic components in a DAG describing a single task in the pipeline. They determine what gets done in that task when a DAG runs. Airflow provides operators for common tasks. It is extensible, so you can define your own custom operators.

Sensors: Are special types of operators whose purpose is to wait on some external or internal trigger. Some common types of sensors are:

  • ExternalTaskSensor: waits on another task (in a different DAG) to complete execution.
  • HivePartitionSensor: waits for a specific value of partition of hive table to get created
  • S3KeySensor: S3 Key sensors are used to wait for a specific file or directory to be available on an S3 bucket.

Hooks: Provide a uniform interface to access external services like S3, MySQL, Hive, EMR, etc. Hooks are the building blocks for operators to interact with external services.

Scheduling: The DAGs and tasks can be run on demand or can be scheduled to be run at a certain frequency defined as a cron expression in the DAG.

Basics of writing a DAG

Step-1: Import Modules

Step-2: Define default args

Step-3: Instantiate DAGs / Create a DAGs object

Step-4: Define Tasks

Step-5: Define Dependencies

Below is a DAG base template which you can use and build upon during this workshop:

# Step-1: Import modules 
from airflow import DAG 
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

# Step-2: Define default args
default_args = {
            "owner": "airflow",
            "start_date": datetime(2021, 01, 25),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "",
            "retries": 1,
            "retry_delay": timedelta(minutes=5)

# Step-3: Instantiate DAGs --- or creating a DAG object
dag = DAG(dag_id='DAG-1', 

# Step-4: Define Tasks
start = DummyOperator(task_id='start',dag=dag)
end = DummyOperator(task_id='end',dag=dag)

# Step-5: Define dependencies
start >> end