Skip to content

Commit

Permalink
feat(connect): Rust ray exec (#3666)
Browse files Browse the repository at this point in the history
## Description 

you can now specify the runner you want to use via native spark config

```py
from daft.daft import connect_start
from pyspark.sql import SparkSession

server = connect_start()
url = f"sc://localhost:{server.port()}"


daft_spark = SparkSession.builder.appName("DaftConnectExample").remote(url).getOrCreate()

daft_spark.conf.set("daft.runner", "ray")
# or use native
# daft_spark.conf.set("daft.runner", "native")

df1 = daft_spark.read.parquet("~/datasets/tpcds/sf10/customer.parquet")
df1.limit(10).show()

```

## Note for reviewers

so i had to do a bit of refactoring to get this to work, mostly in how
the show string works. The actual ray implementation is isolated within
the new `daft-ray-execution` lib, and it's just a wrapper around our
existing python code. The idea with putting it in it's own lib is that
it creates a better abstraction and if we want to later port more of
that code into rust, it'll be a lot easier.


also a few small drivebys that were bugging me while working on this

- change `warn!`'s to `debug!`'s as it was cluttering the output on
every command.
- refactor `PlanIds` to actually reflects what it does, a
`ResponseBuilder`.
- the error output for unsupported relations was nasty, so i simplified
it
[here](https://github.com/Eventual-Inc/Daft/pull/3666/files#diff-0f6aee05ac5693372752b1eab7454e80142119479e41093d0b975bb777d83ffdR169)
and
[here](https://github.com/Eventual-Inc/Daft/pull/3666/files#diff-0f6aee05ac5693372752b1eab7454e80142119479e41093d0b975bb777d83ffdR169)
  • Loading branch information
universalmind303 authored Jan 14, 2025
1 parent c932ec9 commit 0e03303
Show file tree
Hide file tree
Showing 25 changed files with 769 additions and 423 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ daft-micropartition = {path = "src/daft-micropartition", default-features = fals
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-ray-execution = {path = "src/daft-ray-execution", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
Expand All @@ -56,7 +57,6 @@ python = [
"common-system-info/python",
"daft-catalog-python-catalog/python",
"daft-catalog/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
Expand Down Expand Up @@ -172,7 +172,8 @@ members = [
"src/parquet2",
# "src/spark-connect-script",
"src/generated/spark-connect",
"src/common/partitioning"
"src/common/partitioning",
"src/daft-ray-execution"
]

[workspace.dependencies]
Expand Down Expand Up @@ -200,6 +201,7 @@ daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-ray-execution = {path = "src/daft-ray-execution"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-sql = {path = "src/daft-sql"}
Expand Down
43 changes: 31 additions & 12 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,30 +1,49 @@
[dependencies]
arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
common-daft-config = {workspace = true}
common-file-formats = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
daft-logical-plan = {workspace = true}
daft-micropartition = {workspace = true}
daft-scan = {workspace = true}
daft-schema = {workspace = true}
daft-sql = {workspace = true}
daft-table = {workspace = true}
common-daft-config = {workspace = true, optional = true, features = ["python"]}
common-error = {workspace = true, optional = true, features = ["python"]}
common-file-formats = {workspace = true, optional = true, features = ["python"]}
daft-core = {workspace = true, optional = true, features = ["python"]}
daft-dsl = {workspace = true, optional = true, features = ["python"]}
daft-local-execution = {workspace = true, optional = true, features = ["python"]}
daft-logical-plan = {workspace = true, optional = true, features = ["python"]}
daft-micropartition = {workspace = true, optional = true, features = ["python"]}
daft-ray-execution = {workspace = true, optional = true, features = ["python"]}
daft-scan = {workspace = true, optional = true, features = ["python"]}
daft-schema = {workspace = true, optional = true, features = ["python"]}
daft-sql = {workspace = true, optional = true, features = ["python"]}
daft-table = {workspace = true, optional = true, features = ["python"]}
dashmap = "6.1.0"
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
pyo3 = {workspace = true, optional = true}
spark-connect = {workspace = true}
textwrap = "0.16.1"
tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}

[features]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python", "daft-micropartition/python"]
default = ["python"]
python = [
"dep:pyo3",
"dep:common-daft-config",
"dep:common-error",
"dep:common-file-formats",
"dep:daft-core",
"dep:daft-dsl",
"dep:daft-local-execution",
"dep:daft-logical-plan",
"dep:daft-micropartition",
"dep:daft-ray-execution",
"dep:daft-scan",
"dep:daft-schema",
"dep:daft-sql",
"dep:daft-table"
]

[lints]
workspace = true
Expand Down
1 change: 0 additions & 1 deletion src/daft-connect/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ fn type_to_string(dtype: &DataType) -> String {
DataType::FixedShapeTensor(_, _) => "daft.fixed_shape_tensor".to_string(),
DataType::SparseTensor(_) => "daft.sparse_tensor".to_string(),
DataType::FixedShapeSparseTensor(_, _) => "daft.fixed_shape_sparse_tensor".to_string(),
#[cfg(feature = "python")]
DataType::Python => "daft.python".to_string(),
DataType::Unknown => "unknown".to_string(),
DataType::UInt8 => "arrow.uint8".to_string(),
Expand Down
Loading

0 comments on commit 0e03303

Please sign in to comment.