diff --git a/.gitignore b/.gitignore index 3b2b9fab..3d4251d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .vscode .idea -target/ +**/target Cargo.lock venv/ @@ -9,3 +9,4 @@ venv/ /dist +**/.DS_Store diff --git a/core/src/response.rs b/core/src/response.rs index 048964f5..e7268ea8 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -14,7 +14,7 @@ use crate::error_code::ErrorCode; use crate::session::SessionState; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Deserialize, Debug)] pub struct QueryStats { @@ -53,7 +53,7 @@ pub struct ProgressValues { pub bytes: usize, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct SchemaField { pub name: String, #[serde(rename = "type")] diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 140393de..32e89225 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -29,9 +29,9 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_stream::Stream; -use databend_client::APIClient; use databend_client::PresignedResponse; use databend_client::QueryResponse; +use databend_client::{APIClient, SchemaField}; use databend_driver_core::error::{Error, Result}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; use databend_driver_core::schema::{Schema, SchemaRef}; @@ -221,8 +221,14 @@ impl<'o> RestAPIConnection { }) } - async fn wait_for_schema(&self, resp: QueryResponse) -> Result { - if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress() + async fn wait_for_schema( + &self, + resp: QueryResponse, + return_on_progress: bool, + ) -> Result { + if !resp.data.is_empty() + || !resp.schema.is_empty() + || (return_on_progress && resp.stats.progresses.has_progress()) { return Ok(resp); } @@ -240,7 +246,7 @@ impl<'o> RestAPIConnection { if !result.data.is_empty() || !result.schema.is_empty() - || result.stats.progresses.has_progress() + || (return_on_progress && result.stats.progresses.has_progress()) { break; } @@ -262,6 +268,12 @@ impl<'o> RestAPIConnection { fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } + + pub async fn query_row_batch(&self, sql: &str) -> Result { + let resp = self.client.start_query(sql).await?; + let resp = self.wait_for_schema(resp, false).await?; + RowBatch::from_response(self.client.clone(), resp) + } } type PageFut = Pin> + Send>>;