Skip to content

Commit

Permalink
refactor: factor out run_daemon and move to library (#829)
Browse files Browse the repository at this point in the history
Factor out
```
pub fn run_daemon(config: ConfigRoot) -> Result<Daemon, Error>
```

from

```
pub fn run(args: &Args) -> Result<(), Error>
```

and move it to the library such that we can call it from tests.
  • Loading branch information
Anviking authored Feb 13, 2025
1 parent d514f44 commit 8d13420
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 97 deletions.
6 changes: 3 additions & 3 deletions src/bin/oura/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use clap::Parser;
use std::process;

mod console;
mod daemon;
mod run_daemon;

#[derive(Parser)]
#[clap(name = "Oura")]
#[clap(bin_name = "oura")]
#[clap(author, version, about, long_about = None)]
enum Oura {
Daemon(daemon::Args),
Daemon(run_daemon::Args),
}

fn main() {
let args = Oura::parse();

let result = match args {
Oura::Daemon(x) => daemon::run(&x),
Oura::Daemon(x) => run_daemon::run(&x),
};

if let Err(err) = &result {
Expand Down
84 changes: 84 additions & 0 deletions src/bin/oura/run_daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use gasket::daemon::Daemon;
use oura::framework::*;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::info;
use oura::daemon::{run_daemon, ConfigRoot, MetricsConfig};

use crate::console;

fn setup_tracing() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
}

async fn serve_prometheus(
daemon: Arc<Daemon>,
metrics: Option<MetricsConfig>,
) -> Result<(), Error> {
if let Some(metrics) = metrics {
info!("starting metrics exporter");
let runtime = daemon.clone();

let addr: SocketAddr = metrics
.address
.as_deref()
.unwrap_or("0.0.0.0:9186")
.parse()
.map_err(Error::parse)?;

gasket_prometheus::serve(addr, runtime).await;
}

Ok(())
}

pub fn run(args: &Args) -> Result<(), Error> {
if !args.tui {
setup_tracing();
}

let config = ConfigRoot::new(&args.config).map_err(Error::config)?;
let metrics = config.metrics.clone();

let daemon = run_daemon(config)?;

info!("oura is running");

let daemon = Arc::new(daemon);

let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), metrics));
let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui));

daemon.block();

info!("oura is stopping");

daemon.teardown();
prometheus.abort();
tui.abort();

Ok(())
}

#[derive(clap::Args)]
#[clap(author, version, about, long_about = None)]
pub struct Args {
/// config file to load by the daemon
#[clap(long, value_parser)]
config: Option<std::path::PathBuf>,

/// display the terminal UI
#[clap(long, action)]
tui: bool,
}
109 changes: 15 additions & 94 deletions src/bin/oura/daemon.rs → src/daemon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@

use gasket::daemon::Daemon;
use oura::{cursor, filters, framework::*, sinks, sources};
use crate::{cursor, filters, framework::*, sinks, sources};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::{sync::Arc, time::Duration};
use tracing::info;

use crate::console;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsConfig {
pub address: Option<String>,
}

#[derive(Deserialize)]
struct ConfigRoot {
source: sources::Config,
filters: Option<Vec<filters::Config>>,
sink: sinks::Config,
intersect: IntersectConfig,
finalize: Option<FinalizeConfig>,
chain: Option<ChainConfig>,
retries: Option<gasket::retries::Policy>,
cursor: Option<cursor::Config>,
metrics: Option<MetricsConfig>,
pub struct ConfigRoot {
pub source: sources::Config,
pub filters: Option<Vec<filters::Config>>,
pub sink: sinks::Config,
pub intersect: IntersectConfig,
pub finalize: Option<FinalizeConfig>,
pub chain: Option<ChainConfig>,
pub retries: Option<gasket::retries::Policy>,
pub cursor: Option<cursor::Config>,
pub metrics: Option<MetricsConfig>,
}

impl ConfigRoot {
Expand Down Expand Up @@ -94,107 +91,31 @@ fn connect_stages(
Ok(runtime)
}

fn setup_tracing() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
}

async fn serve_prometheus(
daemon: Arc<Daemon>,
metrics: Option<MetricsConfig>,
) -> Result<(), Error> {
if let Some(metrics) = metrics {
info!("starting metrics exporter");
let runtime = daemon.clone();

let addr: SocketAddr = metrics
.address
.as_deref()
.unwrap_or("0.0.0.0:9186")
.parse()
.map_err(Error::parse)?;

gasket_prometheus::serve(addr, runtime).await;
}

Ok(())
}

pub fn run(args: &Args) -> Result<(), Error> {
if !args.tui {
setup_tracing();
}

let config = ConfigRoot::new(&args.config).map_err(Error::config)?;

pub fn run_daemon(config: ConfigRoot) -> Result<Daemon, Error> {
let chain = config.chain.unwrap_or_default();
let intersect = config.intersect;
let finalize = config.finalize;
let current_dir = std::env::current_dir().unwrap();
let cursor = config.cursor.unwrap_or_default();
let breadcrumbs = cursor.initial_load()?;

let ctx = Context {
chain,
intersect,
finalize,
current_dir,
breadcrumbs,
};

let source = config.source.bootstrapper(&ctx)?;

let filters = config
.filters
.into_iter()
.flatten()
.map(|x| x.bootstrapper(&ctx))
.collect::<Result<_, _>>()?;

let sink = config.sink.bootstrapper(&ctx)?;

let cursor = cursor.bootstrapper(&ctx)?;

let retries = define_gasket_policy(config.retries.as_ref());

let daemon = connect_stages(source, filters, sink, cursor, retries)?;

info!("oura is running");

let daemon = Arc::new(daemon);

let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), config.metrics));
let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui));

daemon.block();

info!("oura is stopping");

daemon.teardown();
prometheus.abort();
tui.abort();

Ok(())
Ok(daemon)
}

#[derive(clap::Args)]
#[clap(author, version, about, long_about = None)]
pub struct Args {
/// config file to load by the daemon
#[clap(long, value_parser)]
config: Option<std::path::PathBuf>,

/// display the terminal UI
#[clap(long, action)]
tui: bool,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod filters;
pub mod framework;
pub mod sinks;
pub mod sources;
pub mod daemon;

0 comments on commit 8d13420

Please sign in to comment.