diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index d6a5cbd00..c3a9da1da 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -91,7 +91,6 @@ Pygments==2.17.2 PyJWT==2.9.0 PyMySQL==1.0.2 pyparsing==2.4.7 -pypng==0.20220715.0 pyrsistent==0.18.1 pytest==6.2.5 pytest-arraydiff==0.5.0 @@ -108,7 +107,7 @@ pythran==0.10.0 pytz==2022.1 PyYAML==5.4.1 pyzmq==22.3.0 -qrcode==7.4.2 +qrcode==8.0 requests==2.25.1 requests-futures==1.0.1 scipy==1.8.0 @@ -277,9 +276,7 @@ skymap-scanner └── wipac-rest-tools [required: Any, installed: 1.7.9] ├── cachetools [required: Any, installed: 5.5.0] ├── PyJWT [required: !=2.6.0, installed: 2.9.0] - ├── qrcode [required: Any, installed: 7.4.2] - │ ├── pypng [required: Any, installed: 0.20220715.0] - │ └── typing_extensions [required: Any, installed: 4.12.2] + ├── qrcode [required: Any, installed: 8.0] ├── requests [required: Any, installed: 2.25.1] ├── requests-futures [required: Any, installed: 1.0.1] │ └── requests [required: >=1.2.0, installed: 2.25.1] diff --git a/dependencies-from-Dockerfile_no_cvmfs.log b/dependencies-from-Dockerfile_no_cvmfs.log index 69459fe8f..3603da5b6 100644 --- a/dependencies-from-Dockerfile_no_cvmfs.log +++ b/dependencies-from-Dockerfile_no_cvmfs.log @@ -91,7 +91,6 @@ Pygments==2.17.2 PyJWT==2.9.0 PyMySQL==1.0.2 pyparsing==2.4.7 -pypng==0.20220715.0 pyrsistent==0.18.1 pytest==6.2.5 pytest-arraydiff==0.5.0 @@ -108,7 +107,7 @@ pythran==0.10.0 pytz==2022.1 PyYAML==5.4.1 pyzmq==22.3.0 -qrcode==7.4.2 +qrcode==8.0 requests==2.25.1 requests-futures==1.0.1 scipy==1.8.0 @@ -277,9 +276,7 @@ skymap-scanner └── wipac-rest-tools [required: Any, installed: 1.7.9] ├── cachetools [required: Any, installed: 5.5.0] ├── PyJWT [required: !=2.6.0, installed: 2.9.0] - ├── qrcode [required: Any, installed: 7.4.2] - │ ├── pypng [required: Any, installed: 0.20220715.0] - │ └── typing_extensions [required: Any, installed: 4.12.2] + ├── qrcode [required: Any, installed: 8.0] ├── requests [required: Any, installed: 2.25.1] ├── requests-futures [required: Any, installed: 1.0.1] │ └── requests [required: >=1.2.0, installed: 2.25.1] diff --git a/skymap_scanner/config.py b/skymap_scanner/config.py index a1cd22c38..b5dd9486e 100644 --- a/skymap_scanner/config.py +++ b/skymap_scanner/config.py @@ -25,6 +25,9 @@ # HTTP source to download data from. REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu" +REMOTE_DATA_DOWNLOAD_RETRIES: Final[int] = 2 # note: attempts = retries + 1 +REMOTE_DATA_DOWNLOAD_TIMEOUT: Final[int] = 15 # sec + # Local ephemeral directory to stage files. LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache") diff --git a/skymap_scanner/utils/data_handling.py b/skymap_scanner/utils/data_handling.py index 1a63faa57..0dbc2b8a3 100644 --- a/skymap_scanner/utils/data_handling.py +++ b/skymap_scanner/utils/data_handling.py @@ -1,10 +1,13 @@ """data_handling.py.""" +import itertools import logging -import subprocess +import time from pathlib import Path from typing import List +import requests + from .. import config as cfg LOGGER = logging.getLogger(__name__) @@ -41,12 +44,12 @@ def stage_files(self, file_list: List[str]): LOGGER.debug("File is available on staging path.") else: LOGGER.debug("Staging from HTTP source.") - self.fetch_file(basename) + self.download_file(basename) else: LOGGER.debug(f"File {basename} is available at {filepath}.") - def fetch_file(self, basename: str): + def download_file(self, basename: str): """Retrieves a file from the HTTP source. Args: @@ -55,24 +58,48 @@ def fetch_file(self, basename: str): Raises: RuntimeError: if the file retrieval fails. """ - local_destination_path = self.staging_path / basename - http_source_path = f"{self.remote_path}/{basename}" - # not sure why we use the -O pattern here - cmd = [ - "wget", - "-nv", - "-t", - "5", - "-O", - str(local_destination_path), - http_source_path, - ] - - subprocess.run(cmd, check=True) - - if not local_destination_path.is_file(): + dest = self.staging_path / basename + url = f"{self.remote_path}/{basename}" + + def backoff_sleep(attempt: int): + """Sleep with exponential backoff.""" + sleep_duration = 2**attempt # Exponential backoff: 2, 4, 8 seconds... + LOGGER.info(f"Retrying file download in {sleep_duration} seconds...") + time.sleep(sleep_duration) + + # Step 1: Download the file + for attempt in itertools.count(1): + if attempt > 1: + backoff_sleep(attempt) + # get + try: + response = requests.get( + url, + stream=True, + timeout=cfg.REMOTE_DATA_DOWNLOAD_TIMEOUT, + ) + response.raise_for_status() # Check if the request was successful (2xx) + break + except requests.exceptions.RequestException as e: + if attempt > cfg.REMOTE_DATA_DOWNLOAD_RETRIES: # 'attempt' is 1-indexed + raise RuntimeError( + f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: {e}" + ) from e + + # Step 2: Write the file + try: + with open(dest, "wb") as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + except IOError as e: + raise RuntimeError(f"File download failed during file write: {e}") from e + + # Step 3: Ensure the file was created successfully + if dest.is_file(): + LOGGER.debug(f"File successfully created at {dest}.") + else: raise RuntimeError( - f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}" + f"File download failed during file write (file is invalid):\n-> {dest}." ) def get_filepath(self, filename: str) -> str: diff --git a/tests/file_staging.py b/tests/file_staging.py index 072ebd09a..79903e961 100644 --- a/tests/file_staging.py +++ b/tests/file_staging.py @@ -1,60 +1,74 @@ +"""Tests for file-staging logic.""" + import logging -import subprocess from typing import Dict - -from skymap_scanner.utils.data_handling import DataStager from skymap_scanner import config as cfg +from skymap_scanner.utils.data_handling import DataStager logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) -# Build list of all local files. -local_file_list = [] -for path in cfg.LOCAL_DATA_SOURCES: - subpath = path / cfg.LOCAL_SPLINE_SUBDIR - directory_content = subpath.glob("*") - for path in directory_content: - if path.is_file(): # skip directories - local_file_list.append(path.name) # store filename without path +def test_file_staging() -> None: -# Declare at least one filename only expected to be available remotely. -remote_file_list = ["README"] + # Build list of all local files. + local_file_list = [] + for path in cfg.LOCAL_DATA_SOURCES: + subpath = path / cfg.LOCAL_SPLINE_SUBDIR + directory_content = subpath.glob("*") + for path in directory_content: + if path.is_file(): # skip directories + local_file_list.append(path.name) # store filename without path -# Declare at least one filename that does not exist. -invalid_file_list = ["NONEXISTENT_FILE"] + # Declare at least one filename only expected to be available remotely. + remote_file_list = ["README"] -datastager = DataStager( - local_paths=cfg.LOCAL_DATA_SOURCES, - local_subdir=cfg.LOCAL_SPLINE_SUBDIR, - remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}", -) + # Declare at least one filename that does not exist. + invalid_file_list = ["NONEXISTENT_FILE"] -for file_list in [local_file_list, remote_file_list, invalid_file_list]: - try: + datastager = DataStager( + local_paths=cfg.LOCAL_DATA_SOURCES, + local_subdir=cfg.LOCAL_SPLINE_SUBDIR, + remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}", + ) + + # test stage_files() + # -> OK + for file_list in [local_file_list, remote_file_list]: datastager.stage_files(file_list) - except subprocess.CalledProcessError: - logger.debug(f"Staging failed as expected for invalid file.") + # -> ERROR + try: + datastager.stage_files(invalid_file_list) + except Exception as e: + assert isinstance(e, RuntimeError) + assert str(e) == ( + f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: " + f"404 Client Error: Not Found for url: {datastager.remote_path}/{invalid_file_list[0]}" + ) + # ensure that filepaths can be retrieved for all local files + local_filepaths: Dict[str, str] = dict() + for filename in local_file_list: + logger.debug(f"Testing local file: {filename}.") + local_filepaths[filename] = datastager.get_local_filepath(filename) + assert local_filepaths[filename] == datastager.get_filepath(filename) + logger.debug(f"File available at {local_filepaths[filename]}.") -# ensure that filepaths can be retrieved for all local files -local_filepaths: Dict[str, str] = dict() -for filename in local_file_list: - logger.debug(f"Testing local file: {filename}.") - local_filepaths[filename] = datastager.get_local_filepath(filename) - assert local_filepaths[filename] == datastager.get_filepath(filename) - logger.debug(f"File available at {local_filepaths[filename]}.") + for filename in remote_file_list: + logger.debug(f"Testing staging of remote file: {filename}") + filepath: str = datastager.get_filepath(filename) + logger.debug(f"File available at {filepath}.") -for filename in remote_file_list: - logger.debug(f"Testing staging of remote file: {filename}") - filepath: str = datastager.get_filepath(filename) - logger.debug(f"File available at {filepath}.") + for filename in invalid_file_list: + logger.debug(f"Testing staging of remote file: {filename}") + try: + filepath = datastager.get_filepath(filename) + except FileNotFoundError: + logger.debug(f"File not available as expected.") + else: + assert 0 # we shouldn't get here! -for filename in invalid_file_list: - logger.debug(f"Testing staging of remote file: {filename}") - try: - filepath = datastager.get_filepath(filename) - except FileNotFoundError: - logger.debug(f"File not available as expected.") +if __name__ == "__main__": + test_file_staging()