This repository contains the steps and code to perform ETL (Extract, Transform, Load) on NYC taxi data from the year 2017. The dataset is available for download and the process involves setting up the necessary infrastructure, importing data into a relational database, and then processing the data for analysis.
- AWS subscription for access to RDS, EMR.
- Python
The data set for this project can be downloaded from the following links:
-
Create an AWS EMR instance
m4.xlarge
with 20GB storage and the following services:- Hadoop
- HBase
- Sqoop
-
Download all the datasets in /home/hadoop in the EMR.
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-01.csv
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-02.csv
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-03.csv
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-04.csv
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-05.csv
wget https://nyc-tlc-upgrad.s3.amazonaws.com/yellow_tripdata_2017-06.csv
- Rename the files for ease
mv yellow_tripdata_2017-01.csv jan.csv
mv yellow_tripdata_2017-02.csv feb.csv
mv yellow_tripdata_2017-03.csv mar.csv
mv yellow_tripdata_2017-04.csv apr.csv
mv yellow_tripdata_2017-05.csv may.csv
mv yellow_tripdata_2017-06.csv jun.csv
-
Create an RDS instance with MySQL database.
-
Set up a connection between the RDS and EC2 instance (EMR master node).
-
Connect to RDS from your EMR instance using the following command:
mysql -h your_rds_endpoint -P 3306 -u admin -p
-
Create a new database named
nyctaxi
:create database nyctaxi; use nyctaxi;
-
Create the table schema for taxi datasets:
CREATE TABLE taxi_2017 ( VendorID INT, tpep_pickup_datetime DATETIME, tpep_dropoff_datetime DATETIME, passenger_count INT, trip_distance FLOAT, RatecodeID INT, store_and_fwd_flag VARCHAR(1), PULocationID INT, DOLocationID INT, payment_type INT, fare_amount FLOAT, extra FLOAT, mta_tax FLOAT, tip_amount FLOAT, tolls_amount FLOAT, improvement_surcharge FLOAT, total_amount FLOAT, congestion_surcharge FLOAT, airport_fee FLOAT ); desc taxi_2017;
-
Load the data into the RDS table from the downloaded CSV files:
LOAD DATA LOCAL INFILE '/home/hadoop/jan.csv' INTO TABLE taxi_2017 FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES; LOAD DATA LOCAL INFILE '/home/hadoop/feb.csv' INTO TABLE taxi_2017 FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES;
-
Add an auto incrementing Primary key to the table to use as row_key in Hbase table later on.
ALTER TABLE taxi_2017
ADD COLUMN id INT AUTO_INCREMENT PRIMARY KEY;
- Verify that the data is properly loaded.
select * from taxi_2017 limit 5;
- Setup Sqoop to connect to RDS
sudo -i
wget https://de-mysql-connector.s3.amazonaws.com/mysql-connector-java-8.0.25.tar.gz
tar -xvf mysql-connector-java-8.0.25.tar.gz
cd mysql-connector-java-8.0.25/
sudo cp mysql-connector-java-8.0.25.jar /usr/lib/sqoop/lib/
- In HBase shell, create the Hbase table :
create 'taxidata', {NAME => 'trip_info'},{NAME => 'fare_info'}
- Ingest data from RDS to HBase using Sqoop:
# Import trip_info columns
sqoop import \
--connect jdbc:mysql://your_rds_endpoint:3306/nyctaxi \
--username your_username \
--password your_password \
--table taxi_2017 \
--columns "id,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type" \
--hbase-table taxidata --column-family trip_info \
--hbase-row-key id \
--fields-terminated-by '|' \
-m 8
# Import fare_info columns
sqoop import \
--connect jdbc:mysql://your_rds_endpoint:3306/nyctaxi \
--username your_username \
--password your_password \
--table taxi_2017 \
--columns "id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee" \
--hbase-table taxidata --column-family fare_info \
--hbase-row-key id \
--fields-terminated-by '|' \
-m 8
- Install Happybase in EMR following below steps:
sudo -i
sudo yum update
yum install gcc
sudo yum install python3-devel
pip install happybase
jps
Ensure Thriftserver is running, if not, use command: hbase thrift start
- Bulk import data from specific CSV files (e.g.,
mar.csv
andapr.csv
) on your EMR cluster to your HBase table usingbatch_insert.py
- Install MRJob in EMR:
sudo -i
pip install mrjob
- Use locally downloaded files as input and run MapReduce tasks (e.g.,
mrtask_a.py
) for data processing. Command to run the files:
python mrtask_a.py -r hadoop path_to_input_data_folder > path_to_output_folder/out_a.txt
-
Export results of MapReduce tasks from HDFS to RDS for specific tasks (e.g., tasks c, d, e, f).
-
Run the
transform.py
file to transform the output of mrtask_f. -
Create the necessary tables in RDS for exporting results. Run the results.sql file to create the necessary tables for outputs of mapreduce tasks.
source /home/hadoop/results.sql
- Create Reports in PowerBI by connecting to the Results database in RDS.
This ETL project demonstrates the process of extracting, transforming, and loading NYC taxi data for analysis. The infrastructure setup, data ingestion into RDS and into HBase using Sqoop, MapReduce processing and exporting results to RDS are covered in this project.
Please ensure you have the necessary permissions and access rights for the AWS services and databases mentioned in this project. Modify the commands and settings according to your specific environment and requirements.