Skip to content

Commit

Permalink
test: review skipped tests (networking timeouts) (#1027)
Browse files Browse the repository at this point in the history
* return a http.client.HTTPException instead of OSError

* rerun tests on http.client.HTTPException

* remove test skip for xrootd

* rename test

* rename test

* do not capitalize variables

* correctly load default options

* timeout error

* update test (TODO: review)

* add TimeoutError to retry exceptions

* correctly initialized num_bytes

* correctly access resource

* rerun on timeout

* update retry regex

* remove outdated test
  • Loading branch information
lobis authored Nov 16, 2023
1 parent 6b6fa94 commit 96fc80d
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 85 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ jobs:

- name: Run pytest
run: |
python -m pytest -vv tests \
--reruns 3 --reruns-delay 30 \
--only-rerun requests.exceptions.HTTPError
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "(?i)http|timeout"
vanilla-build:
strategy:
Expand All @@ -93,4 +91,4 @@ jobs:

- name: Run pytest
run: |
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun requests.exceptions.HTTPError
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "(?i)http|timeout"
14 changes: 14 additions & 0 deletions src/uproot/source/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class Resource:
:doc:`uproot.source.futures.ResourceFuture`.
"""

def __init__(self):
self._file_path = None

@property
def file_path(self) -> str:
"""
Expand All @@ -49,6 +52,14 @@ class Source:
the file.
"""

def __init__(self):
self._num_requested_bytes = 0
self._num_requests = 0
self._num_requested_chunks = 0
self._file_path = None
self._num_bytes = None
self._executor = None

def chunk(self, start: int, stop: int) -> Chunk:
"""
Args:
Expand Down Expand Up @@ -139,6 +150,9 @@ def closed(self) -> bool:
True if the associated file/connection/thread pool is closed; False
otherwise.
"""
if self._executor is None:
return True

return self._executor.closed


Expand Down
10 changes: 6 additions & 4 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class FSSpecSource(uproot.source.chunk.Source):
def __init__(self, file_path: str, **options):
import fsspec.core

default_options = uproot.reading.open.defaults

exclude_keys = set(default_options.keys())
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}
options = dict(uproot.reading.open.defaults, **options)
storage_options = {
k: v
for k, v in options.items()
if k not in uproot.reading.open.defaults.keys()
}

self._executor = FSSpecLoopExecutor()

Expand Down
4 changes: 1 addition & 3 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,7 @@ def submit(self, future: ResourceFuture) -> ResourceFuture:
"""
assert isinstance(future, ResourceFuture)
if self.closed:
raise OSError(
f"resource is closed for file {self._workers[0].resource.file_path}"
)
raise OSError(f"resource is closed for file {self._resource.file_path}")
future._run(self._resource)
return future

Expand Down
16 changes: 8 additions & 8 deletions src/uproot/source/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)
response = connection.getresponse()
break
else:
raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, file_path
Expand All @@ -119,7 +119,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)

if response.status != 200:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""HTTP response was {}, rather than 200, in attempt to get file size
in file {}""".format(
response.status, file_path
Expand All @@ -132,7 +132,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)
return int(x)
else:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""response headers did not include content-length: {}
in file {}""".format(
dict(response.getheaders()), file_path
Expand Down Expand Up @@ -216,7 +216,7 @@ def get(self, connection, start: int, stop: int) -> bytes:
)
return self.get(redirect, start, stop)

raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, self._file_path
Expand All @@ -225,7 +225,7 @@ def get(self, connection, start: int, stop: int) -> bytes:

if response.status != 206:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {}, rather than 206 (range requests)
for URL {}""".format(
response.status, self._file_path
Expand Down Expand Up @@ -322,7 +322,7 @@ def task(resource):
task(resource)
return

raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, source.file_path
Expand Down Expand Up @@ -428,7 +428,7 @@ def handle_multipart(
data = response_buffer.read(length)

if len(data) != length:
raise OSError(
raise http.client.HTTPException(
"""wrong chunk length {} (expected {}) for byte range {} "
"in HTTP multipart
for URL {}""".format(
Expand All @@ -454,7 +454,7 @@ def handle_multipart(
else:
range_string = range_string.decode("utf-8", "surrogateescape")
expecting = ", ".join(f"{a}-{b - 1}" for a, b in futures)
raise OSError(
raise http.client.HTTPException(
"""unrecognized byte range in headers of HTTP multipart: {}
expecting: {}
Expand Down
27 changes: 19 additions & 8 deletions src/uproot/source/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def get_server_config(file):
XRootD_client.flags.QueryCode.CONFIG, "readv_iov_max readv_ior_max"
)
if status.error:
raise OSError(status.message)
if status.code in (206,):
raise TimeoutError(status.message)
else:
raise OSError(status.message)

