Data Preparation

Data Preparation

As part of data preparation, the Pandas DataFrame will be used. The following steps are performed in the data preparation task:

  • Identify missing data / null values.
  • Perform outlier detection
  • Perform data transformation (Date format)
  • Split the cleaned data set into train and test data sets.
  • Copy the prepared files to an Amazon S3 bucket for training the model.

Create a file preprocess.py with the following code -

import pandas as pd
import numpy as np
import boto3
import os 

def preprocess(bucket_name):    # we assign a function which we will call in our main DAG file -- using python operator.

    my_region = boto3.session.Session().region_name # set the region of the instance

    # set an output path where the trained model will be saved.
    prefix = 'xgboost'
    output_path ='s3://{}/{}/output'.format(bucket_name, prefix)

    # Download file from S3 bucket and load in dataframe (model_data)
    prefix_1 = 'raw'   # Enter your folder where you will upload your dataet file
    data_file = 'train_1.csv'    # Enter the name of your dataset file
    data_location = 's3://{}/{}/{}'.format(bucket_name,prefix_1,data_file)

    df = pd.read_csv(data_location)

    # Check for missing data
    #df.isnull().sum() 

    # Here we can see that Coloumn-2 "Pickup_datetime" is an object ---> which we need to convert to "datetime_object" to use in ML algorithms.
    # Pandas can do that easily.
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], format= '%Y-%m-%d %H:%M:%S UTC')


    # Since we see from above table Lat & Long are not in correct range of NYC -- we will drop the fields which fall outisde these ranges.
    # we can filter the data using the df.loc function in Pandas.
    df = df.loc[df['pickup_latitude'].between(40,42)]
    df = df.loc[df['pickup_longitude'].between(-75,-72)]
    df = df.loc[df['dropoff_latitude'].between(40,42)]
    df = df.loc[df['dropoff_longitude'].between(-75,-72)]


    # Now lets try to fix the "fare amount" and "passenger-count" 
    df = df.loc[df['fare_amount'] > 2.5]   # US$ 2.50 is a minimum fare taxi will charge - so we are considering only those fields who are above $2.50.
    df = df.loc[df['passenger_count'] > 0]


    # Here we can see 1 outlier -- which is 9 passengers, which seem incorrect.
    # Lets drop those outliers
    df = df.loc[df['passenger_count'] <=6]

    # Lets create new columns 'Year', 'month', 'Day' etc... from a single column "pickup_datetime".
    df['year']=df.pickup_datetime.dt.year
    df['month']=df.pickup_datetime.dt.month
    df['day']=df.pickup_datetime.dt.day
    df['weekday']=df.pickup_datetime.dt.weekday
    df['hour']=df.pickup_datetime.dt.hour


    # Lets calculate - distance now.
    def haversine_np(lon1, lat1, lon2, lat2):

        """
        Calculate the great circle distance between two points
        on the earth (specified in decimal degrees)

        All args must be of equal length.    

        """
        lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

        dlon = lon2 - lon1
        dlat = lat2 - lat1

        a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2

        c = 2 * np.arcsin(np.sqrt(a))
        km = 6367 * c
        return km


    # Now lets create one more column 'distance'.
    df['distance'] = haversine_np(df['pickup_longitude'],df['pickup_latitude'],df['dropoff_longitude'],df['dropoff_latitude'])


    # We can see above that there are some points "min" -- which has zero distance - lets try to dop those fields.
    df = df.loc[df['distance'] > 0]

    # But before we pass our dataset to a algorithm to create a model --- lets drop the features which we don't need.
    # For e.g. 'key' and 'pickup_datetime' -- becuase we have already extracted all those data in other columns.

    del df['pickup_datetime']


    # Train, Test Split
    train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(.6*len(df)), int(.8*len(df))])
    print(train_data.shape, validation_data.shape, test_data.shape)


    # Lets create a csv file from this 'train_data' and upload to S3 bucket --- under 'xgboost' prefix.
    train_data.to_csv('train.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')

    # Lets perform same steps for validatation data.
    validation_data.to_csv('validate.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'validate/validate.csv')).upload_file('validate.csv')


    del test_data['fare_amount']

    test_data.to_csv('test.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')


The preprocess step will write CSV files for training, validation and test into the S3 path s3://airflow-yourname-bucket/xgboost/.

We will be deploying the preprocess.py along with the DAGs to Airflow