Skip to content

Commit

Permalink
Add support for hourly runs and reprocessing
Browse files Browse the repository at this point in the history
* Fixes off-by one in workflow trigger: #116
* Closes: #109
* Closes: #111
  • Loading branch information
hellais committed Jan 23, 2025
1 parent bc33f8f commit 40ecc64
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 62 deletions.
69 changes: 62 additions & 7 deletions dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ def run_make_analysis(
clickhouse_url: str,
probe_cc: List[str],
test_name: List[str],
day: str,
timestamp: str,
):
from oonipipeline.tasks.analysis import (
MakeAnalysisParams,
make_analysis_in_a_day,
make_analysis,
)

params = MakeAnalysisParams(
probe_cc=probe_cc, test_name=test_name, day=day, clickhouse_url=clickhouse_url
probe_cc=probe_cc,
test_name=test_name,
timestamp=timestamp,
clickhouse_url=clickhouse_url,
)
make_analysis_in_a_day(params)
make_analysis(params)


REQUIREMENTS = [str((pathlib.Path(__file__).parent.parent / "oonipipeline").absolute())]
Expand All @@ -53,18 +56,21 @@ def run_make_analysis(
dag_id="batch_measurement_processing",
default_args={
"retries": 3,
"retry_delay": datetime.timedelta(minutes=30),
"retry_delay": datetime.timedelta(minutes=10),
},
params={
"probe_cc": Param(default=[], type=["null", "array"]),
"test_name": Param(default=[], type=["null", "array"]),
},
start_date=datetime.datetime(2012, 12, 4),
schedule="@daily",
# We offset the schedule by 30 minutes so that we give time for the uploader
# to finish
schedule="30 0 * * *",
catchup=False,
concurrency=1,
max_active_runs=1
max_active_runs=1,
) as dag_full:
# YYYY-MM-DD
start_day = "{{ ds }}"
op_make_observations = PythonVirtualenvOperator(
task_id="make_observations",
Expand Down Expand Up @@ -94,3 +100,52 @@ def run_make_analysis(
)

op_make_observations >> op_make_analysis

with DAG(
dag_id="hourly_batch_measurement_processing",
default_args={
"retries": 3,
"retry_delay": datetime.timedelta(minutes=5),
},
params={
"probe_cc": Param(default=[], type=["null", "array"]),
"test_name": Param(default=[], type=["null", "array"]),
},
start_date=datetime.datetime(2012, 12, 4),
# We offset the schedule by 30 minutes so that we give time for the uploader
# to finish
schedule="0 30 * * *",
catchup=False,
concurrency=1,
max_active_runs=1,
) as dag_full:
# YYYY-MM-DDTHH
start_hour = "{{ ts }}"[:13]
op_make_observations_hourly = PythonVirtualenvOperator(
task_id="make_observations_hourly",
python_callable=run_make_observations,
op_kwargs={
"probe_cc": dag_full.params["probe_cc"],
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"data_dir": Variable.get("data_dir", default_var=""),
"bucket_date": start_hour,
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

op_make_analysis_hourly = PythonVirtualenvOperator(
task_id="make_analysis",
python_callable=run_make_analysis,
op_kwargs={
"probe_cc": dag_full.params["probe_cc"],
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"timestamp": start_hour,
},
requirements=REQUIREMENTS,
system_site_packages=False,
)

op_make_observations_hourly >> op_make_analysis_hourly
21 changes: 8 additions & 13 deletions oonidata/src/oonidata/dataclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def get_file_entries_hourly(
if from_cans == True:
prefix_list = get_can_prefixes(start_day, end_day) + prefix_list

log.info(f"using prefix list {prefix_list}")
log.debug(f"using prefix list {prefix_list}")
file_entries = []
prefix_idx = 0
total_prefixes = len(prefix_list)
Expand Down Expand Up @@ -600,21 +600,16 @@ def get_file_entries(


def list_file_entries_batches(
start_day: Union[date, str],
end_day: Union[date, str],
start_hour: datetime,
end_hour: datetime,
probe_cc: CSVList = None,
test_name: CSVList = None,
from_cans: bool = True,
) -> Tuple[List[List[Tuple]], int]:
if isinstance(start_day, str):
start_day = datetime.strptime(start_day, "%Y-%m-%d").date()
if isinstance(end_day, str):
end_day = datetime.strptime(end_day, "%Y-%m-%d").date()

t = PerfTimer()
file_entries = get_file_entries(
start_day=start_day,
end_day=end_day,
file_entries = get_file_entries_hourly(
start_hour=start_hour,
end_hour=end_hour,
test_name=test_name,
probe_cc=probe_cc,
from_cans=from_cans,
Expand All @@ -624,7 +619,7 @@ def list_file_entries_batches(
60_000_000, int(total_file_entry_size / 100)
) # split into approximately 100 batches or 60 MB each batch, whichever is greater

log.info(
log.debug(
f"took {t.pretty} to get {len(file_entries)} entries (batch size: {round(max_batch_size/10**6, 2)}MB)"
)
batches = []
Expand All @@ -641,7 +636,7 @@ def list_file_entries_batches(
total_size += fe.size
current_batch.append((fe.bucket_name, fe.s3path, fe.ext, fe.size))
log.debug(
f"batch size for {start_day}-{end_day} ({probe_cc},{test_name}): {len(current_batch)}"
f"batch size for {start_hour}-{end_hour} ({probe_cc},{test_name}): {len(current_batch)}"
)
batches.append(current_batch)
current_batch = []
Expand Down
1 change: 1 addition & 0 deletions oonipipeline/src/oonipipeline/analysis/web_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ def get_analysis_web_fuzzy_logic(
db: ClickhouseConnection,
start_time: datetime,
end_time: datetime,
bucket_date: str,
probe_cc: List[str],
# We are only doing web_connectivity for the moment
test_name: List[str] = ["web_connectivity"],
Expand Down
91 changes: 64 additions & 27 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
import click
from click_loglevel import LogLevel

from oonipipeline.cli.utils import build_timestamps
from oonipipeline.db.maintenance import (
optimize_all_tables_by_partition,
list_partitions_to_delete,
list_duplicates_in_buckets,
)
from oonipipeline.tasks.common import OptimizeTablesParams, optimize_tables
from oonipipeline.tasks.observations import (
MakeObservationsParams,
make_observations,
)
from oonipipeline.tasks.analysis import (
MakeAnalysisParams,
make_analysis_in_a_day,
make_analysis,
)
from tqdm import tqdm

from ..__about__ import VERSION
from ..db.connections import ClickhouseConnection
Expand All @@ -44,22 +47,6 @@ def _parse_csv(ctx, param, s: Optional[str]) -> List[str]:
callback=_parse_csv,
help="test_name you care to process, can be comma separated for a list (eg. web_connectivity,whatsapp). If omitted will select process all test names.",
)
start_day_option = click.option(
"--start-day",
default=(date.today() - timedelta(days=14)).strftime("%Y-%m-%d"),
help="""the timestamp of the day for which we should start processing data (inclusive).
Note: this is the upload date, which doesn't necessarily match the measurement date.
""",
)
end_day_option = click.option(
"--end-day",
default=(date.today() + timedelta(days=1)).strftime("%Y-%m-%d"),
help="""the timestamp of the day for which we should start processing data (inclusive).
Note: this is the upload date, which doesn't necessarily match the measurement date.
""",
)
start_at_option = click.option(
"--start-at",
type=click.DateTime(),
Expand Down Expand Up @@ -118,6 +105,18 @@ def cli(log_level: int):
@probe_cc_option
@test_name_option
@click.option("--workflow-name", type=str, required=True, default="observations")
@click.option(
"--only-observations",
is_flag=True,
default=False,
help="should we only run the observations generation workflow?",
)
@click.option(
"--only-analysis",
is_flag=True,
default=False,
help="should we only run the analysis workflow?",
)
@click.option(
"--create-tables",
is_flag=True,
Expand All @@ -134,43 +133,81 @@ def run(
workflow_name: str,
start_at: datetime,
end_at: datetime,
only_observations: bool,
only_analysis: bool,
create_tables: bool,
drop_tables: bool,
):
"""
Process OONI measurements and write them into clickhouse
"""
click.echo(f"Runnning worfklow {workflow_name}")
click.echo(f"Runnning workflow {workflow_name}")

maybe_create_delete_tables(
clickhouse_url=config.clickhouse_url,
create_tables=create_tables,
drop_tables=drop_tables,
)
date_range = [start_at + timedelta(days=i) for i in range((end_at - start_at).days)]
for day in date_range:
click.echo(f"Processing {day}")
start_day = day.strftime("%Y-%m-%d")
if workflow_name == "observations":

last_month = None
for timestamp, current_day in tqdm(build_timestamps(start_at, end_at)):
click.echo(f"Processing {timestamp}")
if not only_analysis:
make_observations(
MakeObservationsParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=config.clickhouse_url,
data_dir=config.data_dir,
fast_fail=False,
bucket_date=start_day,
bucket_date=timestamp,
)
)
elif workflow_name == "analysis":
make_analysis_in_a_day(
click.echo("finished running make_observations")

if not only_observations:
make_analysis(
MakeAnalysisParams(
clickhouse_url=config.clickhouse_url,
probe_cc=probe_cc,
test_name=test_name,
day=start_day,
timestamp=timestamp,
)
)
click.echo("finished running make_analysis")

def optimize_params(partition_str: str):
return OptimizeTablesParams(
clickhouse=config.clickhouse_url,
table_names=[
"obs_web",
"obs_web_ctrl",
"obs_http_middlebox",
"analysis_web_measurement",
],
partition_str=partition_str,
)

# optimize tables at the end of the month
# this is done using the PARTITION key to remove duplicate entries that
# may have been inserted during reprocessing
if last_month is None:
last_month = current_day.month
elif last_month != current_day.month:
partition_str = current_day.strftime("%Y%m")
click.echo(f"optimizing tables with {partition_str}")
optimize_tables(optimize_params(partition_str))
click.echo("finished optimizing tables")
last_month = current_day.month

# Ensure the last month in the range is also optimized
partition_str = current_day.strftime("%Y%m")
click.echo(f"optimizing tables with {partition_str}")
optimize_tables(optimize_params(partition_str))
click.echo("finished optimizing tables")

click.echo("finished all runs")


@cli.command()
@click.option(
Expand Down
42 changes: 42 additions & 0 deletions oonipipeline/src/oonipipeline/cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import datetime, timedelta
from typing import Tuple


def build_timestamps(
start_at: datetime, end_at: datetime
) -> list[Tuple[str, datetime]]:
"""
contruct a list of timestamps between start_at and end_at.
They are constructed in such a way that we will have whole days expressed as
YYYY-MM-DD, while hourly intervals will be expressed as YYYY-MM-DDTHH.
For example given the range(2024-01-01T00 -> 2024-01-03T03) we will get:
[
2024-01-01T01,
2024-01-01T02,
2024-01-01T03,
...
2024-01-02,
2024-01-03T00,
2024-01-03T01,
2024-01-03T02,
2024-01-03T02,
]
"""
timestamps = []
current = start_at

while current < end_at:
if (
current.hour == 0
and current != start_at
and current < end_at.replace(hour=0)
):
timestamps.append((current.strftime("%Y-%m-%d"), current))
current += timedelta(days=1)
else:
timestamps.append((current.strftime("%Y-%m-%dT%H"), current))
current += timedelta(hours=1)

return timestamps
1 change: 1 addition & 0 deletions oonipipeline/src/oonipipeline/db/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def make_create_queries():
ENGINE = ReplacingMergeTree
PRIMARY KEY measurement_uid
ORDER BY (measurement_uid, measurement_start_time, probe_cc, probe_asn)
PARTITION BY substring(measurement_uid, 1, 6)
SETTINGS index_granularity = 8192
""",
"analysis_web_measurement",
Expand Down
Loading

0 comments on commit 40ecc64

Please sign in to comment.