Skip to content

Commit

Permalink
refactor: remove eyre from daft-connect (#3719)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 authored Jan 23, 2025
1 parent 98a5589 commit b31ad6b
Show file tree
Hide file tree
Showing 15 changed files with 508 additions and 333 deletions.
18 changes: 1 addition & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
common-error = {workspace = true, optional = true, features = ["python"]}
common-file-formats = {workspace = true, optional = true, features = ["python"]}
daft-catalog = {path = "../daft-catalog", optional = true, features = ["python"]}
common-file-formats = {workspace = true, optional = true, features = [
"python"
]}
common-runtime = {workspace = true}
daft-catalog = {path = "../daft-catalog", optional = true, features = [
"python"
]}
daft-core = {workspace = true, optional = true, features = ["python"]}
daft-dsl = {workspace = true, optional = true, features = ["python"]}
daft-local-execution = {workspace = true, optional = true, features = ["python"]}
daft-local-execution = {workspace = true, optional = true, features = [
"python"
]}
daft-logical-plan = {workspace = true, optional = true, features = ["python"]}
daft-micropartition = {workspace = true, optional = true, features = ["python"]}
daft-ray-execution = {workspace = true, optional = true, features = ["python"]}
daft-micropartition = {workspace = true, optional = true, features = [
"python"
]}
daft-ray-execution = {workspace = true, optional = true, features = [
"python"
]}
daft-scan = {workspace = true, optional = true, features = ["python"]}
daft-schema = {workspace = true, optional = true, features = ["python"]}
daft-sql = {workspace = true, optional = true, features = ["python"]}
daft-table = {workspace = true, optional = true, features = ["python"]}
dashmap = "6.1.0"
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
once_cell = {workspace = true}
Expand All @@ -26,7 +36,7 @@ tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}
common-runtime.workspace = true
snafu.workspace = true

