Skip to content

Commit

Permalink
feat: rest_api support fn query_row_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jan 4, 2025
1 parent 987298f commit 5079797
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.vscode
.idea
target/
**/target
Cargo.lock
venv/

Expand All @@ -9,3 +9,4 @@ venv/

/dist

**/.DS_Store
4 changes: 2 additions & 2 deletions core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::error_code::ErrorCode;
use crate::session::SessionState;
use serde::Deserialize;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Debug)]
pub struct QueryStats {
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct ProgressValues {
pub bytes: usize,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SchemaField {
pub name: String,
#[serde(rename = "type")]
Expand Down
20 changes: 16 additions & 4 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::Stream;

use databend_client::APIClient;
use databend_client::PresignedResponse;
use databend_client::QueryResponse;
use databend_client::{APIClient, SchemaField};
use databend_driver_core::error::{Error, Result};
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
use databend_driver_core::schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -221,8 +221,14 @@ impl<'o> RestAPIConnection {
})
}

async fn wait_for_schema(&self, resp: QueryResponse) -> Result<QueryResponse> {
if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress()
async fn wait_for_schema(
&self,
resp: QueryResponse,
return_on_progress: bool,
) -> Result<QueryResponse> {
if !resp.data.is_empty()
|| !resp.schema.is_empty()
|| (return_on_progress && resp.stats.progresses.has_progress())
{
return Ok(resp);
}
Expand All @@ -240,7 +246,7 @@ impl<'o> RestAPIConnection {

if !result.data.is_empty()
|| !result.schema.is_empty()
|| result.stats.progresses.has_progress()
|| (return_on_progress && result.stats.progresses.has_progress())
{
break;
}
Expand All @@ -262,6 +268,12 @@ impl<'o> RestAPIConnection {
fn default_copy_options() -> BTreeMap<&'o str, &'o str> {
vec![("purge", "true")].into_iter().collect()
}

pub async fn query_row_batch(&self, sql: &str) -> Result<RowBatch> {
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp, false).await?;
RowBatch::from_response(self.client.clone(), resp)
}
}

type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
Expand Down

0 comments on commit 5079797

Please sign in to comment.