Skip to content

Commit

Permalink
Add decode as json or proto to otel http collector
Browse files Browse the repository at this point in the history
  • Loading branch information
hatchan committed Jul 15, 2024
1 parent 7ff3d15 commit 5520bb8
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fpx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tracing = { version = "0.1" }
tracing-opentelemetry = { version = "0.24" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = { version = "2.5" }
prost = { version = "0.12" }


[dev-dependencies]
Expand Down
68 changes: 64 additions & 4 deletions fpx/src/api/handlers/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ use crate::data::models::Span;
use crate::data::Store;
use crate::events::ServerEvents;
use crate::models::SpanAdded;
use async_trait::async_trait;
use axum::extract::State;
use axum::response::IntoResponse;
use axum::Json;
use axum::extract::{FromRequest, Request};
use axum::response::{IntoResponse, Response};
use axum::{Json, RequestExt};
use bytes::Bytes;
use http::header::CONTENT_TYPE;
use http::HeaderMap;
use http::StatusCode;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use prost::Message;
use tracing::error;

#[tracing::instrument(skip_all)]
pub async fn trace_collector_handler(
State(store): State<Store>,
State(events): State<ServerEvents>,
Json(payload): Json<ExportTraceServiceRequest>,
headers: HeaderMap,
JsonOrProtobuf(payload): JsonOrProtobuf<ExportTraceServiceRequest>,
) -> impl IntoResponse {
let trace_ids = extract_trace_ids(&payload);

Expand All @@ -33,5 +42,56 @@ pub async fn trace_collector_handler(
partial_success: None,
};

Json(message)
let content_type = headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or_else(|| "");

match content_type {
"application/json" => Json(message).into_response(),
"application/x-protobuf" => {
let mut buf = bytes::BytesMut::new();
message.encode(&mut buf).expect("TODO");
buf.into_response()
}
content_type => {
error!("unsupported content type: {}", content_type);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}

pub struct JsonOrProtobuf<T>(T);

#[async_trait]
impl<T, S> FromRequest<S> for JsonOrProtobuf<T>
where
S: Send + Sync,
Json<T>: FromRequest<()>,
T: Message + Default,
T: 'static,
{
type Rejection = Response;

async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let content_type_header = req.headers().get(CONTENT_TYPE);
let content_type = content_type_header.and_then(|value| value.to_str().ok());

if let Some(content_type) = content_type {
if content_type.starts_with("application/json") {
let Json(payload) = req.extract().await.map_err(IntoResponse::into_response)?;
return Ok(Self(payload));
}

if content_type.starts_with("application/x-protobuf") {
let bytes = Bytes::from_request(req, state)
.await
.map_err(IntoResponse::into_response)?;
let payload: T = T::decode(bytes).expect("TODO");
return Ok(Self(payload));
}
}

Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
}
}
2 changes: 1 addition & 1 deletion scripts/otel_collector/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ service:
pipelines:
traces:
receivers: [otlp]
exporters: [debug, otlphttp, otlp]
exporters: [debug, otlphttp]

telemetry:
logs:
Expand Down

0 comments on commit 5520bb8

Please sign in to comment.