Skip to content

Commit

Permalink
Feat/sql pagination 2 (#157)
Browse files Browse the repository at this point in the history
(#141) was
becoming hard for me to reason about because FlightSQL pagination has to
be handled differently than SQL tab pagination (because on the SQL tab
we can control batch size directly on the context but we cant do that
with FlightSQL). So I would like to split pagination on each of those
tabs into their own PRs to keep them more focused and easier to review /
reason about / etc.

So this PR makes pagination work and adds integration tests for testing
pagination. There is still room for improvement in the end to end
testing (right now we test some of the implementation details) but at
least we have some coverage.
  • Loading branch information
matthewmturner authored Sep 23, 2024
1 parent 9ff201c commit c431b3a
Show file tree
Hide file tree
Showing 17 changed files with 844 additions and 328 deletions.
112 changes: 86 additions & 26 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,42 @@
//! [`AppExecution`]: Handles executing queries for the TUI application.
use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
use crate::execution::ExecutionContext;
use color_eyre::eyre::Result;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::execute_stream;
use futures::StreamExt;
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

/// Handles executing queries for the TUI application, formatting results
/// and sending them to the UI.
pub(crate) struct AppExecution {
pub struct AppExecution {
inner: Arc<ExecutionContext>,
result_stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
}

impl AppExecution {
/// Create a new instance of [`AppExecution`].
pub fn new(inner: Arc<ExecutionContext>) -> Self {
Self { inner }
Self {
inner,
result_stream: Arc::new(Mutex::new(None)),
}
}

pub fn session_ctx(&self) -> &SessionContext {
self.inner.session_ctx()
}

pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) {
let mut s = self.result_stream.lock().await;
*s = Some(stream)
}

/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
Expand All @@ -60,33 +77,53 @@ impl AppExecution {
let start = std::time::Instant::now();
if i == statement_count - 1 {
info!("Executing last query and display results");
match self.inner.execute_sql(sql).await {
Ok(mut stream) => {
let mut batches = Vec::new();
while let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
batches.push(batch);
}
Err(e) => {
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
break;
sender.send(AppEvent::NewExecution)?;
match self.inner.create_physical_plan(sql).await {
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
Ok(stream) => {
self.set_result_stream(stream).await;
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql.to_string(),
batch: b,
duration,
};
sender.send(AppEvent::ExecutionResultsNextPage(
results,
))?;
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
Err(stream_err) => {
error!("Error creating physical plan: {:?}", stream_err);
let elapsed = start.elapsed();
let e = ExecutionError {
query: sql.to_string(),
error: stream_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
},
Err(plan_err) => {
error!("Error creating physical plan: {:?}", plan_err);
let elapsed = start.elapsed();
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error creating dataframe: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
let e = ExecutionError {
query: sql.to_string(),
error: plan_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
}
} else {
Expand All @@ -107,4 +144,27 @@ impl AppExecution {
}
Ok(())
}

pub async fn next_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
let start = std::time::Instant::now();
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql,
batch: b,
duration,
};
let _ = sender.send(AppEvent::ExecutionResultsNextPage(results));
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
}
136 changes: 68 additions & 68 deletions src/app/handlers/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,74 +66,74 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
}
}

KeyCode::Enter => {
info!("Run FS query");
let sql = app.state.flightsql_tab.editor().lines().join("");
info!("SQL: {}", sql);
let execution = Arc::clone(&app.execution);
let _event_tx = app.event_tx();
tokio::spawn(async move {
let client = execution.flightsql_client();
let mut query =
FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
let start = Instant::now();
if let Some(ref mut c) = *client.lock().await {
info!("Sending query");
match c.execute(sql, None).await {
Ok(flight_info) => {
for endpoint in flight_info.endpoint {
if let Some(ticket) = endpoint.ticket {
match c.do_get(ticket.into_request()).await {
Ok(mut stream) => {
let mut batches: Vec<RecordBatch> = Vec::new();
// temporarily only show the first batch to avoid
// buffering massive result sets. Eventually there should
// be some sort of paging logic
// see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
// while let Some(maybe_batch) = stream.next().await {
if let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
info!("Batch rows: {}", batch.num_rows());
batches.push(batch);
}
Err(e) => {
error!("Error getting batch: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}
let elapsed = start.elapsed();
let rows: usize =
batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error getting response: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}
}
}
Err(e) => {
error!("Error getting response: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}

let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
});
}
// KeyCode::Enter => {
// info!("Run FS query");
// let sql = app.state.flightsql_tab.editor().lines().join("");
// info!("SQL: {}", sql);
// let execution = Arc::clone(&app.execution);
// let _event_tx = app.event_tx();
// tokio::spawn(async move {
// let client = execution.flightsql_client();
// let mut query =
// FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
// let start = Instant::now();
// if let Some(ref mut c) = *client.lock().await {
// info!("Sending query");
// match c.execute(sql, None).await {
// Ok(flight_info) => {
// for endpoint in flight_info.endpoint {
// if let Some(ticket) = endpoint.ticket {
// match c.do_get(ticket.into_request()).await {
// Ok(mut stream) => {
// let mut batches: Vec<RecordBatch> = Vec::new();
// // temporarily only show the first batch to avoid
// // buffering massive result sets. Eventually there should
// // be some sort of paging logic
// // see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
// // while let Some(maybe_batch) = stream.next().await {
// if let Some(maybe_batch) = stream.next().await {
// match maybe_batch {
// Ok(batch) => {
// info!("Batch rows: {}", batch.num_rows());
// batches.push(batch);
// }
// Err(e) => {
// error!("Error getting batch: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
// let elapsed = start.elapsed();
// let rows: usize =
// batches.iter().map(|r| r.num_rows()).sum();
// query.set_results(Some(batches));
// query.set_num_rows(Some(rows));
// query.set_execution_time(elapsed);
// }
// Err(e) => {
// error!("Error getting response: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
// }
// }
// Err(e) => {
// error!("Error getting response: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
//
// let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
// });
// }
_ => {}
}
}
Expand Down
Loading

0 comments on commit c431b3a

Please sign in to comment.