Skip to content

Commit

Permalink
Merge pull request #104 from ooni/fix/analysis
Browse files Browse the repository at this point in the history
Fix analysis generation in airflow
  • Loading branch information
hellais authored Jan 9, 2025
2 parents 7c14a71 + e1502a6 commit a49cb5f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
46 changes: 40 additions & 6 deletions dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def run_make_observations(


def run_make_analysis(
clickhouse_url: str,
probe_cc: List[str],
test_name: List[str],
day: str,
Expand All @@ -40,7 +41,9 @@ def run_make_analysis(
make_analysis_in_a_day,
)

params = MakeAnalysisParams(probe_cc=probe_cc, test_name=test_name, day=day)
params = MakeAnalysisParams(
probe_cc=probe_cc, test_name=test_name, day=day, clickhouse_url=clickhouse_url
)
make_analysis_in_a_day(params)


Expand All @@ -59,14 +62,14 @@ def run_make_analysis(
start_date=datetime.datetime(2012, 12, 4),
schedule="@daily",
catchup=False,
) as dag:
) as dag_full:
start_day = "{{ ds }}"
op_make_observations = PythonVirtualenvOperator(
task_id="make_observations",
python_callable=run_make_observations,
op_kwargs={
"probe_cc": dag.params["probe_cc"],
"test_name": dag.params["test_name"],
"probe_cc": dag_full.params["probe_cc"],
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"data_dir": Variable.get("data_dir", default_var=""),
"bucket_date": start_day,
Expand All @@ -79,12 +82,43 @@ def run_make_analysis(
task_id="make_analysis",
python_callable=run_make_analysis,
op_kwargs={
"probe_cc": dag.params["probe_cc"],
"test_name": dag.params["test_name"],
"probe_cc": dag_full.params["probe_cc"],
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"day": start_day,
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

op_make_observations >> op_make_analysis

with DAG(
dag_id="batch_analysis_only",
default_args={
"retries": 3,
"retry_delay": datetime.timedelta(minutes=30),
},
params={
"probe_cc": Param(default=[], type=["null", "array"]),
"test_name": Param(default=[], type=["null", "array"]),
},
start_date=datetime.datetime(2012, 12, 4),
schedule=None,
catchup=False,
) as dag_analysis:
start_day = "{{ ds }}"
op_make_analysis_only = PythonVirtualenvOperator(
task_id="make_analysis",
python_callable=run_make_analysis,
op_kwargs={
"probe_cc": dag_analysis.params["probe_cc"],
"test_name": dag_analysis.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"day": start_day,
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

op_make_analysis_only
3 changes: 2 additions & 1 deletion oonipipeline/src/oonipipeline/tasks/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

@dataclass
class MakeAnalysisParams:
clickhouse_url: str
probe_cc: List[str]
test_name: List[str]
day: str
Expand All @@ -23,7 +24,7 @@ def make_analysis_in_a_day(params: MakeAnalysisParams):

probe_cc = params.probe_cc
test_name = params.test_name
db = ClickhouseConnection(config.clickhouse_url)
db = ClickhouseConnection(params.clickhouse_url)

write_analysis_web_fuzzy_logic(
db=db,
Expand Down
2 changes: 1 addition & 1 deletion oonipipeline/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_full_workflow(
result = cli_runner.invoke(
cli,
[
"backfill",
"run",
"--start-at",
"2022-10-21",
"--end-at",
Expand Down
1 change: 1 addition & 0 deletions oonipipeline/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test_make_file_entry_batch(datadir, db):
make_analysis_in_a_day(
MakeAnalysisParams(
probe_cc=["IR"],
clickhouse_url=db.clickhouse_url,
test_name=["webconnectivity"],
day=date(2023, 10, 31).strftime("%Y-%m-%d"),
),
Expand Down

0 comments on commit a49cb5f

Please sign in to comment.