Skip to content

Commit

Permalink
Implement Retries and Timeout for File Download (#275)
Browse files Browse the repository at this point in the history
Previously, files were retrieved with `subprocess` + `wget`. Now, it
uses `requests` with a fixed timeout and retries, plus exponential
backoff.

---------

Co-authored-by: wipacdevbot <[email protected]>
  • Loading branch information
ric-evans and wipacdevbot authored Oct 2, 2024
1 parent 0199888 commit c3b0755
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 71 deletions.
7 changes: 2 additions & 5 deletions dependencies-from-Dockerfile.log
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ Pygments==2.17.2
PyJWT==2.9.0
PyMySQL==1.0.2
pyparsing==2.4.7
pypng==0.20220715.0
pyrsistent==0.18.1
pytest==6.2.5
pytest-arraydiff==0.5.0
Expand All @@ -108,7 +107,7 @@ pythran==0.10.0
pytz==2022.1
PyYAML==5.4.1
pyzmq==22.3.0
qrcode==7.4.2
qrcode==8.0
requests==2.25.1
requests-futures==1.0.1
scipy==1.8.0
Expand Down Expand Up @@ -277,9 +276,7 @@ skymap-scanner
└── wipac-rest-tools [required: Any, installed: 1.7.9]
├── cachetools [required: Any, installed: 5.5.0]
├── PyJWT [required: !=2.6.0, installed: 2.9.0]
├── qrcode [required: Any, installed: 7.4.2]
│ ├── pypng [required: Any, installed: 0.20220715.0]
│ └── typing_extensions [required: Any, installed: 4.12.2]
├── qrcode [required: Any, installed: 8.0]
├── requests [required: Any, installed: 2.25.1]
├── requests-futures [required: Any, installed: 1.0.1]
│ └── requests [required: >=1.2.0, installed: 2.25.1]
Expand Down
7 changes: 2 additions & 5 deletions dependencies-from-Dockerfile_no_cvmfs.log
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ Pygments==2.17.2
PyJWT==2.9.0
PyMySQL==1.0.2
pyparsing==2.4.7
pypng==0.20220715.0
pyrsistent==0.18.1
pytest==6.2.5
pytest-arraydiff==0.5.0
Expand All @@ -108,7 +107,7 @@ pythran==0.10.0
pytz==2022.1
PyYAML==5.4.1
pyzmq==22.3.0
qrcode==7.4.2
qrcode==8.0
requests==2.25.1
requests-futures==1.0.1
scipy==1.8.0
Expand Down Expand Up @@ -277,9 +276,7 @@ skymap-scanner
└── wipac-rest-tools [required: Any, installed: 1.7.9]
├── cachetools [required: Any, installed: 5.5.0]
├── PyJWT [required: !=2.6.0, installed: 2.9.0]
├── qrcode [required: Any, installed: 7.4.2]
│ ├── pypng [required: Any, installed: 0.20220715.0]
│ └── typing_extensions [required: Any, installed: 4.12.2]
├── qrcode [required: Any, installed: 8.0]
├── requests [required: Any, installed: 2.25.1]
├── requests-futures [required: Any, installed: 1.0.1]
│ └── requests [required: >=1.2.0, installed: 2.25.1]
Expand Down
3 changes: 3 additions & 0 deletions skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"

REMOTE_DATA_DOWNLOAD_RETRIES: Final[int] = 2 # note: attempts = retries + 1
REMOTE_DATA_DOWNLOAD_TIMEOUT: Final[int] = 15 # sec

# Local ephemeral directory to stage files.
LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

Expand Down
67 changes: 47 additions & 20 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""data_handling.py."""

import itertools
import logging
import subprocess
import time
from pathlib import Path
from typing import List

import requests

from .. import config as cfg

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,12 +44,12 @@ def stage_files(self, file_list: List[str]):
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.fetch_file(basename)
self.download_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def fetch_file(self, basename: str):
def download_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
Expand All @@ -55,24 +58,48 @@ def fetch_file(self, basename: str):
Raises:
RuntimeError: if the file retrieval fails.
"""
local_destination_path = self.staging_path / basename
http_source_path = f"{self.remote_path}/{basename}"
# not sure why we use the -O pattern here
cmd = [
"wget",
"-nv",
"-t",
"5",
"-O",
str(local_destination_path),
http_source_path,
]

subprocess.run(cmd, check=True)

if not local_destination_path.is_file():
dest = self.staging_path / basename
url = f"{self.remote_path}/{basename}"

def backoff_sleep(attempt: int):
"""Sleep with exponential backoff."""
sleep_duration = 2**attempt # Exponential backoff: 2, 4, 8 seconds...
LOGGER.info(f"Retrying file download in {sleep_duration} seconds...")
time.sleep(sleep_duration)

# Step 1: Download the file
for attempt in itertools.count(1):
if attempt > 1:
backoff_sleep(attempt)
# get
try:
response = requests.get(
url,
stream=True,
timeout=cfg.REMOTE_DATA_DOWNLOAD_TIMEOUT,
)
response.raise_for_status() # Check if the request was successful (2xx)
break
except requests.exceptions.RequestException as e:
if attempt > cfg.REMOTE_DATA_DOWNLOAD_RETRIES: # 'attempt' is 1-indexed
raise RuntimeError(
f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: {e}"
) from e

# Step 2: Write the file
try:
with open(dest, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
except IOError as e:
raise RuntimeError(f"File download failed during file write: {e}") from e

# Step 3: Ensure the file was created successfully
if dest.is_file():
LOGGER.debug(f"File successfully created at {dest}.")
else:
raise RuntimeError(
f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}"
f"File download failed during file write (file is invalid):\n-> {dest}."
)

def get_filepath(self, filename: str) -> str:
Expand Down
96 changes: 55 additions & 41 deletions tests/file_staging.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,74 @@
"""Tests for file-staging logic."""

import logging
import subprocess
from typing import Dict


from skymap_scanner.utils.data_handling import DataStager
from skymap_scanner import config as cfg
from skymap_scanner.utils.data_handling import DataStager

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


# Build list of all local files.
local_file_list = []
for path in cfg.LOCAL_DATA_SOURCES:
subpath = path / cfg.LOCAL_SPLINE_SUBDIR
directory_content = subpath.glob("*")
for path in directory_content:
if path.is_file(): # skip directories
local_file_list.append(path.name) # store filename without path
def test_file_staging() -> None:

# Declare at least one filename only expected to be available remotely.
remote_file_list = ["README"]
# Build list of all local files.
local_file_list = []
for path in cfg.LOCAL_DATA_SOURCES:
subpath = path / cfg.LOCAL_SPLINE_SUBDIR
directory_content = subpath.glob("*")
for path in directory_content:
if path.is_file(): # skip directories
local_file_list.append(path.name) # store filename without path

# Declare at least one filename that does not exist.
invalid_file_list = ["NONEXISTENT_FILE"]
# Declare at least one filename only expected to be available remotely.
remote_file_list = ["README"]

datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)
# Declare at least one filename that does not exist.
invalid_file_list = ["NONEXISTENT_FILE"]

