Skip to content

Commit

Permalink
Assess and improve performance of observation generation (#37)
Browse files Browse the repository at this point in the history
Summary of changes:
* Parallelise on file entry instead of bucket_date. This leads to
significant performance boost for processing measurements in a day
* Record inside of `oonidata_processing_logs` metrics about how long it
took process a certain daily batch
* Change schema of HIRL to include sent and received
* Various dask related performance tuning
* Additional metric collection inside of statsd

Prior to this refactor we were achieving a rate of 5000 measurements per
second, but were only reaching this speed when processing multiple
bucket_dates at the same time.

This was happening because the parallelisation was happening on a per
day bucket basic. When processing a single day worth of data (which is
what happens in the daily batch), we were only reaching a speed of about
500 measurements per second.

As part of this PR I have added support for parallelising on a per
archived file basis, which allows us to reach a performance of 7-8k
measurements per second on a per day basis (which is actually faster
than what we were doing before). The splitting into batches happens
based on the size of the files so we should be getting a pretty
consistent performance irrespective of the size of the daily batch.

This is the first step towards improving the performance of reprocessing
of legacy buckets, where each daily bucket is too small to benefit from
just splitting based on day (we ideally want to process batches of days
together).

I also implemented as part of this same PR some changes to the schema of
HIRL to make it more useful for analysis.
  • Loading branch information
hellais authored Nov 1, 2023
1 parent 410b5ae commit 691db1c
Show file tree
Hide file tree
Showing 12 changed files with 2,078 additions and 180 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ __pycache__
/output
/attic
/prof
.ipynb_checkpoints/
345 changes: 345 additions & 0 deletions notebooks/observation-generation.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 8,
"id": "72ab45b0-2c46-4afa-ac2f-64e868ff19d2",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from tqdm.auto import tqdm "
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "ec1358fb-cc19-423d-8078-6569e4928787",
"metadata": {},
"outputs": [],
"source": [
"from oonidata.dataclient import list_file_entries_batches, date_interval"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "f50cb2b5-a250-40ba-b7ff-f533ef9a23d4",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"0it [07:39, ?it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"day 2022-01-10 3749.11GB\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
},
{
"ename": "TypeError",
"evalue": "list.append() takes exactly one argument (2 given)",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[10], line 11\u001b[0m\n\u001b[1;32m 9\u001b[0m size_gb \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mround\u001b[39m(size\u001b[38;5;241m/\u001b[39m\u001b[38;5;241m10\u001b[39m\u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39m\u001b[38;5;241m9\u001b[39m, \u001b[38;5;241m2\u001b[39m)\n\u001b[1;32m 10\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mday \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mstart_day\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m \u001b[39m\u001b[38;5;132;01m{\u001b[39;00msize_gb\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124mGB\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m---> 11\u001b[0m \u001b[43msize_list\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mappend\u001b[49m\u001b[43m(\u001b[49m\u001b[43mstart_day\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[0;31mTypeError\u001b[0m: list.append() takes exactly one argument (2 given)"
]
}
],
"source": [
"from datetime import date, timedelta\n",
"\n",
"size_list = []\n",
"for start_day in tqdm(date_interval(date(2022, 1, 10), date(2022, 3, 1))):\n",
" end_day = start_day + timedelta(days=1)\n",
" _, size = list_file_entries_batches(\n",
" probe_cc=[], test_name=[], start_day='2022-02-01', end_day='2022-03-06'\n",
" )\n",
" size_gb = round(size/10**9, 2)\n",
" print(f\"day {start_day} {size_gb}GB\")\n",
" size_list.append(start_day, size)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "be77dd46-a042-49c0-9356-d16e97d3192f",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 3,
"id": "ce605592-3647-4ddf-978b-c66d3ab31d53",
"metadata": {},
"outputs": [],
"source": [
"batches_2023 = list_file_entries_batches(\n",
" probe_cc=[], test_name=[], start_day='2023-03-05', end_day='2023-03-06'\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "d87c8f38-2ae2-4f40-bb1b-8214986f69a7",
"metadata": {},
"outputs": [],
"source": [
"batches_2022 = list_file_entries_batches(\n",
" probe_cc=[], test_name=[], start_day='2022-03-05', end_day='2022-03-06'\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "bae49016-9969-44b3-a218-3710b539c1e6",
"metadata": {},
"outputs": [],
"source": [
"size_2023 = batches_2023[1]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "e3f31b15-1029-4d80-a9d7-4020591e1ed5",
"metadata": {},
"outputs": [],
"source": [
"size_2022 = batches_2022[1]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "2781e7d4-97e6-426d-aaf9-56073d0de445",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023 size: 25.8GB\n"
]
}
],
"source": [
"print(f\"2023 size: {round(size_2023/10**9, 2)}GB\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "95838ce3-5c23-4efe-b2d2-e14a45257c59",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2022 size: 118.21GB\n"
]
}
],
"source": [
"print(f\"2022 size: {round(size_2022/10**9, 2)}GB\")"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "6f15397d-2fff-484b-96ce-7174b278cd96",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"21692"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from itertools import chain\n",
"files_2023 = list(chain.from_iterable(batches_2023[0]))\n",
"len(files_2023)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "97f76db1-1c8b-4da3-b13a-cfc93e931ab5",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"21425"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from itertools import chain\n",
"files_2022 = list(chain.from_iterable(batches_2022[0]))\n",
"len(files_2022)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "ebce59c6-19b2-4fa0-bb66-488d49aad974",
"metadata": {},
"outputs": [],
"source": [
"df_2023 = pd.DataFrame(files_2023, columns=['bucket', 'path', 'ext', 'size'])"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "447eb0ec-9273-42aa-afd0-54d700739a5d",
"metadata": {},
"outputs": [],
"source": [
"df_2022 = pd.DataFrame(files_2022, columns=['bucket', 'path', 'ext', 'size'])"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "4410e9d3-97a0-4ca5-9617-4216c74c3f0a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1189216.4224598932"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_2023['size'].mean()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "fb43ac83-806f-4997-b409-dad941399e75",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"5517357.404900817"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_2022['size'].mean()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "69995972-c6f7-4140-9547-4b33e3f068e7",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4.63949105965611"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"5517357/1189216"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "87b49029-e2b4-4ea0-9810-af044ccfca5b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'raw/20230305/23/ZW/webconnectivity/2023030523_ZW_webconnectivity.n1.0.tar.gz'"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_2023.iloc[0]['path']"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9e6c4b17-8263-41e6-b73d-962379f0decf",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
9 changes: 0 additions & 9 deletions oonidata/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def sync(
@test_name_option
@start_day_option
@end_day_option
@click.option("--csv-dir", type=Path)
@click.option("--clickhouse", type=str)
@click.option(
"--data-dir",
Expand Down Expand Up @@ -156,7 +155,6 @@ def mkobs(
test_name: List[str],
start_day: date,
end_day: date,
csv_dir: Optional[Path],
clickhouse: Optional[str],
data_dir: Path,
parallelism: int,
Expand All @@ -167,12 +165,6 @@ def mkobs(
"""
Make observations for OONI measurements and write them into clickhouse or a CSV file
"""
if csv_dir:
click.echo(
"When generating CSV outputs we currently only support parallelism of 1"
)
parallelism = 1

if create_tables:
if not clickhouse:
click.echo("--clickhouse needs to be specified when creating tables")
Expand All @@ -195,7 +187,6 @@ def mkobs(
test_name=test_name,
start_day=start_day,
end_day=end_day,
csv_dir=csv_dir,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
Expand Down
Loading

0 comments on commit 691db1c

Please sign in to comment.