Skip to content

Commit

Permalink
feat(cubesql): Additional trace event logging for SQL API (#7524)
Browse files Browse the repository at this point in the history
* feat(cubesql): Additional trace event logging for SQL API

* Fix dataframe error test

* Fix clippy
  • Loading branch information
paveltiunov authored Dec 13, 2023
1 parent 9729173 commit 6b700cd
Show file tree
Hide file tree
Showing 16 changed files with 685 additions and 136 deletions.
29 changes: 0 additions & 29 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1687,12 +1687,6 @@ class ApiGateway {
resType = query.responseFormat;
}

this.log({
type: 'Load Request',
query,
streaming: request.streaming,
}, context);

const [queryType, normalizedQueries] =
await this.getNormalizedQueries(query, context);

Expand Down Expand Up @@ -1792,29 +1786,6 @@ class ApiGateway {
);
}

this.log(
{
type: 'Load Request Success',
query,
duration: this.duration(requestStarted),
apiType,
isPlayground: Boolean(
context.signedWithPlaygroundAuthSecret
),
queries: results.length,
queriesWithPreAggregations:
results.filter(
(r: any) => Object.keys(
r.usedPreAggregations || {}
).length
).length,
queriesWithData:
results.filter((r: any) => r.data?.length).length,
dbType: results.map(r => r.dbType),
},
context,
);

