This is an optional module. Include this task in your DAG in case you have setup a new Redshift cluster as part of this workshop, or have access to an existing Redshift cluster.
We can now add a final step in the DAG to copy the aggregated data from S3 to a table defined in Redshift. As a pre-requisite for this, the redshift_default connection needs to be set through Airflow connections to point to the redshift cluster.
To complete this setup
redshift_defaultas the Conn Id
Go to the Redshift console »
Click on the cluster created and copy the Endpoint.
Only use the Redshift endpoint name without the port/schema i.e. 5439/dev for the Host value above
If you have previously created the Redshift table during setup, then skip this step.
-- Create nyc schema. CREATE schema nyc;
-- Create agg_green_rides table. CREATE TABLE IF not EXISTS nyc.green ( pulocationid bigint, trip_type bigint, payment_type bigint, total_fare_amount float );
Once that’s completed, add the below block of code into the DAG script.
Replace the iam_role_arn with the Redshift IAM role attached
to your cluster
To get the Redshift IAM role ARN navigate to the IAM Console - Roles, search for AmazonMWAA-workshop-redshift-role, click on the Role listed, and copy the Role ARN to replaced in the snippet below.
copy_agg_to_redshift = CopyS3ToRedshiftOperator( task_id='copy_to_redshift', schema='nyc', table='green', s3_bucket=S3_BUCKET_NAME, s3_key='data/aggregated', iam_role_arn='arn:aws:iam::1111111111111:role/AmazonMWAA-workshop-redshift-role', copy_options=["FORMAT AS PARQUET"], dag=dag, )
The plugins deployed to Airflow instance also includes a custom operator to copy data from S3 to Redshift using the IAM ROLE ARN.
With this, we have completed adding all the individual steps in the data pipeline DAG.
Proceed to the next step to chain it up together and execute the data pipleine.