Skip to content

Commit

Permalink
fix: support logical concatenation at axis=0 (dask-contrib#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
agoose77 authored and douglasdavis committed Dec 7, 2023
1 parent 0915586 commit 5497da9
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
"Topic :: Software Development",
]
dependencies = [
"awkward >=2.5.0",
"awkward >=2.5.1rc1",
"dask >=2023.04.0",
"typing_extensions >=4.8.0",
]
Expand Down
111 changes: 87 additions & 24 deletions src/dask_awkward/lib/operations.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

from collections.abc import Mapping
from typing import Any
from typing import TYPE_CHECKING, Any

import awkward as ak
from awkward.operations.ak_concatenate import (
enforce_concatenated_form as enforce_layout_to_concatenated_form,
)
from awkward.typetracer import typetracer_from_form
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph

Expand All @@ -17,6 +21,10 @@
)
from dask_awkward.utils import DaskAwkwardNotImplemented, IncompatiblePartitions

if TYPE_CHECKING:
from awkward.forms import Form
from awkward.highlevel import Array as AwkwardArray


class _ConcatenateFnAxisGT0:
def __init__(self, **kwargs):
Expand All @@ -26,8 +34,17 @@ def __call__(self, *args):
return ak.concatenate(list(args), **self.kwargs)


def _concatenate_axis0_multiarg(*args):
return ak.concatenate(list(args), axis=0)
def _enforce_concatenated_form(array: AwkwardArray, form: Form) -> AwkwardArray:
layout = ak.to_layout(array)
# TODO: should this check whether the form agrees first, or assume that the
# operation is harmless if not required?
result = enforce_layout_to_concatenated_form(layout, form)
return ak.Array(result, behavior=array._behavior, attrs=array._attrs)


def _concatenate_axis_0_meta(*arrays: AwkwardArray) -> AwkwardArray:
# At this stage, the metas have all been enforced to the same type
return arrays[0]


