From 2bd3cc0d86ca09f94c2c40091d24eabae94bd3c8 Mon Sep 17 00:00:00 2001 From: Edinson Jim <4842086+edinsonjim@users.noreply.github.com> Date: Fri, 14 Feb 2025 01:05:27 +0100 Subject: [PATCH] feat: program on layer --- Cargo.lock | 17 ++++------ Cargo.toml | 1 + lumx_axum/Cargo.toml | 1 + lumx_axum/src/lib.rs | 1 + lumx_axum/src/middleware/mod.rs | 1 + lumx_axum/src/middleware/state.rs | 52 +++++++++++++++++++++++++++++++ lumx_axum/src/plugin.rs | 9 ++++-- lumx_core/src/lib.rs | 2 +- lumx_core/src/program.rs | 15 ++++++--- lumx_core/src/tracer.rs | 11 +++++++ lumx_core/src/tracing.rs | 11 ------- 11 files changed, 92 insertions(+), 29 deletions(-) create mode 100644 lumx_axum/src/middleware/mod.rs create mode 100644 lumx_axum/src/middleware/state.rs create mode 100644 lumx_core/src/tracer.rs delete mode 100644 lumx_core/src/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 0377695..179b964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,7 +206,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper 1.0.1", + "sync_wrapper", "tokio", "tower", "tower-layer", @@ -229,7 +229,7 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 1.0.1", + "sync_wrapper", "tower-layer", "tower-service", "tracing", @@ -1085,6 +1085,7 @@ dependencies = [ "lumx_core", "serde", "tokio", + "tower", "tower-http", "tracing", ] @@ -2320,12 +2321,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "sync_wrapper" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" - [[package]] name = "sync_wrapper" version = "1.0.1" @@ -2486,14 +2481,14 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper 0.1.2", + "sync_wrapper", "tokio", "tower-layer", "tower-service", diff --git a/Cargo.toml b/Cargo.toml index e14209d..f00fd7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,5 +25,6 @@ serde = { version = "1.0.215", features = ["derive"] } axum = { version = "0.7" } thiserror = { version = "1.0" } tower-http = { version = "0.6.2", features = ["trace"] } +tower = { version = "0.5.2", features = ["util"] } lumx_core = { path = "./lumx_core" } lumx_axum = { path = "./lumx_axum" } diff --git a/lumx_axum/Cargo.toml b/lumx_axum/Cargo.toml index aab5597..a5f41d0 100644 --- a/lumx_axum/Cargo.toml +++ b/lumx_axum/Cargo.toml @@ -14,3 +14,4 @@ tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true } tower-http = { workspace = true, features = ["trace"] } +tower = { workspace = true } diff --git a/lumx_axum/src/lib.rs b/lumx_axum/src/lib.rs index 7535eeb..a8b1c8f 100644 --- a/lumx_axum/src/lib.rs +++ b/lumx_axum/src/lib.rs @@ -1,6 +1,7 @@ pub mod router; pub use axum; pub mod extractor; +pub mod middleware; pub mod plugin; pub mod program; pub mod state; diff --git a/lumx_axum/src/middleware/mod.rs b/lumx_axum/src/middleware/mod.rs new file mode 100644 index 0000000..266c62a --- /dev/null +++ b/lumx_axum/src/middleware/mod.rs @@ -0,0 +1 @@ +pub mod state; diff --git a/lumx_axum/src/middleware/state.rs b/lumx_axum/src/middleware/state.rs new file mode 100644 index 0000000..ad5685c --- /dev/null +++ b/lumx_axum/src/middleware/state.rs @@ -0,0 +1,52 @@ +use axum::extract::Request; +use lumx_core::program::Program; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +#[derive(Clone)] +pub struct StateLayer { + program: Arc, +} + +impl StateLayer { + pub fn new(program: Arc) -> Self { + Self { program } + } +} + +impl Layer for StateLayer { + type Service = StateLayerService; + + fn layer(&self, inner: S) -> Self::Service { + StateLayerService { + inner, + program: self.program.clone(), + } + } +} + +#[derive(Clone)] +pub struct StateLayerService { + inner: S, + program: Arc, +} + +impl Service> for StateLayerService +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + req.extensions_mut().insert(self.program.clone()); + + self.inner.call(req) + } +} diff --git a/lumx_axum/src/plugin.rs b/lumx_axum/src/plugin.rs index 0b29a37..0f1e976 100644 --- a/lumx_axum/src/plugin.rs +++ b/lumx_axum/src/plugin.rs @@ -1,4 +1,3 @@ -use crate::{router::RouterRef, state::AppState}; use axum::{async_trait, Extension, Router}; use lumx_core::{ plugable::plugin::Plugin, @@ -10,6 +9,9 @@ use std::{net::SocketAddr, sync::Arc}; use tower_http::trace::TraceLayer; use tracing::debug; +use crate::middleware::state::StateLayer; +use crate::{router::RouterRef, state::AppState}; + pub struct WebPlugin; #[async_trait] @@ -35,7 +37,10 @@ impl WebPlugin { debug!(?router, "registered routes"); let router = router - .layer(Extension(AppState { app })) + .layer(Extension(AppState { + app: Arc::clone(&app), + })) + .layer(StateLayer::new(Arc::clone(&app))) .layer(TraceLayer::new_for_http()); println!("Listening on {}", listener.local_addr().unwrap()); diff --git a/lumx_core/src/lib.rs b/lumx_core/src/lib.rs index a63049e..c7a42d1 100644 --- a/lumx_core/src/lib.rs +++ b/lumx_core/src/lib.rs @@ -1,7 +1,7 @@ pub mod plugable; pub mod program; pub mod scheduler; -pub mod tracing; +pub mod tracer; pub mod types; pub use async_trait; diff --git a/lumx_core/src/program.rs b/lumx_core/src/program.rs index e885e8b..5aee366 100644 --- a/lumx_core/src/program.rs +++ b/lumx_core/src/program.rs @@ -9,6 +9,7 @@ use crate::{ plugin::{Plugin, PluginRef}, }, scheduler::Scheduler, + tracer, types::ProgramFailure, }; @@ -217,10 +218,13 @@ impl ProgramBuilder { // 1. read env variables dotenvy::dotenv().ok(); - // 2. build plugins + // 2. init and subscribe tracer + tracer::init(); + + // 3. build plugins self.build_plugins().await; - // 3. schedule + // 4. schedule self.schedule().await } @@ -230,10 +234,13 @@ impl ProgramBuilder { // 1. read env variables dotenvy::dotenv().ok(); - // 2. build plugins + // 2. init and subscribe tracer + tracer::init(); + + // 3. build plugins self.build_plugins().await; - // 3. build program + // 4. build program self.build_program() } diff --git a/lumx_core/src/tracer.rs b/lumx_core/src/tracer.rs new file mode 100644 index 0000000..c99b9d4 --- /dev/null +++ b/lumx_core/src/tracer.rs @@ -0,0 +1,11 @@ +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +pub fn init() { + tracing_subscriber::registry() + .with( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| format!("{}=info", env!("CARGO_CRATE_NAME")).into()), + ) + .with(fmt::layer()) + .init(); +} diff --git a/lumx_core/src/tracing.rs b/lumx_core/src/tracing.rs deleted file mode 100644 index 165ab61..0000000 --- a/lumx_core/src/tracing.rs +++ /dev/null @@ -1,11 +0,0 @@ -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -pub fn init() { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "tower_http=info".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); -}