Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] connect: createDataFrame (WIP) help needed #3376

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use eyre::{bail, Context};
use spark_connect::{relation::RelType, Limit, Relation};
use tracing::warn;

use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range};
use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range, to_df::to_df};

mod aggregate;
mod project;
mod range;
mod to_df;
mod local_relation;

pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
if let Some(common) = relation.common {
Expand All @@ -25,6 +27,8 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
RelType::Aggregate(a) => {
aggregate(*a).wrap_err("Failed to apply aggregate to logical plan")
}
RelType::ToDf(t) => to_df(*t).wrap_err("Failed to apply to_df to logical plan"),
RelType::LocalRelation(l) => local_relation(*l).wrap_err("Failed to apply local_relation to logical plan"),
plan => bail!("Unsupported relation type: {plan:?}"),
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/daft-connect/src/translation/logical_plan/local_relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use daft_logical_plan::LogicalPlanBuilder;

pub fn local_relation(
local_relation: spark_connect::LocalRelation,
) -> eyre::Result<LogicalPlanBuilder> {
let spark_connect::LocalRelation {
data,
schema,
} = local_relation;
}
Comment on lines +3 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs to be completed - it currently lacks a return value and implementation. The function should process the data and schema parameters to construct and return a LogicalPlanBuilder. Consider adding error handling for invalid data or schema formats.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

26 changes: 26 additions & 0 deletions src/daft-connect/src/translation/logical_plan/to_df.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use eyre::{bail, WrapErr};
use daft_logical_plan::LogicalPlanBuilder;
use crate::translation::to_logical_plan;
Comment on lines +1 to +3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The daft_dsl crate needs to be imported to support the col() function call on line 20. Consider adding use daft_dsl; to the imports section.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.


pub fn to_df(to_df: spark_connect::ToDf) -> eyre::Result<LogicalPlanBuilder> {
let spark_connect::ToDf {
input,
column_names,
} = to_df;

let Some(input) = input else {
bail!("Input is required")
};

let plan = to_logical_plan(*input)
.wrap_err_with(|| format!("Failed to translate relation to logical plan: {input:?}"))?;

let column_names: Vec<_> = column_names
.iter()
.map(|name| daft_dsl::col(name))
.collect();

let plan = plan.with_columns(column_names)?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing semicolon at the end of plan.with_columns(column_names)?. This will cause a compilation error.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.


Ok(plan)
}
13 changes: 13 additions & 0 deletions tests/connect/test_create_df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations


def test_create_df(spark_session):
# Create a DataFrame with duplicate values
data = [(1,), (2,), (2,), (3,), (3,), (3,)]
df = spark_session.createDataFrame(data, ["value"])

# Collect and verify results
result = df.collect()

# Verify the DataFrame has the expected number of rows and values
assert sorted([row.value for row in result]) == [1, 2, 2, 3, 3, 3]
Loading