def concatenate(
Expand All @@ -42,28 +59,75 @@ def concatenate(
token = tokenize(arrays, axis, mergebool, highlevel, behavior)
name = f"{label}-{token}"

metas = [c._meta for c in arrays]

if len(metas) == 0:
raise ValueError("Need at least one array to concatenate")

# Are we performing a _logical_ concatenation?
if axis == 0:
npartitions = sum([a.npartitions for a in arrays])
g = {}
i = 0
metas = []
for collection in arrays:
metas.append(collection._meta)
for k in collection.__dask_keys__():
g[(name, i)] = k
i += 1

meta = ak.concatenate(metas, behavior=behavior, attrs=attrs)
assert isinstance(meta, ak.Array)

prev_names = [iarr.name for iarr in arrays]
aml = AwkwardMaterializedLayer(
g,
previous_layer_names=prev_names,
fn=_concatenate_axis0_multiarg,
# There are two possible cases here:
# 1. all arrays have identical metas — just grow the Dask collection
# 2. some arrays have different metas — coerce arrays to same form

# Drop reports from metas to avoid later touching any buffers
metas_no_report = [
typetracer_from_form(x.layout.form, behavior=x.behavior, attrs=x._attrs)
for x in metas
]
# Concatenate metas to determine result form
meta_no_report = ak.concatenate(
metas_no_report, axis=0, behavior=behavior, attrs=attrs
)
intended_form = meta_no_report.layout.form

# If any forms aren't equal to this form, we must enforce each form to the same type
if any(
not m.layout.form.is_equal_to(
intended_form, all_parameters=True, form_key=False
)
for m in metas
):
arrays = [
map_partitions(
_enforce_concatenated_form,
c,
label="enforce-concat-form",
form=intended_form,
output_divisions=1,
)
for c in arrays
]

g = {
(name, i): k
for i, k in enumerate(
k for collection in arrays for k in collection.__dask_keys__()
)
}

aml = AwkwardMaterializedLayer(
g,
previous_layer_names=[a.name for a in arrays],
fn=_concatenate_axis_0_meta,
)
else:
g = {
(name, i): k
for i, k in enumerate(
k for collection in arrays for k in collection.__dask_keys__()
)
}

aml = AwkwardMaterializedLayer(g, previous_layer_names=[arrays[0].name])

hlg = HighLevelGraph.from_collections(name, aml, dependencies=arrays)
return new_array_object(hlg, name, meta=meta, npartitions=npartitions)
return new_array_object(
hlg,
name,
meta=meta_no_report,
npartitions=sum(a.npartitions for a in arrays),
)

if axis > 0:
if partition_compatibility(*arrays) == PartitionCompatibility.NO:
Expand All @@ -72,5 +136,4 @@ def concatenate(
fn = _ConcatenateFnAxisGT0(axis=axis, behavior=behavior, attrs=attrs)
return map_partitions(fn, *arrays)

else:
raise DaskAwkwardNotImplemented("TODO")
raise DaskAwkwardNotImplemented("TODO")
23 changes: 23 additions & 0 deletions src/dask_awkward/lib/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import awkward as ak
import numpy as np
from awkward.types.type import Type
from awkward.typetracer import create_unknown_scalar, is_unknown_scalar
from dask.base import is_dask_collection, tokenize
from dask.highlevelgraph import HighLevelGraph
Expand Down Expand Up @@ -1336,3 +1337,25 @@ def repartition_layer(arr: Array, key: str, divisions: tuple[int, ...]) -> dict:
(_repartition_func,) + tuple((arr.name, part) for part in pp) + (ss,)
)
return layer


@borrow_docstring(ak.enforce_type)
def enforce_type(
array: Array,
type: str | dict | Type,
highlevel: bool = True,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
if not highlevel:
raise ValueError("Only highlevel=True is supported")

return map_partitions(
ak.enforce_type,
array,
label="enforce-type",
type=type,
behavior=behavior,
attrs=attrs,
output_divisions=1,
)
50 changes: 50 additions & 0 deletions tests/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,56 @@ def test_concatenate_simple(daa, caa, axis):
)


def test_concatenate_axis_0_logical_same(daa):
result = dak.concatenate([daa, daa], axis=0)
buffers_report = dak.report_necessary_buffers(result.points.x)
assert len(buffers_report) == 1

buffers = next(iter(buffers_report.values()))

assert buffers.data_and_shape == frozenset(
["@.points.content.x-data", "@.points-offsets"]
)
assert buffers.shape_only == frozenset()


def test_concatenate_axis_0_logical_different(daa):
import dask.config

with dask.config.set(
{"awkward.optimization.on-fail": "raise", "awkward.raise-failed-meta": True}
):
empty_form = ak.forms.from_dict(
{
"class": "RecordArray",
"fields": ["points"],
"contents": [
{
"class": "ListOffsetArray",
"offsets": "i64",
"content": {
"class": "RecordArray",
"fields": ["x", "y"],
"contents": ["int64", "float64"],
},
}
],
}
)
empty_array = ak.Array(empty_form.length_zero_array(highlevel=False))
empty_dak_array = dak.from_awkward(empty_array, npartitions=1)
result = dak.concatenate([daa, empty_dak_array], axis=0)

buffers_report = dak.report_necessary_buffers(result.points.x)
assert len(buffers_report) == 1

buffers = next(iter(buffers_report.values()))
assert buffers.data_and_shape == frozenset(
["@.points.content.x-data", "@.points.content.y-data", "@.points-offsets"]
)
assert buffers.shape_only == frozenset()


@pytest.mark.parametrize("axis", [0, 1, 2])
def test_concatenate_more_axes(axis):
a = [[[1, 2, 3], [], [100, 101], [12, 13]], [[1, 2, 3], [], [100, 101], [12, 13]]]
Expand Down

0 comments on commit 5497da9

Please sign in to comment.