Skip to content

Commit

Permalink
[update]
Browse files Browse the repository at this point in the history
  • Loading branch information
oceanumeric committed May 31, 2024
1 parent 9bbd34a commit 5d2165e
Showing 1 changed file with 96 additions and 2 deletions.
98 changes: 96 additions & 2 deletions _posts/2024-05-02-etl.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ We will use the following tools to do ETL:
- <span class="emphasis">PostgreSQL</span>: postgres is an open-source relational database management system that is widely used for storing and managing data. We will use postgres to store the data that we extract, transform, and load.
- <span class="emphasis">dbt</span>: dbt is a command-line tool that enables data analysts and engineers to transform data in their warehouse more effectively. dbt allows you to write SQL queries to transform your data and then run those queries in a reproducible and scalable way.
- <span class="emphasis">Airflow</span>: Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. We will use Apache Airflow to orchestrate our ETL process and ensure that it runs smoothly.
- basic cloud services knowledge: we will use basic cloud services such as AWS S3, Google Cloud Storage, or Azure Blob Storage or DigitalOcean Spaces to store the data that we extract, transform, and load.
- <span class="emphasis">basic cloud services knowledge</span>: we will use basic cloud services such as AWS S3, Google Cloud Storage, or Azure Blob Storage or DigitalOcean Spaces to store the data that we extract, transform, and load.


## Step 0: upload data to the cloud
Expand All @@ -21,4 +21,98 @@ Again, <span class="exploration"> most of time you do not need to know how to up

- [s3cmd with DigitalOcean Spaces](https://docs.digitalocean.com/products/spaces/reference/s3cmd-usage/){:target="_blank"}
- [s3cmd with AWS S3](https://simplebackups.com/blog/mastering-s3-sync-s3cmd-rclone-ultimate-guide/){:target="_blank"}
- [s3cmd with Google Cloud Storage](https://addshore.com/2022/09/google-cloud-storage-upload-with-s3cmd/){:target="_blank"}
- [s3cmd with Google Cloud Storage](https://addshore.com/2022/09/google-cloud-storage-upload-with-s3cmd/){:target="_blank"}

Sometimes, you might have a small data file like `csv` or `json`. In this case, you can use `psycopg2` to upload the data to the cloud. Here is my project structure:

```plaintext
(venv) ➜ dbt-postgresql tree -L 1
.
β”œβ”€β”€ Holidu # dbt folder
β”œβ”€β”€ data
β”œ β”œβ”€β”€ prices-data-2022.csv
β”œβ”€β”€ logs
β”œβ”€β”€ poetry.lock
β”œβ”€β”€ pyproject.toml
β”œβ”€β”€ src
β”œ β”œβ”€β”€ upload_data.py
└── venv
```

We will use `upload_data.py` as a small example to show you how to upload data to the postgres database on the cloud. For most of the data, the SQL script we will run via `psycopg2` will could do the data type inference for you. However, for `prices-data-2022.csv` (UK house price data), we do not have the column names in the first row. Therefore, we need to specify the column names and data types in the SQL script. Here is the content of `upload_data.py`:

```python
import os
import timeit
import psycopg2
import pandas as pd
from loguru import logger
from dotenv import load_dotenv


load_dotenv()


db_host = os.getenv("POSTGRESQL_DB_HOST")
db_user = os.getenv("POSTGRESQL_DB_USER")
db_password = os.getenv("POSTGRESQL_DB_PASSWORD")
database_name = "Holidu" # Database name


def upload_data():
# read data and do not use the first column as index
df = pd.read_csv("../data/prices-data-2020.csv", header=None)
print(df.head())

# create connection
pg_conn = psycopg2.connect(
dbname=database_name, user=db_user, password=db_password, host=db_host
)
cur = pg_conn.cursor()

sql = """
CREATE TABLE IF NOT EXISTS public.uk_house_price (
transaction_id text PRIMARY KEY,
price text,
date_of_transfer text,
postcode text,
property_type text,
old_or_new text,
duration text,
paon text,
saon text,
street text,
locality text,
town_or_city text,
district text,
county text,
ppd_category_type text,
record_status text
)
"""

cur.execute(sql)
pg_conn.commit()


st_time = timeit.default_timer()
# upload data
with open("../data/prices-data-2020.csv", "r") as f:
# useing copy_expert to upload data
# there is no header in the csv file
cur.copy_expert("COPY uk_house_price FROM STDIN WITH CSV", f)

pg_conn.commit()
cur.close()
ed_time = timeit.default_timer()

logger.info(f"Time taken to upload data: {ed_time - st_time:.2f} seconds")

pg_conn.close()

logger.info("Data uploaded successfully")


if __name__ == "__main__":
upload_data()
```

0 comments on commit 5d2165e

Please sign in to comment.