From 39b1c103cd0adb85d1d7cb20f06813cf165120e1 Mon Sep 17 00:00:00 2001 From: Benno van den Berg Date: Tue, 27 Aug 2024 10:01:26 +0200 Subject: [PATCH] Allow spans to be deleted (#185) * Allow spans to be deleted Allow deleting a specific span or all spans of a trace * Implement delete fn's on D1Store * Remove irrelevant assignments --- fpx-lib/src/api.rs | 11 +- fpx-lib/src/api/handlers/spans.rs | 37 +++++++ fpx-lib/src/api/handlers/traces.rs | 32 ++++++ fpx-lib/src/data.rs | 11 ++ fpx-lib/src/data/sql.rs | 17 ++++ fpx-workers/src/data.rs | 85 +++++++++++++--- fpx/src/api/client.rs | 156 ++++++++++++++++++++--------- fpx/src/commands/client/spans.rs | 25 +++++ fpx/src/commands/client/traces.rs | 22 ++++ fpx/src/data.rs | 25 +++++ 10 files changed, 354 insertions(+), 67 deletions(-) diff --git a/fpx-lib/src/api.rs b/fpx-lib/src/api.rs index a8e99d1ef..9a635b3fd 100644 --- a/fpx-lib/src/api.rs +++ b/fpx-lib/src/api.rs @@ -54,19 +54,20 @@ impl Builder { let router = axum::Router::new() .route("/v1/traces", post(handlers::otel::trace_collector_handler)) + .route("/api/traces", get(handlers::traces::traces_list_handler)) .route( - "/api/traces/:trace_id/spans/:span_id", - get(handlers::spans::span_get_handler), + "/api/traces/:trace_id", + get(handlers::traces::traces_get_handler) + .delete(handlers::traces::traces_delete_handler), ) .route( "/api/traces/:trace_id/spans", get(handlers::spans::span_list_handler), ) .route( - "/api/traces/:trace_id", - get(handlers::traces::traces_get_handler), + "/api/traces/:trace_id/spans/:span_id", + get(handlers::spans::span_get_handler).delete(handlers::spans::span_delete_handler), ) - .route("/api/traces", get(handlers::traces::traces_list_handler)) .route( "/ts-compat/v1/traces/:trace_id/spans", get(handlers::spans::ts_compat_span_list_handler), diff --git a/fpx-lib/src/api/handlers/spans.rs b/fpx-lib/src/api/handlers/spans.rs index fa1309a69..9a309d2b3 100644 --- a/fpx-lib/src/api/handlers/spans.rs +++ b/fpx-lib/src/api/handlers/spans.rs @@ -101,3 +101,40 @@ impl From for ApiServerError { ApiServerError::CommonError(CommonError::InternalServerError) } } + +#[tracing::instrument(skip_all)] +pub async fn span_delete_handler( + State(store): State, + Path((trace_id, span_id)): Path<(String, String)>, +) -> Result> { + let tx = store.start_readonly_transaction().await?; + + hex::decode(&trace_id) + .map_err(|_| ApiServerError::ServiceError(SpanDeleteError::InvalidTraceId))?; + hex::decode(&span_id) + .map_err(|_| ApiServerError::ServiceError(SpanDeleteError::InvalidSpanId))?; + + store.span_delete(&tx, &trace_id, &span_id).await?; + + Ok(StatusCode::NO_CONTENT) +} + +#[derive(Debug, Serialize, Deserialize, Error, ApiError)] +#[serde(tag = "error", content = "details", rename_all = "camelCase")] +#[non_exhaustive] +pub enum SpanDeleteError { + #[api_error(status_code = StatusCode::BAD_REQUEST)] + #[error("Trace ID is invalid")] + InvalidTraceId, + + #[api_error(status_code = StatusCode::BAD_REQUEST)] + #[error("Trace ID is invalid")] + InvalidSpanId, +} + +impl From for ApiServerError { + fn from(err: DbError) -> Self { + error!(?err, "Failed to list spans from database"); + ApiServerError::CommonError(CommonError::InternalServerError) + } +} diff --git a/fpx-lib/src/api/handlers/traces.rs b/fpx-lib/src/api/handlers/traces.rs index b71b59dba..26832b700 100644 --- a/fpx-lib/src/api/handlers/traces.rs +++ b/fpx-lib/src/api/handlers/traces.rs @@ -100,3 +100,35 @@ impl From for ApiServerError { ApiServerError::CommonError(CommonError::InternalServerError) } } + +#[tracing::instrument(skip_all)] +pub async fn traces_delete_handler( + State(store): State, + Path(trace_id): Path, +) -> Result> { + let tx = store.start_readonly_transaction().await?; + + hex::decode(&trace_id) + .map_err(|_| ApiServerError::ServiceError(TraceDeleteError::InvalidTraceId))?; + + // Retrieve all the spans that are associated with the trace + store.span_delete_by_trace(&tx, &trace_id).await?; + + Ok(StatusCode::NO_CONTENT) +} + +#[derive(Debug, Serialize, Deserialize, Error, ApiError)] +#[serde(tag = "error", content = "details", rename_all = "camelCase")] +#[non_exhaustive] +pub enum TraceDeleteError { + #[api_error(status_code = StatusCode::BAD_REQUEST)] + #[error("Trace ID is invalid")] + InvalidTraceId, +} + +impl From for ApiServerError { + fn from(err: DbError) -> Self { + error!(?err, "Failed to list trace from database"); + ApiServerError::CommonError(CommonError::InternalServerError) + } +} diff --git a/fpx-lib/src/data.rs b/fpx-lib/src/data.rs index 767750395..ff64e16e5 100644 --- a/fpx-lib/src/data.rs +++ b/fpx-lib/src/data.rs @@ -69,4 +69,15 @@ pub trait Store: Send + Sync { tx: &Transaction, // Future improvement could hold sort fields, limits, etc ) -> Result>; + + /// Delete all spans with a specific trace_id. + async fn span_delete_by_trace(&self, tx: &Transaction, trace_id: &str) -> Result>; + + /// Delete a single span. + async fn span_delete( + &self, + tx: &Transaction, + trace_id: &str, + span_id: &str, + ) -> Result>; } diff --git a/fpx-lib/src/data/sql.rs b/fpx-lib/src/data/sql.rs index f10158cbd..f94d70996 100644 --- a/fpx-lib/src/data/sql.rs +++ b/fpx-lib/src/data/sql.rs @@ -86,4 +86,21 @@ impl SqlBuilder { ", ) } + + /// Delete all spans with a specific trace_id. + /// + /// Query parameters: + /// - $1: trace_id + pub fn span_delete_by_trace(&self) -> String { + String::from("DELETE FROM spans WHERE trace_id=$1") + } + + /// Delete a specific span. + /// + /// Query parameters: + /// - $1: trace_id + /// - $2: span_id + pub fn span_delete(&self) -> String { + String::from("DELETE FROM spans WHERE trace_id=$1 AND span_id=$2") + } } diff --git a/fpx-workers/src/data.rs b/fpx-workers/src/data.rs index c2708760e..52dfa289a 100644 --- a/fpx-workers/src/data.rs +++ b/fpx-workers/src/data.rs @@ -5,7 +5,7 @@ use serde::Deserialize; use std::sync::Arc; use wasm_bindgen::JsValue; use worker::send::SendFuture; -use worker::D1Database; +use worker::{D1Database, D1ResultMeta}; pub struct D1Store { database: Arc, @@ -24,17 +24,17 @@ impl D1Store { where T: for<'a> Deserialize<'a>, { - let prepared_statement = self.database.prepare(query); - - let result = prepared_statement + let prepared_statement = self + .database + .prepare(query) .bind(values) - .map_err(|err| DbError::InternalError(err.to_string()))?; // TODO: Correct error + .map_err(|err| DbError::InternalError(err.to_string()))?; // TODO: Correct error; - let result = result + let result = prepared_statement .first(None) .await - .map_err(|err| DbError::InternalError(err.to_string()))? - .ok_or(DbError::NotFound)?; // TODO: Correct error + .map_err(|err| DbError::InternalError(err.to_string()))? // TODO: Correct error; + .ok_or(DbError::NotFound)?; Ok(result) } @@ -43,13 +43,13 @@ impl D1Store { where T: for<'a> Deserialize<'a>, { - let prepared_statement = self.database.prepare(query); - - let result = prepared_statement + let prepared_statement = self + .database + .prepare(query) .bind(values) - .map_err(|err| DbError::InternalError(err.to_string()))?; // TODO: Correct error + .map_err(|err| DbError::InternalError(err.to_string()))?; // TODO: Correct error; - let result = result + let result = prepared_statement .all() .await .map_err(|err| DbError::InternalError(err.to_string()))? // TODO: Correct error @@ -156,4 +156,63 @@ impl Store for D1Store { }) .await } + + /// Delete all spans with a specific trace_id. + async fn span_delete_by_trace(&self, _tx: &Transaction, trace_id: &str) -> Result> { + SendFuture::new(async { + let prepared_statement = self + .database + .prepare(self.sql_builder.span_delete_by_trace()) + .bind(&[trace_id.into()]) + .map_err(|err| DbError::InternalError(err.to_string()))?; + + let results = prepared_statement + .run() + .await + .map_err(|err| DbError::InternalError(err.to_string()))?; + + if let Ok(Some(D1ResultMeta { + rows_written: Some(rows_written), + .. + })) = results.meta() + { + Ok(Some(rows_written as u64)) + } else { + Ok(None) + } + }) + .await + } + + /// Delete a single span. + async fn span_delete( + &self, + _tx: &Transaction, + trace_id: &str, + span_id: &str, + ) -> Result> { + SendFuture::new(async { + let prepared_statement = self + .database + .prepare(self.sql_builder.span_delete()) + .bind(&[trace_id.into(), span_id.into()]) + .map_err(|err| DbError::InternalError(err.to_string()))?; + + let results = prepared_statement + .run() + .await + .map_err(|err| DbError::InternalError(err.to_string()))?; + + if let Ok(Some(D1ResultMeta { + rows_written: Some(rows_written), + .. + })) = results.meta() + { + Ok(Some(rows_written as u64)) + } else { + Ok(None) + } + }) + .await + } } diff --git a/fpx/src/api/client.rs b/fpx/src/api/client.rs index ff82f8707..f91f2d98a 100644 --- a/fpx/src/api/client.rs +++ b/fpx/src/api/client.rs @@ -10,10 +10,11 @@ use fpx_lib::api::handlers::spans::SpanGetError; use fpx_lib::api::handlers::traces::TraceGetError; use fpx_lib::api::models; use fpx_lib::otel::HeaderMapInjector; -use http::{HeaderMap, Method}; +use http::{HeaderMap, Method, StatusCode}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator; use std::error::Error; +use std::future::Future; use tracing::trace; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; @@ -49,27 +50,20 @@ impl ApiClient { /// fails it will consider the call as failed and will try to parse the body /// as [`E`]. Any other error will use the relevant variant in /// [`ApiClientError`]. - #[tracing::instrument(skip_all, fields(otel.kind="Client",otel.status_code, otel.status_message))] - async fn do_req( + #[tracing::instrument(skip_all, fields(otel.kind="Client", otel.status_code, otel.status_message))] + async fn do_req( &self, method: Method, path: impl AsRef, - body: Option, + response_parser: impl FnOnce(reqwest::Response) -> P, ) -> Result> where - T: serde::de::DeserializeOwned, - E: serde::de::DeserializeOwned + Error, - B: serde::ser::Serialize, + E: Error, + P: Future>>, { let u = self.base_url.join(path.as_ref())?; - let mut req = self.client.request(method, u); - - if let Some(body) = body { - let json = serde_json::to_string(&body).unwrap(); - - req = req.header("Content-Type", "application/json").body(json); - } + let req = self.client.request(method, u); // Take the current otel context, and inject those details into the // Request using the TraceContext format. @@ -84,40 +78,36 @@ impl ApiClient { req.headers(headers) }; - // TODO: Create new otel span (SpanKind::Client) and add relevant client - // attributes to it. - - // Make request + // Send the request let response = req.send().await?; - // Copy the status code here in case we are unable to parse the response as - // the Ok or Err variant. - let status_code = response.status(); - - // Read the entire response into a local buffer. - let body = response.bytes().await?; - - // TODO: Mark the span status as Err if we are unable to parse the - // response. - - // Try to parse the result as T. - match serde_json::from_slice::(&body) { - Ok(result) => { - tracing::Span::current().record("otel.status_code", "Ok"); - Ok(result) - } - Err(err) => { - trace!( - ?status_code, - ?err, - "Failed to parse response as expected type" - ); - let err = ApiClientError::from_response(status_code, body); - tracing::Span::current().record("otel.status_code", "Err"); - tracing::Span::current().record("otel.status_message", err.to_string()); - Err(err) - } + // TODO: Retrieve all kinds of interesting details of the response and + // set them in the OTEL trace: + // - http.request.method + // - server.address + // - server.port + // - url.full + // - user_agent.original + // - url.scheme + // - url.template + // - http.response.status_code + // # Not sure if this is possible since we need to specify all fields upfront: + // - http.request.header. + // - http.response.header. + + // Parse the response according to the response_parser. + let result = response_parser(response).await; + + // Set the status_code and status_message in the current OTEL span, + // according to the result of the response_parser. + if let Err(ref err) = &result { + tracing::Span::current().record("otel.status_code", "Err"); + tracing::Span::current().record("otel.status_message", err.to_string()); + } else { + tracing::Span::current().record("otel.status_code", "Ok"); } + + result } /// Retrieve the details of a single span. @@ -132,7 +122,7 @@ impl ApiClient { span_id.as_ref() ); - self.do_req(Method::GET, path, None::<()>).await + self.do_req(Method::GET, path, api_result).await } /// Retrieve all the spans associated with a single trace. @@ -142,7 +132,7 @@ impl ApiClient { ) -> Result, ApiClientError> { let path = format!("api/traces/{}/spans", trace_id.as_ref()); - self.do_req(Method::GET, path, None::<()>).await + self.do_req(Method::GET, path, api_result).await } /// Retrieve a summary of a single trace. @@ -152,7 +142,7 @@ impl ApiClient { ) -> Result> { let path = format!("api/traces/{}", trace_id.as_ref()); - self.do_req(Method::GET, path, None::<()>).await + self.do_req(Method::GET, path, api_result).await } /// List a summary traces @@ -161,6 +151,74 @@ impl ApiClient { ) -> Result, ApiClientError> { let path = "api/traces"; - self.do_req(Method::GET, path, None::<()>).await + self.do_req(Method::GET, path, api_result).await + } + + /// List a summary traces + pub async fn trace_delete( + &self, + trace_id: impl AsRef, + ) -> Result<(), ApiClientError> { + let path = format!("api/traces/{}", trace_id.as_ref()); + + self.do_req(Method::DELETE, path, no_body).await + } + + pub async fn span_delete( + &self, + trace_id: impl AsRef, + span_id: impl AsRef, + ) -> Result<(), ApiClientError> { + let path = format!( + "api/traces/{}/spans/{}", + trace_id.as_ref(), + span_id.as_ref() + ); + + self.do_req(Method::DELETE, path, no_body).await + } +} + +/// Check whether the response is a 204 No Content response, if it is return +/// Ok(()). Otherwise try to parse the response as a ApiError. +async fn no_body(response: reqwest::Response) -> Result<(), ApiClientError> +where + E: serde::de::DeserializeOwned + Error, +{ + if response.status() == StatusCode::NO_CONTENT { + return Ok(()); + } + + Err(ApiClientError::from_response( + response.status(), + response.bytes().await?, + )) +} + +/// Try to parse the result as T. If that fails, try to parse the result as a +/// ApiError. +async fn api_result(response: reqwest::Response) -> Result> +where + T: serde::de::DeserializeOwned, + E: serde::de::DeserializeOwned + Error, +{ + // Copy the status code here in case we are unable to parse the response as + // the Ok or Err variant. + let status_code = response.status(); + + // Read the entire response into a local buffer. + let body = response.bytes().await?; + + // Try to parse the result as T. + match serde_json::from_slice::(&body) { + Ok(result) => Ok(result), + Err(err) => { + trace!( + ?status_code, + ?err, + "Failed to parse response as expected type" + ); + Err(ApiClientError::from_response(status_code, body)) + } } } diff --git a/fpx/src/commands/client/spans.rs b/fpx/src/commands/client/spans.rs index fa0f6828d..f1018fa0d 100644 --- a/fpx/src/commands/client/spans.rs +++ b/fpx/src/commands/client/spans.rs @@ -17,12 +17,16 @@ pub enum Command { /// List all spans for a single trace List(ListArgs), + + /// Delete a single span + Delete(DeleteArgs), } pub async fn handle_command(args: Args) -> Result<()> { match args.command { Command::Get(args) => handle_get(args).await, Command::List(args) => handle_list(args).await, + Command::Delete(args) => handle_delete(args).await, } } @@ -68,3 +72,24 @@ async fn handle_list(args: ListArgs) -> Result<()> { Ok(()) } + +#[derive(clap::Args, Debug)] +pub struct DeleteArgs { + /// TraceID - hex encoded + pub trace_id: String, + + /// SpanID - hex encoded + pub span_id: String, + + /// Base url of the fpx dev server. + #[arg(from_global)] + pub base_url: Url, +} + +async fn handle_delete(args: DeleteArgs) -> Result<()> { + let api_client = ApiClient::new(args.base_url.clone()); + + api_client.span_delete(args.trace_id, args.span_id).await?; + + Ok(()) +} diff --git a/fpx/src/commands/client/traces.rs b/fpx/src/commands/client/traces.rs index ef3b6d493..61222d77b 100644 --- a/fpx/src/commands/client/traces.rs +++ b/fpx/src/commands/client/traces.rs @@ -17,12 +17,16 @@ pub enum Command { /// List all traces List(ListArgs), + + /// Delete all spans for a single trace + Delete(DeleteArgs), } pub async fn handle_command(args: Args) -> Result<()> { match args.command { Command::Get(args) => handle_get(args).await, Command::List(args) => handle_list(args).await, + Command::Delete(args) => handle_delete(args).await, } } @@ -62,3 +66,21 @@ async fn handle_list(args: ListArgs) -> Result<()> { Ok(()) } + +#[derive(clap::Args, Debug)] +pub struct DeleteArgs { + /// TraceID - hex encoded + pub trace_id: String, + + /// Base url of the fpx dev server. + #[arg(from_global)] + pub base_url: Url, +} + +async fn handle_delete(args: DeleteArgs) -> Result<()> { + let api_client = ApiClient::new(args.base_url.clone()); + + api_client.trace_delete(args.trace_id).await?; + + Ok(()) +} diff --git a/fpx/src/data.rs b/fpx/src/data.rs index bae6c8749..d114abc69 100644 --- a/fpx/src/data.rs +++ b/fpx/src/data.rs @@ -180,4 +180,29 @@ impl Store for LibsqlStore { Ok(traces) } + + /// Delete all spans with a specific trace_id. + async fn span_delete_by_trace(&self, _tx: &Transaction, trace_id: &str) -> Result> { + let rows_affected = self + .connection + .execute(&self.sql_builder.span_delete_by_trace(), params!(trace_id)) + .await?; + + Ok(Some(rows_affected)) + } + + /// Delete a single span. + async fn span_delete( + &self, + _tx: &Transaction, + trace_id: &str, + span_id: &str, + ) -> Result> { + let rows_affected = self + .connection + .execute(&self.sql_builder.span_delete(), params!(trace_id, span_id)) + .await?; + + Ok(Some(rows_affected)) + } }