Skip to content

Commit

Permalink
full transition of result native-js-native
Browse files Browse the repository at this point in the history
  • Loading branch information
KSDaemon committed Jan 9, 2025
1 parent f681961 commit b428e5e
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 156 deletions.
1 change: 1 addition & 0 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/cubejs-backend-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ crate-type = ["cdylib", "lib"]
cubesqlplanner = { path = "../../rust/cubesqlplanner/cubesqlplanner" }
cubeorchestrator = { path = "../../rust/cubeorchestrator" }
cubenativeutils = { path = "../../rust/cubenativeutils" }
cubesql = { path = "../../rust/cubesql/cubesql" }
anyhow = "1.0"
async-channel = { version = "2" }
async-trait = "0.1.36"
convert_case = "0.6.0"
pin-project = "1.1.5"
cubesql = { path = "../../rust/cubesql/cubesql" }
findshlibs = "0.10.2"
futures = "0.3.30"
http-body-util = "0.1"
Expand Down
6 changes: 5 additions & 1 deletion packages/cubejs-backend-native/js/ResultWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export class ResultMultiWrapper extends BaseWrapper implements DataResult {
}
}

// This is consumed by native side via Transport Bridge
export class ResultArrayWrapper extends BaseWrapper implements DataResult {
public constructor(private readonly results: ResultWrapper[]) {
super();
Expand All @@ -177,6 +178,9 @@ export class ResultArrayWrapper extends BaseWrapper implements DataResult {
[[], [], []]
);

return getFinalQueryResultArray(transformDataJson, rawData, resultDataJson);
// It seems this is not needed anymore
// return getFinalQueryResultArray(transformDataJson, rawData, resultDataJson);

return [transformDataJson, rawData, resultDataJson];
}
}
4 changes: 1 addition & 3 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ function wrapNativeFunctionWithStream(
} else if (response.error) {
writerOrChannel.reject(errorString(response));
} else if (response.isWrapper) { // Native wrapped result
const resArBuf = await response.getFinalResult();
const resStr = new TextDecoder().decode(resArBuf);
writerOrChannel.resolve(resStr);
writerOrChannel.resolve(await response.getFinalResult());
} else {
writerOrChannel.resolve(JSON.stringify(response));
}
Expand Down
10 changes: 8 additions & 2 deletions packages/cubejs-backend-native/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use crate::transport::MapCubeErrExt;
use crate::utils::bind_method;
use async_trait::async_trait;
use cubeorchestrator::query_result_transform::RequestResultArray;
use cubesql::transport::{SqlGenerator, SqlTemplates};
use cubesql::CubeError;
#[cfg(debug_assertions)]
use log::trace;
use neon::prelude::*;
use tokio::sync::oneshot;

use crate::utils::bind_method;

type JsAsyncStringChannelCallback =
Box<dyn FnOnce(Result<String, CubeError>) -> Result<(), CubeError> + Send>;
type JsAsyncChannelCallback = Box<
Expand Down Expand Up @@ -194,6 +194,12 @@ where
rx.await?
}

#[derive(Debug)]
pub enum ValueFromJs {
String(String),
RequestResultArray(RequestResultArray),
}

#[allow(clippy::type_complexity)]
pub async fn call_raw_js_with_channel_as_callback<T, R>(
channel: Arc<Channel>,
Expand Down
81 changes: 54 additions & 27 deletions packages/cubejs-backend-native/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,51 +144,78 @@ pub fn final_query_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
Ok(promise)
}

pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transform_data_array = cx.argument::<JsValue>(0)?;
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
pub type JsResultDataVectors = (
Vec<TransformDataRequest>,
Vec<Arc<QueryResult>>,
Vec<RequestResultData>,
);

pub fn convert_final_query_result_array_from_js(
cx: &mut FunctionContext<'_>,
transform_data_array: Handle<JsValue>,
data_array: Handle<JsArray>,
results_data_array: Handle<JsValue>,
) -> NeonResult<JsResultDataVectors> {
let deserializer = JsValueDeserializer::new(cx, transform_data_array);
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
{
Ok(data) => data,
Err(err) => return cx.throw_error(err.to_string()),
};

let data_array = cx.argument::<JsArray>(1)?;
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
for data_arg in data_array.to_vec(&mut cx)? {
match extract_query_result(&mut cx, data_arg) {
for data_arg in data_array.to_vec(cx)? {
match extract_query_result(cx, data_arg) {
Ok(query_result) => cube_store_results.push(query_result),
Err(err) => return cx.throw_error(err.to_string()),
};
}

let results_data_array = cx.argument::<JsValue>(2)?;
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
let mut request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
let deserializer = JsValueDeserializer::new(cx, results_data_array);
let request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
Ok(data) => data,
Err(err) => return cx.throw_error(err.to_string()),
};

let promise = cx
.task(move || {
get_final_cubestore_result_array(
&transform_requests,
&cube_store_results,
&mut request_results,
)?;

let final_obj = RequestResultArray {
results: request_results,
};
Ok((transform_requests, cube_store_results, request_results))
}

match serde_json::to_string(&final_obj) {
Ok(json) => Ok(json),
Err(err) => Err(anyhow::Error::from(err)),
}
})
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transform_data_array = cx.argument::<JsValue>(0)?;
let data_array = cx.argument::<JsArray>(1)?;
let results_data_array = cx.argument::<JsValue>(2)?;

Ok(promise)
let convert_res = convert_final_query_result_array_from_js(
&mut cx,
transform_data_array,
data_array,
results_data_array,
);
match convert_res {
Ok((transform_requests, cube_store_results, mut request_results)) => {
let promise = cx
.task(move || {
get_final_cubestore_result_array(
&transform_requests,
&cube_store_results,
&mut request_results,
)?;

let final_obj = RequestResultArray {
results: request_results,
};

match serde_json::to_string(&final_obj) {
Ok(json) => Ok(json),
Err(err) => Err(anyhow::Error::from(err)),
}
})
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));

Ok(promise)
}
Err(err) => cx.throw_error(err.to_string()),
}
}

