Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connect): sql #3696

Merged
merged 5 commits into from
Jan 17, 2025
Merged

Conversation

universalmind303
Copy link
Contributor

Description

adds spark's df.createOrReplaceTempView and spark.sql to enable sql workflows.

ex:

my_table = spark.read.parquet('my_table.parquet')
my_tablecreateOrReplaceTempView('my_table')

spark.sql('select * from my_table')

Note for reviewers:

in order for it to be easier to interop with connect, I removed the SQLCatalog and refactored the SQLPlanner to take in a DaftMetaCatalog instead. This partially closes #3586. I think we still need to do some work on the python side of things though.

Copy link

codspeed-hq bot commented Jan 16, 2025

CodSpeed Performance Report

Merging #3696 will degrade performances by 16.83%

Comparing universalmind303:sql-catalog (20704df) with main (e7abec3)

Summary

❌ 2 regressions
✅ 25 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main universalmind303:sql-catalog Change
test_count[1 Small File] 3.3 ms 4 ms -16.83%
test_iter_rows_first_row[100 Small Files] 164.2 ms 188.7 ms -12.94%

Copy link

codecov bot commented Jan 16, 2025

Codecov Report

Attention: Patch coverage is 83.83838% with 32 lines in your changes missing coverage. Please review.

Project coverage is 77.06%. Comparing base (beae462) to head (20704df).
Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-connect/src/execute.rs 83.19% 20 Missing ⚠️
src/daft-catalog/src/lib.rs 78.26% 5 Missing ⚠️
src/daft-connect/src/spark_analyzer.rs 87.50% 4 Missing ⚠️
src/daft-connect/src/connect_service.rs 77.77% 2 Missing ⚠️
src/daft-catalog/src/python.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3696      +/-   ##
==========================================
- Coverage   77.87%   77.06%   -0.82%     
==========================================
  Files         721      720       -1     
  Lines       90505    91795    +1290     
==========================================
+ Hits        70479    70738     +259     
- Misses      20026    21057    +1031     
Files with missing lines Coverage Δ
daft/catalog/__init__.py 30.55% <ø> (ø)
src/daft-catalog/python-catalog/src/python.rs 4.44% <ø> (ø)
src/daft-connect/src/session.rs 100.00% <100.00%> (ø)
src/daft-sql/src/lib.rs 100.00% <ø> (ø)
src/daft-sql/src/planner.rs 75.10% <100.00%> (-0.08%) ⬇️
src/daft-sql/src/python.rs 50.74% <100.00%> (+3.97%) ⬆️
src/daft-catalog/src/python.rs 25.80% <0.00%> (ø)
src/daft-connect/src/connect_service.rs 58.65% <77.77%> (+2.33%) ⬆️
src/daft-connect/src/spark_analyzer.rs 72.28% <87.50%> (+0.83%) ⬆️
src/daft-catalog/src/lib.rs 69.29% <78.26%> (-1.00%) ⬇️
... and 1 more

... and 54 files with indirect coverage changes

src/daft-sql/src/lib.rs Outdated Show resolved Hide resolved
src/daft-sql/src/lib.rs Outdated Show resolved Hide resolved
src/daft-sql/src/catalog.rs Outdated Show resolved Hide resolved
src/daft-connect/src/connect_service.rs Outdated Show resolved Hide resolved

let plan = LogicalPlanBuilder::from(plan);

// TODO: code duplication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this reminder for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a pretty big block of code duplicated in a few spots that I want to clean up.

        // TODO: code duplication
        let result_complete = res.result_complete_response();

        let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);

        let this = self.clone();

        tokio::spawn(async move {
            let execution_fut = async {
                let mut result_stream = this.run_query(plan).await?;
                while let Some(result) = result_stream.next().await {
                    let result = result?;
                    let tables = result.get_tables()?;
                    for table in tables.as_slice() {
                        let response = res.arrow_batch_response(table)?;
                        if tx.send(Ok(response)).await.is_err() {
                            return Ok(());
                        }
                    }
                }
                Ok(())
            };
            if let Err(e) = execution_fut.await {
                let _ = tx.send(Err(e)).await;
            }
        });

        let stream = ReceiverStream::new(rx);

        let stream = stream
            .map_err(|e| {
                Status::internal(
                    textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"),
                )
            })
            .chain(stream::once(ready(Ok(result_complete))));

        Ok(Box::pin(stream))

@raunakab
Copy link
Contributor

@universalmind303 You mentioned that there is still some work remaining on the python side of things. What other things are left?

@universalmind303
Copy link
Contributor Author

@universalmind303 You mentioned that there is still some work remaining on the python side of things. What other things are left?

right now there is a daft catalog and a sql catalog on the python side. we should consolidate these into a single catalog.

@universalmind303 universalmind303 merged commit 6b302af into Eventual-Inc:main Jan 17, 2025
40 of 41 checks passed
@universalmind303 universalmind303 deleted the sql-catalog branch January 23, 2025 06:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sql: unify daft.catalog and daft.sql.catalog
3 participants