[features]
default = ["python"]
Expand Down
10 changes: 4 additions & 6 deletions src/daft-connect/src/connect_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,23 @@ impl SparkConnectService for DaftSparkConnectService {
let Plan { op_type } = plan.required("plan")?;

let OpType::Root(relation) = op_type.required("op_type")? else {
return invalid_argument_err!("op_type must be Root");
invalid_argument_err!("op_type must be Root");
};

let translator = SparkAnalyzer::new(&session);

let result = match translator.relation_to_spark_schema(relation).await {
Ok(schema) => schema,
Err(e) => {
return invalid_argument_err!(
"Failed to translate relation to schema: {e:?}"
);
invalid_argument_err!("Failed to translate relation to schema: {e:?}");
}
};
Ok(Response::new(rb.schema_response(result)))
}
Analyze::DdlParse(DdlParse { ddl_string }) => {
let daft_schema = match daft_sql::sql_schema(&ddl_string) {
Ok(daft_schema) => daft_schema,
Err(e) => return invalid_argument_err!("{e}"),
Err(e) => invalid_argument_err!("{e}"),
};

let daft_schema = daft_schema.to_struct();
Expand All @@ -198,7 +196,7 @@ impl SparkConnectService for DaftSparkConnectService {
};

let OpType::Root(input) = plan.op_type.required("op_type")? else {
return invalid_argument_err!("op_type must be Root");
invalid_argument_err!("op_type must be Root");
};

if let Some(common) = &input.common {
Expand Down
99 changes: 51 additions & 48 deletions src/daft-connect/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::fmt::Write;

use daft_core::prelude::*;

use crate::error::ConnectResult;

// note: right now this is only implemented for Schema, but we'll want to extend this for our dataframe output, and the plan repr.
pub trait SparkDisplay {
fn repr_spark_string(&self) -> String;
Expand All @@ -28,14 +30,14 @@ fn write_field(
field_name: &str,
dtype: &DataType,
level: usize,
) -> eyre::Result<()> {
) -> ConnectResult<()> {
fn write_field_inner(
w: &mut String,
field_name: &str,
dtype: &DataType,
level: usize,
is_list: bool,
) -> eyre::Result<()> {
) -> ConnectResult<()> {
let indent = make_indent(level);

let dtype_str = type_to_string(dtype);
Expand Down Expand Up @@ -85,50 +87,51 @@ fn make_indent(level: usize) -> String {

fn type_to_string(dtype: &DataType) -> String {
match dtype {
DataType::Null => "null".to_string(),
DataType::Boolean => "boolean".to_string(),
DataType::Int8 => "byte".to_string(),
DataType::Int16 => "short".to_string(),
DataType::Int32 => "integer".to_string(),
DataType::Int64 => "long".to_string(),
DataType::Float32 => "float".to_string(),
DataType::Float64 => "double".to_string(),
DataType::Decimal128(precision, scale) => format!("decimal({precision},{scale})"),
DataType::Timestamp(_, _) => "timestamp".to_string(),
DataType::Date => "date".to_string(),
DataType::Time(_) => "time".to_string(),
DataType::Duration(_) => "duration".to_string(),
DataType::Interval => "interval".to_string(),
DataType::Binary => "binary".to_string(),
DataType::FixedSizeBinary(_) => "arrow.fixed_size_binary".to_string(),
DataType::Utf8 => "string".to_string(),
DataType::FixedSizeList(_, _) => "arrow.fixed_size_list".to_string(),
DataType::List(_) => "array".to_string(),
DataType::Struct(_) => "struct".to_string(),
DataType::Map { .. } => "map".to_string(),
DataType::Extension(_, _, _) => "daft.extension".to_string(),
DataType::Embedding(_, _) => "daft.embedding".to_string(),
DataType::Image(_) => "daft.image".to_string(),
DataType::FixedShapeImage(_, _, _) => "daft.fixed_shape_image".to_string(),
DataType::Tensor(_) => "daft.tensor".to_string(),
DataType::FixedShapeTensor(_, _) => "daft.fixed_shape_tensor".to_string(),
DataType::SparseTensor(_) => "daft.sparse_tensor".to_string(),
DataType::FixedShapeSparseTensor(_, _) => "daft.fixed_shape_sparse_tensor".to_string(),
DataType::Python => "daft.python".to_string(),
DataType::Unknown => "unknown".to_string(),
DataType::UInt8 => "arrow.uint8".to_string(),
DataType::UInt16 => "arrow.uint16".to_string(),
DataType::UInt32 => "arrow.uint32".to_string(),
DataType::UInt64 => "arrow.uint64".to_string(),
DataType::Null => "null",
DataType::Boolean => "boolean",
DataType::Int8 => "byte",
DataType::Int16 => "short",
DataType::Int32 => "integer",
DataType::Int64 => "long",
DataType::Float32 => "float",
DataType::Float64 => "double",
DataType::Decimal128(precision, scale) => return format!("decimal({precision},{scale})"),
DataType::Timestamp(_, _) => "timestamp",
DataType::Date => "date",
DataType::Time(_) => "time",
DataType::Duration(_) => "duration",
DataType::Interval => "interval",
DataType::Binary => "binary",
DataType::FixedSizeBinary(_) => "arrow.fixed_size_binary",
DataType::Utf8 => "string",
DataType::FixedSizeList(_, _) => "arrow.fixed_size_list",
DataType::List(_) => "array",
DataType::Struct(_) => "struct",
DataType::Map { .. } => "map",
DataType::Extension(_, _, _) => "daft.extension",
DataType::Embedding(_, _) => "daft.embedding",
DataType::Image(_) => "daft.image",
DataType::FixedShapeImage(_, _, _) => "daft.fixed_shape_image",
DataType::Tensor(_) => "daft.tensor",
DataType::FixedShapeTensor(_, _) => "daft.fixed_shape_tensor",
DataType::SparseTensor(_) => "daft.sparse_tensor",
DataType::FixedShapeSparseTensor(_, _) => "daft.fixed_shape_sparse_tensor",
DataType::Python => "daft.python",
DataType::Unknown => "unknown",
DataType::UInt8 => "arrow.uint8",
DataType::UInt16 => "arrow.uint16",
DataType::UInt32 => "arrow.uint32",
DataType::UInt64 => "arrow.uint64",
}
.to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_empty_schema() -> eyre::Result<()> {
fn test_empty_schema() -> ConnectResult<()> {
let schema = Schema::empty();
let output = schema.repr_spark_string();
let expected = "root\n";
Expand All @@ -137,7 +140,7 @@ mod tests {
}

#[test]
fn test_single_field_schema() -> eyre::Result<()> {
fn test_single_field_schema() -> ConnectResult<()> {
let mut fields = Vec::new();
fields.push(Field::new("step", DataType::Int32));
let schema = Schema::new(fields)?;
Expand All @@ -148,7 +151,7 @@ mod tests {
}

#[test]
fn test_multiple_simple_fields() -> eyre::Result<()> {
fn test_multiple_simple_fields() -> ConnectResult<()> {
let mut fields = Vec::new();
fields.push(Field::new("step", DataType::Int32));
fields.push(Field::new("type", DataType::Utf8));
Expand All @@ -166,7 +169,7 @@ root
}

#[test]
fn test_struct_field() -> eyre::Result<()> {
fn test_struct_field() -> ConnectResult<()> {
// Create a schema with a struct field
let inner_fields = vec![
Field::new("inner1", DataType::Utf8),
Expand All @@ -192,7 +195,7 @@ root
}

#[test]
fn test_nested_struct_in_struct() -> eyre::Result<()> {
fn test_nested_struct_in_struct() -> ConnectResult<()> {
let inner_struct = DataType::Struct(vec![
Field::new("deep", DataType::Boolean),
Field::new("deeper", DataType::Utf8),
Expand Down Expand Up @@ -220,7 +223,7 @@ root
}

#[test]
fn test_list_fields() -> eyre::Result<()> {
fn test_list_fields() -> ConnectResult<()> {
let list_of_int = DataType::List(Box::new(DataType::Int16));
let fixed_list_of_floats = DataType::FixedSizeList(Box::new(DataType::Float32), 3);

Expand All @@ -242,7 +245,7 @@ root
}

#[test]
fn test_map_field() -> eyre::Result<()> {
fn test_map_field() -> ConnectResult<()> {
let map_type = DataType::Map {
key: Box::new(DataType::Utf8),
value: Box::new(DataType::Int32),
Expand All @@ -267,7 +270,7 @@ root
}

#[test]
fn test_extension_type() -> eyre::Result<()> {
fn test_extension_type() -> ConnectResult<()> {
let extension_type =
DataType::Extension("some_ext_type".to_string(), Box::new(DataType::Int32), None);

Expand All @@ -285,7 +288,7 @@ root
}

#[test]
fn test_complex_nested_schema() -> eyre::Result<()> {
fn test_complex_nested_schema() -> ConnectResult<()> {
// A very nested schema to test indentation and various types together
let struct_inner = DataType::Struct(vec![
Field::new("sub_list", DataType::List(Box::new(DataType::Utf8))),
Expand Down Expand Up @@ -327,7 +330,7 @@ root
}

#[test]
fn test_field_name_special_chars() -> eyre::Result<()> {
fn test_field_name_special_chars() -> ConnectResult<()> {
// Field with spaces and special characters
let mut fields = Vec::new();
fields.push(Field::new("weird field@!#", DataType::Utf8));
Expand All @@ -342,7 +345,7 @@ root
}

#[test]
fn test_zero_sized_fixed_list() -> eyre::Result<()> {
fn test_zero_sized_fixed_list() -> ConnectResult<()> {
// Although unusual, test a fixed size list with size=0
let zero_sized_list = DataType::FixedSizeList(Box::new(DataType::Int8), 0);
let mut fields = Vec::new();
Expand Down
26 changes: 0 additions & 26 deletions src/daft-connect/src/err.rs

This file was deleted.

Loading

0 comments on commit b31ad6b

Please sign in to comment.