pub fn final_query_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down
135 changes: 100 additions & 35 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@ use neon::prelude::*;
use std::collections::HashMap;
use std::fmt::Display;

use crate::auth::NativeAuthContext;
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs};
use crate::node_obj_serializer::NodeObjSerializer;
use crate::orchestrator::convert_final_query_result_array_from_js;
use crate::{
auth::TransportRequest, channel::call_js_with_channel_as_callback,
stream::call_js_with_stream_as_callback,
};
use async_trait::async_trait;
use cubeorchestrator::query_result_transform::{
get_final_cubestore_result_array, RequestResultArray,
};
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
use cubesql::compile::engine::df::wrapper::SqlQuery;
use cubesql::transport::{
Expand All @@ -20,14 +31,6 @@ use serde::Serialize;
use std::sync::Arc;
use uuid::Uuid;

use crate::auth::NativeAuthContext;
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator};
use crate::node_obj_serializer::NodeObjSerializer;
use crate::{
auth::TransportRequest, channel::call_js_with_channel_as_callback,
stream::call_js_with_stream_as_callback,
};

#[derive(Debug)]
pub struct NodeBridgeTransport {
channel: Arc<Channel>,
Expand Down Expand Up @@ -369,54 +372,116 @@ impl TransportService for NodeBridgeTransport {
streaming: false,
})?;

let result = call_js_with_channel_as_callback(
let result = call_raw_js_with_channel_as_callback(
self.channel.clone(),
self.on_sql_api_load.clone(),
Some(extra),
extra,
Box::new(|cx, v| Ok(cx.string(v).as_value(cx))),
Box::new(move |cx, v| {
// It's too heavy/slow to get instance of ResultArrayWrapper from JS
// and then call/await the .getFinalResult() method which needs be
// executed again on JS side to get the actual needed date,
// instead we pass it directly from JS side.
// In case of wrapped result it's actually a tuple of
// (transformDataJson[], rawData[], resultDataJson[])
if let Ok(result_wrapped) = v.downcast::<JsArray, _>(cx) {
let res_wrapped_vec = result_wrapped.to_vec(cx).map_cube_err("Can't convert JS result to array")?;

if res_wrapped_vec.len() != 3 {
return Err(CubeError::internal("Expected a tuple with 3 elements: transformDataJson[], rawData[], resultDataJson[]".to_string()));
}

let transform_data_array = res_wrapped_vec.first().unwrap();
let data_array = res_wrapped_vec.get(1).unwrap()
.downcast_or_throw::<JsArray, _>(cx).map_cube_err("Can't downcast js data to array")?;
let results_data_array = res_wrapped_vec.get(2).unwrap();

match convert_final_query_result_array_from_js(
cx,
*transform_data_array,
data_array,
*results_data_array,
) {
Ok((transform_requests, cube_store_results, mut request_results)) => {
get_final_cubestore_result_array(
&transform_requests,
&cube_store_results,
&mut request_results,
).map_cube_err("Can't build result array")?;

Ok(ValueFromJs::RequestResultArray(RequestResultArray {
results: request_results,
}))
}
Err(err) => {
Err(CubeError::internal(format!("Error converting result data: {:?}", err.to_string())))
}
}

} else if let Ok(str) = v.downcast::<JsString, _>(cx) {
Ok(ValueFromJs::String(str.value(cx)))
} else {
Err(CubeError::internal("Can't downcast callback argument to string or resultWrapper object".to_string()))
}
})
)
.await;

if let Err(e) = &result {
if e.message.to_lowercase().contains("continue wait") {
continue;
}
}

let response: serde_json::Value = result?;
match result? {
ValueFromJs::String(result) => {
let response: serde_json::Value = serde_json::Value::String(result);

#[cfg(debug_assertions)]
trace!("[transport] Request <- {:?}", response);
#[cfg(not(debug_assertions))]
trace!("[transport] Request <- <hidden>");
#[cfg(debug_assertions)]
trace!("[transport] Request <- {:?}", response);
#[cfg(not(debug_assertions))]
trace!("[transport] Request <- <hidden>");

if let Some(error_value) = response.get("error") {
match error_value {
serde_json::Value::String(error) => {
if error.to_lowercase() == *"continue wait" {
debug!(
if let Some(error_value) = response.get("error") {
match error_value {
serde_json::Value::String(error) => {
if error.to_lowercase() == *"continue wait" {
debug!(
"[transport] load - retrying request (continue wait) requestId: {}",
request_id
);

continue;
} else {
return Err(CubeError::user(error.clone()));
}
}
other => {
error!(
continue;
} else {
return Err(CubeError::user(error.clone()));
}
}
other => {
error!(
"[transport] load - strange response, success which contains error: {:?}",
other
);

return Err(CubeError::internal(
"Error response with broken data inside".to_string(),
));
}
}
};
return Err(CubeError::internal(
"Error response with broken data inside".to_string(),
));
}
}
};

break serde_json::from_value::<TransportLoadResponse>(response)
.map_err(|err| CubeError::user(err.to_string()));
break serde_json::from_value::<TransportLoadResponse>(response)
.map_err(|err| CubeError::user(err.to_string()));
}
ValueFromJs::RequestResultArray(result) => {
let response = TransportLoadResponse {
pivot_query: None,
slow_query: None,
query_type: None,
results: result.results.into_iter().map(|v| v.into()).collect(),
};
break Ok(response);
}
}
}
}

Expand Down
Loading

0 comments on commit b428e5e

Please sign in to comment.