From 8f7e1e9bb45e020d6fc058a4084dbd312892c798 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Wed, 2 Oct 2024 11:49:11 +0200 Subject: [PATCH 1/6] First parse.py implementation --- stats/parse.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100755 stats/parse.py diff --git a/stats/parse.py b/stats/parse.py new file mode 100755 index 0000000..c69fea2 --- /dev/null +++ b/stats/parse.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +from __future__ import annotations + +import shutil +import tempfile + +import fastparquet +import pandas as pd + +cols = {"source", "origin", "url", "written_human_readable", "written"} +valid = [] +with tempfile.TemporaryDirectory() as tmpdirname: + print("created temporary directory", tmpdirname) + + df = pd.read_csv("ngff_samples.csv") + urls = [] + for index, row in df.iterrows(): + src = row["source"] + url = row["url"] + print(url) + urls.append(url) + + df2 = pd.read_csv(f"{url}") + if not set(df2.columns).issubset(cols): + print(f"invalid csv: {url}") + else: + filename = f"{tmpdirname}/index.pq" + df2.to_parquet(filename) + valid.append(filename) + + fastparquet.writer.merge([filename for filename in valid]) + df_all = pd.read_parquet("all.pq") + print(df_all) + +shutil.rmtree(tmpdirname) From ea52d58eeef7720d0eb4deefc42083340687d1fe Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Wed, 2 Oct 2024 11:49:25 +0200 Subject: [PATCH 2/6] Second parse.py implementation --- stats/parse.py | 62 ++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 30 deletions(-) mode change 100755 => 100644 stats/parse.py diff --git a/stats/parse.py b/stats/parse.py old mode 100755 new mode 100644 index c69fea2..6146f0b --- a/stats/parse.py +++ b/stats/parse.py @@ -1,35 +1,37 @@ #!/usr/bin/env python -from __future__ import annotations - -import shutil -import tempfile - import fastparquet import pandas as pd -cols = {"source", "origin", "url", "written_human_readable", "written"} +cols = [ 'source', 'origin', 'url', 'written_human_readable', 'written'] valid = [] -with tempfile.TemporaryDirectory() as tmpdirname: - print("created temporary directory", tmpdirname) - - df = pd.read_csv("ngff_samples.csv") - urls = [] - for index, row in df.iterrows(): - src = row["source"] - url = row["url"] - print(url) - urls.append(url) - - df2 = pd.read_csv(f"{url}") - if not set(df2.columns).issubset(cols): - print(f"invalid csv: {url}") - else: - filename = f"{tmpdirname}/index.pq" - df2.to_parquet(filename) - valid.append(filename) - - fastparquet.writer.merge([filename for filename in valid]) - df_all = pd.read_parquet("all.pq") - print(df_all) - -shutil.rmtree(tmpdirname) + +df = pd.read_csv('ngff_samples.csv') +urls = [] +for index, row in df.iterrows(): + + src = row['source'] + url = row['url'] + urls.append(url) + + df2 = pd.read_csv(f'{url}') + if not set(df2.columns).issubset(set(cols)): + print(f"invalid csv: {url}") + else: + + size = len(df2.index) + + df2["csv"] = [url] * size + df2["who"] = [src] * size + for colname in cols: + if colname not in df2.columns: + df2[colname] = [None] * size + + df2 = df2.reindex(columns=["who", "csv"] + cols) + + filename = f"all.pq/csv={index}" + df2.to_parquet(filename) + valid.append(filename) + +fastparquet.writer.merge([filename for filename in valid]) +df_all = pd.read_parquet('all.pq') +print(df_all) From 7727d3d004a41c01adb19bfbf94136f12f65a2a8 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Wed, 2 Oct 2024 11:49:36 +0200 Subject: [PATCH 3/6] Third parse.py implementation --- stats/parse.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/stats/parse.py b/stats/parse.py index 6146f0b..7fb6fa8 100644 --- a/stats/parse.py +++ b/stats/parse.py @@ -28,10 +28,6 @@ df2 = df2.reindex(columns=["who", "csv"] + cols) - filename = f"all.pq/csv={index}" + filename = f"all.pq/{index}.pq" df2.to_parquet(filename) valid.append(filename) - -fastparquet.writer.merge([filename for filename in valid]) -df_all = pd.read_parquet('all.pq') -print(df_all) From 6413eddbcea8edb5e32eef07caba452868bce3a5 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Wed, 2 Oct 2024 11:49:46 +0200 Subject: [PATCH 4/6] Fourth parse.py implementation (working aiohttp) --- stats/parse.py | 179 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 160 insertions(+), 19 deletions(-) mode change 100644 => 100755 stats/parse.py diff --git a/stats/parse.py b/stats/parse.py old mode 100644 new mode 100755 index 7fb6fa8..bdaa844 --- a/stats/parse.py +++ b/stats/parse.py @@ -1,33 +1,174 @@ #!/usr/bin/env python +import asyncio +import aiohttp import fastparquet +import math +import os import pandas as pd +import requests +from collections import defaultdict +from aiohttp_retry import RetryClient, ExponentialRetry -cols = [ 'source', 'origin', 'url', 'written_human_readable', 'written'] -valid = [] -df = pd.read_csv('ngff_samples.csv') -urls = [] -for index, row in df.iterrows(): +CONNECTIONS = int(os.environ.get("CONNECTIONS", 80)) +RETRIES = int(os.environ.get("RETRIES", 3)) +TIMEOUT = float(os.environ.get("TIMEOUT", 60.0)) - src = row['source'] - url = row['url'] - urls.append(url) +cols = ('source', 'origin', 'url', 'written_human_readable', 'written') + + + +class Event: + + def __init__(self, depth, url): + self.depth = depth + self.url = url + self.state = {} + + def __str__(self): + return f'{"\t"*self.depth}{self.url}' + + async def load(self, rsp): + raise NotImplementedError() + +class CSVEvent(Event): + + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) + + async def load(self, rsp=None): + self.state = {"type": "csv"} + + +class ZarrEvent(Event): + + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) + + async def load(self, rsp): + data = await rsp.json() + ome = data.get("attributes", {}).get("ome", {}) + # Calcluate + if "multiscales" in ome: + self.state["type"] = "multiscales" + inner = ome["multiscales"][0]["datasets"][0]["path"] + # FIXME + MultiscaleEvent(self.depth+1, f"{self.url}/{inner}") + + # Defer + for x in ("plate", "bioformats2raw.layout"): + if x in ome: + self.state["type"] = x + + if "type" not in self.state: + self.state["type"] = f"unknown: {ome.keys()}" + + +class MultiscaleEvent(Event): + + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) + self.state = {} + + async def load(self, rsp): + data = await rsp.json() + self.state["type"] = "array" + self.state["array"] = data + + +def handle_csv(src, url, depth=1): df2 = pd.read_csv(f'{url}') + if not set(df2.columns).issubset(set(cols)): print(f"invalid csv: {url}") - else: + return + + yield CSVEvent(depth, url) + size = len(df2.index) + + df2["csv"] = [url] * size + df2["who"] = [src] * size + for colname in cols: + if colname not in df2.columns: + df2[colname] = [None] * size + + + for index2, row2 in df2.iterrows(): + url = row2["url"] + if not url or not isinstance(url, str): + # TODO: debug? + continue + try: + if url.endswith(".csv"): + # TODO: check for a sub-source + yield from handle_csv(src, url, depth=depth+1) + else: + leaf = depth + 1 + zarr = f'{row2["url"]}/zarr.json' + yield ZarrEvent(depth+1, zarr) + except Exception as e: + print(f"error: {url} (type={type(url)})") + raise + +def get_events(): + valid = [] + + main_url = 'https://raw.githubusercontent.com/will-moore/ome2024-ngff-challenge/samples_viewer/samples/ngff_samples.csv' + print(main_url) + df = pd.read_csv(main_url) + urls = [] + + events = [] + for index, row in df.iterrows(): + + src = row['source'] + url = row['url'] + urls.append(url) + for event in handle_csv(src, url): + events.append(event) + + return events + + +async def get(event: Event, client: RetryClient): + timeout = aiohttp.ClientTimeout(total=None, sock_connect=TIMEOUT, sock_read=TIMEOUT) + async with client.get(event.url, timeout=timeout) as response: + print(event) + await event.load(response) + + + +async def main(): + + # Setup + connector = aiohttp.TCPConnector(limit=CONNECTIONS) + session = aiohttp.ClientSession(connector=connector) + options = ExponentialRetry(attempts=RETRIES) + client = RetryClient( + client_session=session, + raise_for_status=False, + retry_options=options, + ) + + # Loading + events = get_events() + ret = await asyncio.gather(*(get(event, client) for event in events)) + + # Parsing + try: + tallies = defaultdict(int) + for event in events: + _type = event.state.get("type") + tallies[_type] += 1 - size = len(df2.index) + for k, v in tallies.items(): + print(k, v) - df2["csv"] = [url] * size - df2["who"] = [src] * size - for colname in cols: - if colname not in df2.columns: - df2[colname] = [None] * size + # Cleaning + finally: + await client.close() - df2 = df2.reindex(columns=["who", "csv"] + cols) - filename = f"all.pq/{index}.pq" - df2.to_parquet(filename) - valid.append(filename) +if __name__ == "__main__": + asyncio.run(main()) From e258a31822d606c065689a955547f0d45f08651c Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Wed, 2 Oct 2024 13:31:27 +0200 Subject: [PATCH 5/6] Move to queue implementation --- stats/parse.py | 217 ++++++++++++++++++++++++++++++------------------- 1 file changed, 132 insertions(+), 85 deletions(-) diff --git a/stats/parse.py b/stats/parse.py index bdaa844..fdb4d53 100755 --- a/stats/parse.py +++ b/stats/parse.py @@ -1,174 +1,221 @@ #!/usr/bin/env python +from __future__ import annotations + import asyncio -import aiohttp -import fastparquet -import math import os -import pandas as pd -import requests +import time from collections import defaultdict -from aiohttp_retry import RetryClient, ExponentialRetry +import aiohttp +import pandas as pd +from aiohttp_retry import ExponentialRetry, RetryClient +COLUMNS = ("source", "origin", "url", "written_human_readable", "written") CONNECTIONS = int(os.environ.get("CONNECTIONS", 80)) RETRIES = int(os.environ.get("RETRIES", 3)) TIMEOUT = float(os.environ.get("TIMEOUT", 60.0)) - -cols = ('source', 'origin', 'url', 'written_human_readable', 'written') - - +URL = os.environ.get( + "URL", + "https://raw.githubusercontent.com/will-moore/ome2024-ngff-challenge/samples_viewer/samples/ngff_samples.csv", +) +WORKERS = int(os.environ.get("WORKERS", 40)) class Event: - - def __init__(self, depth, url): + def __init__(self, queue, depth, url): + self.queue = queue self.depth = depth self.url = url self.state = {} + def root(self): + """drops the /zarr.json suffix from urls""" + return self.url[:-10] + def __str__(self): return f'{"\t"*self.depth}{self.url}' + def __repr__(self): + return f"{self.__class__.__name__}<{self.url}>" + async def load(self, rsp): raise NotImplementedError() -class CSVEvent(Event): +class CSVEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) async def load(self, rsp=None): self.state = {"type": "csv"} + df2 = pd.read_csv(f"{self.url}") + + if not set(df2.columns).issubset(set(COLUMNS)): + print(f"invalid csv: {self.url}") + return + + size = len(df2.index) + + df2["csv"] = [self.url] * size + for colname in COLUMNS: + if colname not in df2.columns: + df2[colname] = [None] * size + + for index2, row2 in df2.iterrows(): + url = row2["url"] + if not url or not isinstance(url, str): + # TODO: debug? + continue + try: + if url.endswith(".csv"): + # TODO: check for a sub-source + await self.queue.put(CSVEvent(self.queue, self.depth + 1, url)) + else: + leaf = self.depth + 1 + zarr = f'{row2["url"]}/zarr.json' + await self.queue.put(ZarrEvent(self.queue, self.depth + 1, zarr)) + except Exception: + print(f"error: {url} (type={type(url)})") + raise -class ZarrEvent(Event): +class ZarrEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) async def load(self, rsp): data = await rsp.json() ome = data.get("attributes", {}).get("ome", {}) - # Calcluate + + # Array if "multiscales" in ome: self.state["type"] = "multiscales" inner = ome["multiscales"][0]["datasets"][0]["path"] - # FIXME - MultiscaleEvent(self.depth+1, f"{self.url}/{inner}") - - # Defer - for x in ("plate", "bioformats2raw.layout"): - if x in ome: - self.state["type"] = x + await self.queue.put( + MultiscaleEvent( + self.queue, self.depth + 1, f"{self.root()}/{inner}/zarr.json" + ) + ) + + # Series + elif "plate" in ome: + self.state["type"] = "plate" + # TODO + + # Series + elif "bioformats2raw.layout" in ome: + self.state["type"] = "bioformats2raw.layout" + await self.queue.put( + OMEEvent(self.queue, self.depth + 1, f"{self.root()}/OME/zarr.json") + ) if "type" not in self.state: self.state["type"] = f"unknown: {ome.keys()}" -class MultiscaleEvent(Event): - +class OMEEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) - self.state = {} async def load(self, rsp): data = await rsp.json() - self.state["type"] = "array" - self.state["array"] = data - + self.state["type"] = "ome" + series = data["attributes"]["ome"]["series"] + for s in series: + await self.queue.put( + MultiscaleEvent( + self.queue, self.depth + 1, f"{self.root()}/{s}/zarr.json" + ) + ) -def handle_csv(src, url, depth=1): - df2 = pd.read_csv(f'{url}') - if not set(df2.columns).issubset(set(cols)): - print(f"invalid csv: {url}") - return - - yield CSVEvent(depth, url) - size = len(df2.index) - - df2["csv"] = [url] * size - df2["who"] = [src] * size - for colname in cols: - if colname not in df2.columns: - df2[colname] = [None] * size - - - for index2, row2 in df2.iterrows(): - url = row2["url"] - if not url or not isinstance(url, str): - # TODO: debug? - continue - try: - if url.endswith(".csv"): - # TODO: check for a sub-source - yield from handle_csv(src, url, depth=depth+1) - else: - leaf = depth + 1 - zarr = f'{row2["url"]}/zarr.json' - yield ZarrEvent(depth+1, zarr) - except Exception as e: - print(f"error: {url} (type={type(url)})") - raise - -def get_events(): - valid = [] +class MultiscaleEvent(Event): + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) - main_url = 'https://raw.githubusercontent.com/will-moore/ome2024-ngff-challenge/samples_viewer/samples/ngff_samples.csv' - print(main_url) - df = pd.read_csv(main_url) - urls = [] + async def load(self, rsp): + data = await rsp.json() + self.state["type"] = "array" + self.state["array"] = data - events = [] - for index, row in df.iterrows(): - src = row['source'] - url = row['url'] - urls.append(url) - for event in handle_csv(src, url): - events.append(event) +class ErrorEvent(Event): + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) - return events + async def load(self, rsp): + self.state["type"] = "error" -async def get(event: Event, client: RetryClient): +async def process(event: Event, client: RetryClient): timeout = aiohttp.ClientTimeout(total=None, sock_connect=TIMEOUT, sock_read=TIMEOUT) async with client.get(event.url, timeout=timeout) as response: print(event) await event.load(response) +async def worker(queue, client, state): + while not queue.empty(): + event = await queue.get() + try: + await process(event, client) + await state.put(event.state) + except Exception as e: + await state.put({"event": event, "error": e, "type": "error"}) + finally: + queue.task_done() + async def main(): + start = time.time() - # Setup + # HTTP Setup connector = aiohttp.TCPConnector(limit=CONNECTIONS) session = aiohttp.ClientSession(connector=connector) options = ExponentialRetry(attempts=RETRIES) client = RetryClient( - client_session=session, - raise_for_status=False, - retry_options=options, + client_session=session, + raise_for_status=False, + retry_options=options, ) + queue = asyncio.Queue() + state = asyncio.Queue() + csv = CSVEvent(queue, 0, URL) + await queue.put(csv) + await process(csv, client) + # Loading - events = get_events() - ret = await asyncio.gather(*(get(event, client) for event in events)) + consumers = [ + asyncio.create_task(worker(queue, client, state)) for _ in range(WORKERS) + ] + await queue.join() + for c in consumers: + c.cancel() # Parsing try: tallies = defaultdict(int) - for event in events: - _type = event.state.get("type") + errors = [] + while not state.empty(): + v = await state.get() + _type = v.get("type") tallies[_type] += 1 - + if _type == "error": + errors.append(v) for k, v in tallies.items(): print(k, v) + for err in errors: + print(err) # Cleaning finally: await client.close() + stop = time.time() + print(f"in {stop-start:0.2f} seconds") + if __name__ == "__main__": asyncio.run(main()) From f003985a81bfcdec333d9003c1970f59d5b43a7b Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Fri, 11 Oct 2024 13:43:17 +0200 Subject: [PATCH 6/6] Async logging + RO-Crates (needs linting) --- stats/parse.py | 146 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 24 deletions(-) diff --git a/stats/parse.py b/stats/parse.py index fdb4d53..0cb9276 100755 --- a/stats/parse.py +++ b/stats/parse.py @@ -2,24 +2,56 @@ from __future__ import annotations import asyncio +import logging import os import time from collections import defaultdict +from logging import StreamHandler +from logging.handlers import QueueHandler, QueueListener import aiohttp import pandas as pd from aiohttp_retry import ExponentialRetry, RetryClient +try: + import Queue as queue +except ImportError: + import queue + + COLUMNS = ("source", "origin", "url", "written_human_readable", "written") CONNECTIONS = int(os.environ.get("CONNECTIONS", 80)) RETRIES = int(os.environ.get("RETRIES", 3)) TIMEOUT = float(os.environ.get("TIMEOUT", 60.0)) URL = os.environ.get( "URL", - "https://raw.githubusercontent.com/will-moore/ome2024-ngff-challenge/samples_viewer/samples/ngff_samples.csv", + "https://raw.githubusercontent.com/ome/ome2024-ngff-challenge/refs/heads/main/samples/ngff_samples.csv", ) WORKERS = int(os.environ.get("WORKERS", 40)) +LOGGER_TASK = None + + +async def safely_start_logger(): + LOGGER_TASK = asyncio.create_task(init_logger()) + await asyncio.sleep(0) + + +async def init_logger(): + # helper coroutine to setup and manage the logger + # https://superfastpython.com/asyncio-log-blocking/#How_to_Log_From_Asyncio_Without_Blocking + log = logging.getLogger() + que = queue.Queue() + log.addHandler(QueueHandler(que)) + log.setLevel(logging.INFO) + listener = QueueListener(que, StreamHandler()) + try: + listener.start() + while True: + await asyncio.sleep(60) + finally: + listener.stop() + class Event: def __init__(self, queue, depth, url): @@ -46,38 +78,34 @@ class CSVEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) - async def load(self, rsp=None): + async def load(self, *args): self.state = {"type": "csv"} - df2 = pd.read_csv(f"{self.url}") + df2 = pd.read_csv(f"{self}") if not set(df2.columns).issubset(set(COLUMNS)): - print(f"invalid csv: {self.url}") + logging.warning(f"invalid csv: {self.url}") return size = len(df2.index) + logging.info(f"{self}") df2["csv"] = [self.url] * size for colname in COLUMNS: if colname not in df2.columns: df2[colname] = [None] * size - for index2, row2 in df2.iterrows(): + for _, row2 in df2.iterrows(): url = row2["url"] if not url or not isinstance(url, str): # TODO: debug? continue - try: - if url.endswith(".csv"): - # TODO: check for a sub-source - await self.queue.put(CSVEvent(self.queue, self.depth + 1, url)) - else: - leaf = self.depth + 1 - zarr = f'{row2["url"]}/zarr.json' - await self.queue.put(ZarrEvent(self.queue, self.depth + 1, zarr)) - except Exception: - print(f"error: {url} (type={type(url)})") - raise + if url.endswith(".csv"): + # TODO: check for a sub-source + await self.queue.put(CSVEvent(self.queue, self.depth + 1, url)) + else: + zarr = f'{row2["url"]}/zarr.json' + await self.queue.put(ZarrEvent(self.queue, self.depth + 1, zarr)) class ZarrEvent(Event): @@ -87,6 +115,23 @@ def __init__(self, *args, **kwargs): async def load(self, rsp): data = await rsp.json() ome = data.get("attributes", {}).get("ome", {}) + logging.info(f"{self}") + + # TODO: could check for RO-Crates in subdirectories + await self.queue.put( + ROCrateEvent( + self.queue, self.depth + 1, f"{self.root()}/ro-crate-metadata.json" + ) + ) + + if "multiscales" in ome: + self.state["type"] = "multiscales" + inner = ome["multiscales"][0]["datasets"][0]["path"] + await self.queue.put( + MultiscaleEvent( + self.queue, self.depth + 1, f"{self.root()}/{inner}/zarr.json" + ) + ) # Array if "multiscales" in ome: @@ -107,25 +152,68 @@ async def load(self, rsp): elif "bioformats2raw.layout" in ome: self.state["type"] = "bioformats2raw.layout" await self.queue.put( - OMEEvent(self.queue, self.depth + 1, f"{self.root()}/OME/zarr.json") + OMESeriesEvent( + self.queue, self.depth + 1, f"{self.root()}/OME/zarr.json" + ) ) if "type" not in self.state: self.state["type"] = f"unknown: {ome.keys()}" -class OMEEvent(Event): +class ROCrateEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) async def load(self, rsp): + if rsp.status >= 400: + raise MissingException(f"status:{rsp.status}") + else: + data = await rsp.json() + logging.info(f"{self}") + + +class OMESeriesEvent(Event): + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) + + async def load(self, rsp): + if rsp.status >= 400: + await self.queue.put( + OMEMetadataEvent( + self.queue, self.depth, f"{self.root()}/METADATA.ome.xml" + ) + ) + else: + data = await rsp.json() + logging.info(f"{self}") + self.state["type"] = "ome" + series = data["attributes"]["ome"]["series"] + for s in series: + await self.queue.put( + MultiscaleEvent( + self.queue, self.depth + 1, f"{self.root()[:-4]}/{s}/zarr.json" + ) + ) + + +class OMEMetadataEvent(Event): + def __init__(self, *args, **kwargs): + Event.__init__(self, *args, **kwargs) + + async def load(self, rsp): + if rsp.status >= 400: + raise MissingException(f"status:{rsp.status}") + # TODO: just check for the individual series + data = await rsp.json() + logging.info(f"{self}") self.state["type"] = "ome" series = data["attributes"]["ome"]["series"] for s in series: await self.queue.put( MultiscaleEvent( - self.queue, self.depth + 1, f"{self.root()}/{s}/zarr.json" + self.queue, self.depth + 1, f"{self.root()[:-4]}/{s}/zarr.json" ) ) @@ -136,6 +224,7 @@ def __init__(self, *args, **kwargs): async def load(self, rsp): data = await rsp.json() + logging.info(f"{self}") self.state["type"] = "array" self.state["array"] = data @@ -144,14 +233,19 @@ class ErrorEvent(Event): def __init__(self, *args, **kwargs): Event.__init__(self, *args, **kwargs) - async def load(self, rsp): + async def load(self, *ignore): self.state["type"] = "error" +class MissingException(Exception): + def __init__(self, code): + Exception.__init__(self) + self.code = code + + async def process(event: Event, client: RetryClient): timeout = aiohttp.ClientTimeout(total=None, sock_connect=TIMEOUT, sock_read=TIMEOUT) async with client.get(event.url, timeout=timeout) as response: - print(event) await event.load(response) @@ -161,6 +255,8 @@ async def worker(queue, client, state): try: await process(event, client) await state.put(event.state) + except MissingException as me: + await state.put({"event": event, "code": me.code}) except Exception as e: await state.put({"event": event, "error": e, "type": "error"}) finally: @@ -180,6 +276,8 @@ async def main(): retry_options=options, ) + await safely_start_logger() + queue = asyncio.Queue() state = asyncio.Queue() csv = CSVEvent(queue, 0, URL) @@ -205,16 +303,16 @@ async def main(): if _type == "error": errors.append(v) for k, v in tallies.items(): - print(k, v) + logging.info(f"{k}={v}") for err in errors: - print(err) + logging.error(f"{err}") # Cleaning finally: await client.close() stop = time.time() - print(f"in {stop-start:0.2f} seconds") + logging.info(f"in {stop-start:0.2f} seconds") if __name__ == "__main__":