Skip to content

Commit

Permalink
Generate OTEL files during build
Browse files Browse the repository at this point in the history
  • Loading branch information
hatchan committed Jun 28, 2024
1 parent 0e40b08 commit c810a9f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 14 deletions.
5 changes: 2 additions & 3 deletions fpx/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::events::ServerEvents;
use crate::inspector::InspectorService;
use axum::extract::FromRef;
use axum::routing::{any, get};
use grpc::GrpcService;
use http::StatusCode;
use url::Url;

pub mod client;
mod errors;
mod grpc;
pub mod grpc;
pub mod handlers;
mod studio;
mod ws;
Expand Down Expand Up @@ -51,7 +50,7 @@ pub fn create_api(
inspector_service: InspectorService,
) -> axum::Router {
let api_router = api_router(base_url, events.clone(), store.clone(), inspector_service);
let grpc_service = GrpcService::new(store, events);

axum::Router::new()
.nest("/api/", api_router)
.fallback(studio::default_handler)
Expand Down
25 changes: 21 additions & 4 deletions fpx/src/api/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::data::{DbError, Store};
use crate::events::ServerEvents;
use crate::models::TraceAdded;
use protos::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceService;
use protos::opentelemetry::proto::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
Expand All @@ -9,6 +10,7 @@ pub mod protos {
tonic::include_proto!("opentelemetry_proto");
}

#[derive(Clone)]
pub struct GrpcService {
store: Store,
events: ServerEvents,
Expand All @@ -27,24 +29,39 @@ impl TraceService for GrpcService {
request: tonic::Request<ExportTraceServiceRequest>,
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
let tx = self.store.start_transaction().await?;
let trace = ();

// self.store.trace_create(&tx, trace).await?;
// let trace = ();

self.store.commit_transaction(tx).await?;

let trace_ids = request.get_ref().extract_trace_ids();

self.events.broadcast(TraceAdded::new(trace_ids).into());

let message = ExportTraceServiceResponse {
partial_success: None,
};
Ok(tonic::Response::new(message))
}
}

impl ExportTraceServiceRequest {
pub fn extract_trace_ids(&self) -> Vec<Vec<u8>> {
self.resource_spans
.iter()
.flat_map(|span| {
span.scope_spans.iter().flat_map(|scope_span| {
scope_span.spans.iter().map(|inner| inner.trace_id.clone())
})
})
.collect()
}
}

impl From<DbError> for tonic::Status {
fn from(err: DbError) -> Self {
match err {
DbError::NotFound => todo!(),
DbError::FailedDeserialize { message } => todo!(),
DbError::FailedDeserialize { .. } => todo!(),
DbError::InvalidJson(_) => todo!(),
DbError::InternalError(_) => todo!(),
}
Expand Down
27 changes: 22 additions & 5 deletions fpx/src/commands/dev.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::api;
use crate::api::grpc::protos::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer;
use crate::api::grpc::GrpcService;
use crate::api::{self};
use crate::data::migrations::migrate;
use crate::data::{DataPath, Store};
use crate::events::Events;
use crate::initialize_fpx_dir;
use anyhow::{Context, Result};
use std::future::IntoFuture;
use std::{path::PathBuf, process::exit};
use tokio::select;
use tracing::info;
use tracing::warn;

Expand Down Expand Up @@ -45,7 +49,13 @@ pub async fn handle_command(args: Args) -> Result<()> {
)
.await?;

let app = api::create_api(args.base_url.clone(), events, store, inspector_service);
let app = api::create_api(
args.base_url.clone(),
events.clone(),
store.clone(),
inspector_service,
);
let grpc_service = GrpcService::new(store, events);

let listener = tokio::net::TcpListener::bind(&args.listen_address)
.await
Expand Down Expand Up @@ -75,10 +85,17 @@ pub async fn handle_command(args: Args) -> Result<()> {
"Starting server",
);

axum::serve(listener, app)
let task1 = axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
.context("Failed to start the HTTP server")?;
.into_future();
let task2 = tonic::transport::Server::builder()
.add_service(TraceServiceServer::new(grpc_service))
.serve("127.0.0.1:4317".parse()?);

select! {
_ = task1 => {},
_ = task2 => {},
};

info!("Server shutdown gracefully");

Expand Down
24 changes: 22 additions & 2 deletions fpx/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum ServerMessageDetails {
/// A request has been captured. It contains a reference to the request id
/// and optionally a reference to the inspector id.
RequestAdded(Box<RequestAdded>),

TraceAdded(Box<TraceAdded>),
}

impl From<ServerMessageDetails> for ServerMessage {
Expand Down Expand Up @@ -137,8 +139,26 @@ impl RequestAdded {
}

impl From<RequestAdded> for ServerMessage {
fn from(request_added: RequestAdded) -> Self {
ServerMessageDetails::RequestAdded(Box::new(request_added)).into()
fn from(val: RequestAdded) -> Self {
ServerMessageDetails::RequestAdded(Box::new(val)).into()
}
}

#[derive(JsonSchema, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TraceAdded {
trace_ids: Vec<Vec<u8>>,
}

impl TraceAdded {
pub fn new(trace_ids: Vec<Vec<u8>>) -> Self {
Self { trace_ids }
}
}

impl From<TraceAdded> for ServerMessage {
fn from(val: TraceAdded) -> Self {
ServerMessageDetails::TraceAdded(Box::new(val)).into()
}
}

Expand Down

0 comments on commit c810a9f

Please sign in to comment.