Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: review skipped tests (networking timeouts) #1027

Merged
merged 15 commits into from
Nov 16, 2023
6 changes: 2 additions & 4 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ jobs:

- name: Run pytest
run: |
python -m pytest -vv tests \
--reruns 3 --reruns-delay 30 \
--only-rerun requests.exceptions.HTTPError
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "http.client.HTTPException|TimeoutError"

vanilla-build:
strategy:
Expand All @@ -93,4 +91,4 @@ jobs:

- name: Run pytest
run: |
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun requests.exceptions.HTTPError
python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "http.client.HTTPException|TimeoutError"
14 changes: 14 additions & 0 deletions src/uproot/source/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class Resource:
:doc:`uproot.source.futures.ResourceFuture`.
"""

def __init__(self):
self._file_path = None

@property
def file_path(self) -> str:
"""
Expand All @@ -49,6 +52,14 @@ class Source:
the file.
"""

def __init__(self):
self._num_requested_bytes = 0
self._num_requests = 0
self._num_requested_chunks = 0
self._file_path = None
self._num_bytes = None
self._executor = None

def chunk(self, start: int, stop: int) -> Chunk:
"""
Args:
Expand Down Expand Up @@ -139,6 +150,9 @@ def closed(self) -> bool:
True if the associated file/connection/thread pool is closed; False
otherwise.
"""
if self._executor is None:
return True

return self._executor.closed


Expand Down
10 changes: 6 additions & 4 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class FSSpecSource(uproot.source.chunk.Source):
def __init__(self, file_path: str, **options):
import fsspec.core

default_options = uproot.reading.open.defaults

exclude_keys = set(default_options.keys())
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}
options = dict(uproot.reading.open.defaults, **options)
storage_options = {
k: v
for k, v in options.items()
if k not in uproot.reading.open.defaults.keys()
}

self._executor = FSSpecLoopExecutor()

Expand Down
4 changes: 1 addition & 3 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,7 @@ def submit(self, future: ResourceFuture) -> ResourceFuture:
"""
assert isinstance(future, ResourceFuture)
if self.closed:
raise OSError(
f"resource is closed for file {self._workers[0].resource.file_path}"
)
raise OSError(f"resource is closed for file {self._resource.file_path}")
future._run(self._resource)
return future

Expand Down
16 changes: 8 additions & 8 deletions src/uproot/source/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)
response = connection.getresponse()
break
else:
raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, file_path
Expand All @@ -119,7 +119,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)

if response.status != 200:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""HTTP response was {}, rather than 200, in attempt to get file size
in file {}""".format(
response.status, file_path
Expand All @@ -132,7 +132,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout)
return int(x)
else:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""response headers did not include content-length: {}
in file {}""".format(
dict(response.getheaders()), file_path
Expand Down Expand Up @@ -216,7 +216,7 @@ def get(self, connection, start: int, stop: int) -> bytes:
)
return self.get(redirect, start, stop)

raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, self._file_path
Expand All @@ -225,7 +225,7 @@ def get(self, connection, start: int, stop: int) -> bytes:

if response.status != 206:
connection.close()
raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {}, rather than 206 (range requests)
for URL {}""".format(
response.status, self._file_path
Expand Down Expand Up @@ -322,7 +322,7 @@ def task(resource):
task(resource)
return

raise OSError(
raise http.client.HTTPException(
"""remote server responded with status {} (redirect) without a 'location'
for URL {}""".format(
response.status, source.file_path
Expand Down Expand Up @@ -428,7 +428,7 @@ def handle_multipart(
data = response_buffer.read(length)

if len(data) != length:
raise OSError(
raise http.client.HTTPException(
"""wrong chunk length {} (expected {}) for byte range {} "
"in HTTP multipart
for URL {}""".format(
Expand All @@ -454,7 +454,7 @@ def handle_multipart(
else:
range_string = range_string.decode("utf-8", "surrogateescape")
expecting = ", ".join(f"{a}-{b - 1}" for a, b in futures)
raise OSError(
raise http.client.HTTPException(
"""unrecognized byte range in headers of HTTP multipart: {}

expecting: {}
Expand Down
27 changes: 19 additions & 8 deletions src/uproot/source/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def get_server_config(file):
XRootD_client.flags.QueryCode.CONFIG, "readv_iov_max readv_ior_max"
)
if status.error:
raise OSError(status.message)
if status.code in (206,):
raise TimeoutError(status.message)
else:
raise OSError(status.message)

# Result is something like b'178956968\n2097136\n'
readv_iov_max, readv_ior_max = map(int, result.split(b"\n", 1))
Expand Down Expand Up @@ -121,12 +124,12 @@ def _xrd_error(self, status):
# https://github.com/xrootd/xrootd/blob/250eced4d3787c2ac5be2c8c922134153bbf7f08/src/XrdCl/XrdClStatus.cc#L34-L74
if status.code in (101, 304, 400):
raise uproot._util._file_not_found(self._file_path, status.message)