for file_list in [local_file_list, remote_file_list, invalid_file_list]:
try:
datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)

# test stage_files()
# -> OK
for file_list in [local_file_list, remote_file_list]:
datastager.stage_files(file_list)
except subprocess.CalledProcessError:
logger.debug(f"Staging failed as expected for invalid file.")
# -> ERROR
try:
datastager.stage_files(invalid_file_list)
except Exception as e:
assert isinstance(e, RuntimeError)
assert str(e) == (
f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: "
f"404 Client Error: Not Found for url: {datastager.remote_path}/{invalid_file_list[0]}"
)

# ensure that filepaths can be retrieved for all local files
local_filepaths: Dict[str, str] = dict()
for filename in local_file_list:
logger.debug(f"Testing local file: {filename}.")
local_filepaths[filename] = datastager.get_local_filepath(filename)
assert local_filepaths[filename] == datastager.get_filepath(filename)
logger.debug(f"File available at {local_filepaths[filename]}.")

# ensure that filepaths can be retrieved for all local files
local_filepaths: Dict[str, str] = dict()
for filename in local_file_list:
logger.debug(f"Testing local file: {filename}.")
local_filepaths[filename] = datastager.get_local_filepath(filename)
assert local_filepaths[filename] == datastager.get_filepath(filename)
logger.debug(f"File available at {local_filepaths[filename]}.")
for filename in remote_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
filepath: str = datastager.get_filepath(filename)
logger.debug(f"File available at {filepath}.")

for filename in remote_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
filepath: str = datastager.get_filepath(filename)
logger.debug(f"File available at {filepath}.")
for filename in invalid_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
try:
filepath = datastager.get_filepath(filename)
except FileNotFoundError:
logger.debug(f"File not available as expected.")
else:
assert 0 # we shouldn't get here!


for filename in invalid_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
try:
filepath = datastager.get_filepath(filename)
except FileNotFoundError:
logger.debug(f"File not available as expected.")
if __name__ == "__main__":
test_file_staging()

0 comments on commit c3b0755

Please sign in to comment.