diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index ed7716c..88d3891 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -13,11 +13,7 @@ import dask import dask.dataframe as dd -from dask.base import tokenize -from dask.dataframe.core import new_dd_object from dask.delayed import delayed -from dask.highlevelgraph import HighLevelGraph -from dask.layers import DataFrameIOLayer from dask.utils import parse_bytes @@ -299,31 +295,8 @@ def read_snowflake( batches, meta, npartitions=npartitions, partition_size=partition_size ) - # Legacy check - # if new enough dask version we use from_map - if hasattr(dd, "from_map"): - return dd.from_map( - partial(_fetch_batches, arrow_options=arrow_options), - batches_partitioned, - meta=meta, - ) - # if not revert to exising method - else: - label = "read-snowflake-" - output_name = label + tokenize( - query, - connection_kwargs, - arrow_options, - ) - divisions = tuple([None] * (len(batches_partitioned) + 1)) - # Create Blockwise layer - layer = DataFrameIOLayer( - output_name, - meta.columns, - batches_partitioned, - # TODO: Implement wrapper to only convert columns requested - partial(_fetch_batches, arrow_options=arrow_options), - label=label, - ) - graph = HighLevelGraph({output_name: layer}, {output_name: set()}) - return new_dd_object(graph, output_name, meta, divisions) + return dd.from_map( + partial(_fetch_batches, arrow_options=arrow_options), + batches_partitioned, + meta=meta, + ) diff --git a/requirements.txt b/requirements.txt index f0e9132..1c4a6ee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -dask>=2021.05.0 +dask>=2024.3.0 distributed snowflake-connector-python[pandas]>=2.6.0 snowflake-sqlalchemy