From 8d13420e6b785c7aa5465d04c7c0e635fb5b88c3 Mon Sep 17 00:00:00 2001 From: Johannes Lund Date: Fri, 14 Feb 2025 00:49:14 +0100 Subject: [PATCH] refactor: factor out `run_daemon` and move to library (#829) Factor out ``` pub fn run_daemon(config: ConfigRoot) -> Result ``` from ``` pub fn run(args: &Args) -> Result<(), Error> ``` and move it to the library such that we can call it from tests. --- src/bin/oura/main.rs | 6 +- src/bin/oura/run_daemon.rs | 84 +++++++++++++++++ src/{bin/oura/daemon.rs => daemon/mod.rs} | 109 +++------------------- src/lib.rs | 1 + 4 files changed, 103 insertions(+), 97 deletions(-) create mode 100644 src/bin/oura/run_daemon.rs rename src/{bin/oura/daemon.rs => daemon/mod.rs} (61%) diff --git a/src/bin/oura/main.rs b/src/bin/oura/main.rs index 24420043..c253a8cc 100644 --- a/src/bin/oura/main.rs +++ b/src/bin/oura/main.rs @@ -2,21 +2,21 @@ use clap::Parser; use std::process; mod console; -mod daemon; +mod run_daemon; #[derive(Parser)] #[clap(name = "Oura")] #[clap(bin_name = "oura")] #[clap(author, version, about, long_about = None)] enum Oura { - Daemon(daemon::Args), + Daemon(run_daemon::Args), } fn main() { let args = Oura::parse(); let result = match args { - Oura::Daemon(x) => daemon::run(&x), + Oura::Daemon(x) => run_daemon::run(&x), }; if let Err(err) = &result { diff --git a/src/bin/oura/run_daemon.rs b/src/bin/oura/run_daemon.rs new file mode 100644 index 00000000..d0c67b0c --- /dev/null +++ b/src/bin/oura/run_daemon.rs @@ -0,0 +1,84 @@ +use gasket::daemon::Daemon; +use oura::framework::*; +use std::net::SocketAddr; +use std::sync::Arc; +use tracing::info; +use oura::daemon::{run_daemon, ConfigRoot, MetricsConfig}; + +use crate::console; + +fn setup_tracing() { + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .finish(), + ) + .unwrap(); +} + +async fn serve_prometheus( + daemon: Arc, + metrics: Option, +) -> Result<(), Error> { + if let Some(metrics) = metrics { + info!("starting metrics exporter"); + let runtime = daemon.clone(); + + let addr: SocketAddr = metrics + .address + .as_deref() + .unwrap_or("0.0.0.0:9186") + .parse() + .map_err(Error::parse)?; + + gasket_prometheus::serve(addr, runtime).await; + } + + Ok(()) +} + +pub fn run(args: &Args) -> Result<(), Error> { + if !args.tui { + setup_tracing(); + } + + let config = ConfigRoot::new(&args.config).map_err(Error::config)?; + let metrics = config.metrics.clone(); + + let daemon = run_daemon(config)?; + + info!("oura is running"); + + let daemon = Arc::new(daemon); + + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), metrics)); + let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui)); + + daemon.block(); + + info!("oura is stopping"); + + daemon.teardown(); + prometheus.abort(); + tui.abort(); + + Ok(()) +} + +#[derive(clap::Args)] +#[clap(author, version, about, long_about = None)] +pub struct Args { + /// config file to load by the daemon + #[clap(long, value_parser)] + config: Option, + + /// display the terminal UI + #[clap(long, action)] + tui: bool, +} diff --git a/src/bin/oura/daemon.rs b/src/daemon/mod.rs similarity index 61% rename from src/bin/oura/daemon.rs rename to src/daemon/mod.rs index 3da8d9db..3634c115 100644 --- a/src/bin/oura/daemon.rs +++ b/src/daemon/mod.rs @@ -1,11 +1,8 @@ + use gasket::daemon::Daemon; -use oura::{cursor, filters, framework::*, sinks, sources}; +use crate::{cursor, filters, framework::*, sinks, sources}; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; -use std::{sync::Arc, time::Duration}; -use tracing::info; - -use crate::console; +use std::time::Duration; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MetricsConfig { @@ -13,16 +10,16 @@ pub struct MetricsConfig { } #[derive(Deserialize)] -struct ConfigRoot { - source: sources::Config, - filters: Option>, - sink: sinks::Config, - intersect: IntersectConfig, - finalize: Option, - chain: Option, - retries: Option, - cursor: Option, - metrics: Option, +pub struct ConfigRoot { + pub source: sources::Config, + pub filters: Option>, + pub sink: sinks::Config, + pub intersect: IntersectConfig, + pub finalize: Option, + pub chain: Option, + pub retries: Option, + pub cursor: Option, + pub metrics: Option, } impl ConfigRoot { @@ -94,50 +91,13 @@ fn connect_stages( Ok(runtime) } -fn setup_tracing() { - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::DEBUG) - .finish(), - ) - .unwrap(); -} - -async fn serve_prometheus( - daemon: Arc, - metrics: Option, -) -> Result<(), Error> { - if let Some(metrics) = metrics { - info!("starting metrics exporter"); - let runtime = daemon.clone(); - - let addr: SocketAddr = metrics - .address - .as_deref() - .unwrap_or("0.0.0.0:9186") - .parse() - .map_err(Error::parse)?; - - gasket_prometheus::serve(addr, runtime).await; - } - - Ok(()) -} - -pub fn run(args: &Args) -> Result<(), Error> { - if !args.tui { - setup_tracing(); - } - - let config = ConfigRoot::new(&args.config).map_err(Error::config)?; - +pub fn run_daemon(config: ConfigRoot) -> Result { let chain = config.chain.unwrap_or_default(); let intersect = config.intersect; let finalize = config.finalize; let current_dir = std::env::current_dir().unwrap(); let cursor = config.cursor.unwrap_or_default(); let breadcrumbs = cursor.initial_load()?; - let ctx = Context { chain, intersect, @@ -145,56 +105,17 @@ pub fn run(args: &Args) -> Result<(), Error> { current_dir, breadcrumbs, }; - let source = config.source.bootstrapper(&ctx)?; - let filters = config .filters .into_iter() .flatten() .map(|x| x.bootstrapper(&ctx)) .collect::>()?; - let sink = config.sink.bootstrapper(&ctx)?; - let cursor = cursor.bootstrapper(&ctx)?; - let retries = define_gasket_policy(config.retries.as_ref()); - let daemon = connect_stages(source, filters, sink, cursor, retries)?; - - info!("oura is running"); - - let daemon = Arc::new(daemon); - - let tokio_rt = tokio::runtime::Builder::new_multi_thread() - .enable_io() - .enable_time() - .build() - .unwrap(); - - let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), config.metrics)); - let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui)); - - daemon.block(); - - info!("oura is stopping"); - - daemon.teardown(); - prometheus.abort(); - tui.abort(); - - Ok(()) + Ok(daemon) } -#[derive(clap::Args)] -#[clap(author, version, about, long_about = None)] -pub struct Args { - /// config file to load by the daemon - #[clap(long, value_parser)] - config: Option, - - /// display the terminal UI - #[clap(long, action)] - tui: bool, -} diff --git a/src/lib.rs b/src/lib.rs index 36bfe792..23f77b3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,4 @@ pub mod filters; pub mod framework; pub mod sinks; pub mod sources; +pub mod daemon;