Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Protect sequential read alternative #896

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions fiona/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from fiona.ogrext import Iterator, ItemsIterator, KeysIterator
from fiona.ogrext import Session, WritingSession
from fiona.ogrext import buffer_to_virtual_file, remove_virtual_file, GEOMETRY_TYPES
from fiona.errors import (DriverError, SchemaError, CRSError, UnsupportedGeometryTypeError, DriverSupportError)
from fiona.errors import (DriverError, SchemaError, CRSError, UnsupportedGeometryTypeError, DriverSupportError,
IteratorAlreadyExistsError)
from fiona.logutils import FieldSkipLogFilter
from fiona._crs import crs_to_wkt
from fiona._env import get_gdal_release_name, get_gdal_version_tuple
Expand Down Expand Up @@ -177,6 +178,7 @@ def __init__(self, path, mode='r', driver=None, schema=None, crs=None,

self.field_skip_log_filter = FieldSkipLogFilter()


def __repr__(self):
return "<%s Collection '%s', mode '%s' at %s>" % (
self.closed and "closed" or "open",
Expand Down Expand Up @@ -261,8 +263,9 @@ def filter(self, *args, **kwds):
mask = kwds.get('mask')
if bbox and mask:
raise ValueError("mask and bbox can not be set together")
self.iterator = Iterator(
self, start, stop, step, bbox, mask)
if self.iterator is not None:
raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.")
self.iterator = Iterator(self, start, stop, step, bbox, mask)
return self.iterator

def items(self, *args, **kwds):
Expand Down Expand Up @@ -293,8 +296,9 @@ def items(self, *args, **kwds):
mask = kwds.get('mask')
if bbox and mask:
raise ValueError("mask and bbox can not be set together")
self.iterator = ItemsIterator(
self, start, stop, step, bbox, mask)
if self.iterator is not None:
raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.")
self.iterator = ItemsIterator(self, start, stop, step, bbox, mask)
return self.iterator

def keys(self, *args, **kwds):
Expand Down Expand Up @@ -324,8 +328,9 @@ def keys(self, *args, **kwds):
mask = kwds.get('mask')
if bbox and mask:
raise ValueError("mask and bbox can not be set together")
self.iterator = KeysIterator(
self, start, stop, step, bbox, mask)
if self.iterator is not None:
raise IteratorAlreadyExistsError("Only one Iterator per Session is allowed.")
self.iterator = KeysIterator(self, start, stop, step, bbox, mask)
return self.iterator

def __contains__(self, fid):
Expand Down Expand Up @@ -485,6 +490,11 @@ def __del__(self):
# or use the context manager protocol ("with").
self.close()

def _interrupt_sequential_read(self):
""" Notify iterator that sequential read is interrupted"""
if self.iterator is not None:
self.iterator.interrupt_sequential_read()


ALL_GEOMETRY_TYPES = set([
geom_type for geom_type in GEOMETRY_TYPES.values()
Expand Down
4 changes: 4 additions & 0 deletions fiona/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,9 @@ class GDALVersionError(FionaError):
"""


class IteratorAlreadyExistsError(FionaError):
"""Raised if multiple Iterators per Session are attempted to be created."""


class FionaDeprecationWarning(UserWarning):
"""A warning about deprecation of Fiona features"""
73 changes: 52 additions & 21 deletions fiona/ogrext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,19 @@ def featureRT(feature, collection):

# Collection-related extension classes and functions

cdef inline void * wrap_get_feature(object collection, void *cogr_layer, int index) except *:
"""Helper method to ensure that collection is notified of a sequential read interrupt
if OGR_L_GetFeature is called"""
collection._interrupt_sequential_read()
return OGR_L_GetFeature(cogr_layer, index)


cdef inline int wrap_get_feature_count(object collection, void *cogr_layer, int force) except *:
"""Helper method to ensure that collection is notified of a sequential read interrupt
if OGR_L_GetFeatureCount is called"""
collection._interrupt_sequential_read()
return OGR_L_GetFeatureCount(cogr_layer, force)

cdef class Session:

cdef void *cogr_ds
Expand Down Expand Up @@ -614,7 +627,7 @@ cdef class Session:
def get_length(self):
if self.cogr_layer == NULL:
raise ValueError("Null layer")
return OGR_L_GetFeatureCount(self.cogr_layer, 0)
return wrap_get_feature_count(self.collection, self.cogr_layer, 0)

def get_driver(self):
cdef void *cogr_driver = GDALGetDatasetDriver(self.cogr_ds)
Expand Down Expand Up @@ -846,7 +859,7 @@ cdef class Session:
"""
cdef void * cogr_feature
fid = int(fid)
cogr_feature = OGR_L_GetFeature(self.cogr_layer, fid)
cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, fid)
if cogr_feature != NULL:
_deleteOgrFeature(cogr_feature)
return True
Expand All @@ -860,7 +873,7 @@ cdef class Session:
"""
cdef void * cogr_feature
fid = int(fid)
cogr_feature = OGR_L_GetFeature(self.cogr_layer, fid)
cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, fid)
if cogr_feature != NULL:
feature = FeatureBuilder().build(
cogr_feature,
Expand Down Expand Up @@ -889,12 +902,12 @@ cdef class Session:
index = item
# from the back
if index < 0:
ftcount = OGR_L_GetFeatureCount(self.cogr_layer, 0)
ftcount = wrap_get_feature_count(self.collection, self.cogr_layer, 0)
if ftcount == -1:
raise IndexError(
"collection's dataset does not support negative indexes")
index += ftcount
cogr_feature = OGR_L_GetFeature(self.cogr_layer, index)
cogr_feature = wrap_get_feature(self.collection, self.cogr_layer, index)
if cogr_feature == NULL:
return None
feature = FeatureBuilder().build(
Expand Down Expand Up @@ -1310,6 +1323,8 @@ cdef class Iterator:
cdef fastcount
cdef ftcount
cdef stepsign
cdef _is_interrupted


def __cinit__(self, collection, start=None, stop=None, step=None,
bbox=None, mask=None):
Expand Down Expand Up @@ -1358,9 +1373,10 @@ cdef class Iterator:
warnings.warn("Layer does not support" \
" OLC_FASTFEATURECOUNT, negative slices or start values other than zero" \
" may be slow.", RuntimeWarning)
self.ftcount = OGR_L_GetFeatureCount(session.cogr_layer, 1)
self.ftcount = wrap_get_feature_count(self.collection, session.cogr_layer, 1)
else:
self.ftcount = OGR_L_GetFeatureCount(session.cogr_layer, 0)
self.ftcount = wrap_get_feature_count(self.collection, session.cogr_layer, 0)


if self.ftcount == -1 and ((start is not None and start < 0) or
(stop is not None and stop < 0)):
Expand Down Expand Up @@ -1403,6 +1419,15 @@ cdef class Iterator:
self.next_index = start
log.debug("Next index: %d", self.next_index)
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
self._is_interrupted = False

def interrupt_sequential_read(self):
""" Notify iterator that sequential read is interrupted"""
self._is_interrupted = True

def is_interrupted(self):
""" Returns True if iterator is interrupted."""
return self._is_interrupted

def __iter__(self):
return self
Expand All @@ -1429,21 +1454,27 @@ cdef class Iterator:
if self.next_index > self.start or (self.stop is not None and self.next_index <= self.stop):
raise StopIteration

# Set read cursor to next_item position
if self.step > 1 and self.fastindex:
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
elif self.step > 1 and not self.fastindex and not self.next_index == self.start:
# GDALs default implementation of SetNextByIndex is calling ResetReading() and then
# calling GetNextFeature n times. We can shortcut that if we know the previous index.
# OGR_L_GetNextFeature increments cursor by 1, therefore self.step - 1 as one increment was performed when feature is read
for _ in range(self.step - 1):
cogr_feature = OGR_L_GetNextFeature(session.cogr_layer)
if cogr_feature == NULL:
raise StopIteration
elif self.step > 1 and not self.fastindex and self.next_index == self.start:
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
elif self.step < 0:
if self._is_interrupted:
if not self.fastindex:
log.warning("Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance.")
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
self._is_interrupted = False
else:
# Set read cursor to next_item position
if self.step > 1 and self.fastindex:
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
elif self.step > 1 and not self.fastindex and not self.next_index == self.start:
# GDALs default implementation of SetNextByIndex is calling ResetReading() and then
# calling GetNextFeature n times. We can shortcut that if we know the previous index.
# OGR_L_GetNextFeature increments cursor by 1, therefore self.step - 1 as one increment was performed when feature is read
for _ in range(self.step - 1):
cogr_feature = OGR_L_GetNextFeature(session.cogr_layer)
if cogr_feature == NULL:
raise StopIteration
elif self.step > 1 and not self.fastindex and self.next_index == self.start:
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)
elif self.step < 0:
OGR_L_SetNextByIndex(session.cogr_layer, self.next_index)

# set the next index
self.next_index += self.step
Expand Down
30 changes: 29 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import os.path
import shutil
import tarfile
import tempfile
import zipfile
from collections import OrderedDict
from click.testing import CliRunner
import pytest

import fiona
from fiona import supported_drivers
from fiona.drvsupport import driver_mode_mingdal
from fiona.crs import from_epsg
from fiona.env import GDALVersion

Expand Down Expand Up @@ -320,6 +323,31 @@ def unittest_path_coutwildrnp_shp(path_coutwildrnp_shp, request):
request.cls.path_coutwildrnp_shp = path_coutwildrnp_shp


@pytest.fixture(scope="session", params=[driver for driver, raw in supported_drivers.items() if 'w' in raw
and (driver not in driver_mode_mingdal['w'] or
gdal_version >= GDALVersion(*driver_mode_mingdal['w'][driver][:2]))
and driver not in {'DGN', 'MapInfo File', 'GPSTrackMaker', 'GPX', 'BNA', 'DXF',
'GML'}])
def slice_dataset(request):
""" Create temporary datasets to test slices"""

driver = request.param
min_id = 0
max_id = 9
schema = {'geometry': 'Point', 'properties': [('position', 'int')]}
records = [{'geometry': {'type': 'Point', 'coordinates': (0.0, float(i))}, 'properties': {'position': i}} for i
in range(min_id, max_id + 1)]

tmpdir = tempfile.mkdtemp()
path = os.path.join(tmpdir, get_temp_filename(driver))

with fiona.open(path, 'w',
driver=driver,
schema=schema) as c:
c.writerecords(records)
yield path, min_id, max_id
shutil.rmtree(tmpdir)

@pytest.fixture()
def testdata_generator():
""" Helper function to create test data sets for ideally all supported drivers
Expand Down Expand Up @@ -445,4 +473,4 @@ def _testdata_generator(driver, range1, range2):
@pytest.fixture(scope='session')
def path_test_tz_geojson(data_dir):
"""Path to ```test_tz.geojson``"""
return os.path.join(data_dir, 'test_tz.geojson')
return os.path.join(data_dir, 'test_tz.geojson')
41 changes: 40 additions & 1 deletion tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import fiona
from fiona.collection import Collection
from fiona.env import getenv, GDALVersion
from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning
from fiona.errors import FionaValueError, DriverError, FionaDeprecationWarning, IteratorAlreadyExistsError
from .conftest import WGS84PATTERN, get_temp_filename
from fiona.drvsupport import supported_drivers, driver_mode_mingdal

Expand Down Expand Up @@ -915,6 +915,45 @@ def test_collection_env(path_coutwildrnp_shp):
with fiona.open(path_coutwildrnp_shp):
assert 'FIONA_ENV' in getenv()

@pytest.mark.parametrize("args", [(0, None, None, 0),
(0, None, None, 1),
(0, None, None, 4),
(0, None, 2, 3),
])
@pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*')
@pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*')
def test_iterator_sequential_read_interrupted(slice_dataset, args):
""" Test if iterator resumes at correct position after sequential read is interrupted
"""

start, stop, step, interrupted_index = args
slice_dataset_path, min_id, max_id = slice_dataset

positions = list(range(min_id, max_id + 1))[start:stop:step]

with fiona.open(slice_dataset_path, 'r') as c:

iterator = c.items(start, stop, step)
for _ in range(interrupted_index):
_ = next(iterator)
# Interrupt sequential read
c[-1]
assert iterator.is_interrupted()
item = next(iterator)
assert int(item[1]['properties']['position']) == positions[interrupted_index]


def test_multiple_iterators(path_coutwildrnp_shp):
"""Test that only one iterator at any time can be active"""

with fiona.open(path_coutwildrnp_shp, 'r') as c:

item_iterator = c.items()
item = next(item_iterator)
item = next(item_iterator)
with pytest.raises(IteratorAlreadyExistsError):
filter_iterator = c.filter()


@pytest.mark.parametrize('driver,filename', [('ESRI Shapefile', 'test.shp'),
('GeoJSON', 'test.json'),
Expand Down
9 changes: 3 additions & 6 deletions tests/test_slice.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Note well: collection slicing is deprecated!
"""
import tempfile
import shutil
import os

import pytest
from fiona.env import GDALVersion
import fiona
Expand Down Expand Up @@ -109,13 +107,12 @@ def slice_dataset_path(request):
])
@pytest.mark.filterwarnings('ignore:.*OLC_FASTFEATURECOUNT*')
@pytest.mark.filterwarnings('ignore:.*OLCFastSetNextByIndex*')
def test_collection_iterator_items_slice(slice_dataset_path, args):
def test_collection_iterator_items_slice(slice_dataset, args):
""" Test if c.items(start, stop, step) returns the correct features.
"""

start, stop, step = args
min_id = 0
max_id = 9
slice_dataset_path, min_id, max_id = slice_dataset

positions = list(range(min_id, max_id + 1))[start:stop:step]

Expand Down