Skip to content

Commit

Permalink
implement remaining tests clickhouse (#95)
Browse files Browse the repository at this point in the history
* fix: Correct test_arrow_mmap_to_db_merge_without_incremental logic for ClickHouse

* remove unwanted message

* update doc

* make http_port optional

* fix typo
  • Loading branch information
sanjushahgupta authored Jan 26, 2025
1 parent e9e9d8d commit 2f870a2
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
Binary file modified docs/media/clickhouse_img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 4 additions & 5 deletions docs/supported-sources/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ Ingestr supports ClickHouse as a destination.
The URI format for ClickHouse as a destination is as follows:

```plaintext
clickhouse://<username>:<password>@<host>:<port>/<database>
clickhouse://<username>:<password>@<host>:<port>?<http_port>
```
## URI parameters:
- `username` (required): The username is required to authenticate with the ClickHouse server.
- `password` (required): The password is required to authenticate the provided username.
- `host` (required): The hostname or IP address of the ClickHouse server where the database is hosted.
- `port` (required): The TCP port number used by the ClickHouse server.
- `database` (required): The name of the ClickHouse database to connect to.
- `http_port` (optional): The port number to use when connecting to the ClickHouse server's HTTP interface. Make sure your ClickHouse server is configured to accept HTTP connections on the port specified by http_port. By default, ClickHouse uses port 8123.


ClickHouse requires a `username`, `password`, `host`, `port`, and `database` to connect to the ClickHouse server. For more information, read [here](https://dlthub.com/docs/dlt-ecosystem/destinations/clickhouse#2-setup-clickhouse-database). Once you've completed the guide, you should have all the above-mentioned credentials.
ClickHouse requires a `username`, `password`, `host` and `port` to connect to the ClickHouse server. For more information, read [here](https://dlthub.com/docs/dlt-ecosystem/destinations/clickhouse#2-setup-clickhouse-database). Once you've completed the guide, you should have all the above-mentioned credentials.

```
ingestr ingest \
--source-uri "stripe://?api_key=key123" \
--source-table 'event' \
--dest-uri "clickhouse://user_123:pass123@localhost:9000/DB_123" \
--dest-uri "clickhouse://user_123:pass123@localhost:9000" \
--dest-table 'stripe.event'
```

Expand Down
19 changes: 8 additions & 11 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
from testcontainers.core.waiting_utils import wait_for_logs # type: ignore
from testcontainers.kafka import KafkaContainer # type: ignore
from testcontainers.localstack import LocalStackContainer # type: ignore
from testcontainers.mysql import MySqlContainer # type: ignore
from testcontainers.postgres import PostgresContainer # type: ignore
from typer.testing import CliRunner
from testcontainers.mysql import MySqlContainer # type: ignore

from ingestr.main import app
from ingestr.src.appstore.errors import (
Expand Down Expand Up @@ -497,6 +497,7 @@ def stop_fully(self):
"clickhouse+native": clickHouseDocker,
}


@pytest.fixture(scope="session", autouse=True)
def manage_containers():
# Run all tests
Expand Down Expand Up @@ -526,7 +527,6 @@ def test_create_replace(source, dest):
dest_future = executor.submit(dest.start)
source_uri = source_future.result()
dest_uri = dest_future.result()

db_to_db_create_replace(source_uri, dest_uri)
source.stop()
dest.stop()
Expand Down Expand Up @@ -1363,17 +1363,14 @@ def test_arrow_mmap_to_db_delete_insert(dest):

def run_command(df: pd.DataFrame, incremental_key: Optional[str] = None):
table = pa.Table.from_pandas(df)
if "clickhouse" in dest_uri:
pytest.skip("")
with tempfile.NamedTemporaryFile(suffix=".arrow", delete=True) as tmp:
with pa.OSFile(tmp.name, "wb") as f:
writer = ipc.new_file(f, table.schema)
writer.write_table(table)
writer.close()

if "clickhouse" in dest_uri:
pytest.skip("clickhouse is not supported for this test")

create_clickhouse_database(dest_uri, schema)

res = invoke_ingest_command(
f"mmap://{tmp.name}",
"whatever",
Expand All @@ -1387,6 +1384,7 @@ def run_command(df: pd.DataFrame, incremental_key: Optional[str] = None):
return res

dest_uri = dest.start()
create_clickhouse_database(dest_uri, schema)
dest_engine = sqlalchemy.create_engine(dest_uri)

# let's start with a basic dataframe
Expand Down Expand Up @@ -1492,7 +1490,7 @@ def run_command(df: pd.DataFrame):
writer = ipc.new_file(f, table.schema)
writer.write_table(table)
writer.close()
create_clickhouse_database(dest_uri, schema)

res = invoke_ingest_command(
f"mmap://{tmp.name}",
"whatever",
Expand All @@ -1501,12 +1499,11 @@ def run_command(df: pd.DataFrame):
inc_strategy="merge",
primary_key="id",
)
if "clickhouse" in dest_uri:
pytest.skip("")
assert res.exit_code == 0
return res

dest_uri = dest.start()
create_clickhouse_database(dest_uri, schema)
dest_engine = sqlalchemy.create_engine(dest_uri)

# let's start with a basic dataframe
Expand Down Expand Up @@ -1553,6 +1550,7 @@ def run_command(df: pd.DataFrame):

with dest_engine.begin() as conn:
res = conn.execute(f"select count(*) from {schema}.output").fetchall()

assert res[0][0] == row_count + 1000

res = conn.execute(
Expand All @@ -1572,7 +1570,6 @@ def run_command(df: pd.DataFrame):
"value": ["a"] * 1000,
}
)

run_command(old_rows)
with dest_engine.begin() as conn:
res = conn.execute(f"select count(*) from {schema}.output").fetchall()
Expand Down

0 comments on commit 2f870a2

Please sign in to comment.