diff --git a/.cargo/config.toml b/.cargo/config.toml index dd0c8bc38..5426eea6f 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] -fpx = "run --package fpx --" +fpx = "run --package fpx-cli --" xtask = "run --package xtask --" diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 94eaad420..9b2feb629 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -57,12 +57,12 @@ jobs: run: cargo test - name: cargo build - run: cargo build --release + run: cargo build --release --package fpx-cli - name: Upload artifact uses: actions/upload-artifact@v4 with: name: fpx_${{ matrix.target }} - path: target/release/fpx + path: target/release/fpx-cli if-no-files-found: error retention-days: 7 diff --git a/Cargo.lock b/Cargo.lock index f6eaa5f69..bae6110c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -820,73 +820,73 @@ dependencies = [ "async-trait", "axum 0.7.9", "bytes", - "clap", - "fpx-lib", "fpx-macros", "futures-util", "hex", "http 1.2.0", "http-body-util", - "include_dir", "libsql", - "once_cell", "opentelemetry", - "opentelemetry-otlp", "opentelemetry-proto", "opentelemetry_sdk", "prost 0.13.4", - "rand", "reqwest", "schemars", "serde", "serde_json", - "serde_with", "strum", - "test-log", "thiserror 2.0.11", "time", "tokio", - "tokio-tungstenite 0.21.0", - "toml", - "tonic 0.12.3", "tower 0.4.13", + "tower-http 0.5.2", "tracing", "tracing-opentelemetry", - "tracing-subscriber", "url", + "wasm-bindgen", ] [[package]] -name = "fpx-lib" +name = "fpx-cli" version = "0.1.0" dependencies = [ "anyhow", "async-trait", "axum 0.7.9", "bytes", + "clap", + "fpx", "fpx-macros", "futures-util", "hex", "http 1.2.0", "http-body-util", + "include_dir", "libsql", + "once_cell", "opentelemetry", + "opentelemetry-otlp", "opentelemetry-proto", "opentelemetry_sdk", "prost 0.13.4", + "rand", "schemars", "serde", "serde_json", + "serde_with", "strum", + "test-log", "thiserror 2.0.11", "time", "tokio", + "tokio-tungstenite 0.21.0", + "toml", + "tonic 0.12.3", "tower 0.4.13", - "tower-http 0.5.2", "tracing", "tracing-opentelemetry", + "tracing-subscriber", "url", - "wasm-bindgen", ] [[package]] @@ -905,7 +905,7 @@ version = "0.1.0" dependencies = [ "axum 0.7.9", "console_error_panic_hook", - "fpx-lib", + "fpx", "getrandom", "serde", "serde_json", @@ -3899,7 +3899,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "fpx-lib", + "fpx", "schemars", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 51c79ec76..26cec6e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,6 @@ [workspace] resolver = "2" -members = ["fpx", "fpx-lib", "fpx-workers", "fpx-macros", "xtask"] -default-members = ["fpx"] +members = ["fpx-cli", "fpx", "fpx-workers", "fpx-macros", "xtask"] [workspace.package] authors = ["Fiberplane "] diff --git a/fpx-cli/Cargo.toml b/fpx-cli/Cargo.toml new file mode 100644 index 000000000..661097bd0 --- /dev/null +++ b/fpx-cli/Cargo.toml @@ -0,0 +1,80 @@ +[package] +name = "fpx-cli" +version = { workspace = true } +edition = "2021" +authors = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[features] +embed-studio = [] # When enabled it will embed Studio from frontend/dist + +[dependencies] +anyhow = { workspace = true } +async-trait = { version = "0.1" } +axum = { workspace = true, default-features = false, features = [ + "http1", + "query", + "tokio", + "tracing", + "ws", +] } +bytes = { version = "1.6" } +clap = { workspace = true, features = ["derive", "env"] } +fpx = { version = "0.1.0", path = "../fpx", features = [ + "libsql", + "fpx_client", +] } +fpx-macros = { version = "0.1.0", path = "../fpx-macros" } +futures-util = { version = "0.3" } +hex = { version = "0.4" } +http = { version = "1.1" } +http-body-util = { version = "0.1" } +include_dir = { version = "0.7.3" } +libsql = { version = "0.6", default-features = false, features = [ + "core", + "serde", +] } +once_cell = { version = "1.19" } +opentelemetry = { version = "0.27" } +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.27", features = [ + "http-json", + "reqwest-client", + "reqwest-rustls-webpki-roots", +] } +opentelemetry-proto = { version = "0.27", features = [ + "gen-tonic-messages", + "with-serde", + "with-schemars", +] } +prost = { version = "0.13" } +rand = { version = "0.8.5" } +schemars = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { version = "3.8.1" } +strum = { version = "0.26", features = ["derive"] } +thiserror = { version = "2.0" } +time = { version = "0.3.17", features = ["serde-human-readable"] } +tokio = { version = "1.40", features = ["rt-multi-thread", "signal", "fs"] } +tokio-tungstenite = { version = "0.21", features = [ + "rustls-tls-webpki-roots", +] } # This should be kept the same as whatever Axum has +toml = { version = "0.8" } +tonic = { version = "0.12" } +tower = { version = "0.4" } +tracing = { version = "0.1" } +tracing-opentelemetry = { version = "0.28" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +url = { version = "2.5" } + +[target.'cfg(windows)'.dependencies] +libsql = { version = "0.6", default-features = false, features = [ + "core", + "serde", + "replication", +] } + +[dev-dependencies] +test-log = { version = "0.2", default-features = false, features = ["trace"] } diff --git a/fpx-cli/README.md b/fpx-cli/README.md new file mode 100644 index 000000000..a4bf9c203 --- /dev/null +++ b/fpx-cli/README.md @@ -0,0 +1,55 @@ +# fpx + +The fpx tool is a command line tool to launch a local HTTP or gRPC OTEL +ingestion endpoint. It also includes a CLI client to interact with some of the +Rest and web-socket endpoints. + +NOTE: Currently only a in-memory storage is supported. + +## Usage + +First, make sure you have Rust installed on your machine. You can install Rust +using [rustup](https://rustup.rs/) or take a look at the +[official instructions](https://www.rust-lang.org/tools/install). + +Then run the following command to execute the local dev server: + +``` +cargo run -- dev +``` + +See `Commands` for more information. + +## Commands + +The fpx binary is primarily used to start a local dev server, but it is also +possible to run some other commands. + +For ease of use, the `fpx` cargo alias has been added, meaning you can run +`cargo fpx` in any directory in this repository and compile and then invoke +`fpx`. + +### `fpx dev` + +Starts the local dev server. + +Use `-e` or `--enable-tracing` to send otlp payloads to `--otlp-endpoint`. Note +that this should not be used to send traces to itself, as that will create an +infinite loop. + +### `fpx client` + +Invokes endpoints on a fpx server. + +This command can also send traces to a otel endpoint. NOTE: there are some known +issues where it doesn't seem to work properly. + +Examples: + +``` +# Fetch all traces +fpx client traces list + +# Fetch a specific span +fpx client spans get aa aa +``` diff --git a/fpx/src/commands.rs b/fpx-cli/src/commands.rs similarity index 100% rename from fpx/src/commands.rs rename to fpx-cli/src/commands.rs diff --git a/fpx/src/commands/client.rs b/fpx-cli/src/commands/client.rs similarity index 100% rename from fpx/src/commands/client.rs rename to fpx-cli/src/commands/client.rs diff --git a/fpx/src/commands/client/spans.rs b/fpx-cli/src/commands/client/spans.rs similarity index 98% rename from fpx/src/commands/client/spans.rs rename to fpx-cli/src/commands/client/spans.rs index f1018fa0d..2fb2f4197 100644 --- a/fpx/src/commands/client/spans.rs +++ b/fpx-cli/src/commands/client/spans.rs @@ -1,6 +1,6 @@ -use crate::api::client::ApiClient; use anyhow::Result; use clap::Subcommand; +use fpx::api::client::ApiClient; use std::io::stdout; use url::Url; diff --git a/fpx/src/commands/client/traces.rs b/fpx-cli/src/commands/client/traces.rs similarity index 98% rename from fpx/src/commands/client/traces.rs rename to fpx-cli/src/commands/client/traces.rs index 61222d77b..51ff88617 100644 --- a/fpx/src/commands/client/traces.rs +++ b/fpx-cli/src/commands/client/traces.rs @@ -1,6 +1,6 @@ -use crate::api::client::ApiClient; use anyhow::Result; use clap::Subcommand; +use fpx::api::client::ApiClient; use std::io::stdout; use url::Url; diff --git a/fpx/src/commands/debug.rs b/fpx-cli/src/commands/debug.rs similarity index 100% rename from fpx/src/commands/debug.rs rename to fpx-cli/src/commands/debug.rs diff --git a/fpx/src/commands/debug/ws.rs b/fpx-cli/src/commands/debug/ws.rs similarity index 97% rename from fpx/src/commands/debug/ws.rs rename to fpx-cli/src/commands/debug/ws.rs index a3cf3b985..8a1198622 100644 --- a/fpx/src/commands/debug/ws.rs +++ b/fpx-cli/src/commands/debug/ws.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use fpx_lib::api::models::FPX_WEBSOCKET_ID_HEADER; +use fpx::api::models::FPX_WEBSOCKET_ID_HEADER; use futures_util::{SinkExt, StreamExt}; use std::sync::Arc; use tokio::sync::Mutex; diff --git a/fpx/src/commands/dev.rs b/fpx-cli/src/commands/dev.rs similarity index 99% rename from fpx/src/commands/dev.rs rename to fpx-cli/src/commands/dev.rs index 2ea63c4e8..bdba55d22 100644 --- a/fpx/src/commands/dev.rs +++ b/fpx-cli/src/commands/dev.rs @@ -3,7 +3,7 @@ use crate::events::InMemoryEvents; use crate::grpc::GrpcService; use crate::initialize_fpx_dir; use anyhow::{Context, Result}; -use fpx_lib::{api, service}; +use fpx::{api, service}; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use std::future::IntoFuture; use std::path::PathBuf; diff --git a/fpx/src/commands/system.rs b/fpx-cli/src/commands/system.rs similarity index 100% rename from fpx/src/commands/system.rs rename to fpx-cli/src/commands/system.rs diff --git a/fpx/src/commands/system/database.rs b/fpx-cli/src/commands/system/database.rs similarity index 100% rename from fpx/src/commands/system/database.rs rename to fpx-cli/src/commands/system/database.rs diff --git a/fpx-cli/src/data.rs b/fpx-cli/src/data.rs new file mode 100644 index 000000000..3316bf60f --- /dev/null +++ b/fpx-cli/src/data.rs @@ -0,0 +1,221 @@ +use anyhow::Context; +use async_trait::async_trait; +use fpx::data::models::{HexEncodedId, Span}; +use fpx::data::sql::SqlBuilder; +use fpx::data::{DbError, Result, Store, Transaction}; +use libsql::{params, Builder, Connection}; +use std::fmt::Display; +use std::path::Path; +use std::sync::Arc; +use tracing::trace; +use util::RowsExt; + +mod migrations; +mod util; + +#[cfg(test)] +mod tests; + +pub enum DataPath<'a> { + InMemory, + File(&'a Path), +} + +impl<'a> DataPath<'a> { + pub fn as_path(&self) -> &'a Path { + match self { + DataPath::InMemory => Path::new(":memory:"), + DataPath::File(path) => path, + } + } +} + +impl Display for DataPath<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DataPath::InMemory => write!(f, ":memory:"), + DataPath::File(path) => f.write_fmt(format_args!("{}", path.display())), + } + } +} + +#[derive(Clone)] +pub struct LibsqlStore { + connection: Connection, + sql_builder: Arc, +} + +impl LibsqlStore { + pub async fn open(path: DataPath<'_>) -> Result { + trace!(%path, "Opening Libsql database"); + + // Not sure if we need this database object, but for now we just drop + // it. + let database = Builder::new_local(path.as_path()) + .build() + .await + .context("failed to build libSQL database object")?; + + let mut connection = database + .connect() + .context("failed to connect to libSQL database")?; + + Self::initialize_connection(&mut connection).await?; + + let sql_builder = Arc::new(SqlBuilder::new()); + + Ok(LibsqlStore { + connection, + sql_builder, + }) + } + + pub async fn in_memory() -> Result { + Self::open(DataPath::InMemory).await + } + + pub async fn file(db_path: &Path) -> Result { + Self::open(DataPath::File(db_path)).await + } + + /// This function will execute a few PRAGMA statements to set the database + /// connection. This should run before any other queries are executed. + async fn initialize_connection(connection: &mut Connection) -> Result<()> { + connection + .query( + "PRAGMA journal_mode = WAL; + PRAGMA busy_timeout = 5000; + PRAGMA cache_size = 2000; + PRAGMA foreign_keys = ON; + PRAGMA journal_size_limit = 27103364; + PRAGMA mmap_size = 134217728; + PRAGMA synchronous = NORMAL; + PRAGMA temp_store = memory;", + (), + ) + .await + .map_err(|err| DbError::InternalError(err.to_string()))?; + + Ok(()) + } +} + +#[async_trait] +impl Store for LibsqlStore { + async fn start_readonly_transaction(&self) -> Result { + Ok(Transaction {}) + } + async fn start_readwrite_transaction(&self) -> Result { + Ok(Transaction {}) + } + + async fn commit_transaction(&self, _tx: Transaction) -> Result<(), DbError> { + Ok(()) + } + async fn rollback_transaction(&self, _tx: Transaction) -> Result<(), DbError> { + Ok(()) + } + + async fn span_get( + &self, + _tx: &Transaction, + trace_id: &HexEncodedId, + span_id: &HexEncodedId, + ) -> Result { + let span = self + .connection + .query(&self.sql_builder.span_get(), (trace_id, span_id)) + .await? + .fetch_one() + .await?; + + Ok(span) + } + + async fn span_list_by_trace( + &self, + _tx: &Transaction, + trace_id: &HexEncodedId, + ) -> Result> { + let spans = self + .connection + .query(&self.sql_builder.span_list_by_trace(), params!(trace_id)) + .await? + .fetch_all() + .await?; + + Ok(spans) + } + + async fn span_create(&self, _tx: &Transaction, span: Span) -> Result { + let span = self + .connection + .query( + &self.sql_builder.span_create(), + params!( + span.trace_id, + span.span_id, + span.parent_span_id, + span.name, + span.kind, + span.start_time, + span.end_time, + span.inner, + ), + ) + .await? + .fetch_one() + .await?; + + Ok(span) + } + + /// Get a list of all the traces. (currently limited to 20, sorted by most + /// recent [`end_time`]) + /// + /// Note that a trace is a computed value, so not all properties are + /// present. To get all the data, use the [`Self::span_list_by_trace`] fn. + async fn traces_list( + &self, + _tx: &Transaction, + // Future improvement could hold sort fields, limits, etc + ) -> Result> { + let traces = self + .connection + .query(&self.sql_builder.traces_list(None), ()) + .await? + .fetch_all() + .await?; + + Ok(traces) + } + + /// Delete all spans with a specific trace_id. + async fn span_delete_by_trace( + &self, + _tx: &Transaction, + trace_id: &HexEncodedId, + ) -> Result> { + let rows_affected = self + .connection + .execute(&self.sql_builder.span_delete_by_trace(), params!(trace_id)) + .await?; + + Ok(Some(rows_affected)) + } + + /// Delete a single span. + async fn span_delete( + &self, + _tx: &Transaction, + trace_id: &HexEncodedId, + span_id: &HexEncodedId, + ) -> Result> { + let rows_affected = self + .connection + .execute(&self.sql_builder.span_delete(), params!(trace_id, span_id)) + .await?; + + Ok(Some(rows_affected)) + } +} diff --git a/fpx/src/data/migrations.rs b/fpx-cli/src/data/migrations.rs similarity index 100% rename from fpx/src/data/migrations.rs rename to fpx-cli/src/data/migrations.rs diff --git a/fpx/src/data/migrations/20240708_create_spans.sql b/fpx-cli/src/data/migrations/20240708_create_spans.sql similarity index 100% rename from fpx/src/data/migrations/20240708_create_spans.sql rename to fpx-cli/src/data/migrations/20240708_create_spans.sql diff --git a/fpx/src/data/migrations/20240723_create_responses.sql b/fpx-cli/src/data/migrations/20240723_create_responses.sql similarity index 100% rename from fpx/src/data/migrations/20240723_create_responses.sql rename to fpx-cli/src/data/migrations/20240723_create_responses.sql diff --git a/fpx/src/data/tests.rs b/fpx-cli/src/data/tests.rs similarity index 92% rename from fpx/src/data/tests.rs rename to fpx-cli/src/data/tests.rs index 32f98a4e4..91e897d12 100644 --- a/fpx/src/data/tests.rs +++ b/fpx-cli/src/data/tests.rs @@ -1,7 +1,7 @@ use crate::data::LibsqlStore; -use fpx_lib::api::models::{AttributeMap, SpanKind}; -use fpx_lib::data::models::{HexEncodedId, Span}; -use fpx_lib::data::Store; +use fpx::api::models::{AttributeMap, SpanKind}; +use fpx::data::models::{HexEncodedId, Span}; +use fpx::data::Store; use test_log::test; /// Tests creating a span and then retrieving it using the various methods. @@ -25,7 +25,7 @@ async fn span_successful() { let span_id = HexEncodedId::new("a6c0ed7c2f81e7c8").unwrap(); let now = time::OffsetDateTime::now_utc(); - let inner_span: fpx_lib::api::models::Span = fpx_lib::api::models::Span { + let inner_span: fpx::api::models::Span = fpx::api::models::Span { trace_id: trace_id.clone(), span_id: span_id.clone(), parent_span_id: None, diff --git a/fpx-cli/src/data/util.rs b/fpx-cli/src/data/util.rs new file mode 100644 index 000000000..6edb159f3 --- /dev/null +++ b/fpx-cli/src/data/util.rs @@ -0,0 +1,38 @@ +use fpx::data::{DbError, Result}; +use libsql::{de, Rows}; +use serde::de::DeserializeOwned; + +#[allow(dead_code)] +pub(crate) trait RowsExt { + /// `T` must be a `struct` + async fn fetch_one(&mut self) -> Result; + + /// `T` must be a `struct` + async fn fetch_optional(&mut self) -> Result, DbError>; + + /// `T` must be a `struct` + async fn fetch_all(&mut self) -> Result, DbError>; +} + +impl RowsExt for Rows { + async fn fetch_one(&mut self) -> Result { + self.fetch_optional().await?.ok_or(DbError::NotFound) + } + + async fn fetch_optional(&mut self) -> Result, DbError> { + match self.next().await? { + Some(row) => Ok(Some(de::from_row(&row)?)), + None => Ok(None), + } + } + + async fn fetch_all(&mut self) -> Result, DbError> { + let mut results = Vec::new(); + + while let Some(row) = self.next().await? { + results.push(de::from_row(&row)?); + } + + Ok(results) + } +} diff --git a/fpx-cli/src/events.rs b/fpx-cli/src/events.rs new file mode 100644 index 000000000..8dfde060c --- /dev/null +++ b/fpx-cli/src/events.rs @@ -0,0 +1,39 @@ +use async_trait::async_trait; +use fpx::api::models::ServerMessage; +use fpx::events::ServerEvents; +use tokio::sync::broadcast; +use tracing::trace; + +#[derive(Clone)] +pub struct InMemoryEvents { + sender: broadcast::Sender, +} + +impl InMemoryEvents { + pub fn new() -> Self { + let (sender, _) = broadcast::channel(100); + Self { sender } + } + + pub async fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} + +impl Default for InMemoryEvents { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ServerEvents for InMemoryEvents { + async fn broadcast(&self, message: ServerMessage) { + if let Err(err) = self.sender.send(message) { + // Note: this only happens when the channel is closed. Which also + // happens when there a no subscribers. So there is not need to log + // this as an warn or error. + trace!(%err, "failed to broadcast message"); + }; + } +} diff --git a/fpx/src/grpc.rs b/fpx-cli/src/grpc.rs similarity index 97% rename from fpx/src/grpc.rs rename to fpx-cli/src/grpc.rs index 753407009..4d4a16269 100644 --- a/fpx/src/grpc.rs +++ b/fpx-cli/src/grpc.rs @@ -1,4 +1,4 @@ -use fpx_lib::service::Service; +use fpx::service::Service; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, diff --git a/fpx/src/main.rs b/fpx-cli/src/main.rs similarity index 99% rename from fpx/src/main.rs rename to fpx-cli/src/main.rs index 57fb823a3..15e2fd721 100644 --- a/fpx/src/main.rs +++ b/fpx-cli/src/main.rs @@ -12,7 +12,6 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry}; -mod api; mod commands; pub mod data; pub mod events; diff --git a/fpx-lib/Cargo.toml b/fpx-lib/Cargo.toml deleted file mode 100644 index 6760b837a..000000000 --- a/fpx-lib/Cargo.toml +++ /dev/null @@ -1,61 +0,0 @@ -[package] -name = "fpx-lib" -version = { workspace = true } -edition = "2021" -authors = { workspace = true } -license = { workspace = true } -repository = { workspace = true } - -[lib] - -[dependencies] -anyhow = { version = "1.0", default-features = false } -async-trait = { version = "0.1", default-features = false } -axum = { workspace = true, default-features = false, features = [ - "json", - "matched-path", -] } -bytes = { version = "1.0", default-features = false } -fpx-macros = { version = "0.1.0", path = "../fpx-macros" } -futures-util = { version = "0.3", default-features = false } -hex = { version = "0.4", default-features = false, features = ["alloc"] } -http = { version = "1.1", default-features = false } -libsql = { version = "0.6", default-features = false, optional = true } -opentelemetry = { version = "0.27", default-features = false } -opentelemetry_sdk = { version = "0.27", default-features = false } -opentelemetry-proto = { version = "0.27", default-features = false, features = [ - "trace", - "gen-tonic-messages", - "with-serde", - "with-schemars", -] } -prost = { version = "0.13", default-features = false } -schemars = { workspace = true, default-features = false, features = ["derive"] } -serde = { version = "1.0", default-features = false, features = ["derive"] } -serde_json = { version = "1.0", default-features = false } -strum = { version = "0.26", default-features = false, features = ["derive"] } -thiserror = { version = "2.0", default-features = false } -time = { version = "0.3", default-features = false, features = [ - "serde-human-readable", -] } -tokio = { version = "1.40", default-features = false } -tower = { version = "0.4", default-features = false } -tower-http = { version = "0.5", default-features = false, features = [ - "compression-br", - "compression-gzip", - "decompression-br", - "decompression-gzip", -] } -tracing = { version = "0.1", default-features = false, features = [ - "attributes", -] } -tracing-opentelemetry = { version = "0.28", default-features = false } -url = { version = "2.5", default-features = false } -wasm-bindgen = { version = "0.2", default-features = false, optional = true } - -[dev-dependencies] -http-body-util = { version = "0.1", default-features = false } -tokio = { version = "1.40", default-features = false, features = [ - "macros", - "test-util", -] } diff --git a/fpx-lib/README.md b/fpx-lib/README.md deleted file mode 100644 index 22655b254..000000000 --- a/fpx-lib/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# fpx-lib - -This crate contains the shared types and logic that is used in both the normal -fpx binary and the fpx-workers crates. This crate contains most of the logic -that is shared between them and it also includes shared traits. The -implementations of these traits might not be included in this crate. diff --git a/fpx-lib/src/api.rs b/fpx-lib/src/api.rs deleted file mode 100644 index f17930ba5..000000000 --- a/fpx-lib/src/api.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::data::BoxedStore; -use crate::otel::OtelTraceLayer; -use crate::service::Service; -use axum::extract::FromRef; -use axum::routing::{get, post}; -use http::StatusCode; -use tower_http::compression::CompressionLayer; -use tower_http::decompression::RequestDecompressionLayer; - -pub mod errors; -pub mod handlers; -pub mod models; - -#[derive(Clone)] -pub struct ApiState { - service: Service, - store: BoxedStore, -} - -impl FromRef for BoxedStore { - fn from_ref(state: &ApiState) -> Self { - state.store.clone() - } -} - -impl FromRef for Service { - fn from_ref(state: &ApiState) -> Self { - state.service.clone() - } -} - -#[derive(Default)] -pub struct Builder { - enable_compression: bool, -} - -impl Builder { - pub fn new() -> Self { - Self::default() - } - - pub fn set_compression(mut self, compression: bool) -> Self { - self.enable_compression = compression; - self - } - - pub fn enable_compression(self) -> Self { - self.set_compression(true) - } - - /// Create a API and expose it through a axum router. - pub fn build(self, service: Service, store: BoxedStore) -> axum::Router { - let api_state = ApiState { service, store }; - - let router = axum::Router::new() - .route("/v1/traces", post(handlers::otel::trace_collector_handler)) - .route("/v1/traces", get(handlers::traces::traces_list_handler)) - .route( - "/v1/traces/:trace_id", - get(handlers::traces::traces_get_handler) - .delete(handlers::traces::traces_delete_handler), - ) - .route( - "/v1/traces/:trace_id/spans", - get(handlers::spans::span_list_handler), - ) - .route( - "/v1/traces/:trace_id/spans/:span_id", - get(handlers::spans::span_get_handler).delete(handlers::spans::span_delete_handler), - ) - .with_state(api_state) - .fallback(StatusCode::NOT_FOUND) - .layer(OtelTraceLayer::default()) - .layer(RequestDecompressionLayer::new()); - - if self.enable_compression { - router.layer(CompressionLayer::new()) - } else { - router - } - } -} diff --git a/fpx-lib/src/api/errors.rs b/fpx-lib/src/api/errors.rs deleted file mode 100644 index d84911662..000000000 --- a/fpx-lib/src/api/errors.rs +++ /dev/null @@ -1,212 +0,0 @@ -use axum::response::IntoResponse; -use bytes::Bytes; -use fpx_macros::ApiError; -use http::StatusCode; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tracing::{error, warn}; - -pub trait ApiError { - fn status_code(&self) -> StatusCode; -} - -#[derive(Debug, Error)] -pub enum ApiServerError { - /// An error occurred in the service. These errors are specific to the - /// endpoint that was called. - #[error(transparent)] - ServiceError(E), - - #[error(transparent)] - CommonError(CommonError), -} - -/// An Implementation for `()` which always returns a 500 status code. This is -/// useful if an endpoint does not have any errors, but we still require it for -/// our blanket IntoResponse impl for ApiServerError. -impl ApiError for () { - fn status_code(&self) -> StatusCode { - StatusCode::INTERNAL_SERVER_ERROR - } -} - -/// Blanket implementation for all types that implement `ApiError` and -/// `Serialize`. This should be the only implementation for `IntoResponse` that -/// we will use, since it adheres to our error handling strategy and only -/// requires implementing the `ApiError` trait (the `Serialize` trait is a -/// noop). -impl IntoResponse for ApiServerError -where - E: ApiError, - E: Serialize, -{ - fn into_response(self) -> axum::response::Response { - let result = match &self { - ApiServerError::ServiceError(err) => ( - err.status_code(), - serde_json::to_vec(err).expect("Failed to serialize ServiceError"), - ), - ApiServerError::CommonError(err) => ( - err.status_code(), - serde_json::to_vec(err).expect("Failed to serialize CommonError"), - ), - }; - - result.into_response() - } -} - -/// Implementation for any anyhow::Error to be converted to a -/// `CommonError::InternalServerError`. -impl From for ApiServerError { - fn from(err: anyhow::Error) -> Self { - warn!(?err, "An anyhow error was converted to a ApiServerError"); - ApiServerError::CommonError(CommonError::InternalServerError) - } -} - -/// Implementation for any ApiError to be converted to a -/// `ApiServerError::ServiceError`. This does not apply to _all_ types since -/// that would conflict with the impl for `anyhow::Error`. -impl From for ApiServerError -where - E: ApiError, -{ - fn from(value: E) -> Self { - ApiServerError::ServiceError(value) - } -} - -#[allow(dead_code)] -#[derive(Debug, Error)] -pub enum ApiClientError { - /// This can only occur when a invalid base URL was provided. - #[error("An invalid URL was provided: {0}")] - ParseError(#[from] url::ParseError), - - // /// An error occurred in reqwest. - // #[error("An error occurred while making the request: {0}")] - // ClientError(#[from] reqwest::Error), - /// An error returned from the service. These errors are specific to the - /// endpoint that was called. - #[error(transparent)] - ServiceError(E), - - #[error(transparent)] - CommonError(#[from] CommonError), - - /// A response was received, but we were unable to deserialize it. The - /// status code and the receive body are returned. - #[error("API returned an unknown response: Status: {0}, Body: {1:?}")] - InvalidResponse(StatusCode, Bytes), -} - -impl ApiClientError -where - E: serde::de::DeserializeOwned, -{ - /// Try to parse the result as a ServiceError or a CommonError. If both - /// fail, return the status_code and body. - pub fn from_response(status_code: StatusCode, body: Bytes) -> Self { - // Try to parse the result as a ServiceError. - if let Ok(result) = serde_json::from_slice::(&body) { - return ApiClientError::ServiceError(result); - } - - // Try to parse the result as CommonError. - if let Ok(result) = serde_json::from_slice::(&body) { - return ApiClientError::CommonError(result); - } - - // If both failed, return the status_code and the body for the user to - // debug. - ApiClientError::InvalidResponse(status_code, body) - } -} - -#[derive(Debug, Error, Serialize, Deserialize, ApiError)] -#[serde(tag = "error", content = "details", rename_all = "camelCase")] -pub enum CommonError { - #[api_error(status_code = StatusCode::INTERNAL_SERVER_ERROR)] - #[error("Internal server error")] - InternalServerError, -} - -#[cfg(test)] -mod tests { - use super::*; - use http_body_util::BodyExt; - - #[derive(Debug, Serialize, Deserialize, Error, ApiError)] - #[serde(tag = "error", content = "details", rename_all = "camelCase")] - #[non_exhaustive] - pub enum RequestGetError { - #[api_error(status_code = StatusCode::NOT_FOUND)] - #[error("Request not found")] - RequestNotFound, - - #[api_error(status_code = StatusCode::BAD_REQUEST)] - #[error("Provided ID is invalid")] - InvalidId, - } - - /// Test to convert Service Error in a ApiServerError to a ApiClientError. - #[tokio::test] - async fn api_server_error_to_api_client_error_service_error() { - let response = - ApiServerError::ServiceError(RequestGetError::RequestNotFound).into_response(); - - let (parts, body) = response.into_parts(); - let body = body - .collect() - .await - .expect("Should be able to read body") - .to_bytes(); - - let api_client_error = ApiClientError::from_response(parts.status, body); - - match api_client_error { - ApiClientError::ServiceError(err) => match err { - RequestGetError::RequestNotFound => (), - err => panic!("Unexpected service error: {:?}", err), - }, - err => panic!("Unexpected error: {:?}", err), - } - } - - /// Test to convert Common Error in a ApiServerError to a ApiClientError. - #[tokio::test] - async fn api_server_error_to_api_client_error_common_error() { - let response = - ApiServerError::CommonError::(CommonError::InternalServerError) - .into_response(); - - let (parts, body) = response.into_parts(); - let body = body - .collect() - .await - .expect("Should be able to read body") - .to_bytes(); - - let api_client_error: ApiClientError = - ApiClientError::from_response(parts.status, body); - - match api_client_error { - ApiClientError::CommonError(CommonError::InternalServerError) => (), - err => panic!("Unexpected error: {:?}", err), - } - } - - /// Test to confirm that a anyhow::Error can be converted into a - /// ApiServerError. - #[tokio::test] - async fn anyhow_error_into_api_server_error() { - let anyhow_error = anyhow::Error::msg("some random anyhow error"); - let api_server_error: ApiServerError = anyhow_error.into(); - - match api_server_error { - ApiServerError::CommonError(CommonError::InternalServerError) => (), - err => panic!("Unexpected error: {:?}", err), - }; - } -} diff --git a/fpx-lib/src/data.rs b/fpx-lib/src/data.rs deleted file mode 100644 index a399ade7e..000000000 --- a/fpx-lib/src/data.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::data::models::HexEncodedId; -use crate::events::ServerEvents; -use async_trait::async_trait; -use std::sync::Arc; -use thiserror::Error; - -pub mod models; -pub mod sql; -pub mod util; - -pub type Result = anyhow::Result; - -pub type BoxedEvents = Arc; -pub type BoxedStore = Arc; - -#[derive(Clone, Default, Debug)] -pub struct Transaction {} - -impl Transaction { - pub fn new() -> Self { - Self {} - } -} - -#[derive(Debug, Error)] -pub enum DbError { - #[error("No rows were returned")] - NotFound, - - #[error("failed to deserialize into `T`: {0}")] - FailedDeserialize(#[from] serde::de::value::Error), - - #[error("Internal error: {0}")] - InternalError(String), - - #[cfg(feature = "libsql")] - #[error("Internal database error occurred: {0}")] - LibsqlError(#[from] libsql::Error), -} - -#[async_trait] -pub trait Store: Send + Sync { - async fn start_readonly_transaction(&self) -> Result; - async fn start_readwrite_transaction(&self) -> Result; - - async fn commit_transaction(&self, tx: Transaction) -> Result<(), DbError>; - async fn rollback_transaction(&self, tx: Transaction) -> Result<(), DbError>; - - async fn span_get( - &self, - tx: &Transaction, - trace_id: &HexEncodedId, - span_id: &HexEncodedId, - ) -> Result; - - async fn span_list_by_trace( - &self, - tx: &Transaction, - trace_id: &HexEncodedId, - ) -> Result>; - - async fn span_create(&self, tx: &Transaction, span: models::Span) -> Result; - - /// Get a list of all the traces. - /// - /// Note that a trace is a computed value, so not all properties are - /// present. To get all the data, use the [`Self::span_list_by_trace`] fn. - async fn traces_list( - &self, - tx: &Transaction, - // Future improvement could hold sort fields, limits, etc - ) -> Result>; - - /// Delete all spans with a specific trace_id. - async fn span_delete_by_trace( - &self, - tx: &Transaction, - trace_id: &HexEncodedId, - ) -> Result>; - - /// Delete a single span. - async fn span_delete( - &self, - tx: &Transaction, - trace_id: &HexEncodedId, - span_id: &HexEncodedId, - ) -> Result>; -} diff --git a/fpx-lib/src/data/util.rs b/fpx-lib/src/data/util.rs deleted file mode 100644 index aa4d24c34..000000000 --- a/fpx-lib/src/data/util.rs +++ /dev/null @@ -1,299 +0,0 @@ -use anyhow::Result; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Deserializer, Serialize}; -use std::ops::{Deref, DerefMut}; -use time::OffsetDateTime; - -/// This is a wrapper around `T` that will deserialize from JSON. -/// -/// This is intended to be used with properties of a struct that will be -/// deserialized from a libsql [`Row`]. Since there is a limited number of -/// values available we need to serialize the data in a Database column as JSON -/// so that we can still use complicated data structures, such as arrays and -/// maps. -#[derive(Debug)] -pub struct Json(pub T); - -impl Clone for Json -where - T: Clone, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl PartialEq for Json -where - T: PartialEq, -{ - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -impl Json { - pub fn into_inner(self) -> T { - self.0 - } -} - -impl Deref for Json { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for Json { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl AsRef for Json { - fn as_ref(&self) -> &T { - &self.0 - } -} - -impl AsMut for Json { - fn as_mut(&mut self) -> &mut T { - &mut self.0 - } -} - -impl Default for Json -where - T: Default, -{ - fn default() -> Self { - Self(T::default()) - } -} - -impl Serialize for Json -where - T: Serialize, -{ - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let value = serde_json::to_string(&self.0).map_err(serde::ser::Error::custom)?; - - value.serialize(serializer) - } -} - -impl<'de, T> Deserialize<'de> for Json -where - T: DeserializeOwned, -{ - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let string: String = Deserialize::deserialize(deserializer)?; - let json: T = serde_json::from_str(&string).map_err(serde::de::Error::custom)?; - - Ok(Json(json)) - } -} - -#[cfg(feature = "wasm-bindgen")] -impl From> for wasm_bindgen::JsValue -where - T: Serialize, -{ - fn from(value: Json) -> Self { - let value = serde_json::to_string(&value.0).expect("failed to serialize Json"); - wasm_bindgen::JsValue::from_str(&value) - } -} - -#[cfg(feature = "libsql")] -impl From> for libsql::Value -where - T: Serialize, -{ - fn from(value: Json) -> Self { - let value = serde_json::to_string(&value.0).expect("failed to serialize Json"); - libsql::Value::Text(value) - } -} - -/// This is a wrapper that makes it a bit easier to work with a timestamp that -/// is serialized as a `f64`. This should only be used in the database layer. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Timestamp(pub time::OffsetDateTime); - -impl Timestamp { - pub fn now() -> Self { - Self(OffsetDateTime::now_utc()) - } - - pub fn unix_nanos(&self) -> i128 { - self.0.unix_timestamp_nanos() - } - - pub fn fractional(&self) -> f64 { - Self::nanos_to_fractional(self.unix_nanos()) - } - - fn nanos_to_fractional(t: i128) -> f64 { - let t = t as f64; - t / 1_000_000_000_f64 - } - - fn fractional_to_nanos(t: f64) -> i128 { - (t * 1_000_000_000_f64) as i128 - } -} - -impl TryFrom for Timestamp { - type Error = anyhow::Error; - - fn try_from(timestamp: f64) -> std::result::Result { - let nanos = Timestamp::fractional_to_nanos(timestamp); - // Note: This will fail if a really big date is passed in. This won't - // happen for a while, though it could be used maliciously. - let datetime = - OffsetDateTime::from_unix_timestamp_nanos(nanos).map_err(|err| anyhow::anyhow!(err))?; - - Ok(Self(datetime)) - } -} - -impl From for time::OffsetDateTime { - fn from(timestamp: Timestamp) -> Self { - timestamp.0 - } -} - -impl From for Timestamp { - fn from(timestamp: time::OffsetDateTime) -> Self { - Self(timestamp) - } -} - -#[cfg(feature = "wasm-bindgen")] -impl From for wasm_bindgen::JsValue { - fn from(value: Timestamp) -> Self { - wasm_bindgen::JsValue::from_f64(value.fractional()) - } -} - -#[cfg(feature = "libsql")] -impl From for libsql::Value { - fn from(timestamp: Timestamp) -> Self { - libsql::Value::Real(timestamp.fractional()) - } -} - -impl Serialize for Timestamp { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let timestamp = self.fractional(); - timestamp.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for Timestamp { - fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { - let timestamp: f64 = Deserialize::deserialize(deserializer)?; - Timestamp::try_from(timestamp).map_err(serde::de::Error::custom) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use time::format_description::well_known::Rfc3339; - - /// Simple struct that has all the necessary traits to be serialized and - /// deserialize. - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] - struct Sample { - name: String, - age: u32, - } - - #[test] - fn json_serialize_deserialize() { - let sample_1 = Sample { - name: "John Doe".to_string(), - age: 42, - }; - - let json = Json(sample_1.clone()); - - // Serialize and deserialize the JSON wrapper. - let serialized = serde_json::to_string(&json).expect("Unable to serialize"); - let deserialized: Json = - serde_json::from_str(&serialized).expect("Unable to deserialize"); - - // Get the new sample from the deserialized value. - let sample_2 = deserialized.into_inner(); - - // Verify that the deserialized value is the same as the original. - assert_eq!(sample_1, sample_2); - } - - #[test] - fn timestamp_serialize_deserialize() { - let timestamp_1 = - time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); - let timestamp_1 = Timestamp(timestamp_1); - - // Serialize and deserialize the JSON wrapper. - let serialized = serde_json::to_string(×tamp_1).expect("Unable to serialize"); - let timestamp_2: Timestamp = - serde_json::from_str(&serialized).expect("Unable to deserialize"); - - // Note: Given that we're working with floats, we can't really compare - // the two timestamps, so we will just ignore the sub-sec precision. - assert_eq!( - timestamp_1.0.replace_nanosecond(0).unwrap(), - timestamp_2.0.replace_nanosecond(0).unwrap() - ) - } - - #[cfg(target_family = "wasm")] - #[cfg(feature = "wasm-bindgen")] - #[test] - fn timestamp_serialize_wasm_bindgen() { - let timestamp_1 = - time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); - let timestamp_1 = Timestamp(timestamp_1); - let timestamp_1_fractional = timestamp_1.fractional(); - - let js_value: wasm_bindgen::JsValue = timestamp_1.into(); - let js_value_fractional = js_value.as_f64().unwrap(); - - assert_eq!(timestamp_1_fractional, js_value_fractional) - } - - #[cfg(feature = "libsql")] - #[test] - fn timestamp_serialize_libsql() { - let timestamp_1 = - time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); - let timestamp_1 = Timestamp(timestamp_1); - let timestamp_1_fractional = timestamp_1.fractional(); - - let libsql_value: libsql::Value = timestamp_1.into(); - match libsql_value { - libsql::Value::Real(value) => { - assert_eq!(timestamp_1_fractional, value) - } - _ => panic!("Expected libsql::Value::Real"), - } - } -} diff --git a/fpx-lib/src/events.rs b/fpx-lib/src/events.rs deleted file mode 100644 index 2904b0ed0..000000000 --- a/fpx-lib/src/events.rs +++ /dev/null @@ -1,7 +0,0 @@ -use crate::api::models::ServerMessage; -use axum::async_trait; - -#[async_trait] -pub trait ServerEvents: Sync + Send { - async fn broadcast(&self, msg: ServerMessage); -} diff --git a/fpx-workers/Cargo.toml b/fpx-workers/Cargo.toml index 5dd0b9908..c9891486b 100644 --- a/fpx-workers/Cargo.toml +++ b/fpx-workers/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] [dependencies] axum = { workspace = true, default-features = false } console_error_panic_hook = { version = "0.1" } -fpx-lib = { version = "0.1.0", path = "../fpx-lib", features = [ +fpx = { version = "0.1.0", path = "../fpx", features = [ "wasm-bindgen", ] } getrandom = { version = "0.2", features = ["js"] } # Required for its feature diff --git a/fpx-workers/src/data.rs b/fpx-workers/src/data.rs index 33949d6c4..afc897ecc 100644 --- a/fpx-workers/src/data.rs +++ b/fpx-workers/src/data.rs @@ -1,7 +1,7 @@ use axum::async_trait; -use fpx_lib::data::models::HexEncodedId; -use fpx_lib::data::sql::SqlBuilder; -use fpx_lib::data::{models, DbError, Result, Store, Transaction}; +use fpx::data::models::HexEncodedId; +use fpx::data::sql::SqlBuilder; +use fpx::data::{models, DbError, Result, Store, Transaction}; use serde::Deserialize; use std::sync::Arc; use wasm_bindgen::JsValue; diff --git a/fpx-workers/src/lib.rs b/fpx-workers/src/lib.rs index 73e50c886..d94847f6d 100644 --- a/fpx-workers/src/lib.rs +++ b/fpx-workers/src/lib.rs @@ -1,9 +1,9 @@ use axum::async_trait; use axum::routing::get; use data::D1Store; -use fpx_lib::api::models::ServerMessage; -use fpx_lib::events::ServerEvents; -use fpx_lib::{api, service}; +use fpx::api::models::ServerMessage; +use fpx::events::ServerEvents; +use fpx::{api, service}; use middleware::auth::auth_middleware; use std::sync::Arc; use tower_service::Service; diff --git a/fpx-workers/src/ws/client.rs b/fpx-workers/src/ws/client.rs index 9c8987660..a63d2a828 100644 --- a/fpx-workers/src/ws/client.rs +++ b/fpx-workers/src/ws/client.rs @@ -1,4 +1,4 @@ -use fpx_lib::api::models::ServerMessage; +use fpx::api::models::ServerMessage; use std::sync::Arc; use wasm_bindgen_futures::wasm_bindgen::JsValue; use worker::*; diff --git a/fpx-workers/src/ws/worker.rs b/fpx-workers/src/ws/worker.rs index 4a5fd0f62..bc4fb1b81 100644 --- a/fpx-workers/src/ws/worker.rs +++ b/fpx-workers/src/ws/worker.rs @@ -1,4 +1,4 @@ -use fpx_lib::api::models::ServerMessage; +use fpx::api::models::ServerMessage; use tracing::error; use worker::*; diff --git a/fpx-workers/wrangler.toml b/fpx-workers/wrangler.toml index 5923a97fa..e2a2ee024 100644 --- a/fpx-workers/wrangler.toml +++ b/fpx-workers/wrangler.toml @@ -17,7 +17,7 @@ watch_dir = [ "src", "migrations", "Cargo.toml", - "../fpx-lib", + "../fpx", "../fpx-macros", "../Cargo.toml", "../Cargo.lock", diff --git a/fpx-lib/.gitignore b/fpx/.gitignore similarity index 100% rename from fpx-lib/.gitignore rename to fpx/.gitignore diff --git a/fpx/Cargo.toml b/fpx/Cargo.toml index 016d47706..5b9e37948 100644 --- a/fpx/Cargo.toml +++ b/fpx/Cargo.toml @@ -6,78 +6,65 @@ authors = { workspace = true } license = { workspace = true } repository = { workspace = true } +[lib] + [features] -embed-studio = [] # When enabled it will embed Studio from frontend/dist +fpx_client = ["dep:reqwest"] [dependencies] -anyhow = { workspace = true } -async-trait = { version = "0.1" } +anyhow = { version = "1.0", default-features = false } +async-trait = { version = "0.1", default-features = false } axum = { workspace = true, default-features = false, features = [ - "http1", - "query", - "tokio", - "tracing", - "ws", + "json", + "matched-path", ] } -bytes = { version = "1.6" } -clap = { workspace = true, features = ["derive", "env"] } -fpx-lib = { version = "0.1.0", path = "../fpx-lib", features = ["libsql"] } +bytes = { version = "1.0", default-features = false } fpx-macros = { version = "0.1.0", path = "../fpx-macros" } -futures-util = { version = "0.3" } -hex = { version = "0.4" } -http = { version = "1.1" } -http-body-util = { version = "0.1" } -include_dir = { version = "0.7.3" } -libsql = { version = "0.6", default-features = false, features = [ - "core", - "serde", -] } -once_cell = { version = "1.19" } -opentelemetry = { version = "0.27" } -opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.27", features = [ - "http-json", - "reqwest-client", - "reqwest-rustls-webpki-roots", -] } -opentelemetry-proto = { version = "0.27", features = [ +futures-util = { version = "0.3", default-features = false } +hex = { version = "0.4", default-features = false, features = ["alloc"] } +http = { version = "1.1", default-features = false } +libsql = { version = "0.6", default-features = false, optional = true } +opentelemetry = { version = "0.27", default-features = false } +opentelemetry_sdk = { version = "0.27", default-features = false } +opentelemetry-proto = { version = "0.27", default-features = false, features = [ + "trace", "gen-tonic-messages", "with-serde", "with-schemars", ] } -prost = { version = "0.13" } -rand = { version = "0.8.5" } -reqwest = { version = "0.12", default-features = false, features = [ +prost = { version = "0.13", default-features = false } +reqwest = { version = "0.12", optional = true, default-features = false, features = [ "charset", "http2", "rustls-tls", "json", ] } -schemars = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -serde_with = { version = "3.8.1" } -strum = { version = "0.26", features = ["derive"] } -thiserror = { version = "2.0" } -time = { version = "0.3.17", features = ["serde-human-readable"] } -tokio = { version = "1.40", features = ["rt-multi-thread", "signal", "fs"] } -tokio-tungstenite = { version = "0.21", features = [ - "rustls-tls-webpki-roots", -] } # This should be kept the same as whatever Axum has -toml = { version = "0.8" } -tonic = { version = "0.12" } -tower = { version = "0.4" } -tracing = { version = "0.1" } -tracing-opentelemetry = { version = "0.28" } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -url = { version = "2.5" } - -[target.'cfg(windows)'.dependencies] -libsql = { version = "0.6", default-features = false, features = [ - "core", - "serde", - "replication", +schemars = { workspace = true, default-features = false, features = ["derive"] } +serde = { version = "1.0", default-features = false, features = ["derive"] } +serde_json = { version = "1.0", default-features = false } +strum = { version = "0.26", default-features = false, features = ["derive"] } +thiserror = { version = "2.0", default-features = false } +time = { version = "0.3", default-features = false, features = [ + "serde-human-readable", +] } +tokio = { version = "1.40", default-features = false } +tower = { version = "0.4", default-features = false } +tower-http = { version = "0.5", default-features = false, features = [ + "compression-br", + "compression-gzip", + "decompression-br", + "decompression-gzip", ] } +tracing = { version = "0.1", default-features = false, features = [ + "attributes", +] } +tracing-opentelemetry = { version = "0.28", default-features = false } +url = { version = "2.5", default-features = false } +wasm-bindgen = { version = "0.2", default-features = false, optional = true } [dev-dependencies] -test-log = { version = "0.2", default-features = false, features = ["trace"] } +http-body-util = { version = "0.1", default-features = false } +tokio = { version = "1.40", default-features = false, features = [ + "macros", + "test-util", +] } diff --git a/fpx/README.md b/fpx/README.md index a4bf9c203..22655b254 100644 --- a/fpx/README.md +++ b/fpx/README.md @@ -1,55 +1,6 @@ -# fpx +# fpx-lib -The fpx tool is a command line tool to launch a local HTTP or gRPC OTEL -ingestion endpoint. It also includes a CLI client to interact with some of the -Rest and web-socket endpoints. - -NOTE: Currently only a in-memory storage is supported. - -## Usage - -First, make sure you have Rust installed on your machine. You can install Rust -using [rustup](https://rustup.rs/) or take a look at the -[official instructions](https://www.rust-lang.org/tools/install). - -Then run the following command to execute the local dev server: - -``` -cargo run -- dev -``` - -See `Commands` for more information. - -## Commands - -The fpx binary is primarily used to start a local dev server, but it is also -possible to run some other commands. - -For ease of use, the `fpx` cargo alias has been added, meaning you can run -`cargo fpx` in any directory in this repository and compile and then invoke -`fpx`. - -### `fpx dev` - -Starts the local dev server. - -Use `-e` or `--enable-tracing` to send otlp payloads to `--otlp-endpoint`. Note -that this should not be used to send traces to itself, as that will create an -infinite loop. - -### `fpx client` - -Invokes endpoints on a fpx server. - -This command can also send traces to a otel endpoint. NOTE: there are some known -issues where it doesn't seem to work properly. - -Examples: - -``` -# Fetch all traces -fpx client traces list - -# Fetch a specific span -fpx client spans get aa aa -``` +This crate contains the shared types and logic that is used in both the normal +fpx binary and the fpx-workers crates. This crate contains most of the logic +that is shared between them and it also includes shared traits. The +implementations of these traits might not be included in this crate. diff --git a/fpx/src/api.rs b/fpx/src/api.rs index 0d389810a..003ef7e98 100644 --- a/fpx/src/api.rs +++ b/fpx/src/api.rs @@ -1,2 +1,84 @@ +use crate::data::BoxedStore; +use crate::otel::OtelTraceLayer; +use crate::service::Service; +use axum::extract::FromRef; +use axum::routing::{get, post}; +use http::StatusCode; +use tower_http::compression::CompressionLayer; +use tower_http::decompression::RequestDecompressionLayer; + +#[cfg(feature = "fpx_client")] pub mod client; pub mod errors; +pub mod handlers; +pub mod models; + +#[derive(Clone)] +pub struct ApiState { + service: Service, + store: BoxedStore, +} + +impl FromRef for BoxedStore { + fn from_ref(state: &ApiState) -> Self { + state.store.clone() + } +} + +impl FromRef for Service { + fn from_ref(state: &ApiState) -> Self { + state.service.clone() + } +} + +#[derive(Default)] +pub struct Builder { + enable_compression: bool, +} + +impl Builder { + pub fn new() -> Self { + Self::default() + } + + pub fn set_compression(mut self, compression: bool) -> Self { + self.enable_compression = compression; + self + } + + pub fn enable_compression(self) -> Self { + self.set_compression(true) + } + + /// Create a API and expose it through a axum router. + pub fn build(self, service: Service, store: BoxedStore) -> axum::Router { + let api_state = ApiState { service, store }; + + let router = axum::Router::new() + .route("/v1/traces", post(handlers::otel::trace_collector_handler)) + .route("/v1/traces", get(handlers::traces::traces_list_handler)) + .route( + "/v1/traces/:trace_id", + get(handlers::traces::traces_get_handler) + .delete(handlers::traces::traces_delete_handler), + ) + .route( + "/v1/traces/:trace_id/spans", + get(handlers::spans::span_list_handler), + ) + .route( + "/v1/traces/:trace_id/spans/:span_id", + get(handlers::spans::span_get_handler).delete(handlers::spans::span_delete_handler), + ) + .with_state(api_state) + .fallback(StatusCode::NOT_FOUND) + .layer(OtelTraceLayer::default()) + .layer(RequestDecompressionLayer::new()); + + if self.enable_compression { + router.layer(CompressionLayer::new()) + } else { + router + } + } +} diff --git a/fpx/src/api/client.rs b/fpx/src/api/client.rs index 842993c78..4dc2bb86c 100644 --- a/fpx/src/api/client.rs +++ b/fpx/src/api/client.rs @@ -4,17 +4,20 @@ //! api module. But for now this is only used within our own code, so it is //! fine. -use super::errors::ApiClientError; +use super::errors::CommonError; +use super::handlers::spans::SpanGetError; +use super::handlers::traces::TraceGetError; +use super::models; +use crate::otel::HeaderMapInjector; use anyhow::Result; -use fpx_lib::api::handlers::spans::SpanGetError; -use fpx_lib::api::handlers::traces::TraceGetError; -use fpx_lib::api::models; -use fpx_lib::otel::HeaderMapInjector; +use bytes::Bytes; use http::{HeaderMap, Method, StatusCode}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator; use std::error::Error; use std::future::Future; +use thiserror::Error; +use tracing::error; use tracing::trace; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; @@ -171,6 +174,54 @@ impl ApiClient { } } +#[allow(dead_code)] +#[derive(Debug, Error)] +pub enum ApiClientError { + /// This can only occur when a invalid base URL was provided. + #[error("An invalid URL was provided: {0}")] + ParseError(#[from] url::ParseError), + + /// An error occurred in reqwest. + #[error("An error occurred while making the request: {0}")] + ClientError(#[from] reqwest::Error), + + /// An error returned from the service. These errors are specific to the + /// endpoint that was called. + #[error(transparent)] + ServiceError(E), + + #[error(transparent)] + CommonError(#[from] CommonError), + + /// A response was received, but we were unable to deserialize it. The + /// status code and the receive body are returned. + #[error("API returned an unknown response: Status: {0}, Body: {1:?}")] + InvalidResponse(StatusCode, Bytes), +} + +impl ApiClientError +where + E: serde::de::DeserializeOwned, +{ + /// Try to parse the result as a ServiceError or a CommonError. If both + /// fail, return the status_code and body. + pub fn from_response(status_code: StatusCode, body: Bytes) -> Self { + // Try to parse the result as a ServiceError. + if let Ok(result) = serde_json::from_slice::(&body) { + return ApiClientError::ServiceError(result); + } + + // Try to parse the result as CommonError. + if let Ok(result) = serde_json::from_slice::(&body) { + return ApiClientError::CommonError(result); + } + + // If both failed, return the status_code and the body for the user to + // debug. + ApiClientError::InvalidResponse(status_code, body) + } +} + /// Check whether the response is a 204 No Content response, if it is return /// Ok(()). Otherwise try to parse the response as a ApiError. async fn no_body(response: reqwest::Response) -> Result<(), ApiClientError> @@ -214,3 +265,79 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::errors::ApiServerError; + use axum::response::IntoResponse; + use fpx_macros::ApiError; + use http::StatusCode; + use http_body_util::BodyExt; + use serde::{Deserialize, Serialize}; + use thiserror::Error; + use tracing::error; + + #[derive(Debug, Serialize, Deserialize, Error, ApiError)] + #[serde(tag = "error", content = "details", rename_all = "camelCase")] + #[non_exhaustive] + pub enum TestError { + #[api_error(status_code = StatusCode::NOT_FOUND)] + #[error("Request not found")] + RequestNotFound, + + #[api_error(status_code = StatusCode::BAD_REQUEST)] + #[error("Provided ID is invalid")] + InvalidId, + } + + /// Test to convert Service Error in a ApiServerError to a ApiClientError. + #[tokio::test] + async fn api_server_error_to_api_client_error_service_error() { + let response = ApiServerError::ServiceError(TestError::RequestNotFound).into_response(); + + let (parts, body) = response.into_parts(); + let body = body + .collect() + .await + .expect("Should be able to read body") + .to_bytes(); + + let api_client_error = ApiClientError::from_response(parts.status, body); + + assert!( + matches!( + api_client_error, + ApiClientError::ServiceError(TestError::RequestNotFound) + ), + "returned error does not match expected error; got: {:?}", + api_client_error + ); + } + + /// Test to convert Common Error in a ApiServerError to a ApiClientError. + #[tokio::test] + async fn api_server_error_to_api_client_error_common_error() { + let response = ApiServerError::CommonError::(CommonError::InternalServerError) + .into_response(); + + let (parts, body) = response.into_parts(); + let body = body + .collect() + .await + .expect("Should be able to read body") + .to_bytes(); + + let api_client_error: ApiClientError = + ApiClientError::from_response(parts.status, body); + + assert!( + matches!( + api_client_error, + ApiClientError::CommonError(CommonError::InternalServerError), + ), + "returned error does not match expected error; got: {:?}", + api_client_error + ) + } +} diff --git a/fpx/src/api/errors.rs b/fpx/src/api/errors.rs index 64d3b26c4..aa7e4ef16 100644 --- a/fpx/src/api/errors.rs +++ b/fpx/src/api/errors.rs @@ -1,5 +1,4 @@ use axum::response::IntoResponse; -use bytes::Bytes; use fpx_macros::ApiError; use http::StatusCode; use serde::{Deserialize, Serialize}; @@ -77,54 +76,6 @@ where } } -#[allow(dead_code)] -#[derive(Debug, Error)] -pub enum ApiClientError { - /// This can only occur when a invalid base URL was provided. - #[error("An invalid URL was provided: {0}")] - ParseError(#[from] url::ParseError), - - /// An error occurred in reqwest. - #[error("An error occurred while making the request: {0}")] - ClientError(#[from] reqwest::Error), - - /// An error returned from the service. These errors are specific to the - /// endpoint that was called. - #[error(transparent)] - ServiceError(E), - - #[error(transparent)] - CommonError(#[from] CommonError), - - /// A response was received, but we were unable to deserialize it. The - /// status code and the receive body are returned. - #[error("API returned an unknown response: Status: {0}, Body: {1:?}")] - InvalidResponse(StatusCode, Bytes), -} - -impl ApiClientError -where - E: serde::de::DeserializeOwned, -{ - /// Try to parse the result as a ServiceError or a CommonError. If both - /// fail, return the status_code and body. - pub fn from_response(status_code: StatusCode, body: Bytes) -> Self { - // Try to parse the result as a ServiceError. - if let Ok(result) = serde_json::from_slice::(&body) { - return ApiClientError::ServiceError(result); - } - - // Try to parse the result as CommonError. - if let Ok(result) = serde_json::from_slice::(&body) { - return ApiClientError::CommonError(result); - } - - // If both failed, return the status_code and the body for the user to - // debug. - ApiClientError::InvalidResponse(status_code, body) - } -} - #[derive(Debug, Error, Serialize, Deserialize, ApiError)] #[serde(tag = "error", content = "details", rename_all = "camelCase")] pub enum CommonError { @@ -136,12 +87,11 @@ pub enum CommonError { #[cfg(test)] mod tests { use super::*; - use http_body_util::BodyExt; #[derive(Debug, Serialize, Deserialize, Error, ApiError)] #[serde(tag = "error", content = "details", rename_all = "camelCase")] #[non_exhaustive] - pub enum RequestGetError { + pub enum TestError { #[api_error(status_code = StatusCode::NOT_FOUND)] #[error("Request not found")] RequestNotFound, @@ -151,63 +101,20 @@ mod tests { InvalidId, } - /// Test to convert Service Error in a ApiServerError to a ApiClientError. - #[tokio::test] - async fn api_server_error_to_api_client_error_service_error() { - let response = - ApiServerError::ServiceError(RequestGetError::RequestNotFound).into_response(); - - let (parts, body) = response.into_parts(); - let body = body - .collect() - .await - .expect("Should be able to read body") - .to_bytes(); - - let api_client_error = ApiClientError::from_response(parts.status, body); - - match api_client_error { - ApiClientError::ServiceError(err) => match err { - RequestGetError::RequestNotFound => (), - err => panic!("Unexpected service error: {:?}", err), - }, - err => panic!("Unexpected error: {:?}", err), - } - } - - /// Test to convert Common Error in a ApiServerError to a ApiClientError. - #[tokio::test] - async fn api_server_error_to_api_client_error_common_error() { - let response = - ApiServerError::CommonError::(CommonError::InternalServerError) - .into_response(); - - let (parts, body) = response.into_parts(); - let body = body - .collect() - .await - .expect("Should be able to read body") - .to_bytes(); - - let api_client_error: ApiClientError = - ApiClientError::from_response(parts.status, body); - - match api_client_error { - ApiClientError::CommonError(CommonError::InternalServerError) => (), - err => panic!("Unexpected error: {:?}", err), - } - } - /// Test to confirm that a anyhow::Error can be converted into a /// ApiServerError. #[tokio::test] async fn anyhow_error_into_api_server_error() { let anyhow_error = anyhow::Error::msg("some random anyhow error"); - let api_server_error: ApiServerError = anyhow_error.into(); + let api_server_error: ApiServerError = anyhow_error.into(); - match api_server_error { - ApiServerError::CommonError(CommonError::InternalServerError) => (), - err => panic!("Unexpected error: {:?}", err), - }; + assert!( + matches!( + api_server_error, + ApiServerError::CommonError(CommonError::InternalServerError) + ), + "returned error does not match expected error; got: {:?}", + api_server_error + ); } } diff --git a/fpx-lib/src/api/handlers.rs b/fpx/src/api/handlers.rs similarity index 100% rename from fpx-lib/src/api/handlers.rs rename to fpx/src/api/handlers.rs diff --git a/fpx-lib/src/api/handlers/otel.rs b/fpx/src/api/handlers/otel.rs similarity index 100% rename from fpx-lib/src/api/handlers/otel.rs rename to fpx/src/api/handlers/otel.rs diff --git a/fpx-lib/src/api/handlers/spans.rs b/fpx/src/api/handlers/spans.rs similarity index 100% rename from fpx-lib/src/api/handlers/spans.rs rename to fpx/src/api/handlers/spans.rs diff --git a/fpx-lib/src/api/handlers/traces.rs b/fpx/src/api/handlers/traces.rs similarity index 100% rename from fpx-lib/src/api/handlers/traces.rs rename to fpx/src/api/handlers/traces.rs diff --git a/fpx-lib/src/api/models.rs b/fpx/src/api/models.rs similarity index 100% rename from fpx-lib/src/api/models.rs rename to fpx/src/api/models.rs diff --git a/fpx-lib/src/api/models/otel.rs b/fpx/src/api/models/otel.rs similarity index 100% rename from fpx-lib/src/api/models/otel.rs rename to fpx/src/api/models/otel.rs diff --git a/fpx/src/data.rs b/fpx/src/data.rs index bdc7a4518..a399ade7e 100644 --- a/fpx/src/data.rs +++ b/fpx/src/data.rs @@ -1,221 +1,88 @@ -use anyhow::Context; +use crate::data::models::HexEncodedId; +use crate::events::ServerEvents; use async_trait::async_trait; -use fpx_lib::data::models::{HexEncodedId, Span}; -use fpx_lib::data::sql::SqlBuilder; -use fpx_lib::data::{DbError, Result, Store, Transaction}; -use libsql::{params, Builder, Connection}; -use std::fmt::Display; -use std::path::Path; use std::sync::Arc; -use tracing::trace; -use util::RowsExt; +use thiserror::Error; -mod migrations; -mod util; +pub mod models; +pub mod sql; +pub mod util; -#[cfg(test)] -mod tests; +pub type Result = anyhow::Result; -pub enum DataPath<'a> { - InMemory, - File(&'a Path), -} +pub type BoxedEvents = Arc; +pub type BoxedStore = Arc; -impl<'a> DataPath<'a> { - pub fn as_path(&self) -> &'a Path { - match self { - DataPath::InMemory => Path::new(":memory:"), - DataPath::File(path) => path, - } - } -} +#[derive(Clone, Default, Debug)] +pub struct Transaction {} -impl Display for DataPath<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DataPath::InMemory => write!(f, ":memory:"), - DataPath::File(path) => f.write_fmt(format_args!("{}", path.display())), - } +impl Transaction { + pub fn new() -> Self { + Self {} } } -#[derive(Clone)] -pub struct LibsqlStore { - connection: Connection, - sql_builder: Arc, -} - -impl LibsqlStore { - pub async fn open(path: DataPath<'_>) -> Result { - trace!(%path, "Opening Libsql database"); - - // Not sure if we need this database object, but for now we just drop - // it. - let database = Builder::new_local(path.as_path()) - .build() - .await - .context("failed to build libSQL database object")?; - - let mut connection = database - .connect() - .context("failed to connect to libSQL database")?; +#[derive(Debug, Error)] +pub enum DbError { + #[error("No rows were returned")] + NotFound, - Self::initialize_connection(&mut connection).await?; + #[error("failed to deserialize into `T`: {0}")] + FailedDeserialize(#[from] serde::de::value::Error), - let sql_builder = Arc::new(SqlBuilder::new()); - - Ok(LibsqlStore { - connection, - sql_builder, - }) - } - - pub async fn in_memory() -> Result { - Self::open(DataPath::InMemory).await - } - - pub async fn file(db_path: &Path) -> Result { - Self::open(DataPath::File(db_path)).await - } + #[error("Internal error: {0}")] + InternalError(String), - /// This function will execute a few PRAGMA statements to set the database - /// connection. This should run before any other queries are executed. - async fn initialize_connection(connection: &mut Connection) -> Result<()> { - connection - .query( - "PRAGMA journal_mode = WAL; - PRAGMA busy_timeout = 5000; - PRAGMA cache_size = 2000; - PRAGMA foreign_keys = ON; - PRAGMA journal_size_limit = 27103364; - PRAGMA mmap_size = 134217728; - PRAGMA synchronous = NORMAL; - PRAGMA temp_store = memory;", - (), - ) - .await - .map_err(|err| DbError::InternalError(err.to_string()))?; - - Ok(()) - } + #[cfg(feature = "libsql")] + #[error("Internal database error occurred: {0}")] + LibsqlError(#[from] libsql::Error), } #[async_trait] -impl Store for LibsqlStore { - async fn start_readonly_transaction(&self) -> Result { - Ok(Transaction {}) - } - async fn start_readwrite_transaction(&self) -> Result { - Ok(Transaction {}) - } +pub trait Store: Send + Sync { + async fn start_readonly_transaction(&self) -> Result; + async fn start_readwrite_transaction(&self) -> Result; - async fn commit_transaction(&self, _tx: Transaction) -> Result<(), DbError> { - Ok(()) - } - async fn rollback_transaction(&self, _tx: Transaction) -> Result<(), DbError> { - Ok(()) - } + async fn commit_transaction(&self, tx: Transaction) -> Result<(), DbError>; + async fn rollback_transaction(&self, tx: Transaction) -> Result<(), DbError>; async fn span_get( &self, - _tx: &Transaction, + tx: &Transaction, trace_id: &HexEncodedId, span_id: &HexEncodedId, - ) -> Result { - let span = self - .connection - .query(&self.sql_builder.span_get(), (trace_id, span_id)) - .await? - .fetch_one() - .await?; - - Ok(span) - } + ) -> Result; async fn span_list_by_trace( &self, - _tx: &Transaction, + tx: &Transaction, trace_id: &HexEncodedId, - ) -> Result> { - let spans = self - .connection - .query(&self.sql_builder.span_list_by_trace(), params!(trace_id)) - .await? - .fetch_all() - .await?; - - Ok(spans) - } + ) -> Result>; - async fn span_create(&self, _tx: &Transaction, span: Span) -> Result { - let span = self - .connection - .query( - &self.sql_builder.span_create(), - params!( - span.trace_id, - span.span_id, - span.parent_span_id, - span.name, - span.kind, - span.start_time, - span.end_time, - span.inner, - ), - ) - .await? - .fetch_one() - .await?; - - Ok(span) - } + async fn span_create(&self, tx: &Transaction, span: models::Span) -> Result; - /// Get a list of all the traces. (currently limited to 20, sorted by most - /// recent [`end_time`]) + /// Get a list of all the traces. /// /// Note that a trace is a computed value, so not all properties are /// present. To get all the data, use the [`Self::span_list_by_trace`] fn. async fn traces_list( &self, - _tx: &Transaction, + tx: &Transaction, // Future improvement could hold sort fields, limits, etc - ) -> Result> { - let traces = self - .connection - .query(&self.sql_builder.traces_list(None), ()) - .await? - .fetch_all() - .await?; - - Ok(traces) - } + ) -> Result>; /// Delete all spans with a specific trace_id. async fn span_delete_by_trace( &self, - _tx: &Transaction, + tx: &Transaction, trace_id: &HexEncodedId, - ) -> Result> { - let rows_affected = self - .connection - .execute(&self.sql_builder.span_delete_by_trace(), params!(trace_id)) - .await?; - - Ok(Some(rows_affected)) - } + ) -> Result>; /// Delete a single span. async fn span_delete( &self, - _tx: &Transaction, + tx: &Transaction, trace_id: &HexEncodedId, span_id: &HexEncodedId, - ) -> Result> { - let rows_affected = self - .connection - .execute(&self.sql_builder.span_delete(), params!(trace_id, span_id)) - .await?; - - Ok(Some(rows_affected)) - } + ) -> Result>; } diff --git a/fpx-lib/src/data/models.rs b/fpx/src/data/models.rs similarity index 100% rename from fpx-lib/src/data/models.rs rename to fpx/src/data/models.rs diff --git a/fpx-lib/src/data/sql.rs b/fpx/src/data/sql.rs similarity index 100% rename from fpx-lib/src/data/sql.rs rename to fpx/src/data/sql.rs diff --git a/fpx/src/data/util.rs b/fpx/src/data/util.rs index 34b5ab641..aa4d24c34 100644 --- a/fpx/src/data/util.rs +++ b/fpx/src/data/util.rs @@ -1,38 +1,299 @@ -use fpx_lib::data::{DbError, Result}; -use libsql::{de, Rows}; +use anyhow::Result; use serde::de::DeserializeOwned; +use serde::{Deserialize, Deserializer, Serialize}; +use std::ops::{Deref, DerefMut}; +use time::OffsetDateTime; -#[allow(dead_code)] -pub(crate) trait RowsExt { - /// `T` must be a `struct` - async fn fetch_one(&mut self) -> Result; +/// This is a wrapper around `T` that will deserialize from JSON. +/// +/// This is intended to be used with properties of a struct that will be +/// deserialized from a libsql [`Row`]. Since there is a limited number of +/// values available we need to serialize the data in a Database column as JSON +/// so that we can still use complicated data structures, such as arrays and +/// maps. +#[derive(Debug)] +pub struct Json(pub T); - /// `T` must be a `struct` - async fn fetch_optional(&mut self) -> Result, DbError>; +impl Clone for Json +where + T: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} - /// `T` must be a `struct` - async fn fetch_all(&mut self) -> Result, DbError>; +impl PartialEq for Json +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } } -impl RowsExt for Rows { - async fn fetch_one(&mut self) -> Result { - self.fetch_optional().await?.ok_or(DbError::NotFound) +impl Json { + pub fn into_inner(self) -> T { + self.0 } +} - async fn fetch_optional(&mut self) -> Result, DbError> { - match self.next().await? { - Some(row) => Ok(Some(de::from_row(&row)?)), - None => Ok(None), - } +impl Deref for Json { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 } +} - async fn fetch_all(&mut self) -> Result, DbError> { - let mut results = Vec::new(); +impl DerefMut for Json { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} - while let Some(row) = self.next().await? { - results.push(de::from_row(&row)?); - } +impl AsRef for Json { + fn as_ref(&self) -> &T { + &self.0 + } +} + +impl AsMut for Json { + fn as_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl Default for Json +where + T: Default, +{ + fn default() -> Self { + Self(T::default()) + } +} + +impl Serialize for Json +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let value = serde_json::to_string(&self.0).map_err(serde::ser::Error::custom)?; + + value.serialize(serializer) + } +} + +impl<'de, T> Deserialize<'de> for Json +where + T: DeserializeOwned, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let string: String = Deserialize::deserialize(deserializer)?; + let json: T = serde_json::from_str(&string).map_err(serde::de::Error::custom)?; + + Ok(Json(json)) + } +} + +#[cfg(feature = "wasm-bindgen")] +impl From> for wasm_bindgen::JsValue +where + T: Serialize, +{ + fn from(value: Json) -> Self { + let value = serde_json::to_string(&value.0).expect("failed to serialize Json"); + wasm_bindgen::JsValue::from_str(&value) + } +} + +#[cfg(feature = "libsql")] +impl From> for libsql::Value +where + T: Serialize, +{ + fn from(value: Json) -> Self { + let value = serde_json::to_string(&value.0).expect("failed to serialize Json"); + libsql::Value::Text(value) + } +} + +/// This is a wrapper that makes it a bit easier to work with a timestamp that +/// is serialized as a `f64`. This should only be used in the database layer. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Timestamp(pub time::OffsetDateTime); + +impl Timestamp { + pub fn now() -> Self { + Self(OffsetDateTime::now_utc()) + } + + pub fn unix_nanos(&self) -> i128 { + self.0.unix_timestamp_nanos() + } + + pub fn fractional(&self) -> f64 { + Self::nanos_to_fractional(self.unix_nanos()) + } + + fn nanos_to_fractional(t: i128) -> f64 { + let t = t as f64; + t / 1_000_000_000_f64 + } + + fn fractional_to_nanos(t: f64) -> i128 { + (t * 1_000_000_000_f64) as i128 + } +} + +impl TryFrom for Timestamp { + type Error = anyhow::Error; + + fn try_from(timestamp: f64) -> std::result::Result { + let nanos = Timestamp::fractional_to_nanos(timestamp); + // Note: This will fail if a really big date is passed in. This won't + // happen for a while, though it could be used maliciously. + let datetime = + OffsetDateTime::from_unix_timestamp_nanos(nanos).map_err(|err| anyhow::anyhow!(err))?; + + Ok(Self(datetime)) + } +} + +impl From for time::OffsetDateTime { + fn from(timestamp: Timestamp) -> Self { + timestamp.0 + } +} + +impl From for Timestamp { + fn from(timestamp: time::OffsetDateTime) -> Self { + Self(timestamp) + } +} + +#[cfg(feature = "wasm-bindgen")] +impl From for wasm_bindgen::JsValue { + fn from(value: Timestamp) -> Self { + wasm_bindgen::JsValue::from_f64(value.fractional()) + } +} - Ok(results) +#[cfg(feature = "libsql")] +impl From for libsql::Value { + fn from(timestamp: Timestamp) -> Self { + libsql::Value::Real(timestamp.fractional()) + } +} + +impl Serialize for Timestamp { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let timestamp = self.fractional(); + timestamp.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for Timestamp { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let timestamp: f64 = Deserialize::deserialize(deserializer)?; + Timestamp::try_from(timestamp).map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::format_description::well_known::Rfc3339; + + /// Simple struct that has all the necessary traits to be serialized and + /// deserialize. + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] + struct Sample { + name: String, + age: u32, + } + + #[test] + fn json_serialize_deserialize() { + let sample_1 = Sample { + name: "John Doe".to_string(), + age: 42, + }; + + let json = Json(sample_1.clone()); + + // Serialize and deserialize the JSON wrapper. + let serialized = serde_json::to_string(&json).expect("Unable to serialize"); + let deserialized: Json = + serde_json::from_str(&serialized).expect("Unable to deserialize"); + + // Get the new sample from the deserialized value. + let sample_2 = deserialized.into_inner(); + + // Verify that the deserialized value is the same as the original. + assert_eq!(sample_1, sample_2); + } + + #[test] + fn timestamp_serialize_deserialize() { + let timestamp_1 = + time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); + let timestamp_1 = Timestamp(timestamp_1); + + // Serialize and deserialize the JSON wrapper. + let serialized = serde_json::to_string(×tamp_1).expect("Unable to serialize"); + let timestamp_2: Timestamp = + serde_json::from_str(&serialized).expect("Unable to deserialize"); + + // Note: Given that we're working with floats, we can't really compare + // the two timestamps, so we will just ignore the sub-sec precision. + assert_eq!( + timestamp_1.0.replace_nanosecond(0).unwrap(), + timestamp_2.0.replace_nanosecond(0).unwrap() + ) + } + + #[cfg(target_family = "wasm")] + #[cfg(feature = "wasm-bindgen")] + #[test] + fn timestamp_serialize_wasm_bindgen() { + let timestamp_1 = + time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); + let timestamp_1 = Timestamp(timestamp_1); + let timestamp_1_fractional = timestamp_1.fractional(); + + let js_value: wasm_bindgen::JsValue = timestamp_1.into(); + let js_value_fractional = js_value.as_f64().unwrap(); + + assert_eq!(timestamp_1_fractional, js_value_fractional) + } + + #[cfg(feature = "libsql")] + #[test] + fn timestamp_serialize_libsql() { + let timestamp_1 = + time::OffsetDateTime::parse("2024-08-07T08:39:51+00:00", &Rfc3339).unwrap(); + let timestamp_1 = Timestamp(timestamp_1); + let timestamp_1_fractional = timestamp_1.fractional(); + + let libsql_value: libsql::Value = timestamp_1.into(); + match libsql_value { + libsql::Value::Real(value) => { + assert_eq!(timestamp_1_fractional, value) + } + _ => panic!("Expected libsql::Value::Real"), + } } } diff --git a/fpx/src/events.rs b/fpx/src/events.rs index 67e8ab49c..2904b0ed0 100644 --- a/fpx/src/events.rs +++ b/fpx/src/events.rs @@ -1,39 +1,7 @@ -use async_trait::async_trait; -use fpx_lib::api::models::ServerMessage; -use fpx_lib::events::ServerEvents; -use tokio::sync::broadcast; -use tracing::trace; - -#[derive(Clone)] -pub struct InMemoryEvents { - sender: broadcast::Sender, -} - -impl InMemoryEvents { - pub fn new() -> Self { - let (sender, _) = broadcast::channel(100); - Self { sender } - } - - pub async fn subscribe(&self) -> broadcast::Receiver { - self.sender.subscribe() - } -} - -impl Default for InMemoryEvents { - fn default() -> Self { - Self::new() - } -} +use crate::api::models::ServerMessage; +use axum::async_trait; #[async_trait] -impl ServerEvents for InMemoryEvents { - async fn broadcast(&self, message: ServerMessage) { - if let Err(err) = self.sender.send(message) { - // Note: this only happens when the channel is closed. Which also - // happens when there a no subscribers. So there is not need to log - // this as an warn or error. - trace!(%err, "failed to broadcast message"); - }; - } +pub trait ServerEvents: Sync + Send { + async fn broadcast(&self, msg: ServerMessage); } diff --git a/fpx-lib/src/lib.rs b/fpx/src/lib.rs similarity index 100% rename from fpx-lib/src/lib.rs rename to fpx/src/lib.rs diff --git a/fpx-lib/src/otel.rs b/fpx/src/otel.rs similarity index 100% rename from fpx-lib/src/otel.rs rename to fpx/src/otel.rs diff --git a/fpx-lib/src/service.rs b/fpx/src/service.rs similarity index 100% rename from fpx-lib/src/service.rs rename to fpx/src/service.rs diff --git a/xtask/Cargo.toml b/xtask/Cargo.toml index b4b2e3e0c..6dfce11bb 100644 --- a/xtask/Cargo.toml +++ b/xtask/Cargo.toml @@ -9,7 +9,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } -fpx-lib = { version = "0.1.0", path = "../fpx-lib" } +fpx = { version = "0.1.0", path = "../fpx" } schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/xtask/src/commands/schemas.rs b/xtask/src/commands/schemas.rs index 07e53c2de..99cc02493 100644 --- a/xtask/src/commands/schemas.rs +++ b/xtask/src/commands/schemas.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use fpx_lib::api::models::{ClientMessage, ServerMessage}; +use fpx::api::models::{ClientMessage, ServerMessage}; use schemars::schema::RootSchema; use schemars::schema_for; use serde_json::Value; @@ -20,7 +20,7 @@ pub async fn handle_command(args: Args) -> Result<()> { let schemas = Vec::from([ schema_for!(ClientMessage), schema_for!(ServerMessage), - // schema_for!(fpx_lib::api::models::Span), + // schema_for!(fpx::api::models::Span), ]); let zod_schema = generate_zod_schemas(&args.project_directory, &schemas)?;