else:
raise OSError(
f"""XRootD error: {status.message}
in file {self._file_path}"""
elif status.code in (206,):
raise TimeoutError(
f"XRootD error: {status.message} in file {self._file_path}"
)
else:
raise OSError(f"XRootD error: {status.message} in file {self._file_path}")

@property
def timeout(self) -> float | None:
Expand Down Expand Up @@ -266,6 +269,8 @@ class XRootDSource(uproot.source.chunk.Source):
ResourceClass = XRootDResource

def __init__(self, file_path: str, **options):
options = dict(uproot.reading.open.defaults, **options)

self._timeout = options["timeout"]
self._desired_max_num_elements = options["max_num_elements"]
self._use_threads = options["use_threads"]
Expand All @@ -285,7 +290,7 @@ def _open(self):
# futures that wait for chunks that have been split to merge them.
if self._use_threads:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[trivial_resource() for x in range(self._num_workers)]
[trivial_resource() for _ in range(self._num_workers)]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
Expand Down Expand Up @@ -458,6 +463,8 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource):
ResourceClass = XRootDResource

def __init__(self, file_path: str, **options):
options = dict(uproot.reading.open.defaults, **options)

self._num_workers = options["num_workers"]
self._timeout = options["timeout"]
self._use_threads = options["use_threads"]
Expand Down Expand Up @@ -501,5 +508,9 @@ def timeout(self) -> float | None:
@property
def num_bytes(self) -> int:
if self._num_bytes is None:
self._num_bytes = self._executor.workers[0].resource.num_bytes
if hasattr(self._executor, "workers"):
self._num_bytes = self._executor.workers[0].resource.num_bytes
else:
self._num_bytes = self._executor._resource.num_bytes

return self._num_bytes
30 changes: 3 additions & 27 deletions tests/test_0001_source_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ def test_fallback(server, use_threads, num_workers):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -322,19 +319,14 @@ def test_xrootd(use_threads):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_deadlock(use_threads):
def test_xrootd_deadlock():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure of what I did here, don't really understand this test. Please review @nsmith- in case I broke something important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is obsolete now that XRootD upstream has truly fixed the issues discussed in #59 and our CI is not testing older xrootd that needs the patch of

if older_xrootd("5.1.0"):
# This is registered after calling "import XRootD.client" so it is ran
# before XRootD.client.finalize.finalize()
@atexit.register
def cleanup_open_files():

In principle we could in https://github.com/scikit-hep/uproot5/blob/6b6fa9458b4ae46894e053dee25c56f678567f9d/.github/workflows/build-test.yml#L62C11-L62C11 specify older xrootd to test but it seems a bit moot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @chrisburr has input

pytest.importorskip("XRootD")
# Attach this file to the "test_xrootd_deadlock" function, so it leaks
pytest.uproot_test_xrootd_deadlock_f = uproot.source.xrootd.XRootDResource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=20,
use_threads=use_threads,
)


Expand All @@ -352,9 +344,6 @@ def test_xrootd_fail(use_threads):
)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -376,9 +365,6 @@ def test_xrootd_vectorread(use_threads):
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -398,18 +384,15 @@ def test_xrootd_vectorread_max_element_split(use_threads):
assert len(one) == max_element_size + 1


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_vectorread_max_element_split_consistency(use_threads):
pytest.importorskip("XRootD")
filename = "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"

def get_chunk(Source, **kwargs):
with Source(filename, **kwargs) as source:
def get_chunk(source_cls, **kwargs):
with source_cls(filename, **kwargs) as source:
notifications = queue.Queue()
max_element_size = 2097136
chunks = source.chunks([(0, max_element_size + 1)], notifications)
Expand Down Expand Up @@ -444,9 +427,6 @@ def test_xrootd_vectorread_fail(use_threads):
)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand All @@ -461,7 +441,6 @@ def test_xrootd_size(use_threads):
) as source:
size1 = source.num_bytes

pytest.importorskip("XRootD")
with uproot.source.xrootd.MultithreadedXRootDSource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
Expand All @@ -474,9 +453,6 @@ def test_xrootd_size(use_threads):
assert size1 == 3469136394


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
Expand Down
9 changes: 0 additions & 9 deletions tests/test_0006_notify_when_downloaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ def test_http_fallback_workers(server):
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd():
Expand All @@ -168,9 +165,6 @@ def test_xrootd():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_workers():
Expand All @@ -191,9 +185,6 @@ def test_xrootd_workers():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread():
Expand Down
9 changes: 0 additions & 9 deletions tests/test_0007_single_chunk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ def test_http_multipart_fail():
tobytes(source.chunk(0, 100).raw_data)


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd():
Expand All @@ -147,9 +144,6 @@ def test_xrootd():
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_worker():
Expand All @@ -169,9 +163,6 @@ def test_xrootd_worker():
assert one[:4] == b"root"


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread():
Expand Down
3 changes: 0 additions & 3 deletions tests/test_0302-pickle.py → tests/test_0302_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def test_pickle_roundtrip_http():
]


@pytest.mark.skip(
reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now"
)
@pytest.mark.network
@pytest.mark.xrootd
def test_pickle_roundtrip_xrootd():
Expand Down
Loading