diff --git a/datashader/compiler.py b/datashader/compiler.py index 7d18cab59..99fd242e7 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -7,7 +7,7 @@ import xarray as xr from .reductions import by, category_codes, summary, where -from .utils import ngjit +from .utils import isnull, ngjit try: from datashader.transfer_functions._cuda_utils import cuda_mutex_lock, cuda_mutex_unlock @@ -172,6 +172,9 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): names = ('_{0}'.format(i) for i in count()) inputs = list(bases) + list(cols) namespace = {} + need_isnull = any(call[3] for call in calls) + if need_isnull: + namespace["isnull"] = isnull any_uses_cuda_mutex = any(call[6] for call in calls) if any_uses_cuda_mutex: # This adds an argument to the append() function that is the cuda mutex @@ -234,7 +237,7 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): else: var = f"{arg_lk[nan_check_column]}[{subscript}]" prev_body = body[-1] - body[-1] = f'if {var}<=0 or {var}>0:' # Inline CUDA-friendly 'is not nan' test + body[-1] = f'if not isnull({var}):' body.append(f' {prev_body}') whitespace = ' ' @@ -243,7 +246,12 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): else: if uses_cuda_mutex and not prev_cuda_mutex: body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))') - body.append(f'{func_name}(x, y, {", ".join(args)})') + if nan_check_column: + var = f"{arg_lk[nan_check_column]}[{subscript}]" + body.append(f'if not isnull({var}):') + body.append(f' {func_name}(x, y, {", ".join(args)})') + else: + body.append(f'{func_name}(x, y, {", ".join(args)})') if uses_cuda_mutex: body.append(f'cuda_mutex_unlock({arg_lk["_cuda_mutex"]}, (y, x))') diff --git a/datashader/reductions.py b/datashader/reductions.py index 707369b85..205c51459 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -14,7 +14,8 @@ try: from datashader.transfer_functions._cuda_utils import ( cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, - cuda_nanmax_n_in_place, cuda_nanmin_n_in_place) + cuda_nanmax_n_in_place, cuda_nanmin_n_in_place, cuda_row_min_in_place, + cuda_row_max_n_in_place, cuda_row_min_n_in_place) except ImportError: (cuda_atomic_nanmin, cuda_atomic_nanmmax, cuda_args, cuda_nanmax_n_in_place, cuda_nanmin_n_in_place) = None, None, None, None, None @@ -27,7 +28,7 @@ from .utils import ( Expr, ngjit, nansum_missing, nanmax_in_place, nansum_in_place, - nanmax_n_in_place, nanmin_n_in_place, row_max_in_place, row_min_in_place, + nanmax_n_in_place, nanmin_n_in_place, row_min_in_place, row_max_n_in_place, row_min_n_in_place, ) @@ -255,16 +256,27 @@ class Reduction(Expr): """Base class for per-bin reductions.""" def __init__(self, column=None): self.column = column + self._nan_check_column = None @property def nan_check_column(self): - return None + if self._nan_check_column is not None: + return extract(self._nan_check_column) + else: + return None def uses_cuda_mutex(self): + """Return ``True`` if this Reduction needs to use a CUDA mutex to + ensure that it is threadsafe across CUDA threads. + + If the CUDA append functions are all atomic (i.e. using functions from + the numba.cuda.atomic module) then this is ``False``, otherwise it is + ``True``. + """ return False def uses_row_index(self, cuda, partitioned): - """Return True if this Reduction uses a row index virtual column. + """Return ``True`` if this Reduction uses a row index virtual column. For some reductions the order of the rows of supplied data is important. These include ``first`` and ``last`` reductions as well as @@ -398,7 +410,7 @@ def _create_uint32(shape, array_module): class OptionalFieldReduction(Reduction): """Base class for things like ``count`` or ``any`` for which the field is optional""" def __init__(self, column=None): - self.column = column + super().__init__(column) @property def inputs(self): @@ -593,6 +605,8 @@ class by(Reduction): Per-category reduction function. """ def __init__(self, cat_column, reduction=count()): + super().__init__() + # set basic categorizer if isinstance(cat_column, CategoryPreprocess): self.categorizer = cat_column @@ -1013,7 +1027,7 @@ def _antialias_stage_2(self, self_intersect, array_module): @staticmethod @ngjit def _append(x, y, agg, field): - if isnull(agg[y, x]) or agg[y, x] > field: + if not isnull(field) and (isnull(agg[y, x]) or agg[y, x] > field): agg[y, x] = field return 0 return -1 @@ -1022,7 +1036,7 @@ def _append(x, y, agg, field): @ngjit def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor - if isnull(agg[y, x]) or value > agg[y, x]: + if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): agg[y, x] = value return 0 return -1 @@ -1058,7 +1072,7 @@ def _antialias_stage_2(self, self_intersect, array_module): @staticmethod @ngjit def _append(x, y, agg, field): - if isnull(agg[y, x]) or agg[y, x] < field: + if not isnull(field) and (isnull(agg[y, x]) or agg[y, x] < field): agg[y, x] = field return 0 return -1 @@ -1067,7 +1081,7 @@ def _append(x, y, agg, field): @ngjit def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor - if isnull(agg[y, x]) or value > agg[y, x]: + if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): agg[y, x] = value return 0 return -1 @@ -1179,18 +1193,11 @@ def out_dshape(self, in_dshape, antialias, cuda, partitioned): return dshape(ct.float64) def uses_row_index(self, cuda, partitioned): - if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - return partitioned + return cuda or partitioned def _antialias_requires_2_stages(self): return True - def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - return super()._build_append(dshape, schema, cuda, antialias, self_intersect) - def _build_bases(self, cuda, partitioned): if self.uses_row_index(cuda, partitioned): row_index_selector = self._create_row_index_selector() @@ -1249,7 +1256,7 @@ def _append(x, y, agg, field): @ngjit def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor - if isnull(agg[y, x]) or value > agg[y, x]: + if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): agg[y, x] = value return 0 return -1 @@ -1287,7 +1294,7 @@ def _append(x, y, agg, field): @ngjit def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor - if isnull(agg[y, x]) or value > agg[y, x]: + if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): agg[y, x] = value return 0 return -1 @@ -1335,18 +1342,11 @@ class _first_n_or_last_n(FloatingNReduction): """Abstract base class of first_n and last_n reductions. """ def uses_row_index(self, cuda, partitioned): - if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - return partitioned + return cuda or partitioned def _antialias_requires_2_stages(self): return True - def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - return super()._build_append(dshape, schema, cuda, antialias, self_intersect) - def _build_bases(self, cuda, partitioned): if self.uses_row_index(cuda, partitioned): row_index_selector = self._create_row_index_selector() @@ -1622,18 +1622,10 @@ def __init__(self, selector: Reduction, lookup_column: str | None=None): self.selector = selector # List of all column names that this reduction uses. self.columns = (selector.column, lookup_column) - self._nan_check_column = None def __hash__(self): return hash((type(self), self._hashable_inputs(), self.selector)) - @property - def nan_check_column(self): - if self._nan_check_column is not None: - return extract(self._nan_check_column) - else: - return None - def out_dshape(self, input_dshape, antialias, cuda, partitioned): if self.column is None: return dshape(ct.int64) @@ -1644,7 +1636,7 @@ def uses_cuda_mutex(self): return True def uses_row_index(self, cuda, partitioned): - return self.column is None or isinstance(self.selector, (_first_or_last, _first_n_or_last_n)) + return self.column is None or self.selector.uses_row_index(cuda, partitioned) def validate(self, in_dshape): if self.column is not None: @@ -1681,19 +1673,41 @@ def _append(x, y, agg, field, update_index): @staticmethod @ngjit def _append_antialias(x, y, agg, field, aa_factor, update_index): - agg[y, x] = field - return update_index + # Ignore aa_factor. + if agg.ndim > 2: + # Bump previous values along to make room for new value. + n = agg.shape[2] + for i in range(n-1, update_index, -1): + agg[y, x, i] = agg[y, x, i-1] + agg[y, x, update_index] = field + else: + agg[y, x] = field @staticmethod @nb_cuda.jit(device=True) def _append_antialias_cuda(x, y, agg, field, aa_factor, update_index): - agg[y, x] = field + # Ignore aa_factor + if agg.ndim > 2: + # Bump previous values along to make room for new value. + n = agg.shape[2] + for i in range(n-1, update_index, -1): + agg[y, x, i] = agg[y, x, i-1] + agg[y, x, update_index] = field + else: + agg[y, x] = field return update_index @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field, update_index): - agg[y, x] = field + if agg.ndim > 2: + # Bump previous values along to make room for new value. + n = agg.shape[2] + for i in range(n-1, update_index, -1): + agg[y, x, i] = agg[y, x, i-1] + agg[y, x, update_index] = field + else: + agg[y, x] = field return update_index def _build_append(self, dshape, schema, cuda, antialias, self_intersect): @@ -1713,21 +1727,27 @@ def _build_append(self, dshape, schema, cuda, antialias, self_intersect): def _build_bases(self, cuda, partitioned): selector = self.selector if isinstance(selector, (_first_or_last, _first_n_or_last_n)) and selector.uses_row_index(cuda, partitioned): + # Need to swap out the selector with an equivalent row index selector row_index_selector = selector._create_row_index_selector() - new_where = where(row_index_selector, self.column) - new_where._nan_check_column = self.selector.column - return row_index_selector._build_bases(cuda, partitioned) + new_where._build_bases(cuda, partitioned) + if self.column is None: + # If selector uses a row index and this where returns the same row index, + # can just swap out this where reduction with the row_index_selector. + row_index_selector._nan_check_column = self.selector.column + return row_index_selector._build_bases(cuda, partitioned) + else: + new_where = where(row_index_selector, self.column) + new_where._nan_check_column = self.selector.column + return row_index_selector._build_bases(cuda, partitioned) + new_where._build_bases(cuda, partitioned) else: return selector._build_bases(cuda, partitioned) + super()._build_bases(cuda, partitioned) def _build_combine(self, dshape, antialias, cuda, partitioned): - if cuda and self.selector.uses_cuda_mutex(): - raise NotImplementedError( - "'where' reduction does not support a selector that uses a CUDA mutex such as 'max_n'") - # Does not support categorical reductions. selector = self.selector - append = selector._append + if cuda: + append = selector._append_cuda + else: + append = selector._append # If the selector uses a row_index then selector_aggs will be int64 with -1 # representing missing data. Otherwise missing data is NaN. @@ -1770,11 +1790,31 @@ def combine_cuda_2d(aggs, selector_aggs): if not invalid(value) and append(x, y, selector_aggs[0], value) >= 0: aggs[0][y, x] = aggs[1][y, x] + @nb_cuda.jit + def combine_cuda_3d(aggs, selector_aggs): + ny, nx, n = aggs[0].shape + x, y = nb_cuda.grid(2) + if x < nx and y < ny: + for i in range(n): + value = selector_aggs[1][y, x, i] + if invalid(value): + break + update_index = append(x, y, selector_aggs[0], value) + if update_index < 0: + break + # Bump values along in the same way that append() has done above. + for j in range(n-1, update_index, -1): + aggs[0][y, x, j] = aggs[0][y, x, j-1] + aggs[0][y, x, update_index] = aggs[1][y, x, i] + def wrapped_combine(aggs, selector_aggs): if len(aggs) == 1: pass elif cuda: - combine_cuda_2d[cuda_args(aggs[0].shape)](aggs, selector_aggs) + if aggs[0].ndim == 3: + combine_cuda_3d[cuda_args(aggs[0].shape[:2])](aggs, selector_aggs) + else: + combine_cuda_2d[cuda_args(aggs[0].shape)](aggs, selector_aggs) else: if aggs[0].ndim == 3: combine_cpu_3d(aggs, selector_aggs) @@ -1885,8 +1925,7 @@ def _build_append(self, dshape, schema, cuda, antialias, self_intersect): # self.column is None but row index is passed as field argument to append functions # Doesn't yet support antialiasing if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - #return self._append_cuda + return self._append_cuda else: return self._append @@ -1902,7 +1941,7 @@ class _max_row_index(_max_or_min_row_index): @ngjit def _append(x, y, agg, field): # field is int64 row index - if field != -1 and field > agg[y, x]: + if field > agg[y, x]: agg[y, x] = field return 0 return -1 @@ -1912,13 +1951,18 @@ def _append(x, y, agg, field): @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): # field is int64 row index - return 0 if nb_cuda.atomic.max(agg, (y, x), field) != field else -1 + if field != -1: + old = nb_cuda.atomic.max(agg, (y, x), field) + if old < field: + return 0 + return -1 @staticmethod def _combine(aggs): # Maximum ignoring -1 values if len(aggs) > 1: - row_max_in_place(aggs[0], aggs[1]) + # Works with numpy or cupy arrays + np.maximum(aggs[0], aggs[1], out=aggs[0]) return aggs[0] @@ -1929,6 +1973,9 @@ class _min_row_index(_max_or_min_row_index): user code. It is primarily purpose is to support the use of ``first`` reductions using dask and/or CUDA. """ + def uses_cuda_mutex(self): + return True + # CPU append functions @staticmethod @ngjit @@ -1939,6 +1986,23 @@ def _append(x, y, agg, field): return 0 return -1 + # GPU append functions + @staticmethod + @nb_cuda.jit(device=True) + def _append_cuda(x, y, agg, field): + # field is int64 row index + # Always uses cuda mutex so this does not need to be atomic + if field != -1 and (agg[y, x] == -1 or field < agg[y, x]): + agg[y, x] = field + return 0 + return -1 + + def _build_combine(self, dshape, antialias, cuda, partitioned): + if cuda: + return self._combine_cuda + else: + return self._combine + @staticmethod def _combine(aggs): # Minimum ignoring -1 values @@ -1946,6 +2010,15 @@ def _combine(aggs): row_min_in_place(aggs[0], aggs[1]) return aggs[0] + @staticmethod + def _combine_cuda(aggs): + ret = aggs[0] + if len(aggs) > 1: + kernel_args = cuda_args(ret.shape) + for i in range(1, len(aggs)): + cuda_row_min_in_place[kernel_args](ret, aggs[i]) + return ret + class _max_n_or_min_n_row_index(FloatingNReduction): """Abstract base class of max_n and min_n row_index reductions. @@ -1961,6 +2034,9 @@ def inputs(self): def out_dshape(self, in_dshape, antialias, cuda, partitioned): return dshape(ct.int64) + def uses_cuda_mutex(self): + return True + def uses_row_index(self, cuda, partitioned): return True @@ -1968,11 +2044,16 @@ def _build_append(self, dshape, schema, cuda, antialias, self_intersect): # self.column is None but row index is passed as field argument to append functions # Doesn't yet support antialiasing if cuda: - raise ValueError(f"'{type(self).__name__}' reduction is not supported on the GPU") - #return self._append_cuda + return self._append_cuda else: return self._append + def _build_combine(self, dshape, antialias, cuda, partitioned): + if cuda: + return self._combine_cuda + else: + return self._combine + class _max_n_row_index(_max_n_or_min_n_row_index): """Max_n reduction operating on row index. @@ -1998,12 +2079,40 @@ def _append(x, y, agg, field): return i return -1 + # GPU append functions + @staticmethod + @nb_cuda.jit(device=True) + def _append_cuda(x, y, agg, field): + # field is int64 row index + # Always uses cuda mutex so this does not need to be atomic + if field != -1: + # Linear walk along stored values. + # Could do binary search instead but not expecting n to be large. + n = agg.shape[2] + for i in range(n): + if agg[y, x, i] == -1 or field > agg[y, x, i]: + # Bump previous values along to make room for new value. + for j in range(n-1, i, -1): + agg[y, x, j] = agg[y, x, j-1] + agg[y, x, i] = field + return i + return -1 + @staticmethod def _combine(aggs): if len(aggs) > 1: row_max_n_in_place(aggs[0], aggs[1]) return aggs[0] + @staticmethod + def _combine_cuda(aggs): + ret = aggs[0] + if len(aggs) > 1: + kernel_args = cuda_args(ret.shape[:2]) + for i in range(1, len(aggs)): + cuda_row_max_n_in_place[kernel_args](ret, aggs[i]) + return ret + class _min_n_row_index(_max_n_or_min_n_row_index): """Min_n reduction operating on row index. @@ -2029,12 +2138,39 @@ def _append(x, y, agg, field): return i return -1 + @staticmethod + @nb_cuda.jit(device=True) + def _append_cuda(x, y, agg, field): + # field is int64 row index + # Always uses cuda mutex so this does not need to be atomic + if field != -1: + # Linear walk along stored values. + # Could do binary search instead but not expecting n to be large. + n = agg.shape[2] + for i in range(n): + if agg[y, x, i] == -1 or field < agg[y, x, i]: + # Bump previous values along to make room for new value. + for j in range(n-1, i, -1): + agg[y, x, j] = agg[y, x, j-1] + agg[y, x, i] = field + return i + return -1 + @staticmethod def _combine(aggs): if len(aggs) > 1: row_min_n_in_place(aggs[0], aggs[1]) return aggs[0] + @staticmethod + def _combine_cuda(aggs): + ret = aggs[0] + if len(aggs) > 1: + kernel_args = cuda_args(ret.shape[:2]) + for i in range(1, len(aggs)): + cuda_row_min_n_in_place[kernel_args](ret, aggs[i]) + return ret + __all__ = list(set([_k for _k,_v in locals().items() if isinstance(_v,type) and (issubclass(_v,Reduction) or _v is summary) diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index b6a25f493..e48308693 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -167,7 +167,7 @@ def test_sum(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.sum('f64')), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_first(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -179,7 +179,7 @@ def test_first(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.first('f64')), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_last(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -219,7 +219,7 @@ def test_max(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('f64')), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_min_row_index(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -228,7 +228,7 @@ def test_min_row_index(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds._min_row_index()), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_max_row_index(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -267,7 +267,7 @@ def test_max_n(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.max('plusminus')).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_min_n_row_index(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -282,7 +282,7 @@ def test_min_n_row_index(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds._min_row_index()).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_max_n_row_index(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -297,7 +297,7 @@ def test_max_n_row_index(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds._max_row_index()).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_first_n(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -312,7 +312,7 @@ def test_first_n(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.first('plusminus')).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_last_n(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -369,7 +369,7 @@ def test_where_min(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f32'))), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_max_n(ddf, npartitions): # Important to test with npartitions > 2 to have multiple combination stages. @@ -398,7 +398,7 @@ def test_where_max_n(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.where(ds.max('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_min_n(ddf, npartitions): # Important to test with npartitions > 2 to have multiple combination stages. @@ -427,7 +427,7 @@ def test_where_min_n(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.where(ds.min('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_first(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -448,7 +448,7 @@ def test_where_first(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.first('f32'))), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_last(ddf, npartitions): ddf = ddf.repartition(npartitions) @@ -469,7 +469,7 @@ def test_where_last(ddf, npartitions): assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.last('f32'))), out) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_first_n(ddf, npartitions): # Important to test with npartitions > 2 to have multiple combination stages. @@ -498,7 +498,7 @@ def test_where_first_n(ddf, npartitions): assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.where(ds.first('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_where_last_n(ddf, npartitions): # Important to test with npartitions > 2 to have multiple combination stages. @@ -2002,17 +2002,3 @@ def test_canvas_size(): for cvs in cvs_list: with pytest.raises(ValueError, match=msg): cvs.points(ddf, "x", "y", ds.mean("z")) - - -@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set") -@pytest.mark.parametrize('reduction', [ - ds.where(ds.first('f64')), - ds.where(ds.first_n('f64')), - ds.where(ds.last('f64')), - ds.where(ds.last_n('f64')), - ds.where(ds.max_n('f64', n=3)), - ds.where(ds.min_n('f64', n=3)), -]) -def test_reduction_on_cuda_dask_raises_error(reduction): - with pytest.raises((NotImplementedError, ValueError)): - c.points(cudf_ddf, 'x', 'y', reduction) diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index 7244b810e..09a05f664 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -30,7 +30,6 @@ df_pd.at[2,'f32'] = nan df_pd.at[2,'f64'] = nan df_pd.at[2,'plusminus'] = nan -dfs_pd = [df_pd] test_gpu = bool(int(os.getenv("DATASHADER_TEST_GPU", 0))) @@ -214,7 +213,7 @@ def test_max(df): assert_eq_xr(c.points(df, 'x', 'y', ds.max('f64')), out) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_min_n(df): solution = np.array([[[-3, -1, 0, 4, nan, nan], [-13, -11, 10, 12, 14, nan]], [[-9, -7, -5, 6, 8, nan], [-19, -17, -15, 16, 18, nan]]]) @@ -226,7 +225,7 @@ def test_min_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.min('plusminus')).data) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_max_n(df): solution = np.array([[[4, 0, -1, -3, nan, nan], [14, 12, 10, -11, -13, nan]], [[8, 6, -5, -7, -9, nan], [18, 16, -15, -17, -19, nan]]]) @@ -238,7 +237,51 @@ def test_max_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.max('plusminus')).data) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) +def test_where_min_row_index(df): + out = xr.DataArray([[0, 10], [-5, -15]], coords=coords, dims=dims) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds._min_row_index(), 'plusminus')), out) + + +@pytest.mark.parametrize('df', dfs) +def test_where_max_row_index(df): + out = xr.DataArray([[4, 14], [-9, -19]], coords=coords, dims=dims) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds._max_row_index(), 'plusminus')), out) + + +@pytest.mark.parametrize('df', dfs) +def test_where_min_n_row_index(df): + sol = np.array([[[ 0, -1, nan, -3, 4, nan], + [ 10, -11, 12, -13, 14, nan]], + [[ -5, 6, -7, 8, -9, nan], + [-15, 16, -17, 18, -19, nan]]]) + for n in range(1, 7): + agg = c.points(df, 'x', 'y', ds.where(ds._min_n_row_index(n=n), 'plusminus')) + out = sol[:, :, :n] + print(n, agg.data.tolist()) + print(' ', out.tolist()) + assert_eq_ndarray(agg.data, out) + if n == 1: + assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds._min_row_index(), 'plusminus')).data) + + +@pytest.mark.parametrize('df', dfs) +def test_where_max_n_row_index(df): + sol = np.array([[[ 4, -3, nan, -1, 0, nan], + [ 14, -13, 12, -11, 10, nan]], + [[ -9, 8, -7, 6, -5, nan], + [-19, 18, -17, 16, -15, nan]]]) + for n in range(1, 7): + agg = c.points(df, 'x', 'y', ds.where(ds._max_n_row_index(n=n), 'plusminus')) + out = sol[:, :, :n] + print(n, agg.data.tolist()) + print(' ', out.tolist()) + assert_eq_ndarray(agg.data, out) + if n == 1: + assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds._max_row_index(), 'plusminus')).data) + + +@pytest.mark.parametrize('df', dfs) def test_where_first(df): # Note reductions like ds.where(ds.first('i32'), 'reverse') are supported, # but the same results can be achieved using the simpler ds.first('reverse') @@ -256,7 +299,7 @@ def test_where_first(df): assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('f32'))), out) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_where_last(df): # Note reductions like ds.where(ds.last('i32'), 'reverse') are supported, # but the same results can be achieved using the simpler ds.last('reverse') @@ -306,7 +349,7 @@ def test_where_min(df): assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f32'))), out) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_where_first_n(df): sol_rowindex = np.array([[[ 0, 1, 3, 4, -1, -1], [10, 11, 12, 13, 14, -1]], @@ -330,7 +373,7 @@ def test_where_first_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds.first('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_where_last_n(df): sol_rowindex = np.array([[[ 4, 3, 1, 0, -1, -1], [14, 13, 12, 11, 10, -1]], @@ -354,7 +397,7 @@ def test_where_last_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds.last('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_where_max_n(df): sol_rowindex = np.array([[[ 4, 0, 1, 3, -1, -1], [14, 12, 10, 11, 13, -1]], @@ -378,7 +421,7 @@ def test_where_max_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds.max('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_where_min_n(df): sol_rowindex = np.array([[[3, 1, 0, 4, -1, -1], [13, 11, 10, 12, 14, -1]], @@ -402,9 +445,9 @@ def test_where_min_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds.min('plusminus'), 'reverse')).data) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_summary_where_n(df): - sol_min_n_rowindex = np.array([[[3, 1, 0, 4, -1], + sol_min_n_rowindex = np.array([[[ 3, 1, 0, 4, -1], [13, 11, 10, 12, 14]], [[ 9, 7, 5, 6, 8], [19, 17, 15, 16, 18]]]) @@ -434,7 +477,7 @@ def test_summary_where_n(df): assert_eq_ndarray(agg['max_n'].data, sol_max_n_reverse) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_summary_different_n(df): msg = 'Using multiple FloatingNReductions with different n values is not supported' with pytest.raises(ValueError, match=msg): @@ -784,7 +827,7 @@ def test_categorical_std(df): assert_eq_xr(agg, out) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_first(df): out = xr.DataArray([[0, 10], [5, 15]], coords=coords, dims=dims) assert_eq_xr(c.points(df, 'x', 'y', ds.first('i32')), out) @@ -793,7 +836,7 @@ def test_first(df): assert_eq_xr(c.points(df, 'x', 'y', ds.first('f64')), out) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_last(df): out = xr.DataArray([[4, 14], [9, 19]], coords=coords, dims=dims) assert_eq_xr(c.points(df, 'x', 'y', ds.last('i32')), out) @@ -802,7 +845,7 @@ def test_last(df): assert_eq_xr(c.points(df, 'x', 'y', ds.last('f64')), out) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_first_n(df): solution = np.array([[[0, -1, -3, 4, nan, nan], [10, -11, 12, -13, 14, nan]], [[-5, 6, -7, 8, -9, nan], [-15, 16, -17, 18, -19, nan]]]) @@ -814,7 +857,7 @@ def test_first_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.first('plusminus')).data) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_last_n(df): solution = np.array([[[4, -3, -1, 0, nan, nan], [14, -13, 12, -11, 10, nan]], [[-9, 8, -7, 6, -5, nan], [-19, 18, -17, 16, -15, nan]]]) @@ -826,6 +869,48 @@ def test_last_n(df): assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.last('plusminus')).data) +@pytest.mark.parametrize('df', dfs) +def test_min_row_index(df): + out = xr.DataArray([[0, 10], [5, 15]], coords=coords, dims=dims) + agg = c.points(df, 'x', 'y', ds._min_row_index()) + assert agg.dtype == np.int64 + assert_eq_xr(agg, out) + + +@pytest.mark.parametrize('df', dfs) +def test_max_row_index(df): + out = xr.DataArray([[4, 14], [9, 19]], coords=coords, dims=dims) + agg = c.points(df, 'x', 'y', ds._max_row_index()) + assert agg.dtype == np.int64 + assert_eq_xr(agg, out) + + +@pytest.mark.parametrize('df', dfs) +def test_min_n_row_index(df): + solution = np.array([[[0, 1, 2, 3, 4, -1], [10, 11, 12, 13, 14, -1]], + [[5, 6, 7, 8, 9, -1], [15, 16, 17, 18, 19, -1]]]) + for n in range(1, 7): + agg = c.points(df, 'x', 'y', ds._min_n_row_index(n=n)) + assert agg.dtype == np.int64 + out = solution[:, :, :n] + assert_eq_ndarray(agg.data, out) + if n == 1: + assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds._min_row_index()).data) + + +@pytest.mark.parametrize('df', dfs) +def test_max_n_row_index(df): + solution = np.array([[[4, 3, 2, 1, 0, -1], [14, 13, 12, 11, 10, -1]], + [[9, 8, 7, 6, 5, -1], [19, 18, 17, 16, 15, -1]]]) + for n in range(1, 7): + agg = c.points(df, 'x', 'y', ds._max_n_row_index(n=n)) + assert agg.dtype == np.int64 + out = solution[:, :, :n] + assert_eq_ndarray(agg.data, out) + if n == 1: + assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds._max_row_index()).data) + + @pytest.mark.parametrize('df', dfs) def test_multiple_aggregates(df): agg = c.points(df, 'x', 'y', @@ -1080,7 +1165,7 @@ def test_lines_on_edge(): assert_eq_xr(agg, out) -@pytest.mark.parametrize('df', dfs_pd) +@pytest.mark.parametrize('df', dfs) def test_log_axis_line(df): axis = ds.core.LogAxis() logcoords = axis.compute_index(axis.compute_scale_and_translate((1, 10), 2), 2) @@ -2654,19 +2739,3 @@ def test_canvas_size(): for cvs in cvs_list: with pytest.raises(ValueError, match=msg): cvs.points(df, "x", "y", ds.mean("z")) - - -@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set") -@pytest.mark.parametrize('reduction', [ - ds.first('f64'), - ds.first_n('f64', n=3), - ds.last('f64'), - ds.last_n('f64', n=3), - ds.where(ds.first('f64')), - ds.where(ds.first_n('f64', n=3)), - ds.where(ds.last('f64')), - ds.where(ds.last_n('f64', n=3)), -]) -def test_reduction_on_cuda_raises_error(reduction): - with pytest.raises(ValueError, match="not supported on the GPU"): - c.points(df_cuda, 'x', 'y', reduction) diff --git a/datashader/transfer_functions/_cuda_utils.py b/datashader/transfer_functions/_cuda_utils.py index 5078a3008..baa1d79b7 100644 --- a/datashader/transfer_functions/_cuda_utils.py +++ b/datashader/transfer_functions/_cuda_utils.py @@ -252,3 +252,70 @@ def cuda_nanmin_n_in_place(ret, other): ret_pixel[i] = other_value istart = i+1 break + + +@cuda.jit +def cuda_row_min_in_place(ret, other): + """CUDA equivalent of row_min_in_place. + """ + ny, nx = ret.shape + x, y = cuda.grid(2) + if x < nx and y < ny: + if other[y, x] > -1 and (ret[y, x] == -1 or other[y, x] < ret[y, x]): + ret[y, x] = other[y, x] + + +@cuda.jit +def cuda_row_max_n_in_place(ret, other): + """CUDA equivalent of row_max_n_in_place. + """ + ny, nx, n = ret.shape + x, y = cuda.grid(2) + if x < nx and y < ny: + ret_pixel = ret[y, x] # 1D array of n values for single pixel + other_pixel = other[y, x] # ditto + # Walk along other_pixel array a value at a time, find insertion + # index in ret_pixel and bump values along to insert. Next + # other_pixel value is inserted at a higher index, so this walks + # the two pixel arrays just once each. + istart = 0 + for other_value in other_pixel: + if other_value == -1: + break + + for i in range(istart, n): + if ret_pixel[i] == -1 or other_value > ret_pixel[i]: + # Bump values along then insert. + for j in range(n-1, i, -1): # fails + ret_pixel[j] = ret_pixel[j-1] + ret_pixel[i] = other_value + istart = i+1 + break + + +@cuda.jit +def cuda_row_min_n_in_place(ret, other): + """CUDA equivalent of row_min_n_in_place. + """ + ny, nx, n = ret.shape + x, y = cuda.grid(2) + if x < nx and y < ny: + ret_pixel = ret[y, x] # 1D array of n values for single pixel + other_pixel = other[y, x] # ditto + # Walk along other_pixel array a value at a time, find insertion + # index in ret_pixel and bump values along to insert. Next + # other_pixel value is inserted at a higher index, so this walks + # the two pixel arrays just once each. + istart = 0 + for other_value in other_pixel: + if other_value == -1: + break + + for i in range(istart, n): + if ret_pixel[i] == -1 or other_value < ret_pixel[i]: + # Bump values along then insert. + for j in range(n-1, i, -1): + ret_pixel[j] = ret_pixel[j-1] + ret_pixel[i] = other_value + istart = i+1 + break diff --git a/datashader/utils.py b/datashader/utils.py index c30bac6a1..f87f3466b 100644 --- a/datashader/utils.py +++ b/datashader/utils.py @@ -734,20 +734,6 @@ def parallel_fill(array, value): array[i] = value -@ngjit -def row_max_in_place(ret, other): - """Maximum of 2 arrays of row indexes. - Row indexes are integers from 0 upwards, missing data is -1, - so simple max suffices. - Return the first array. - """ - ret = ret.ravel() - other = other.ravel() - for i in range(len(ret)): - if other[i] > ret[i]: - ret[i] = other[i] - - @ngjit def row_min_in_place(ret, other): """Minimum of 2 arrays of row indexes. diff --git a/doc/reduction.csv b/doc/reduction.csv index 73fde6e9e..f1e3467a3 100644 --- a/doc/reduction.csv +++ b/doc/reduction.csv @@ -2,10 +2,10 @@ :class:`~datashader.reductions.any`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.by`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.count`, yes, yes, yes, yes, yes, -:class:`~datashader.reductions.first`, yes, yes, , , yes, yes -:class:`~datashader.reductions.first_n`, yes, yes, , , , yes -:class:`~datashader.reductions.last`, yes, yes, , , yes, yes -:class:`~datashader.reductions.last_n`, yes, yes, , , , yes +:class:`~datashader.reductions.first`, yes, yes, yes, yes, yes, yes +:class:`~datashader.reductions.first_n`, yes, yes, yes, yes, , yes +:class:`~datashader.reductions.last`, yes, yes, yes, yes, yes, yes +:class:`~datashader.reductions.last_n`, yes, yes, yes, yes, , yes :class:`~datashader.reductions.max`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.max_n`, yes, yes, yes, yes, , yes :class:`~datashader.reductions.mean`, yes, yes, yes, yes, yes,