Skip to content

Commit

Permalink
Converting from HLG to from_map (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-rakowski authored Jul 5, 2024
1 parent 59c83b4 commit ef0c965
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 29 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ jobs:
# xref https://github.com/coiled/dask-snowflake/pull/56
- os: macos-latest
python-version: "3.11"

steps:
- name: Checkout source
uses: actions/checkout@v2
Expand Down
30 changes: 4 additions & 26 deletions dask_snowflake/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@

import dask
import dask.dataframe as dd
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.delayed import delayed
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from dask.utils import parse_bytes


Expand Down Expand Up @@ -263,13 +259,6 @@ def read_snowflake(
if partition_size is None and npartitions is None:
partition_size = "100MiB"

label = "read-snowflake-"
output_name = label + tokenize(
query,
connection_kwargs,
arrow_options,
)

# Disable `log_imported_packages_in_telemetry` as a temporary workaround for
# https://github.com/snowflakedb/snowflake-connector-python/issues/1648.
# Also xref https://github.com/coiled/dask-snowflake/issues/51.
Expand All @@ -286,11 +275,7 @@ def read_snowflake(
# right partner application ID.
batches = _fetch_query_batches(query, connection_kwargs, execute_params).compute()
if not batches:
# Empty results set -> return an empty DataFrame
meta = dd.utils.make_meta({})
graph = {(output_name, 0): meta}
divisions = (None, None)
return new_dd_object(graph, output_name, meta, divisions)
return dd.from_pandas(pd.DataFrame(), npartitions=1)

batch_types = set(type(b) for b in batches)
if len(batch_types) > 1 or next(iter(batch_types)) is not ArrowResultBatch:
Expand All @@ -310,15 +295,8 @@ def read_snowflake(
batches, meta, npartitions=npartitions, partition_size=partition_size
)

# Create Blockwise layer
layer = DataFrameIOLayer(
output_name,
meta.columns,
batches_partitioned,
# TODO: Implement wrapper to only convert columns requested
return dd.from_map(
partial(_fetch_batches, arrow_options=arrow_options),
label=label,
batches_partitioned,
meta=meta,
)
divisions = tuple([None] * (len(batches_partitioned) + 1))
graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dask>=2021.05.0
dask>=2024.3.0
distributed
snowflake-connector-python[pandas]>=2.6.0
snowflake-sqlalchemy
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
packages=["dask_snowflake"],
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
python_requires=">=3.7",
python_requires=">=3.9",
install_requires=open("requirements.txt").read().strip().split("\n"),
include_package_data=True,
zip_safe=False,
Expand Down

0 comments on commit ef0c965

Please sign in to comment.