Skip to content

Commit

Permalink
ci: Add dags folder to tests (#28753)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming authored Feb 17, 2025
1 parent b2d5c96 commit da60e84
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 127 deletions.
2 changes: 1 addition & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ runs:
pytest ${{
inputs.person-on-events == 'true'
&& './posthog/clickhouse/ ./posthog/queries/ ./posthog/api/test/test_insight* ./posthog/api/test/dashboards/test_dashboard.py'
|| 'posthog'
|| 'posthog dags'
}} ${{ inputs.person-on-events == 'true' && 'ee/clickhouse/' || 'ee/' }} -m "not async_migrations" \
--ignore=posthog/temporal \
--ignore=common/hogvm/python/test \
Expand Down
7 changes: 6 additions & 1 deletion dags/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

import pytest

from posthog.test.base import reset_clickhouse_database
from posthog.clickhouse.cluster import ClickhouseCluster, get_cluster


@pytest.fixture
def cluster(django_db_setup) -> Iterator[ClickhouseCluster]:
yield get_cluster()
reset_clickhouse_database()
try:
yield get_cluster()
finally:
reset_clickhouse_database()
6 changes: 0 additions & 6 deletions dags/tests/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest
from clickhouse_driver import Client

from django.conf import settings
from dags.deletes import (
deletes_job,
PendingPersonEventDeletesTable,
Expand All @@ -23,11 +22,6 @@ def test_full_job(cluster: ClickhouseCluster):

events = [(i, f"distinct_id_{i}", UUID(int=i), timestamp - timedelta(hours=i)) for i in range(event_count)]

def truncate_events(client: Client) -> None:
client.execute(f"TRUNCATE TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'")

cluster.any_host(truncate_events).result()

def insert_events(client: Client) -> None:
client.execute(
"""INSERT INTO writable_events (team_id, distinct_id, person_id, timestamp)
Expand Down
6 changes: 3 additions & 3 deletions dags/tests/test_materialized_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
materialize_column,
run_materialize_mutations,
)
from posthog.clickhouse.cluster import ClickhouseCluster
from posthog.clickhouse.cluster import ClickhouseCluster, Query
from posthog.test.base import materialized


Expand Down Expand Up @@ -66,10 +66,10 @@ def populate_test_data(client: Client) -> None:
cluster.any_host(populate_test_data).result()

# stop merges on all hosts so that we don't inadvertently write data for the materialized column due to merges
cluster.map_all_hosts(lambda c: c.execute(f"SYSTEM STOP MERGES")).result()
cluster.map_all_hosts(Query("SYSTEM STOP MERGES")).result()

# try our best to make sure that merges resume after this test, even if we throw in the test block below
resume_merges = cluster.map_all_hosts(lambda c: c.execute("SYSTEM START MERGES")).result()
resume_merges = lambda: cluster.map_all_hosts(Query("SYSTEM START MERGES")).result()
exit_handlers = contextlib.ExitStack()
exit_handlers.callback(resume_merges)

Expand Down
182 changes: 66 additions & 116 deletions posthog/test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,70 @@ def run_clickhouse_statement_in_parallel(statements: list[str]):
j.join()


def reset_clickhouse_database() -> None:
run_clickhouse_statement_in_parallel(
[
DROP_RAW_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_RAW_SESSION_VIEW_SQL(),
DROP_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_SESSION_VIEW_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DROP_CHANNEL_DEFINITION_DICTIONARY_SQL,
DROP_CHANNEL_DEFINITION_TABLE_SQL,
DROP_DISTRIBUTED_EVENTS_TABLE_SQL,
DROP_EVENTS_TABLE_SQL(),
DROP_PERSON_TABLE_SQL,
DROP_RAW_SESSION_TABLE_SQL(),
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(),
DROP_SESSION_TABLE_SQL(),
TRUNCATE_COHORTPEOPLE_TABLE_SQL,
TRUNCATE_GROUPS_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
TRUNCATE_PLUGIN_LOG_ENTRIES_TABLE_SQL,
]
)
run_clickhouse_statement_in_parallel(
[
CHANNEL_DEFINITION_DICTIONARY_SQL(),
CHANNEL_DEFINITION_TABLE_SQL(),
EVENTS_TABLE_SQL(),
PERSONS_TABLE_SQL(),
RAW_SESSIONS_TABLE_SQL(),
SESSIONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DISTRIBUTED_EVENTS_TABLE_SQL(),
DISTRIBUTED_RAW_SESSIONS_TABLE_SQL(),
DISTRIBUTED_SESSIONS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
CHANNEL_DEFINITION_DATA_SQL(),
RAW_SESSIONS_TABLE_MV_SQL(),
RAW_SESSIONS_VIEW_SQL(),
SESSIONS_TABLE_MV_SQL(),
SESSIONS_VIEW_SQL(),
]
)


class ClickhouseDestroyTablesMixin(BaseTest):
"""
To speed up tests we normally don't destroy the tables between tests, so clickhouse tables will have data from previous tests.
Expand All @@ -1027,125 +1091,11 @@ class ClickhouseDestroyTablesMixin(BaseTest):

def setUp(self):
super().setUp()
run_clickhouse_statement_in_parallel(
[
DROP_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_RAW_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_SESSION_VIEW_SQL(),
DROP_RAW_SESSION_VIEW_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DROP_DISTRIBUTED_EVENTS_TABLE_SQL,
DROP_EVENTS_TABLE_SQL(),
DROP_PERSON_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
TRUNCATE_GROUPS_TABLE_SQL,
TRUNCATE_COHORTPEOPLE_TABLE_SQL,
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
TRUNCATE_PLUGIN_LOG_ENTRIES_TABLE_SQL,
DROP_CHANNEL_DEFINITION_TABLE_SQL,
DROP_CHANNEL_DEFINITION_DICTIONARY_SQL,
DROP_SESSION_TABLE_SQL(),
DROP_RAW_SESSION_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
EVENTS_TABLE_SQL(),
PERSONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
CHANNEL_DEFINITION_TABLE_SQL(),
CHANNEL_DEFINITION_DICTIONARY_SQL(),
SESSIONS_TABLE_SQL(),
RAW_SESSIONS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DISTRIBUTED_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(),
DISTRIBUTED_SESSIONS_TABLE_SQL(),
DISTRIBUTED_RAW_SESSIONS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
CHANNEL_DEFINITION_DATA_SQL(),
SESSIONS_TABLE_MV_SQL(),
RAW_SESSIONS_TABLE_MV_SQL(),
SESSIONS_VIEW_SQL(),
RAW_SESSIONS_VIEW_SQL(),
]
)
reset_clickhouse_database()

def tearDown(self):
super().tearDown()

run_clickhouse_statement_in_parallel(
[
DROP_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_RAW_SESSION_MATERIALIZED_VIEW_SQL(),
DROP_SESSION_VIEW_SQL(),
DROP_RAW_SESSION_VIEW_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DROP_DISTRIBUTED_EVENTS_TABLE_SQL,
DROP_EVENTS_TABLE_SQL(),
DROP_PERSON_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_V2_TEST_TABLE_SQL(),
DROP_CHANNEL_DEFINITION_TABLE_SQL,
DROP_CHANNEL_DEFINITION_DICTIONARY_SQL,
DROP_SESSION_TABLE_SQL(),
DROP_RAW_SESSION_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
EVENTS_TABLE_SQL(),
PERSONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DATA_TABLE_SQL(),
CHANNEL_DEFINITION_TABLE_SQL(),
CHANNEL_DEFINITION_DICTIONARY_SQL(),
SESSIONS_TABLE_SQL(),
RAW_SESSIONS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
DISTRIBUTED_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_V2_TEST_DISTRIBUTED_TABLE_SQL(),
DISTRIBUTED_SESSIONS_TABLE_SQL(),
DISTRIBUTED_RAW_SESSIONS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
SESSIONS_TABLE_MV_SQL(),
RAW_SESSIONS_TABLE_MV_SQL(),
SESSIONS_VIEW_SQL(),
RAW_SESSIONS_VIEW_SQL(),
CHANNEL_DEFINITION_DATA_SQL(),
]
)
reset_clickhouse_database()


def snapshot_clickhouse_queries(fn_or_class):
Expand Down

0 comments on commit da60e84

Please sign in to comment.