The last bit of code in the DAG is to define the flow/dependencies between all the tasks added into the dag. We can setup the dependencies as shown below -
s3_sensor >> glue_crawler >> glue_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover >> copy_agg_to_redshift
That brings us to the end of the DAG definition. We can put together all the steps into a individual DAG file data_pipeline.py and copy the script into the s3://airflow-yourname-bucket/dags folder.
After uploading the dag code on S3 it could take up to a minute before the DAG shows up in the Airflow UI. You can access the Airflow UI by clicking on the link in the Airflow Console.
The DAG will be initially set to disabled state by default.
You will need to enable the DAG (by switching the On/Off toggle button) to be picked up by the scheduler. Once the DAG is enabled we can also run the DAG manually.
To trigger the DAG, click on the play button shown below. In the next page you would be asked for the configuration for the trigger. Nothing needs to be done here, just simply click on the Trigger button in the UI to get the DAG running.
The Airflow DAG will now wait for the files to arrive in S3 in the path s3://airflow-yourname-bucket/data/raw/green/.
To initiate the DAG run, copy over the data files into the specified path using the AWS S3 Cli command from the Cloud9 console.
Edit the S3 BUCKET NAME in the s3 copy command below to your workshop bucket
aws s3 cp s3://nyc-tlc/trip\ data/green_tripdata_2020-06.csv s3://airflow-yourname-bucket/data/raw/green/
Once the files are copied into the folder, the s3_sensor task will move to complete, and the execution will be passed over to the next set of tasks in the DAG.
In case of any issues the task will be marked as Failed (Red), you can click on the task and View Log to identify the reason for failure.
The entire DAG will complete in a few minutes, and during the DAG run you can visit individual services console to view the execution details.
On successful completion, you will see all the tasks marked in green as below -
You will be able to query the data copied in Redshift, and also the intermediate transformation results as well as the raw data stored in S3 using Amazon Athena.
To query the data with Athena, you will need to create a crawler pointing to the transformed and aggregated folders in S3 as done previously with the raw folder and run it manually from the console, or include it as a step in the Airflow DAG.
That’s it! We have orchestrated a data pipeline which gets triggered when the data files arrive in S3, registers the metadata, transforms the data into Parquet format with AWS Glue, performs aggregation with EMR Spark, and loads the results into Redshift!