Skip to content

Commit

Permalink
Merge pull request #135 from andrewm4894/add-clickhouse
Browse files Browse the repository at this point in the history
Add clickhouse
  • Loading branch information
andrewm4894 authored Feb 15, 2025
2 parents 0b2c173 + de93bc4 commit 319c98e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .example.env
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,10 @@ ANOMSTACK_MOTHERDUCK_TOKEN=
# turso related env vars
ANOMSTACK_TURSO_DATABASE_URL=
ANOMSTACK_TURSO_AUTH_TOKEN=

# clickhouse related env vars
ANOMSTACK_CLICKHOUSE_HOST=localhost
ANOMSTACK_CLICKHOUSE_PORT=8123
ANOMSTACK_CLICKHOUSE_USER=anomstack
ANOMSTACK_CLICKHOUSE_PASSWORD=anomstack
ANOMSTACK_CLICKHOUSE_DATABASE=default
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Supported sources and databases for your metrics to live in and be queried from:
<th align="center"><a href="./anomstack/fn/run.py" target="_blank">Python</a></th>
<th align="center"><a href="./anomstack/external/gcp/bigquery.py" target="_blank">BigQuery</a></th>
<th align="center"><a href="./anomstack/external/snowflake/snowflake.py" target="_blank">Snowflake</a></th>
<th align="center"><a href="./anomstack/external/clickhouse/clickhouse.py" target="_blank">ClickHouse</a></th>
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">DuckDB</a></th>
<th align="center"><a href="./anomstack/external/sqlite/sqlite.py" target="_blank">SQLite</a></th>
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">MotherDuck</a></th>
Expand All @@ -63,6 +64,7 @@ Supported sources and databases for your metrics to live in and be queried from:
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">🚧</td>
</tr>
</tbody>
Expand Down
4 changes: 3 additions & 1 deletion anomstack/df/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from anomstack.external.gcp.bigquery import save_df_bigquery
from anomstack.external.snowflake.snowflake import save_df_snowflake
from anomstack.external.sqlite.sqlite import save_df_sqlite

from anomstack.external.clickhouse.clickhouse import save_df_clickhouse

def save_df(
df: pd.DataFrame, db: str, table_key: str, if_exists: str = "append"
Expand All @@ -35,6 +35,8 @@ def save_df(
df = save_df_duckdb(df, table_key)
elif db == "sqlite":
df = save_df_sqlite(df, table_key)
elif db == "clickhouse":
df = save_df_clickhouse(df, table_key)
else:
raise ValueError(f"Unknown db: {db}")

Expand Down
126 changes: 126 additions & 0 deletions anomstack/external/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Some helper functions for ClickHouse using clickhouse-connect.
"""

import os
import pandas as pd
from dagster import get_dagster_logger
from clickhouse_connect import get_client


def map_dtype(dtype) -> str:
"""
Map a Pandas dtype to a ClickHouse data type.
"""
if pd.api.types.is_integer_dtype(dtype):
return "Int64"
elif pd.api.types.is_float_dtype(dtype):
return "Float64"
elif pd.api.types.is_bool_dtype(dtype):
return "UInt8"
elif pd.api.types.is_datetime64_any_dtype(dtype):
return "DateTime"
else:
return "String"


def get_clickhouse_client():
"""
Create a ClickHouse client using environment variables for connection parameters.
Returns:
ClickHouseClient: Configured ClickHouse client instance
"""
logger = get_dagster_logger()

host = os.environ.get("ANOMSTACK_CLICKHOUSE_HOST", "localhost")
port = int(os.environ.get("ANOMSTACK_CLICKHOUSE_PORT", "8123"))
user = os.environ.get("ANOMSTACK_CLICKHOUSE_USER", "anomstack")
password = os.environ.get("ANOMSTACK_CLICKHOUSE_PASSWORD", "anomstack")
database = os.environ.get("ANOMSTACK_CLICKHOUSE_DATABASE", "default")

logger.info(f"ClickHouse connection: {host}:{port}/{database}")

return get_client(
host=host,
port=port,
username=user,
password=password,
database=database
)


def read_sql_clickhouse(sql: str) -> pd.DataFrame:
"""
Read data from ClickHouse using an SQL query.
Args:
sql (str): The SQL query to execute.
Returns:
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
"""
client = get_clickhouse_client()
result = client.query(sql)

return pd.DataFrame(result.result_set, columns=result.column_names)


def save_df_clickhouse(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
"""
Save a Pandas DataFrame to ClickHouse.
Args:
df (pd.DataFrame): The DataFrame to save.
table_key (str): The table name to save the DataFrame as.
Returns:
pd.DataFrame: The input DataFrame.
"""
client = get_clickhouse_client()
# Convert the DataFrame to a list of rows and extract column names.
data = df.values.tolist()
columns = list(df.columns)

try:
client.insert(table=table_key, data=data, column_names=columns)
except Exception as e:
logger = get_dagster_logger()
logger.info(
f"Table {table_key} may not exist. Attempting to create table. Error: {e}"
)
# Construct a CREATE TABLE statement based on the DataFrame schema.
columns_defs = []
for col, dtype in df.dtypes.items():
ch_type = map_dtype(dtype)
# Use backticks around column names in case of reserved words.
columns_defs.append(f"`{col}` {ch_type}")
columns_str = ", ".join(columns_defs)
create_sql = (
f"CREATE TABLE IF NOT EXISTS {table_key} ({columns_str}) "
"ENGINE = MergeTree() ORDER BY tuple()"
)
client.command(create_sql)
# Insert the data after creating the table.
client.insert(table=table_key, data=data, column_names=columns)

return df


def run_sql_clickhouse(sql: str) -> None:
"""
Execute a non-returning SQL statement in ClickHouse.
Args:
sql (str): The SQL statement to execute.
Returns:
None
"""
client = get_clickhouse_client()
try:
client.command(sql)
except Exception as e:
logger = get_dagster_logger()
logger.error(f"Error executing SQL statement in ClickHouse: {e}")
raise
10 changes: 10 additions & 0 deletions anomstack/sql/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from anomstack.external.gcp.bigquery import read_sql_bigquery
from anomstack.external.snowflake.snowflake import read_sql_snowflake
from anomstack.external.sqlite.sqlite import read_sql_sqlite, run_sql_sqlite
from anomstack.external.clickhouse.clickhouse import (
read_sql_clickhouse,
run_sql_clickhouse,
)
from anomstack.sql.translate import db_translate

pd.options.display.max_columns = 10
Expand Down Expand Up @@ -62,6 +66,12 @@ def read_sql(sql: str, db: str, returns_df: bool = True) -> pd.DataFrame:
elif not returns_df:
run_sql_sqlite(sql)
df = pd.DataFrame()
elif db == "clickhouse":
if returns_df:
df = read_sql_clickhouse(sql)
elif not returns_df:
run_sql_clickhouse(sql)
df = pd.DataFrame()
else:
raise ValueError(f"Unknown db: {db}")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
db: "clickhouse"
metric_batch: "python_ingest_simple"
alert_methods: "email,slack"
ingest_cron_schedule: "*/2 * * * *"
Expand Down
1 change: 1 addition & 0 deletions requirements.compile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
anomaly-agent
boto3
clickhouse-connect
dagit
dagster
dagster-docker
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
anomaly-agent
boto3
clickhouse-connect
dagit
dagster
dagster-docker
Expand Down

0 comments on commit 319c98e

Please sign in to comment.