diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index bbc888ac0..ef4277174 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -67,9 +67,7 @@ jobs: - name: Run pytest run: | - python -m pytest -vv tests \ - --reruns 3 --reruns-delay 30 \ - --only-rerun requests.exceptions.HTTPError + python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "(?i)http|timeout" vanilla-build: strategy: @@ -93,4 +91,4 @@ jobs: - name: Run pytest run: | - python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun requests.exceptions.HTTPError + python -m pytest -vv tests --reruns 3 --reruns-delay 30 --only-rerun "(?i)http|timeout" diff --git a/src/uproot/source/chunk.py b/src/uproot/source/chunk.py index 9a268f63a..4d22a94ae 100644 --- a/src/uproot/source/chunk.py +++ b/src/uproot/source/chunk.py @@ -30,6 +30,9 @@ class Resource: :doc:`uproot.source.futures.ResourceFuture`. """ + def __init__(self): + self._file_path = None + @property def file_path(self) -> str: """ @@ -49,6 +52,14 @@ class Source: the file. """ + def __init__(self): + self._num_requested_bytes = 0 + self._num_requests = 0 + self._num_requested_chunks = 0 + self._file_path = None + self._num_bytes = None + self._executor = None + def chunk(self, start: int, stop: int) -> Chunk: """ Args: @@ -139,6 +150,9 @@ def closed(self) -> bool: True if the associated file/connection/thread pool is closed; False otherwise. """ + if self._executor is None: + return True + return self._executor.closed diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py index 2b9ba1d49..2cb874575 100644 --- a/src/uproot/source/fsspec.py +++ b/src/uproot/source/fsspec.py @@ -26,10 +26,12 @@ class FSSpecSource(uproot.source.chunk.Source): def __init__(self, file_path: str, **options): import fsspec.core - default_options = uproot.reading.open.defaults - - exclude_keys = set(default_options.keys()) - storage_options = {k: v for k, v in options.items() if k not in exclude_keys} + options = dict(uproot.reading.open.defaults, **options) + storage_options = { + k: v + for k, v in options.items() + if k not in uproot.reading.open.defaults.keys() + } self._executor = FSSpecLoopExecutor() diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index dfed35200..20673a293 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -453,9 +453,7 @@ def submit(self, future: ResourceFuture) -> ResourceFuture: """ assert isinstance(future, ResourceFuture) if self.closed: - raise OSError( - f"resource is closed for file {self._workers[0].resource.file_path}" - ) + raise OSError(f"resource is closed for file {self._resource.file_path}") future._run(self._resource) return future diff --git a/src/uproot/source/http.py b/src/uproot/source/http.py index 02e42ee95..b433f4efd 100644 --- a/src/uproot/source/http.py +++ b/src/uproot/source/http.py @@ -106,7 +106,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout) response = connection.getresponse() break else: - raise OSError( + raise http.client.HTTPException( """remote server responded with status {} (redirect) without a 'location' for URL {}""".format( response.status, file_path @@ -119,7 +119,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout) if response.status != 200: connection.close() - raise OSError( + raise http.client.HTTPException( """HTTP response was {}, rather than 200, in attempt to get file size in file {}""".format( response.status, file_path @@ -132,7 +132,7 @@ def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout) return int(x) else: connection.close() - raise OSError( + raise http.client.HTTPException( """response headers did not include content-length: {} in file {}""".format( dict(response.getheaders()), file_path @@ -216,7 +216,7 @@ def get(self, connection, start: int, stop: int) -> bytes: ) return self.get(redirect, start, stop) - raise OSError( + raise http.client.HTTPException( """remote server responded with status {} (redirect) without a 'location' for URL {}""".format( response.status, self._file_path @@ -225,7 +225,7 @@ def get(self, connection, start: int, stop: int) -> bytes: if response.status != 206: connection.close() - raise OSError( + raise http.client.HTTPException( """remote server responded with status {}, rather than 206 (range requests) for URL {}""".format( response.status, self._file_path @@ -322,7 +322,7 @@ def task(resource): task(resource) return - raise OSError( + raise http.client.HTTPException( """remote server responded with status {} (redirect) without a 'location' for URL {}""".format( response.status, source.file_path @@ -428,7 +428,7 @@ def handle_multipart( data = response_buffer.read(length) if len(data) != length: - raise OSError( + raise http.client.HTTPException( """wrong chunk length {} (expected {}) for byte range {} " "in HTTP multipart for URL {}""".format( @@ -454,7 +454,7 @@ def handle_multipart( else: range_string = range_string.decode("utf-8", "surrogateescape") expecting = ", ".join(f"{a}-{b - 1}" for a, b in futures) - raise OSError( + raise http.client.HTTPException( """unrecognized byte range in headers of HTTP multipart: {} expecting: {} diff --git a/src/uproot/source/xrootd.py b/src/uproot/source/xrootd.py index 0571b6713..987fada94 100644 --- a/src/uproot/source/xrootd.py +++ b/src/uproot/source/xrootd.py @@ -63,7 +63,10 @@ def get_server_config(file): XRootD_client.flags.QueryCode.CONFIG, "readv_iov_max readv_ior_max" ) if status.error: - raise OSError(status.message) + if status.code in (206,): + raise TimeoutError(status.message) + else: + raise OSError(status.message) # Result is something like b'178956968\n2097136\n' readv_iov_max, readv_ior_max = map(int, result.split(b"\n", 1)) @@ -121,12 +124,12 @@ def _xrd_error(self, status): # https://github.com/xrootd/xrootd/blob/250eced4d3787c2ac5be2c8c922134153bbf7f08/src/XrdCl/XrdClStatus.cc#L34-L74 if status.code in (101, 304, 400): raise uproot._util._file_not_found(self._file_path, status.message) - - else: - raise OSError( - f"""XRootD error: {status.message} -in file {self._file_path}""" + elif status.code in (206,): + raise TimeoutError( + f"XRootD error: {status.message} in file {self._file_path}" ) + else: + raise OSError(f"XRootD error: {status.message} in file {self._file_path}") @property def timeout(self) -> float | None: @@ -266,6 +269,8 @@ class XRootDSource(uproot.source.chunk.Source): ResourceClass = XRootDResource def __init__(self, file_path: str, **options): + options = dict(uproot.reading.open.defaults, **options) + self._timeout = options["timeout"] self._desired_max_num_elements = options["max_num_elements"] self._use_threads = options["use_threads"] @@ -285,7 +290,7 @@ def _open(self): # futures that wait for chunks that have been split to merge them. if self._use_threads: self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [trivial_resource() for x in range(self._num_workers)] + [trivial_resource() for _ in range(self._num_workers)] ) else: self._executor = uproot.source.futures.ResourceTrivialExecutor( @@ -458,6 +463,8 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource): ResourceClass = XRootDResource def __init__(self, file_path: str, **options): + options = dict(uproot.reading.open.defaults, **options) + self._num_workers = options["num_workers"] self._timeout = options["timeout"] self._use_threads = options["use_threads"] @@ -501,5 +508,9 @@ def timeout(self) -> float | None: @property def num_bytes(self) -> int: if self._num_bytes is None: - self._num_bytes = self._executor.workers[0].resource.num_bytes + if hasattr(self._executor, "workers"): + self._num_bytes = self._executor.workers[0].resource.num_bytes + else: + self._num_bytes = self._executor._resource.num_bytes + return self._num_bytes diff --git a/tests/test_0001_source_class.py b/tests/test_0001_source_class.py index 4c97d95bd..2ca6d98a7 100644 --- a/tests/test_0001_source_class.py +++ b/tests/test_0001_source_class.py @@ -299,9 +299,6 @@ def test_fallback(server, use_threads, num_workers): assert one[:4] == b"root" -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -322,22 +319,6 @@ def test_xrootd(use_threads): assert one[:4] == b"root" -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) -@pytest.mark.network -@pytest.mark.xrootd -@pytest.mark.parametrize("use_threads", [True, False], indirect=True) -def test_xrootd_deadlock(use_threads): - pytest.importorskip("XRootD") - # Attach this file to the "test_xrootd_deadlock" function, so it leaks - pytest.uproot_test_xrootd_deadlock_f = uproot.source.xrootd.XRootDResource( - "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", - timeout=20, - use_threads=use_threads, - ) - - @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -352,9 +333,6 @@ def test_xrootd_fail(use_threads): ) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -376,9 +354,6 @@ def test_xrootd_vectorread(use_threads): assert one[:4] == b"root" -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -398,9 +373,6 @@ def test_xrootd_vectorread_max_element_split(use_threads): assert len(one) == max_element_size + 1 -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -408,8 +380,8 @@ def test_xrootd_vectorread_max_element_split_consistency(use_threads): pytest.importorskip("XRootD") filename = "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root" - def get_chunk(Source, **kwargs): - with Source(filename, **kwargs) as source: + def get_chunk(source_cls, **kwargs): + with source_cls(filename, **kwargs) as source: notifications = queue.Queue() max_element_size = 2097136 chunks = source.chunks([(0, max_element_size + 1)], notifications) @@ -444,9 +416,6 @@ def test_xrootd_vectorread_fail(use_threads): ) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) @@ -461,7 +430,6 @@ def test_xrootd_size(use_threads): ) as source: size1 = source.num_bytes - pytest.importorskip("XRootD") with uproot.source.xrootd.MultithreadedXRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, @@ -474,9 +442,6 @@ def test_xrootd_size(use_threads): assert size1 == 3469136394 -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd @pytest.mark.parametrize("use_threads", [True, False], indirect=True) diff --git a/tests/test_0006_notify_when_downloaded.py b/tests/test_0006_notify_when_downloaded.py index aea1a96da..b79b80e2f 100644 --- a/tests/test_0006_notify_when_downloaded.py +++ b/tests/test_0006_notify_when_downloaded.py @@ -145,9 +145,6 @@ def test_http_fallback_workers(server): expected.pop((chunk.start, chunk.stop)) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd(): @@ -168,9 +165,6 @@ def test_xrootd(): expected.pop((chunk.start, chunk.stop)) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd_workers(): @@ -191,9 +185,6 @@ def test_xrootd_workers(): expected.pop((chunk.start, chunk.stop)) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd_vectorread(): diff --git a/tests/test_0007_single_chunk_interface.py b/tests/test_0007_single_chunk_interface.py index 5f3223a7a..dd20d134c 100644 --- a/tests/test_0007_single_chunk_interface.py +++ b/tests/test_0007_single_chunk_interface.py @@ -125,9 +125,6 @@ def test_http_multipart_fail(): tobytes(source.chunk(0, 100).raw_data) -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd(): @@ -147,9 +144,6 @@ def test_xrootd(): assert one[:4] == b"root" -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd_worker(): @@ -169,9 +163,6 @@ def test_xrootd_worker(): assert one[:4] == b"root" -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_xrootd_vectorread(): diff --git a/tests/test_0302-pickle.py b/tests/test_0302_pickle.py similarity index 94% rename from tests/test_0302-pickle.py rename to tests/test_0302_pickle.py index b16603538..1177c7088 100644 --- a/tests/test_0302-pickle.py +++ b/tests/test_0302_pickle.py @@ -53,9 +53,6 @@ def test_pickle_roundtrip_http(): ] -@pytest.mark.skip( - reason="RECHECK: Run2012B_DoubleMuParked.root is super-flaky right now" -) @pytest.mark.network @pytest.mark.xrootd def test_pickle_roundtrip_xrootd(): diff --git a/tests/test_0303-empty-jagged-array.py b/tests/test_0303_empty_jagged_array.py similarity index 100% rename from tests/test_0303-empty-jagged-array.py rename to tests/test_0303_empty_jagged_array.py