Skip to content

Commit

Permalink
Merge pull request #90 from martindurant/implementations
Browse files Browse the repository at this point in the history
Implementations: pyspark and ray
  • Loading branch information
martindurant authored Feb 6, 2025
2 parents 3223604 + 22794bc commit 6b83113
Show file tree
Hide file tree
Showing 27 changed files with 1,155 additions and 74 deletions.
20 changes: 20 additions & 0 deletions .github/envs/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: test-environment
channels:
- conda-forge
dependencies:
- dask >=2025
- pandas
- polars
- pyspark
- pyarrow >=15
- numpy
- pytest
- pytest-cov
- numba
- awkward
- distributed
- openjdk ==20
- pip
- pip:
- ray[data]
- git+https://github.com/dask-contrib/dask-awkward
14 changes: 9 additions & 5 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ jobs:
fail-fast: false
matrix:
platform: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]
runs-on: ${{matrix.platform}}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: setup Python ${{matrix.python-version}}
uses: actions/setup-python@v4
- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v3
with:
python-version: ${{matrix.python-version}}
python-version: ${{ matrix.python-version }}
environment-file: .github/envs/environment.yml
activate-environment: test-environment
- name: install
shell: bash -l {0}
run: |
pip install pip wheel -U
pip install -q --no-cache-dir .[test]
pip install -q --no-cache-dir -e .[test]
pip list
- name: test
shell: bash -l {0}
run: |
python -m pytest -v --cov-config=.coveragerc --cov akimbo
10 changes: 10 additions & 0 deletions docs/demo/akimbo-demo.ipynb → docs/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "8b1be0e8",
"metadata": {},
"source": [
"# HEP Demo\n",
"\n",
"Here we show a plausible small workflow on a real excerpt of particle data."
]
},
{
"cell_type": "code",
"execution_count": 1,
Expand Down
36 changes: 21 additions & 15 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
akimbo
==============

.. currentmodule:: akimbo

Top Level Functions
~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: generated/

read_parquet
read_json
read_avro
get_parquet_schema
get_json_schema
get_avro_schema

Accessor
~~~~~~~~

Expand All @@ -38,6 +23,8 @@ Backends
akimbo.dask.DaskAwkwardAccessor
akimbo.polars.PolarsAwkwardAccessor
akimbo.cudf.CudfAwkwardAccessor
akimbo.ray.RayAccessor
akimbo.spark.SparkAccessor

.. autoclass:: akimbo.pandas.PandasAwkwardAccessor

Expand All @@ -47,6 +34,25 @@ Backends

.. autoclass:: akimbo.cudf.CudfAwkwardAccessor

.. autoclass:: akimbo.ray.RayAccessor

.. autoclass:: akimbo.spark.SparkAccessor

Top Level Functions
~~~~~~~~~~~~~~~~~~~
.. currentmodule:: akimbo


.. autosummary::
:toctree: generated/

read_parquet
read_json
read_avro
get_parquet_schema
get_json_schema
get_avro_schema


Extensions
~~~~~~~~~~
Expand Down
8 changes: 1 addition & 7 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
]

templates_path = ["_templates"]
exclude_patterns = [
"_build",
"Thumbs.db",
".DS_Store",
"**.ipynb_checkpoints",
"**akimbo-demo.ipynb",
]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store", "**.ipynb_checkpoints"]


# -- Options for HTML output -------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/cudf-ak.ipynb
1 change: 0 additions & 1 deletion docs/demo/.gitignore

This file was deleted.

File renamed without changes.
16 changes: 15 additions & 1 deletion example/cudf-ak.ipynb → docs/example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "ee00a3e2",
"metadata": {},
"source": [
"# GPU backend"
]
},
{
"cell_type": "markdown",
"id": "58d18a3a-45b1-425a-b822-e8be0a6c0bc0",
"metadata": {},
"source": [
"This example depends on data in a file that can be made in the following way.\n",
"\n",
"```python\n",
"import awkward as ak\n",
Expand All @@ -14,6 +23,11 @@
" [[6, 7]]] * N\n",
" arr = ak.Array({\"a\": part})\n",
" ak.to_parquet(arr, fn, extensionarray=False)\n",
"```\n",
"\n",
"The file cuda-env.yaml can be used to create a functional environment using conda:\n",
"```bash\n",
"$ conda env create -f example/cuda-env.yaml\n",
"```"
]
},
Expand Down Expand Up @@ -617,7 +631,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
"version": "3.10.9"
}
},
"nbformat": 4,
Expand Down
9 changes: 9 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ identical syntax:
- dask.dataframe
- polars
- cuDF
- ray dataset
- pyspark


numpy-like API
Expand Down Expand Up @@ -111,6 +113,13 @@ the ``akimbo`` system, you can apply these methods to ragged/nested dataframes.
install.rst
quickstart.ipynb

