From 1991e7be40adbb8b70c7adb78d6a88939c1f7284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Thu, 7 May 2020 22:00:21 +0200 Subject: [PATCH 1/4] alternative iterator interrupt implementation --- fiona/collection.py | 45 +++++++++++----------- fiona/ogrext.pyx | 70 +++++++++++++++++++++++------------ tests/test_collection.py | 80 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 148 insertions(+), 47 deletions(-) diff --git a/fiona/collection.py b/fiona/collection.py index ddbbcd01f..9a24d0860 100644 --- a/fiona/collection.py +++ b/fiona/collection.py @@ -89,7 +89,6 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, gdal=get_gdal_release_name())) self.session = None - self.iterator = None self._len = 0 self._bounds = None self._driver = None @@ -100,6 +99,7 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, self.enabled_drivers = enabled_drivers self.ignore_fields = ignore_fields self.ignore_geometry = bool(ignore_geometry) + self.iterators = [] if vsi: self.path = vfs.vsi_path(path, vsi, archive) @@ -254,9 +254,8 @@ def filter(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - self.iterator = Iterator( - self, start, stop, step, bbox, mask) - return self.iterator + iterator = Iterator(self, start, stop, step, bbox, mask) + return iterator def items(self, *args, **kwds): """Returns an iterator over FID, record pairs, optionally @@ -282,9 +281,8 @@ def items(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - self.iterator = ItemsIterator( - self, start, stop, step, bbox, mask) - return self.iterator + iterator = ItemsIterator(self, start, stop, step, bbox, mask) + return iterator def keys(self, *args, **kwds): """Returns an iterator over FIDs, optionally @@ -310,9 +308,8 @@ def keys(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - self.iterator = KeysIterator( - self, start, stop, step, bbox, mask) - return self.iterator + iterator = KeysIterator(self, start, stop, step, bbox, mask) + return iterator def __contains__(self, fid): return self.session.has_feature(fid) @@ -323,16 +320,16 @@ def __iter__(self): """Returns an iterator over records.""" return self.filter() - def __next__(self): - """Returns next record from iterator.""" - warnings.warn("Collection.__next__() is buggy and will be removed in " - "Fiona 2.0. Switch to `next(iter(collection))`.", - FionaDeprecationWarning, stacklevel=2) - if not self.iterator: - iter(self) - return next(self.iterator) - - next = __next__ + # def __next__(self): + # """Returns next record from iterator.""" + # warnings.warn("Collection.__next__() is buggy and will be removed in " + # "Fiona 2.0. Switch to `next(iter(collection))`.", + # FionaDeprecationWarning, stacklevel=2) + # if not self.iterator: + # iter(self) + # return next(self.iterator) + # + # next = __next__ def __getitem__(self, item): return self.session.__getitem__(item) @@ -448,7 +445,7 @@ def close(self): self.session.stop() log.debug("Stopped session") self.session = None - self.iterator = None + # todo del self.iterators if self.env: self.env.__exit__() @@ -473,6 +470,12 @@ def __del__(self): # or use the context manager protocol ("with"). self.close() + def _interrupt_sequential_read(self): + for ref in self.iterators: + iterator = ref() + if iterator is not None: + iterator.interrupt_sequential_read() + ALL_GEOMETRY_TYPES = set([ geom_type for geom_type in GEOMETRY_TYPES.values() diff --git a/fiona/ogrext.pyx b/fiona/ogrext.pyx index 974b3a0a1..860e559e8 100644 --- a/fiona/ogrext.pyx +++ b/fiona/ogrext.pyx @@ -11,6 +11,7 @@ import warnings import math import uuid from collections import namedtuple +import weakref from six import integer_types, string_types, text_type @@ -439,6 +440,17 @@ def featureRT(feature, collection): # Collection-related extension classes and functions +cdef inline void * wrap_get_feature(object collection, void *cogr_layer, int index) except *: + """Check if it is safe to execute OGR_L_GetFeature""" + collection._interrupt_sequential_read() + return OGR_L_GetFeature(cogr_layer, index) + + +cdef inline int wrap_get_feature_count(object collection, void *cogr_layer, int force) except *: + """Check if it is safe to execute OGR_L_GetFeatureCount""" + collection._interrupt_sequential_read() + return OGR_L_GetFeatureCount(cogr_layer, force) + cdef class Session: cdef void *cogr_ds @@ -571,7 +583,7 @@ cdef class Session: def get_length(self): if self.cogr_layer == NULL: raise ValueError("Null layer") - return OGR_L_GetFeatureCount(self.cogr_layer, 0) + return wrap_get_feature_count(self.collection, self.cogr_layer, 0) def get_driver(self): cdef void *cogr_driver = GDALGetDatasetDriver(self.cogr_ds) @@ -803,7 +815,7 @@ cdef class Session: """ cdef void * cogr_feature fid = int(fid) - cogr_feature = OGR_L_GetFeature(self.cogr_layer, fid) + cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, fid) if cogr_feature != NULL: _deleteOgrFeature(cogr_feature) return True @@ -817,7 +829,7 @@ cdef class Session: """ cdef void * cogr_feature fid = int(fid) - cogr_feature = OGR_L_GetFeature(self.cogr_layer, fid) + cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, fid) if cogr_feature != NULL: feature = FeatureBuilder().build( cogr_feature, @@ -846,12 +858,12 @@ cdef class Session: index = item # from the back if index < 0: - ftcount = OGR_L_GetFeatureCount(self.cogr_layer, 0) + ftcount = wrap_get_feature_count(self.collection, self.cogr_layer, 0) if ftcount == -1: raise IndexError( "collection's dataset does not support negative indexes") index += ftcount - cogr_feature = OGR_L_GetFeature(self.cogr_layer, index) + cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, index) if cogr_feature == NULL: return None feature = FeatureBuilder().build( @@ -1247,12 +1259,15 @@ cdef class Iterator: cdef step cdef fastindex cdef stepsign + cdef is_interrupted + cdef object __weakref__ def __cinit__(self, collection, start=None, stop=None, step=None, bbox=None, mask=None): if collection.session is None: raise ValueError("I/O operation on closed collection") self.collection = collection + self.collection.iterators.append(weakref.ref(self)) cdef Session session cdef void *cogr_geometry session = self.collection.session @@ -1280,7 +1295,7 @@ cdef class Iterator: self.fastindex = OGR_L_TestCapability( session.cogr_layer, OLC_FASTSETNEXTBYINDEX) - ftcount = OGR_L_GetFeatureCount(session.cogr_layer, 0) + ftcount = wrap_get_feature_count(self.collection, session.cogr_layer, 0) if ftcount == -1 and ((start is not None and start < 0) or (stop is not None and stop < 0)): raise IndexError( @@ -1311,6 +1326,10 @@ cdef class Iterator: self.next_index = start log.debug("Index: %d", self.next_index) OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) + self.is_interrupted = False + + def interrupt_sequential_read(self): + self.is_interrupted = True def __iter__(self): return self @@ -1332,24 +1351,29 @@ cdef class Iterator: if self.next_index > self.start or (self.stop is not None and self.next_index <= self.stop): raise StopIteration - # Set read cursor to next_item position - if self.step > 1 and self.fastindex: - OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) - - elif self.step > 1 and not self.fastindex and not self.next_index == self.start: - for _ in range(self.step - 1): - # TODO rbuffat add test -> OGR_L_GetNextFeature increments cursor by 1, therefore self.step - 1 as one increment was performed when feature is read - cogr_feature = OGR_L_GetNextFeature(session.cogr_layer) - if cogr_feature == NULL: - raise StopIteration - elif self.step > 1 and not self.fastindex and self.next_index == self.start: - OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) - - elif self.step == 0: - # OGR_L_GetNextFeature increments read cursor by one - pass - elif self.step < 0: + if self.is_interrupted: + # TODO WARNING potential slow operation OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) + self.is_interrupted = False + else: + # Set read cursor to next_item position + if self.step > 1 and self.fastindex: + OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) + + elif self.step > 1 and not self.fastindex and not self.next_index == self.start: + for _ in range(self.step - 1): + # TODO rbuffat add test -> OGR_L_GetNextFeature increments cursor by 1, therefore self.step - 1 as one increment was performed when feature is read + cogr_feature = OGR_L_GetNextFeature(session.cogr_layer) + if cogr_feature == NULL: + raise StopIteration + elif self.step > 1 and not self.fastindex and self.next_index == self.start: + OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) + + elif self.step == 0: + # OGR_L_GetNextFeature increments read cursor by one + pass + elif self.step < 0: + OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) # set the next index self.next_index += self.step diff --git a/tests/test_collection.py b/tests/test_collection.py index 02f12abe6..c02117912 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -8,10 +8,16 @@ import fiona from fiona.collection import Collection -from fiona.drvsupport import supported_drivers -from fiona.env import getenv +from fiona.env import getenv, GDALVersion from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning -from .conftest import WGS84PATTERN +from .conftest import WGS84PATTERN, get_temp_filename +from fiona.drvsupport import supported_drivers, driver_mode_mingdal +import tempfile +import os +import shutil + + +gdal_version = GDALVersion.runtime() class TestSupportedDrivers(object): @@ -900,3 +906,71 @@ def test_collection_env(path_coutwildrnp_shp): with fiona.open(path_coutwildrnp_shp): assert 'FIONA_ENV' in getenv() + +def test_interrupted_sequential_read(path_coutwildrnp_json): + with fiona.open(path_coutwildrnp_json) as c: + for key in c.keys(): + c[key] + + +def test_membership_releases_lock(path_coutwildrnp_json): + with fiona.open(path_coutwildrnp_json) as c: + 5 in c.keys() + assert (0 in c) + + +@pytest.fixture(scope="module", params=[driver for driver, raw in supported_drivers.items() if 'w' in raw + and (driver not in driver_mode_mingdal['w'] or + gdal_version >= GDALVersion(*driver_mode_mingdal['w'][driver][:2])) + and driver not in {'DGN', 'MapInfo File', 'GPSTrackMaker', 'GPX', 'BNA', 'DXF', + 'GML'}]) +def slice_dataset_path(request): + """ Create temporary datasets for test_collection_iterator_items_slice()""" + + driver = request.param + min_id = 0 + max_id = 9 + schema = {'geometry': 'Point', 'properties': [('position', 'int')]} + records = [{'geometry': {'type': 'Point', 'coordinates': (0.0, float(i))}, 'properties': {'position': i}} for i + in range(min_id, max_id + 1)] + + tmpdir = tempfile.mkdtemp() + path = os.path.join(tmpdir, get_temp_filename(driver)) + + with fiona.open(path, 'w', + driver=driver, + schema=schema) as c: + c.writerecords(records) + yield path + shutil.rmtree(tmpdir) + + +@pytest.mark.parametrize("args", [(0, None, None, 4), + ]) +@pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*') +@pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*') +def test_collection_iterator_items_slice(slice_dataset_path, args): + """ Test if c.items(start, stop, step) returns the correct features. + """ + + start, stop, step, interrupted_index = args + min_id = 0 + max_id = 9 + + positions = list(range(min_id, max_id + 1))[start:stop:step] + + with fiona.open(slice_dataset_path, 'r') as c: + + iterator = c.items(start, stop, step) + for _ in range(interrupted_index): + _ = next(iterator) + iterator.interrupt_sequential_read() + item = next(iterator) + + assert int(item[1]['properties']['position']) == positions[interrupted_index] + + +def test_collection_iterator_keys_next(path_coutwildrnp_shp): + with fiona.open(path_coutwildrnp_shp) as src: + k = next(src.keys(5, None)) + assert k == 5 \ No newline at end of file From 8facdfa48273960b10523360f02c1789958bd084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Fri, 8 May 2020 16:10:02 +0200 Subject: [PATCH 2/4] allow only one active iterator --- fiona/collection.py | 42 +++++++++++++++++++++++++--------------- fiona/errors.py | 4 ++++ fiona/ogrext.pyx | 18 ++++++++++++----- tests/test_collection.py | 24 +++++++++++++++++++---- 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/fiona/collection.py b/fiona/collection.py index 9a24d0860..9ef89fe0f 100644 --- a/fiona/collection.py +++ b/fiona/collection.py @@ -99,7 +99,7 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, self.enabled_drivers = enabled_drivers self.ignore_fields = ignore_fields self.ignore_geometry = bool(ignore_geometry) - self.iterators = [] + self.iterator = None if vsi: self.path = vfs.vsi_path(path, vsi, archive) @@ -174,6 +174,12 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, self.field_skip_log_filter = FieldSkipLogFilter() + def _add_iterator(self, iterator): + """Internal method to handle new iterator""" + if self.iterator is not None: + self.iterator.stop_iterator() + self.iterator = iterator + def __repr__(self): return "<%s Collection '%s', mode '%s' at %s>" % ( self.closed and "closed" or "open", @@ -255,6 +261,7 @@ def filter(self, *args, **kwds): if bbox and mask: raise ValueError("mask and bbox can not be set together") iterator = Iterator(self, start, stop, step, bbox, mask) + self._add_iterator(iterator) return iterator def items(self, *args, **kwds): @@ -282,6 +289,7 @@ def items(self, *args, **kwds): if bbox and mask: raise ValueError("mask and bbox can not be set together") iterator = ItemsIterator(self, start, stop, step, bbox, mask) + self._add_iterator(iterator) return iterator def keys(self, *args, **kwds): @@ -309,6 +317,7 @@ def keys(self, *args, **kwds): if bbox and mask: raise ValueError("mask and bbox can not be set together") iterator = KeysIterator(self, start, stop, step, bbox, mask) + self._add_iterator(iterator) return iterator def __contains__(self, fid): @@ -320,16 +329,16 @@ def __iter__(self): """Returns an iterator over records.""" return self.filter() - # def __next__(self): - # """Returns next record from iterator.""" - # warnings.warn("Collection.__next__() is buggy and will be removed in " - # "Fiona 2.0. Switch to `next(iter(collection))`.", - # FionaDeprecationWarning, stacklevel=2) - # if not self.iterator: - # iter(self) - # return next(self.iterator) - # - # next = __next__ + def __next__(self): + """Returns next record from iterator.""" + warnings.warn("Collection.__next__() is buggy and will be removed in " + "Fiona 2.0. Switch to `next(iter(collection))`.", + FionaDeprecationWarning, stacklevel=2) + if not self.iterator: + iter(self) + return next(self.iterator) + + next = __next__ def __getitem__(self, item): return self.session.__getitem__(item) @@ -445,7 +454,9 @@ def close(self): self.session.stop() log.debug("Stopped session") self.session = None - # todo del self.iterators + if self.iterator is not None: + self.iterator.stop_iterator() + self.iterator = None if self.env: self.env.__exit__() @@ -471,10 +482,9 @@ def __del__(self): self.close() def _interrupt_sequential_read(self): - for ref in self.iterators: - iterator = ref() - if iterator is not None: - iterator.interrupt_sequential_read() + """ Notify iterator that sequential read is interrupted""" + if self.iterator is not None: + self.iterator.interrupt_sequential_read() ALL_GEOMETRY_TYPES = set([ diff --git a/fiona/errors.py b/fiona/errors.py index 9dd1cfe21..643942732 100644 --- a/fiona/errors.py +++ b/fiona/errors.py @@ -63,5 +63,9 @@ class GDALVersionError(FionaError): """ +class IteratorStoppedError(FionaError): + """Raised if iterator is accessed after it was stopped.""" + + class FionaDeprecationWarning(UserWarning): """A warning about deprecation of Fiona features""" diff --git a/fiona/ogrext.pyx b/fiona/ogrext.pyx index 860e559e8..d928ccdca 100644 --- a/fiona/ogrext.pyx +++ b/fiona/ogrext.pyx @@ -11,7 +11,6 @@ import warnings import math import uuid from collections import namedtuple -import weakref from six import integer_types, string_types, text_type @@ -30,7 +29,7 @@ from fiona import compat from fiona.errors import ( DriverError, DriverIOError, SchemaError, CRSError, FionaValueError, TransactionError, GeometryTypeValidationError, DatasetDeleteError, - FionaDeprecationWarning) + FionaDeprecationWarning, IteratorStoppedError) from fiona.compat import OrderedDict from fiona.rfc3339 import parse_date, parse_datetime, parse_time from fiona.rfc3339 import FionaDateType, FionaDateTimeType, FionaTimeType @@ -1260,14 +1259,13 @@ cdef class Iterator: cdef fastindex cdef stepsign cdef is_interrupted - cdef object __weakref__ + cdef is_stopped def __cinit__(self, collection, start=None, stop=None, step=None, bbox=None, mask=None): if collection.session is None: raise ValueError("I/O operation on closed collection") self.collection = collection - self.collection.iterators.append(weakref.ref(self)) cdef Session session cdef void *cogr_geometry session = self.collection.session @@ -1327,10 +1325,14 @@ cdef class Iterator: log.debug("Index: %d", self.next_index) OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) self.is_interrupted = False + self.is_stopped = False def interrupt_sequential_read(self): self.is_interrupted = True + def stop_iterator(self): + self.is_stopped = True + def __iter__(self): return self @@ -1351,8 +1353,14 @@ cdef class Iterator: if self.next_index > self.start or (self.stop is not None and self.next_index <= self.stop): raise StopIteration + # Check if iterator is stopped + if self.is_stopped: + log.warning("Iterator was stopped, possible due to the creation of another iterator for the same collection.") + raise IteratorStoppedError + if self.is_interrupted: - # TODO WARNING potential slow operation + if not self.fastindex: + log.warning("Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance.") OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) self.is_interrupted = False else: diff --git a/tests/test_collection.py b/tests/test_collection.py index c02117912..f9fd42ff8 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -9,7 +9,7 @@ import fiona from fiona.collection import Collection from fiona.env import getenv, GDALVersion -from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning +from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning, IteratorStoppedError from .conftest import WGS84PATTERN, get_temp_filename from fiona.drvsupport import supported_drivers, driver_mode_mingdal import tempfile @@ -946,11 +946,12 @@ def slice_dataset_path(request): @pytest.mark.parametrize("args", [(0, None, None, 4), + (0, None, 2, 3), ]) @pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*') @pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*') -def test_collection_iterator_items_slice(slice_dataset_path, args): - """ Test if c.items(start, stop, step) returns the correct features. +def test_iterator_sequential_read_interrupted(slice_dataset_path, args): + """ Test if iterator resumes at correct position after sequential read is interrupted """ start, stop, step, interrupted_index = args @@ -970,7 +971,22 @@ def test_collection_iterator_items_slice(slice_dataset_path, args): assert int(item[1]['properties']['position']) == positions[interrupted_index] +def test_multiple_iterators(slice_dataset_path): + """Test that only one iterator at any time can be active""" + + with fiona.open(slice_dataset_path, 'r') as c: + + item_iterator = c.items() + item = next(item_iterator) + item = next(item_iterator) + filter_iterator = c.filter() + with pytest.raises(IteratorStoppedError): + item = next(item_iterator) + + def test_collection_iterator_keys_next(path_coutwildrnp_shp): with fiona.open(path_coutwildrnp_shp) as src: k = next(src.keys(5, None)) - assert k == 5 \ No newline at end of file + assert k == 5 + + From 363654a2cc33a22665f618b95f787fcf60dba208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Sun, 24 May 2020 13:39:18 +0200 Subject: [PATCH 3/4] allow only creation of one iterator per session --- fiona/collection.py | 33 +++++++++--------- fiona/errors.py | 4 +-- fiona/ogrext.pyx | 16 ++++----- tests/conftest.py | 28 +++++++++++++++ tests/test_collection.py | 73 ++++++---------------------------------- tests/test_slice.py | 37 ++------------------ 6 files changed, 66 insertions(+), 125 deletions(-) diff --git a/fiona/collection.py b/fiona/collection.py index a46cbd1d7..83a90b746 100644 --- a/fiona/collection.py +++ b/fiona/collection.py @@ -11,7 +11,8 @@ from fiona.ogrext import Iterator, ItemsIterator, KeysIterator from fiona.ogrext import Session, WritingSession from fiona.ogrext import buffer_to_virtual_file, remove_virtual_file, GEOMETRY_TYPES - from fiona.errors import (DriverError, SchemaError, CRSError, UnsupportedGeometryTypeError, DriverSupportError) + from fiona.errors import (DriverError, SchemaError, CRSError, UnsupportedGeometryTypeError, DriverSupportError, + IteratorAlreadyExistsError) from fiona.logutils import FieldSkipLogFilter from fiona._crs import crs_to_wkt from fiona._env import get_gdal_release_name, get_gdal_version_tuple @@ -91,6 +92,7 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, gdal=get_gdal_release_name())) self.session = None + self.iterator = None self._len = 0 self._bounds = None self._driver = None @@ -101,7 +103,6 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, self.enabled_drivers = enabled_drivers self.ignore_fields = ignore_fields self.ignore_geometry = bool(ignore_geometry) - self.iterator = None if vsi: self.path = vfs.vsi_path(path, vsi, archive) @@ -176,11 +177,6 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None, self.field_skip_log_filter = FieldSkipLogFilter() - def _add_iterator(self, iterator): - """Internal method to handle new iterator""" - if self.iterator is not None: - self.iterator.stop_iterator() - self.iterator = iterator def __repr__(self): return "<%s Collection '%s', mode '%s' at %s>" % ( @@ -262,9 +258,10 @@ def filter(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - iterator = Iterator(self, start, stop, step, bbox, mask) - self._add_iterator(iterator) - return iterator + if self.iterator is not None: + raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.") + self.iterator = Iterator(self, start, stop, step, bbox, mask) + return self.iterator def items(self, *args, **kwds): """Returns an iterator over FID, record pairs, optionally @@ -290,9 +287,10 @@ def items(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - iterator = ItemsIterator(self, start, stop, step, bbox, mask) - self._add_iterator(iterator) - return iterator + if self.iterator is not None: + raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.") + self.iterator = ItemsIterator(self, start, stop, step, bbox, mask) + return self.iterator def keys(self, *args, **kwds): """Returns an iterator over FIDs, optionally @@ -318,9 +316,10 @@ def keys(self, *args, **kwds): mask = kwds.get('mask') if bbox and mask: raise ValueError("mask and bbox can not be set together") - iterator = KeysIterator(self, start, stop, step, bbox, mask) - self._add_iterator(iterator) - return iterator + if self.iterator is not None: + raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.") + self.iterator = KeysIterator(self, start, stop, step, bbox, mask) + return self.iterator def __contains__(self, fid): return self.session.has_feature(fid) @@ -456,8 +455,6 @@ def close(self): self.session.stop() log.debug("Stopped session") self.session = None - if self.iterator is not None: - self.iterator.stop_iterator() self.iterator = None if self.env: self.env.__exit__() diff --git a/fiona/errors.py b/fiona/errors.py index 643942732..3e2464f7a 100644 --- a/fiona/errors.py +++ b/fiona/errors.py @@ -63,8 +63,8 @@ class GDALVersionError(FionaError): """ -class IteratorStoppedError(FionaError): - """Raised if iterator is accessed after it was stopped.""" +class IteratorAlreadyExistsError(FionaError): + """Raised if multiple Iterators per Session are attempted to be created.""" class FionaDeprecationWarning(UserWarning): diff --git a/fiona/ogrext.pyx b/fiona/ogrext.pyx index 7dcfdcfa8..8143cdf76 100644 --- a/fiona/ogrext.pyx +++ b/fiona/ogrext.pyx @@ -29,7 +29,7 @@ from fiona import compat from fiona.errors import ( DriverError, DriverIOError, SchemaError, CRSError, FionaValueError, TransactionError, GeometryTypeValidationError, DatasetDeleteError, - FionaDeprecationWarning, IteratorStoppedError) + FionaDeprecationWarning) from fiona.compat import OrderedDict from fiona.rfc3339 import parse_date, parse_datetime, parse_time from fiona.rfc3339 import FionaDateType, FionaDateTimeType, FionaTimeType @@ -1261,7 +1261,7 @@ cdef class Iterator: cdef ftcount cdef stepsign cdef is_interrupted - cdef is_stopped + def __cinit__(self, collection, start=None, stop=None, step=None, bbox=None, mask=None): @@ -1310,16 +1310,16 @@ cdef class Iterator: warnings.warn("Layer does not support" \ " OLC_FASTFEATURECOUNT, negative slices or start values other than zero" \ " may be slow.", RuntimeWarning) - self.ftcount = OGR_L_GetFeatureCount(session.cogr_layer, 1) + self.ftcount = wrap_get_feature_count(self.collection, session.cogr_layer, 1) else: - self.ftcount = OGR_L_GetFeatureCount(session.cogr_layer, 0) + self.ftcount = wrap_get_feature_count(self.collection, session.cogr_layer, 0) + if self.ftcount == -1 and ((start is not None and start < 0) or (stop is not None and stop < 0)): raise IndexError( "collection's dataset does not support negative slice indexes") - if stop is not None and stop < 0: stop += self.ftcount @@ -1357,14 +1357,12 @@ cdef class Iterator: log.debug("Next index: %d", self.next_index) OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) self.is_interrupted = False - self.is_stopped = False + def interrupt_sequential_read(self): + """ Notify iterator that sequential read is interrupted""" self.is_interrupted = True - def stop_iterator(self): - self.is_stopped = True - def __iter__(self): return self diff --git a/tests/conftest.py b/tests/conftest.py index 5b4513097..2d9bc5e89 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,12 +5,15 @@ import os.path import shutil import tarfile +import tempfile import zipfile from click.testing import CliRunner import pytest import fiona +from fiona import supported_drivers +from fiona.drvsupport import driver_mode_mingdal from fiona.env import GDALVersion driver_extensions = {'DXF': 'dxf', @@ -316,3 +319,28 @@ def unittest_path_coutwildrnp_shp(path_coutwildrnp_shp, request): """Makes shapefile path available to unittest tests""" request.cls.path_coutwildrnp_shp = path_coutwildrnp_shp + +@pytest.fixture(scope="session", params=[driver for driver, raw in supported_drivers.items() if 'w' in raw + and (driver not in driver_mode_mingdal['w'] or + gdal_version >= GDALVersion(*driver_mode_mingdal['w'][driver][:2])) + and driver not in {'DGN', 'MapInfo File', 'GPSTrackMaker', 'GPX', 'BNA', 'DXF', + 'GML'}]) +def slice_dataset(request): + """ Create temporary datasets to test slices""" + + driver = request.param + min_id = 0 + max_id = 9 + schema = {'geometry': 'Point', 'properties': [('position', 'int')]} + records = [{'geometry': {'type': 'Point', 'coordinates': (0.0, float(i))}, 'properties': {'position': i}} for i + in range(min_id, max_id + 1)] + + tmpdir = tempfile.mkdtemp() + path = os.path.join(tmpdir, get_temp_filename(driver)) + + with fiona.open(path, 'w', + driver=driver, + schema=schema) as c: + c.writerecords(records) + yield path, min_id, max_id + shutil.rmtree(tmpdir) diff --git a/tests/test_collection.py b/tests/test_collection.py index b7eff21e9..900e8b37d 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -9,16 +9,9 @@ import fiona from fiona.collection import Collection from fiona.env import getenv, GDALVersion -from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning, IteratorStoppedError +from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning, IteratorAlreadyExistsError from .conftest import WGS84PATTERN, get_temp_filename from fiona.drvsupport import supported_drivers, driver_mode_mingdal -import tempfile -import os -import shutil - - -gdal_version = GDALVersion.runtime() - class TestSupportedDrivers(object): @@ -884,7 +877,7 @@ def test_collection_zip_http(): ) assert ( ds.path - == "/vsizip/vsicurl/https://raw.githubusercontent.com/Toblerity/Fiona/master/tests/data/coutwildrnp.zip", + == "/vsizip/vsicurl/https://raw.githubusercontent.com/Toblerity/Fiona/master/tests/data/coutwildrnp.zip" ) assert len(ds) == 67 @@ -920,56 +913,19 @@ def test_collection_env(path_coutwildrnp_shp): assert 'FIONA_ENV' in getenv() -def test_interrupted_sequential_read(path_coutwildrnp_json): - with fiona.open(path_coutwildrnp_json) as c: - for key in c.keys(): - c[key] - - -def test_membership_releases_lock(path_coutwildrnp_json): - with fiona.open(path_coutwildrnp_json) as c: - 5 in c.keys() - assert (0 in c) - - -@pytest.fixture(scope="module", params=[driver for driver, raw in supported_drivers.items() if 'w' in raw - and (driver not in driver_mode_mingdal['w'] or - gdal_version >= GDALVersion(*driver_mode_mingdal['w'][driver][:2])) - and driver not in {'DGN', 'MapInfo File', 'GPSTrackMaker', 'GPX', 'BNA', 'DXF', - 'GML'}]) -def slice_dataset_path(request): - """ Create temporary datasets for test_collection_iterator_items_slice()""" - - driver = request.param - min_id = 0 - max_id = 9 - schema = {'geometry': 'Point', 'properties': [('position', 'int')]} - records = [{'geometry': {'type': 'Point', 'coordinates': (0.0, float(i))}, 'properties': {'position': i}} for i - in range(min_id, max_id + 1)] - - tmpdir = tempfile.mkdtemp() - path = os.path.join(tmpdir, get_temp_filename(driver)) - - with fiona.open(path, 'w', - driver=driver, - schema=schema) as c: - c.writerecords(records) - yield path - shutil.rmtree(tmpdir) - - -@pytest.mark.parametrize("args", [(0, None, None, 4), +@pytest.mark.parametrize("args", [(0, None, None, 0), + (0, None, None, 1), + (0, None, None, 4), (0, None, 2, 3), ]) @pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*') @pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*') -def test_iterator_sequential_read_interrupted(slice_dataset_path, args): +def test_iterator_sequential_read_interrupted(slice_dataset, args): """ Test if iterator resumes at correct position after sequential read is interrupted """ start, stop, step, interrupted_index = args - min_id = 0 - max_id = 9 + slice_dataset_path, min_id, max_id = slice_dataset positions = list(range(min_id, max_id + 1))[start:stop:step] @@ -984,20 +940,13 @@ def test_iterator_sequential_read_interrupted(slice_dataset_path, args): assert int(item[1]['properties']['position']) == positions[interrupted_index] -def test_multiple_iterators(slice_dataset_path): +def test_multiple_iterators(path_coutwildrnp_shp): """Test that only one iterator at any time can be active""" - with fiona.open(slice_dataset_path, 'r') as c: + with fiona.open(path_coutwildrnp_shp, 'r') as c: item_iterator = c.items() item = next(item_iterator) item = next(item_iterator) - filter_iterator = c.filter() - with pytest.raises(IteratorStoppedError): - item = next(item_iterator) - - -def test_collection_iterator_keys_next(path_coutwildrnp_shp): - with fiona.open(path_coutwildrnp_shp) as src: - k = next(src.keys(5, None)) - assert k == 5 + with pytest.raises(IteratorAlreadyExistsError): + filter_iterator = c.filter() diff --git a/tests/test_slice.py b/tests/test_slice.py index 3ac8241c1..5304d114a 100644 --- a/tests/test_slice.py +++ b/tests/test_slice.py @@ -1,14 +1,10 @@ """Note well: collection slicing is deprecated! """ -import tempfile -import shutil -import os + import pytest from fiona.env import GDALVersion import fiona from fiona.errors import FionaDeprecationWarning -from .conftest import get_temp_filename -from fiona.drvsupport import supported_drivers, driver_mode_mingdal gdal_version = GDALVersion.runtime() @@ -43,32 +39,6 @@ def test_collection_iterator_next(path_coutwildrnp_shp): assert v['id'] == '5' -@pytest.fixture(scope="module", params=[driver for driver, raw in supported_drivers.items() if 'w' in raw - and (driver not in driver_mode_mingdal['w'] or - gdal_version >= GDALVersion(*driver_mode_mingdal['w'][driver][:2])) - and driver not in {'DGN', 'MapInfo File', 'GPSTrackMaker', 'GPX', 'BNA', 'DXF', - 'GML'}]) -def slice_dataset_path(request): - """ Create temporary datasets for test_collection_iterator_items_slice()""" - - driver = request.param - min_id = 0 - max_id = 9 - schema = {'geometry': 'Point', 'properties': [('position', 'int')]} - records = [{'geometry': {'type': 'Point', 'coordinates': (0.0, float(i))}, 'properties': {'position': i}} for i - in range(min_id, max_id + 1)] - - tmpdir = tempfile.mkdtemp() - path = os.path.join(tmpdir, get_temp_filename(driver)) - - with fiona.open(path, 'w', - driver=driver, - schema=schema) as c: - c.writerecords(records) - yield path - shutil.rmtree(tmpdir) - - @pytest.mark.parametrize("args", [(0, 5, None), (1, 5, None), (-5, None, None), @@ -110,13 +80,12 @@ def slice_dataset_path(request): ]) @pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*') @pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*') -def test_collection_iterator_items_slice(slice_dataset_path, args): +def test_collection_iterator_items_slice(slice_dataset, args): """ Test if c.items(start, stop, step) returns the correct features. """ start, stop, step = args - min_id = 0 - max_id = 9 + slice_dataset_path, min_id, max_id = slice_dataset positions = list(range(min_id, max_id + 1))[start:stop:step] From 8a35d036817c2bd045878f735f53d32e95c066c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Buffat?= Date: Sun, 24 May 2020 14:09:33 +0200 Subject: [PATCH 4/4] refactor test_iterator_sequential_read_interrupted to force situation where sequential read is interrupted --- fiona/ogrext.pyx | 21 +++++++++++++-------- tests/test_collection.py | 5 +++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/fiona/ogrext.pyx b/fiona/ogrext.pyx index 8143cdf76..d8426f419 100644 --- a/fiona/ogrext.pyx +++ b/fiona/ogrext.pyx @@ -440,13 +440,15 @@ def featureRT(feature, collection): # Collection-related extension classes and functions cdef inline void * wrap_get_feature(object collection, void *cogr_layer, int index) except *: - """Check if it is safe to execute OGR_L_GetFeature""" + """Helper method to ensure that collection is notified of a sequential read interrupt + if OGR_L_GetFeature is called""" collection._interrupt_sequential_read() return OGR_L_GetFeature(cogr_layer, index) cdef inline int wrap_get_feature_count(object collection, void *cogr_layer, int force) except *: - """Check if it is safe to execute OGR_L_GetFeatureCount""" + """Helper method to ensure that collection is notified of a sequential read interrupt + if OGR_L_GetFeatureCount is called""" collection._interrupt_sequential_read() return OGR_L_GetFeatureCount(cogr_layer, force) @@ -1260,7 +1262,7 @@ cdef class Iterator: cdef fastcount cdef ftcount cdef stepsign - cdef is_interrupted + cdef _is_interrupted def __cinit__(self, collection, start=None, stop=None, step=None, @@ -1356,12 +1358,15 @@ cdef class Iterator: self.next_index = start log.debug("Next index: %d", self.next_index) OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) - self.is_interrupted = False - + self._is_interrupted = False def interrupt_sequential_read(self): """ Notify iterator that sequential read is interrupted""" - self.is_interrupted = True + self._is_interrupted = True + + def is_interrupted(self): + """ Returns True if iterator is interrupted.""" + return self._is_interrupted def __iter__(self): return self @@ -1388,11 +1393,11 @@ cdef class Iterator: if self.next_index > self.start or (self.stop is not None and self.next_index <= self.stop): raise StopIteration - if self.is_interrupted: + if self._is_interrupted: if not self.fastindex: log.warning("Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance.") OGR_L_SetNextByIndex(session.cogr_layer, self.next_index) - self.is_interrupted = False + self._is_interrupted = False else: # Set read cursor to next_item position if self.step > 1 and self.fastindex: diff --git a/tests/test_collection.py b/tests/test_collection.py index 900e8b37d..9b4e1ff82 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -934,9 +934,10 @@ def test_iterator_sequential_read_interrupted(slice_dataset, args): iterator = c.items(start, stop, step) for _ in range(interrupted_index): _ = next(iterator) - iterator.interrupt_sequential_read() + # Interrupt sequential read + c[-1] + assert iterator.is_interrupted() item = next(iterator) - assert int(item[1]['properties']['position']) == positions[interrupted_index]