res(request.streaming ? results[0] : {
results,
});
Expand Down
11 changes: 10 additions & 1 deletion packages/cubejs-api-gateway/src/sql-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ export class SQLServer {
}
});
},
sqlApiLoad: async ({ request, session, query, sqlQuery, streaming }) => {
sqlApiLoad: async ({ request, session, query, queryKey, sqlQuery, streaming }) => {
const context = await contextByRequest(request, session);

// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
try {
await this.apiGateway.sqlApiLoad({
queryKey,
query,
sqlQuery,
streaming,
Expand Down Expand Up @@ -175,6 +176,14 @@ export class SQLServer {
}
});
},
logLoadEvent: async ({ request, session, event, properties }) => {
const context = await contextByRequest(request, session);

this.apiGateway.log({
type: event,
...properties
}, context);
},
sqlGenerators: async (paramsJson: string) => {
// TODO get rid of it
const { request, session } = JSON.parse(paramsJson);
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-api-gateway/src/types/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type SqlApiRequest = BaseRequest & {
query: Record<string, any>;
sqlQuery?: [string, string[]];
apiType?: ApiType;
queryKey: any;
streaming?: boolean;
};

Expand Down
10 changes: 10 additions & 0 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,18 @@ export interface SqlApiLoadPayload {
request: Request<LoadRequestMeta>,
session: SessionContext,
query: any,
queryKey: any,
sqlQuery: any,
streaming: boolean,
}

export interface LogLoadEventPayload {
request: Request<LoadRequestMeta>,
session: SessionContext,
event: string,
properties: any,
}

export interface MetaPayload {
request: Request<undefined>,
session: SessionContext,
Expand All @@ -83,6 +91,7 @@ export type SQLInterfaceOptions = {
meta: (payload: MetaPayload) => unknown | Promise<unknown>,
stream: (payload: LoadPayload) => unknown | Promise<unknown>,
sqlApiLoad: (payload: SqlApiLoadPayload) => unknown | Promise<unknown>,
logLoadEvent: (payload: LogLoadEventPayload) => unknown | Promise<unknown>,
sqlGenerators: (paramsJson: string) => unknown | Promise<unknown>,
canSwitchUserForSession: (payload: CanSwitchUserPayload) => unknown | Promise<unknown>,
};
Expand Down Expand Up @@ -310,6 +319,7 @@ export const registerInterface = async (options: SQLInterfaceOptions): Promise<S
stream: wrapNativeFunctionWithStream(options.stream),
sqlApiLoad: wrapNativeFunctionWithStream(options.sqlApiLoad),
sqlGenerators: wrapRawNativeFunctionWithChannelCallback(options.sqlGenerators),
logLoadEvent: wrapRawNativeFunctionWithChannelCallback(options.logLoadEvent),
canSwitchUserForSession: wrapRawNativeFunctionWithChannelCallback(options.canSwitchUserForSession),
});
};
Expand Down
4 changes: 4 additions & 0 deletions packages/cubejs-backend-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ fn register_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transport_meta = options
.get::<JsFunction, _, _>(&mut cx, "meta")?
.root(&mut cx);
let transport_log_load_event = options
.get::<JsFunction, _, _>(&mut cx, "logLoadEvent")?
.root(&mut cx);
let transport_sql_generator = options
.get::<JsFunction, _, _>(&mut cx, "sqlGenerators")?
.root(&mut cx);
Expand Down Expand Up @@ -160,6 +163,7 @@ fn register_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
transport_sql_api_load,
transport_sql,
transport_meta,
transport_log_load_event,
transport_sql_generator,
transport_can_switch_user_for_session,
);
Expand Down
90 changes: 77 additions & 13 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use cubeclient::models::{V1Error, V1LoadRequestQuery, V1LoadResponse, V1MetaResponse};
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
use cubesql::compile::engine::df::wrapper::SqlQuery;
use cubesql::transport::{SqlGenerator, SqlResponse};
use cubesql::transport::{SpanId, SqlGenerator, SqlResponse};
use cubesql::{
di_service,
sql::AuthContextRef,
Expand All @@ -33,6 +33,7 @@ pub struct NodeBridgeTransport {
on_sql_api_load: Arc<Root<JsFunction>>,
on_sql: Arc<Root<JsFunction>>,
on_meta: Arc<Root<JsFunction>>,
log_load_event: Arc<Root<JsFunction>>,
sql_generators: Arc<Root<JsFunction>>,
can_switch_user_for_session: Arc<Root<JsFunction>>,
}
Expand All @@ -43,6 +44,7 @@ impl NodeBridgeTransport {
on_sql_api_load: Root<JsFunction>,
on_sql: Root<JsFunction>,
on_meta: Root<JsFunction>,
log_load_event: Root<JsFunction>,
sql_generators: Root<JsFunction>,
can_switch_user_for_session: Root<JsFunction>,
) -> Self {
Expand All @@ -51,6 +53,7 @@ impl NodeBridgeTransport {
on_sql_api_load: Arc::new(on_sql_api_load),
on_sql: Arc::new(on_sql),
on_meta: Arc::new(on_meta),
log_load_event: Arc::new(log_load_event),
sql_generators: Arc::new(sql_generators),
can_switch_user_for_session: Arc::new(can_switch_user_for_session),
}
Expand Down Expand Up @@ -83,6 +86,16 @@ struct LoadRequest {
#[serde(rename = "expressionParams", skip_serializing_if = "Option::is_none")]
expression_params: Option<Vec<Option<String>>>,
streaming: bool,
#[serde(rename = "queryKey", skip_serializing_if = "Option::is_none")]
query_key: Option<serde_json::Value>,
}

#[derive(Debug, Serialize)]
struct LogEvent {
request: TransportRequest,
session: SessionContext,
event: String,
properties: serde_json::Value,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -187,6 +200,7 @@ impl TransportService for NodeBridgeTransport {

async fn sql(
&self,
span_id: Option<Arc<SpanId>>,
query: V1LoadRequestQuery,
ctx: AuthContextRef,
meta: LoadRequestMeta,
Expand All @@ -198,14 +212,18 @@ impl TransportService for NodeBridgeTransport {
.downcast_ref::<NativeAuthContext>()
.expect("Unable to cast AuthContext to NativeAuthContext");

let request_id = Uuid::new_v4().to_string();
let request_id = span_id
.as_ref()
.map(|s| s.span_id.clone())
.unwrap_or_else(|| Uuid::new_v4().to_string());

let extra = serde_json::to_string(&LoadRequest {
request: TransportRequest {
id: format!("{}-span-{}", request_id, 1),
meta: Some(meta.clone()),
},
query: query.clone(),
query_key: span_id.map(|s| s.query_key.clone()),
session: SessionContext {
user: native_auth.user.clone(),
superuser: native_auth.superuser,
Expand Down Expand Up @@ -259,6 +277,7 @@ impl TransportService for NodeBridgeTransport {

async fn load(
&self,
span_id: Option<Arc<SpanId>>,
query: V1LoadRequestQuery,
sql_query: Option<SqlQuery>,
ctx: AuthContextRef,
Expand All @@ -271,16 +290,19 @@ impl TransportService for NodeBridgeTransport {
.downcast_ref::<NativeAuthContext>()
.expect("Unable to cast AuthContext to NativeAuthContext");

let request_id = Uuid::new_v4().to_string();
let mut span_counter: u32 = 1;
let request_id = span_id
.as_ref()
.map(|s| s.span_id.clone())
.unwrap_or_else(|| Uuid::new_v4().to_string());

loop {
let extra = serde_json::to_string(&LoadRequest {
request: TransportRequest {
id: format!("{}-span-{}", request_id, span_counter),
id: format!("{}-span-{}", request_id, 1),
meta: Some(meta.clone()),
},
query: query.clone(),
query_key: span_id.as_ref().map(|s| s.query_key.clone()),
session: SessionContext {
user: native_auth.user.clone(),
superuser: native_auth.superuser,
Expand Down Expand Up @@ -313,12 +335,10 @@ impl TransportService for NodeBridgeTransport {
if let Ok(res) = serde_json::from_value::<V1Error>(response) {
if res.error.to_lowercase() == *"continue wait" {
debug!(
"[transport] load - retrying request (continue wait) requestId: {}, span: {}",
request_id, span_counter
"[transport] load - retrying request (continue wait) requestId: {}",
request_id
);

span_counter += 1;

continue;
} else {
error!(
Expand All @@ -336,6 +356,7 @@ impl TransportService for NodeBridgeTransport {

async fn load_stream(
&self,
span_id: Option<Arc<SpanId>>,
query: V1LoadRequestQuery,
sql_query: Option<SqlQuery>,
ctx: AuthContextRef,
Expand All @@ -345,8 +366,10 @@ impl TransportService for NodeBridgeTransport {
) -> Result<CubeStreamReceiver, CubeError> {
trace!("[transport] Request ->");

let request_id = Uuid::new_v4().to_string();
let mut span_counter: u32 = 1;
let request_id = span_id
.as_ref()
.map(|s| s.span_id.clone())
.unwrap_or_else(|| Uuid::new_v4().to_string());
loop {
let native_auth = ctx
.as_any()
Expand All @@ -355,10 +378,11 @@ impl TransportService for NodeBridgeTransport {

let extra = serde_json::to_string(&LoadRequest {
request: TransportRequest {
id: format!("{}-span-{}", request_id, span_counter),
id: format!("{}-span-{}", request_id, 1),
meta: Some(meta.clone()),
},
query: query.clone(),
query_key: span_id.as_ref().map(|s| s.query_key.clone()),
sql_query: sql_query.clone().map(|q| (q.sql, q.values)),
session: SessionContext {
user: native_auth.user.clone(),
Expand All @@ -381,7 +405,6 @@ impl TransportService for NodeBridgeTransport {

if let Err(e) = &res {
if e.message.to_lowercase().contains("continue wait") {
span_counter += 1;
continue;
}
}
Expand Down Expand Up @@ -425,6 +448,47 @@ impl TransportService for NodeBridgeTransport {
.await?;
Ok(res)
}

async fn log_load_state(
&self,
span_id: Option<Arc<SpanId>>,
ctx: AuthContextRef,
meta_fields: LoadRequestMeta,
event: String,
properties: serde_json::Value,
) -> Result<(), CubeError> {
let native_auth = ctx
.as_any()
.downcast_ref::<NativeAuthContext>()
.expect("Unable to cast AuthContext to NativeAuthContext");

let request_id = span_id
.map(|s| s.span_id.clone())
.unwrap_or_else(|| Uuid::new_v4().to_string());
call_raw_js_with_channel_as_callback(
self.channel.clone(),
self.log_load_event.clone(),
LogEvent {
request: TransportRequest {
id: format!("{}-span-1", request_id),
meta: Some(meta_fields.clone()),
},
session: SessionContext {
user: native_auth.user.clone(),
superuser: native_auth.superuser,
security_context: native_auth.security_context.clone(),
},
event,
properties,
},
Box::new(|cx, v| match NodeObjSerializer::serialize(&v, cx) {
Ok(res) => Ok(res),
Err(e) => cx.throw_error(format!("Can't serialize to node obj: {}", e)),
}),
Box::new(move |_, _| Ok(())),
)
.await
}
}

// method to get keys to values using function from js object
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-backend-native/test/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ describe('SQLInterface', () => {
throw new Error('Please specify user');
});

const logLoadEvent = ({ event, properties }: { event: string, properties: any }) => {
console.log(`Load event: ${JSON.stringify({ type: event, ...properties })}`);
};

const instance = await native.registerInterface({
// nonce: '12345678910111213141516'.substring(0, 20),
port: 4545,
Expand All @@ -152,6 +156,7 @@ describe('SQLInterface', () => {
sql,
meta,
stream,
logLoadEvent,
sqlGenerators,
canSwitchUserForSession: (_payload) => true,
});
Expand Down
Loading

0 comments on commit 6b700cd

Please sign in to comment.