# Result is something like b'178956968\n2097136\n'
readv_iov_max, readv_ior_max = map(int, result.split(b"\n", 1))
Expand Down Expand Up @@ -121,12 +124,12 @@ def _xrd_error(self, status):
# https://github.com/xrootd/xrootd/blob/250eced4d3787c2ac5be2c8c922134153bbf7f08/src/XrdCl/XrdClStatus.cc#L34-L74
if status.code in (101, 304, 400):
raise uproot._util._file_not_found(self._file_path, status.message)

else:
raise OSError(
f"""XRootD error: {status.message}
in file {self._file_path}"""
elif status.code in (206,):
raise TimeoutError(
f"XRootD error: {status.message} in file {self._file_path}"
)
else:
raise OSError(f"XRootD error: {status.message} in file {self._file_path}")

@property
def timeout(self) -> float | None:
Expand Down Expand Up @@ -266,6 +269,8 @@ class XRootDSource(uproot.source.chunk.Source):
ResourceClass = XRootDResource

def __init__(self, file_path: str, **options):
options = dict(uproot.reading.open.defaults, **options)

self._timeout = options["timeout"]
self._desired_max_num_elements = options["max_num_elements"]
self._use_threads = options["use_threads"]
Expand All @@ -285,7 +290,7 @@ def _open(self):
# futures that wait for chunks that have been split to merge them.
if self._use_threads:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[trivial_resource() for x in range(self._num_workers)]
[trivial_resource() for _ in range(self._num_workers)]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
Expand Down Expand Up @@ -458,6 +463,8 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource):
ResourceClass = XRootDResource

def __init__(self, file_path: str, **options):
options = dict(uproot.reading.open.defaults, **options)

self._num_workers = options["num_workers"]
self._timeout = options["timeout"]
self._use_threads = options["use_threads"]
Expand Down Expand Up @@ -501,5 +508,9 @@ def timeout(self) -> float | None:
@property
def num_bytes(self) -> int:
if self._num_bytes is None:
self._num_bytes = self._executor.workers[0].resource.num_bytes
if hasattr(self._executor, "workers"):
self._num_bytes = self._executor.workers[0].resource.num_bytes
else:
self._num_bytes = self._executor._resource.num_bytes

return self._num_bytes
39 changes: 2 additions & 37 deletions tests/test_0001_source_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ def test_fallback(server, use_threads, num_workers):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -322,22 +319,6 @@ def test_xrootd(use_threads):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_deadlock(use_threads):
pytest.importorskip("XRootD")
# Attach this file to the "test_xrootd_deadlock" function, so it leaks
pytest.uproot_test_xrootd_deadlock_f = uproot.source.xrootd.XRootDResource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=20,
use_threads=use_threads,
)


@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -352,9 +333,6 @@ def test_xrootd_fail(use_threads):
)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -376,9 +354,6 @@ def test_xrootd_vectorread(use_threads):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -398,18 +373,15 @@ def test_xrootd_vectorread_max_element_split(use_threads):
assert len(one) == max_element_size + 1


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_vectorread_max_element_split_consistency(use_threads):
pytest.importorskip("XRootD")
filename = "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"

def get_chunk(Source, **kwargs):
with Source(filename, **kwargs) as source:
def get_chunk(source_cls, **kwargs):
with source_cls(filename, **kwargs) as source:
notifications = queue.Queue()
max_element_size = 2097136
chunks = source.chunks([(0, max_element_size + 1)], notifications)
Expand Down Expand Up @@ -444,9 +416,6 @@ def test_xrootd_vectorread_fail(use_threads):
)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -461,7 +430,6 @@ def test_xrootd_size(use_threads):
) as source:
size1 = source.num_bytes

pytest.importorskip("XRootD")
with uproot.source.xrootd.MultithreadedXRootDSource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
Expand All @@ -474,9 +442,6 @@ def test_xrootd_size(use_threads):
assert size1 == 3469136394


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand Down
9 changes: 0 additions & 9 deletions tests/test_0006_notify_when_downloaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ def test_http_fallback_workers(server):
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd():
Expand All @@ -168,9 +165,6 @@ def test_xrootd():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_workers():
Expand All @@ -191,9 +185,6 @@ def test_xrootd_workers():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread():
Expand Down
9 changes: 0 additions & 9 deletions tests/test_0007_single_chunk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ def test_http_multipart_fail():
tobytes(source.chunk(0, 100).raw_data)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd():
Expand All @@ -147,9 +144,6 @@ def test_xrootd():
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_worker():
Expand All @@ -169,9 +163,6 @@ def test_xrootd_worker():
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread():
Expand Down
3 changes: 0 additions & 3 deletions tests/test_0302-pickle.py → tests/test_0302_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def test_pickle_roundtrip_http():
]


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_pickle_roundtrip_xrootd():
Expand Down
File renamed without changes.

0 comments on commit 96fc80d

Please sign in to comment.