Stitching the DAG tasks together

Finally Specify task dependencies.

# set the dependencies between tasks
s3_sensor >> preprocess_task >> train_model_task >> batch_transform_task

That brings us to the end of the DAG definition. We can put together all the steps into a individual DAG file ml_pipeline.py and copy the script along with the config.py and the preprocess.py code/files into the s3://airflow-yourname-bucket/dags folder.

Once the script is deployed, it can be viewed from the Airflow UI within in a minute or so.

Data Pipeline in Action

The DAG will be initially set to disabled state by default. Enable the DAG using the On/Off toggle button to be picked up by the scheduler. We can also trigger the DAG manually.

To trigger the DAG, click on the play button shown below, and in the next page choose Trigger.

The Airflow DAG will now wait for the files to arrive in S3 in the path s3://airflow-yourname-bucket/raw/.

To initiate the DAG run,

  1. Go to S3 Console
  2. Click on your bucket created for the workshop
  3. Click on Create folder
  4. Enter raw for the Folder name, and click on Create folder
  5. Copy over the train_1.csv file into the S3 BUCKET folder - s3://airflow-yourname-bucket/raw/ created above.

Once the file is copied into the folder, the s3_sensor task will move to complete, and the execution will be passed over to the next set of tasks in the DAG.

In case the preprocessing airflow task gets triggered before the training file upload is completed, you will see the task being marked as failed. To reset the status, you can click on preprocessing task in the Airflow UI (under Graph View) and click on Clear under Task Actions in the popup window.

The entire DAG will complete in a few minutes, and during the DAG run you can visit Amazon Sagemaker console to view the Job details -

  • Training Job - Here
  • Transform Job - Here

At the end of the DAG run, the result of the batch transform job will be written into the S3 path - s3://airflow-yourname-bucket/transform/

The trained model will also be deployed to the S3 path s3://airflow-yourname-bucket/xgboost/output/

You can download the result CSV file to look at the predicted values for the fare prices.


In this module, you have seen that building an ML workflow involves quite a bit of preparation but it helps improve the rate of experimentation, engineering productivity, and maintenance of repetitive ML tasks.

Airflow Amazon SageMaker Operators provide a convenient way to build ML workflows and integrate with Amazon SageMaker. You can extend the workflows by customizing the Airflow DAGs with any tasks that better fit your ML workflows, such as feature engineering, creating an ensemble of training models, creating parallel training jobs, and retraining models to adapt to the data distribution changes.