.. toctree::
:maxdepth: 1
:caption: Demos

akimbo-demo.ipynb
cudf-ak.ipynb

.. toctree::
:maxdepth: 1
:caption: API Reference
Expand Down
6 changes: 5 additions & 1 deletion docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ Requirements
~~~~~~~~~~~~

To install ``akimbo`` you will need ``awkward`` and
one of the backend libraries: ``pandas``, ``dask`` or ``polars``.
one of the backend libraries: ``pandas``, ``dask``, ``cuDF``, ``ray.data``,
``pyspark`` or ``polars``. Each of there have various installation options,
please see their respective documentation.

``akimbo`` depends on ``pyarrow`` and ``awkward``.


From PyPI
Expand Down
File renamed without changes
File renamed without changes
6 changes: 5 additions & 1 deletion src/akimbo/apply_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def dec(
match: function to determine if a part of the data structure matches the type we want to
operate on
outtype: postprocessing function after transform
inmode: how ``func`` expects its inputs: as awkward arrays (ak), numpy or arrow
inmode: how ``func`` expects its inputs: as
- ak: awkward arrays,
- numpy
- arrow
- other: anything that can be cast to ak arrays, e.g., number literals
"""

@functools.wraps(func)
Expand Down
18 changes: 16 additions & 2 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def f(lay, method=meth, **kwargs):


class CudfAwkwardAccessor(Accessor):
"""Operations on cuDF dataframes on the GPU.
Data are kept in GPU memory and use views rather than copies where
possible.
"""

series_type = Series
dataframe_type = DataFrame

Expand Down Expand Up @@ -145,9 +151,17 @@ def str(self):
try:
cast = dec_cu(libcudf.unary.cast, match=leaf)
except AttributeError:

def cast_inner(col, dtype):
return cudf.core.column.ColumnBase(col.data, size=len(col), dtype=np.dtype(dtype),
mask=None, offset=0, children=())
return cudf.core.column.ColumnBase(
col.data,
size=len(col),
dtype=np.dtype(dtype),
mask=None,
offset=0,
children=(),
)

cast = dec_cu(cast_inner, match=leaf)

@property
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run(self, *args, **kwargs):
ar = [self._to_tt(ar) if hasattr(ar, "ak") else ar for ar in ar]
out = op(tt, *ar, **kwargs)
meta = PandasAwkwardAccessor._to_output(
ak.typetracer.length_zero_if_typetracer(out)
ak.typetracer.length_one_if_typetracer(out)
)
except (ValueError, TypeError):
meta = None
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, accessor) -> None:
floor_temporal = dec_t(pc.floor_temporal)
reound_temporal = dec_t(pc.round_temporal)
strftime = dec_t(pc.strftime)
strptime = dec_t(pc.strptime)
# strptime = dec_t(pc.strptime) # this is in .str instead
day = dec_t(pc.day)
day_of_week = dec_t(pc.day_of_week)
day_of_year = dec_t(pc.day_of_year)
Expand Down
14 changes: 12 additions & 2 deletions src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


def ak_to_series(ds, backend="pandas", extract=True):
"""Make backend-specific series from data"""
if backend == "pandas":
import akimbo.pandas

Expand All @@ -23,13 +24,18 @@ def ak_to_series(ds, backend="pandas", extract=True):
import akimbo.cudf

s = akimbo.cudf.CudfAwkwardAccessor._to_output(ds)
elif backend in ["ray", "spark"]:
raise ValueError("Backend only supports dataframes, not series")

else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unpack()
return s


# TODO: read_parquet should use native versions rather than convert. This version
# is OK for pandas
def read_parquet(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -60,6 +66,8 @@ def read_parquet(
return ak_to_series(ds, backend, extract=extract)


# TODO: should be a map over input files, maybe with newline byte blocks
# as in dask
def read_json(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -124,6 +132,8 @@ def get_json_schema(
return layout_to_jsonschema(arr.layout)


# TODO: should be a map over input files, maybe with newline byte blocks
# as in dask
def read_avro(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -205,9 +215,9 @@ def join(
merge = _merge

counts = np.empty(len(table1), dtype="uint64")
# TODO: the line below over-allocates, can switch to somehing growable
# TODO: the line below over-allocates, can switch to something growable
matches = np.empty(len(table2), dtype="uint64")
# TODO: to_numpy(allow_missong) makes this a bit faster, but is not
# TODO: to_numpy(allow_missing) makes this a bit faster, but is not
# not GPU general
counts, matches, ind = merge(table1[key], table2[key], counts, matches)
matches.resize(int(ind), refcheck=False)
Expand Down
Loading

0 comments on commit 6b83113

Please sign in to comment.