Building the DAG

As first steps in defining the DAG, lets import all the required modules (Operators and Sensors) that we will use as part of the data pipeline -

from datetime import timedelta  
import airflow  
from airflow import DAG  
from airflow.models import DAG
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor  

# airflow operators
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

# airflow sagemaker operators
from airflow.contrib.operators.sagemaker_training_operator \
    import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_tuning_operator \
    import SageMakerTuningOperator
from airflow.contrib.operators.sagemaker_transform_operator \
    import SageMakerTransformOperator
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.utils.trigger_rule import TriggerRule

# sagemaker sdk
import boto3
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner

# airflow sagemaker configuration
from sagemaker.workflow.airflow import training_config
#from sagemaker.workflow.airflow import tuning_config
from sagemaker.workflow.airflow import transform_config_from_estimator

Add in the custom scripts/files to be imported


# Import preprocess file.
from preprocess import preprocess

# Import config file.
import config as cfg


def get_sagemaker_role_arn(role_name, region_name):
    iam = boto3.client('iam', region_name=region_name)
    response = iam.get_role(RoleName=role_name)
    return response["Role"]["Arn"]

Next we will define the default set of parameters for the Airflow DAG. We will also specify the S3 bucket name where the data and the scripts will be stored.

Edit the S3_BUCKET_NAME and the REGION_NAME in the code snippet below to your workshop bucket and region


S3_BUCKET_NAME = "airflow-yourname-bucket"
REGION_NAME = "us-east-1"
  
default_args = {  
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ['airflow@iloveairflow.com'],
    'email_on_failure': True,
    'email_on_retry': False
}

Next, we are going to setup all the configurations used by the DAG, including the training and transformation config to define the estimator.

#-----------
### Setup Train and Tunning Tasks. 
#-----------

# read config file
config = cfg.get_config(S3_BUCKET_NAME, REGION_NAME)

# set configuration for tasks
hook = AwsHook(aws_conn_id='airflow-sagemaker')
region = config["job_level"]["region_name"]
sess = hook.get_session(region_name=region)
role = get_sagemaker_role_arn(
    config["train_model"]["sagemaker_role"],
    sess.region_name)
container = get_image_uri(sess.region_name, 'xgboost', repo_version='1.0-1')


hpo_enabled = False

train_input = config["train_model"]["inputs"]["train"]
csv_train_input = sagemaker.session.s3_input(train_input, content_type='csv')

validation_input = config["train_model"]["inputs"]["validation"]
csv_validation_input = sagemaker.session.s3_input(validation_input, content_type='csv')

training_inputs = {"train": csv_train_input, "validation": csv_validation_input}

output_path = config["train_model"]["estimator_config"]["output_path"]

fm_estimator = Estimator(image_name=container, 
    role=role,
    train_instance_count=1, 
    train_instance_type='ml.m5.2xlarge', 
    train_volume_size=5, # 5 GB 
    output_path=output_path,
    sagemaker_session=sagemaker.session.Session(sess),
    #train_use_spot_instances=True,
    #train_max_run=300,
    #train_max_wait=600
    )

fm_estimator.set_hyperparameters(max_depth=5,
    eta=0.2,
    #gamma=4,
    #min_child_weight=300,
    #subsample=0.8,
    #silent=0,
    objective='reg:linear',
    early_stopping_rounds=10,
    num_round=150)


# train_config specifies SageMaker training configuration
train_config = training_config(
    estimator=fm_estimator,
    inputs=training_inputs)


# create transform config
transform_config = transform_config_from_estimator(
    estimator=fm_estimator,
    task_id="model_tuning" if hpo_enabled else "model_training",
    task_type="tuning" if hpo_enabled else "training",
    **config["batch_transform"]["transform_config"]
)

We will then proceed to create a DAG object using the default arguments, and specifying the schedule interval at which the DAG should be executed.


#-------
### Start creating DAGs
#-------

dag = DAG(  
    'ml_pipeline',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

s3_sensor = S3PrefixSensor(
    task_id='s3_sensor',
    bucket_name=S3_BUCKET_NAME,
    prefix='raw/',
    dag=dag
)

Preprocessing

We will add in the preprocessing task which will generate the training and test data sets.

# Create python operator to call our preprocess function (preprocess.py file).
preprocess_task = PythonOperator(
    task_id='preprocessing',
    provide_context=False,
    python_callable=preprocess,
    op_kwargs={'bucket_name': S3_BUCKET_NAME},
    dag=dag)

Model training

  • We’ll train the Amazon SageMaker XG-Boost algorithm by launching a training job using Airflow Amazon SageMaker Operators.
  • Use SageMakerTrainingOperator to run a training job by setting the hyperparameters known to work for your data.
# launch sagemaker training job and wait until it completes
train_model_task = SageMakerTrainingOperator(
    task_id='model_training',
    dag=dag,
    config=train_config,
    aws_conn_id='airflow-sagemaker',
    wait_for_completion=True,
    check_interval=30
)

Model inference

Using the Airflow SageMakerTransformOperator, create an Amazon SageMaker batch transform job to perform batch inference on the test dataset to evaluate performance of the model.

# launch sagemaker batch transform job and wait until it completes
batch_transform_task = SageMakerTransformOperator(
    task_id='predicting',
    dag=dag,
    config=transform_config,
    aws_conn_id='airflow-sagemaker',
    wait_for_completion=True,
    check_interval=30,
    trigger_rule=TriggerRule.ONE_SUCCESS
)

You could also include a SageMakerTuningOperator to run a hyperparameter tuning job to find the best model by running many jobs that test a range of hyperparameters on your dataset. Refer to the blog post for more details

That is all the tasks that we will include as part of ML pipeline. Next step is to chain them up together, deploy the DAG to Airflow and run the DAG.