Redshift

This is an optional module. Include this task in your DAG in case you have setup a new Redshift cluster as part of this workshop, or have access to an existing Redshift cluster.

We can now add a final step in the DAG to copy the aggregated data from S3 to a table defined in Redshift. As a pre-requisite for this, the redshift_default connection needs to be set through Airflow connections to point to the redshift cluster.

To complete this setup

  • Go to Airflow Web UI (You can find the link in the MWAA console for your Airflow instance)
  • Click on Admin > Connections
  • Click on the + sign to add a new connection
  • Enter redshift_default as the Conn Id
  • Select Postgres from the Conn Type drop down
  • Enter the Redshift endpoint of your cluster as the Host

    Go to the Redshift console » Click on the cluster created and copy the Endpoint.
    Only use the Redshift endpoint name without the port/schema i.e. 5439/dev for the Host value above

  • Schema - dev
  • Login - admin
  • Password - Enter the password for the admin user (defaulted to - Password1)
  • Port - 5439
  • Click on Save

Airflow Connections


Create schema and redshift tables.

If you have previously created the Redshift table during setup, then skip this step.

  • Go to: Redshift Query Editor
  • Enter the appropriate database connection details (Default Database name is dev unless changed)
  • Click Connect
  • Execute the queries below to create schema and tables for the aggregated data.
--	Create nyc schema.
CREATE schema nyc;
--	Create agg_green_rides table.
CREATE TABLE IF not EXISTS nyc.green (
  pulocationid      bigint,
  trip_type         bigint,
  payment_type		bigint,
  total_fare_amount	float
);

Once that’s completed, add the below block of code into the DAG script.

Replace the iam_role_arn with the Redshift IAM role attached to your cluster

To get the Redshift IAM role ARN navigate to the IAM Console - Roles, search for AmazonMWAA-workshop-redshift-role, click on the Role listed, and copy the Role ARN to replaced in the snippet below.

copy_agg_to_redshift = CopyS3ToRedshiftOperator(
    task_id='copy_to_redshift',
    schema='nyc',
    table='green',
    s3_bucket=S3_BUCKET_NAME,
    s3_key='data/aggregated',
    iam_role_arn='arn:aws:iam::1111111111111:role/AmazonMWAA-workshop-redshift-role',
    copy_options=["FORMAT AS PARQUET"],
    dag=dag,
)

The plugins deployed to Airflow instance also includes a custom operator to copy data from S3 to Redshift using the IAM ROLE ARN.

With this, we have completed adding all the individual steps in the data pipeline DAG.

Proceed to the next step to chain it up together and execute the data pipleine.