Skip to content

Commit

Permalink
Adding dbt transformation layer
Browse files Browse the repository at this point in the history
  • Loading branch information
pevolution-ahmed committed Sep 15, 2022
1 parent 91fe219 commit 3298018
Show file tree
Hide file tree
Showing 21 changed files with 277 additions and 20 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.env
stock-dataops
stock-dataops
required_env_vars.txt
.vscode/
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
install:
pip install --upgrade pip &&\
pip install -r requirement.txt

start:
airflow webserver & airflow scheduler

format:
black *.y

lint:
pylint --disable=R,C ./dags/*.py

# test:
# python -m pyrest -vv --cov=
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# Stock Market Analytics DataOps Workflow
# Stock Market Analytic DataOps Workflow

## Projects TO-DO:
## Description:
DataOps Pipeline for Automating the ETL process of the stock market data and then build a BI product on top of these data whether it's a dashboard or a forecast predictive model.

## TO-DO:
- Save stocks ticker data from yahoo finance to Google BigQuery
- Create a Greate Expectation Suite and Checkpoints using the Greate Expectation package to validate and test the loaded data (Test suite)
- Create a Great Expectation Suite and Checkpoints using the Great Expectation package to validate and test the loaded data (Test suite)
- Add the following tasks:
- a task for formatting the code using black lib
- a task to check the linting using pylint
- a task to run unit tests using pytest, pytest-cov
- Setup A dbt-core project as a transformation layer above the source data

Binary file modified dags/__pycache__/etl_dag.cpython-310.pyc
Binary file not shown.
38 changes: 23 additions & 15 deletions dags/etl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,42 @@
GreatExpectationsOperator,
)
from packages.stock_data_access.stock_data import _save_stocks_data_to_gbq
from pathlib import Path
import os

DATASET_NAME = 'stocks_storage'
sa_path = os.environ.get('SERVICE_ACCOUNT_JSON_PATH')
GE_ROOT_DIR = os.getcwd() + "/great_expectations"
sa_path = os.environ.get('BQ_SERVICE_ACCOUNT_JSON')
BASE_DIR = Path('.').parent.parent.parent.absolute()
GE_ROOT_DIR = Path(BASE_DIR, "great_expectations")
DBT_ROOT_DIR = Path(BASE_DIR, "dbt_transformations")

with DAG(
"stocks-etl",
start_date=datetime(2022,8,1),
schedule_interval="@once",
catchup=False
) as dag:
# TODO: Check the exsistance of the dataset using get_dataset operator and branch operator
# If it's not exists create a new dataset with an operator
# if it's exists check if the table exists or not using a branch operator again:
# if it's not exists create the table then che
# if it's exists

upload_stocks_data_to_gbq = PythonOperator(
task_id = "save_stocks_data_to_gbq",
python_callable=_save_stocks_data_to_gbq,
op_args=['AMZN',2022,1,1,2022,2,1,sa_path]
)
validate_stocks_data = GreatExpectationsOperator(
task_id="validate_stocks_data",
checkpoint_name="stocks_expectations",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
return_json_dict=True
)
# validate_source_stocks_data = GreatExpectationsOperator(
# task_id="validate_source_stocks_data",
# checkpoint_name="stocks_expectations",
# data_context_root_dir=GE_ROOT_DIR,
# fail_task_on_validation_failure=True,
# return_json_dict=True
# )
# run_dbt_dag = BashOperator(
# task_id="run_dbt_dag",
# bash_command=f'cd {DBT_ROOT_DIR} && dbt run'
# )
# test_dbt_dag = BashOperator(
# task_id="test_dbt_dag",
# bash_command=f'cd {DBT_ROOT_DIR} && dbt test'
# )
# Main Stream
upload_stocks_data_to_gbq >> validate_stocks_data
# upload_stocks_data_to_gbq
# >> validate_source_stocks_data >> run_dbt_dag >> test_dbt_dag
4 changes: 4 additions & 0 deletions dbt_transformations/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
15 changes: 15 additions & 0 deletions dbt_transformations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Empty file.
38 changes: 38 additions & 0 deletions dbt_transformations/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dbt_transformations'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'dbt_transformations'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
dbt_transformations:
# Config indicated by + and applies to all files under models/example/
stocks_metrics:
+materialized: view
Empty file.
10 changes: 10 additions & 0 deletions dbt_transformations/models/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2

sources:
- name: stocks_storage
freshness:
warn_after: {count: 13, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: Date
tables:
- name: stocks_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized='view'
)
}}

with avg_std_of_stocks as (
select
avg(Adj_Close) as mean,
std(Adj_Close) as std ,
date(Date) as date
from
{{ source('stocks_storage', 'stocks_data')}}
)

select
mean / std as cov,
date
from
avg_std_of_stocks
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: 2

models:
- name: coefficient_of_variation
description: >
The metrics to measure the invesment risk of a particular stock
columns:
- name: date
description: ""

- name: country
description: We add a case when country is 'CS' for the tests to pass. CS is the country code for Czechoslovakia, which is split in Czechia (CZ) and Slovakia. We choose to associate CS data to Czechia rather than Slovakia since it contains more (recent) data.
tests:
- not_null
- dbt_utils.relationships_where:
to: ref('seed_country_codes')
field: two_letter_iso_code

- name: sales
description: ""

- name: refunds
description: ""

tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- date
- country
18 changes: 18 additions & 0 deletions dbt_transformations/models/stocks_metrics/return_on_investment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized='view'
)
}}

with start_and_last_stock_values as (
select
first_value(Adj_Close) over() as start_val,
last_value(Adj_Close) over() as last_val,
date(Date) as date
from
{{ source('stocks_storage', 'stocks_data')}}
)
select
((end_val - start_val) / start_val) as return_on_invesment,
date
from
start_and_last_stock_values
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{ config(
materialized='view'
)
}}

with prices as (
select
Date,
Adj_Close as current_price,
lag(Adj_Close, 1) over ( order by Date) as previous_price
from
{{ source('stocks_storage', 'stocks_data') }}
)
,daily_returns as (
select
Date,
(current_price / cast(previous_price as float)) - 1 as daily_return
from
prices
)
select
date(Date) as date,
avg(daily_return) as total_return_over_time
from
daily_returns
where
date(Date) <= start_date
and date(Date) >= current_date()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

version: 2

models:
- name: ROI_rate

columns:
- name: date
description: ""

- name: country
description: We add a case when country is 'CS' for the tests to pass. CS is the country code for Czechoslovakia, which is split in Czechia (CZ) and Slovakia. We choose to associate CS data to Czechia rather than Slovakia since it contains more (recent) data.
tests:
- not_null
- dbt_utils.relationships_where:
to: ref('seed_country_codes')
field: two_letter_iso_code

- name: sales
description: ""

- name: refunds
description: ""

tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- date
- country
Empty file.
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions logs/dbt.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@


============================== 2022-09-08 15:18:10.778242 | 71d3d40b-e12c-4115-b52f-5c06b7cfcd39 ==============================
15:18:10.778259 [info ] [MainThread]: Running with dbt=1.2.1
15:18:10.779122 [debug] [MainThread]: running dbt with arguments {'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/home/ahmed/.dbt', 'send_anonymous_usage_stats': True, 'event_buffer_size': 100000, 'quiet': False, 'no_print': False, 'project_name': 'dbt_transformations', 'skip_profile_setup': False, 'which': 'init', 'indirect_selection': 'eager'}
15:18:10.780087 [debug] [MainThread]: Tracking: tracking
15:18:10.782557 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0f911096c0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0f91109ab0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0f91109b70>]}
15:18:10.783511 [info ] [MainThread]: Creating dbt configuration folder at /home/ahmed/.dbt
15:18:10.830750 [debug] [MainThread]: Flushing usage events


============================== 2022-09-08 15:19:53.009374 | 4b583e44-b492-48e6-be23-805f68899600 ==============================
15:19:53.009389 [info ] [MainThread]: Running with dbt=1.2.1
15:19:53.010062 [debug] [MainThread]: running dbt with arguments {'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/home/ahmed/.dbt', 'send_anonymous_usage_stats': True, 'event_buffer_size': 100000, 'quiet': False, 'no_print': False, 'project_name': 'dbt_transformations', 'skip_profile_setup': False, 'which': 'init', 'indirect_selection': 'eager'}
15:19:53.010543 [debug] [MainThread]: Tracking: tracking
15:19:53.012843 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb609840>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb6099c0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb7af580>]}
15:19:53.016007 [debug] [MainThread]: Starter project path: /media/ahmed/New Volume/pevolution-ahmed/stock-market-analytics-dataops/stock-dataops/lib/python3.10/site-packages/dbt/include/starter_project
15:22:24.316499 [info ] [MainThread]: Profile dbt_transformations written to /home/ahmed/.dbt/profiles.yml using target's profile_template.yml and your supplied values. Run 'dbt debug' to validate the connection.
15:22:24.317706 [info ] [MainThread]:
Your new dbt project "dbt_transformations" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

https://community.getdbt.com/

Happy modeling!

15:22:24.319253 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb6099c0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb7af4f0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0acb7af940>]}
15:22:24.320294 [debug] [MainThread]: Flushing usage events

0 comments on commit 3298018

Please sign in to comment.