Skip to content

Commit

Permalink
feat(rust): run the manager probing in background
Browse files Browse the repository at this point in the history
  • Loading branch information
imobachgs committed Nov 21, 2024
1 parent b3e630b commit 6c629c1
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 41 deletions.
80 changes: 80 additions & 0 deletions rust/agama-lib/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ pub struct Progress {
}

impl Progress {
pub fn finished() -> Self {
Self {
current_step: 0,
max_steps: 0,
current_title: "".to_string(),
finished: true,
}
}

pub async fn from_proxy(proxy: &crate::proxies::ProgressProxy<'_>) -> zbus::Result<Progress> {
let (current_step, max_steps, finished) =
tokio::join!(proxy.current_step(), proxy.total_steps(), proxy.finished());
Expand Down Expand Up @@ -218,3 +227,74 @@ pub trait ProgressPresenter {
/// Finishes the progress reporting.
async fn finish(&mut self);
}

#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProgressSummary {
pub steps: Vec<String>,
pub current_step: u32,
pub max_steps: u32,
pub current_title: String,
pub finished: bool,
}

impl ProgressSummary {
pub fn finished() -> Self {
Self {
steps: vec![],
current_step: 0,
max_steps: 0,
current_title: "".to_string(),
finished: true,
}
}
}

/// A sequence of progress steps.
/// FIXME: find a better name to distinguish from agama-server::web::common::ProgressSequence.
#[derive(Debug)]
pub struct ProgressSequence {
pub steps: Vec<String>,
current: usize,
}

impl ProgressSequence {
/// Create a new progress sequence with the given steps.
///
/// * `steps`: The steps to create the sequence from.
pub fn new(steps: &[&str]) -> Self {
let steps = steps.iter().map(|s| s.to_string()).collect();
Self { steps, current: 0 }
}

/// Move to the next step in the sequence and return the progress for it.
///
/// It returns `None` if the sequence is finished.
pub fn next(&mut self) -> Option<Progress> {
if self.is_finished() {
return None;
}
self.current += 1;
self.step()
}

/// The progres has finished.
pub fn is_finished(&self) -> bool {
self.current == self.steps.len()
}

/// Return the progress for the current step.
pub fn step(&self) -> Option<Progress> {
if self.is_finished() {
return None;
}

let current_title = self.steps.get(self.current).unwrap().clone();
Some(Progress {
current_step: (self.current + 1) as u32,
max_steps: self.steps.len() as u32,
current_title,
finished: (self.current + 1) == self.steps.len(),
})
}
}
1 change: 1 addition & 0 deletions rust/agama-lib/src/software/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::base_http_client::{BaseHTTPClient, BaseHTTPClientError};
use crate::software::model::SoftwareConfig;
use std::collections::HashMap;

#[derive(Clone)]
pub struct SoftwareHTTPClient {
client: BaseHTTPClient,
}
Expand Down
1 change: 1 addition & 0 deletions rust/agama-lib/src/storage/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use crate::base_http_client::{BaseHTTPClient, BaseHTTPClientError};
use crate::storage::StorageSettings;

#[derive(Clone)]
pub struct StorageHTTPClient {
client: BaseHTTPClient,
}
Expand Down
1 change: 1 addition & 0 deletions rust/agama-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod network;
pub mod products;
pub mod questions;
pub mod scripts;
pub mod services;
pub mod software;
pub mod storage;
pub mod users;
Expand Down
2 changes: 1 addition & 1 deletion rust/agama-server/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use agama_lib::{base_http_client::BaseHTTPClient, error::ServiceError};

pub mod backend;
pub mod error;
pub mod web;

Expand All @@ -28,7 +29,6 @@ pub use error::ManagerError;
use web::manager_router;

use crate::{products::ProductsRegistry, web::EventsSender};
pub mod backend;

// TODO: the `service` function should receive the information that might be needed to set up the
// service. What about creating an `Application` struct that holdes the HTTP client, the D-Bus
Expand Down
128 changes: 88 additions & 40 deletions rust/agama-server/src/manager/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@
//! Implements the logic for the manager service.
use crate::products::ProductsRegistry;
use crate::services::{InstallerService, ServiceStatusManager};
use crate::web::{Event, EventsSender};
use agama_lib::progress::ProgressSummary;
use agama_lib::{
base_http_client::BaseHTTPClient,
manager::{InstallationPhase, InstallerStatus},
software::SoftwareHTTPClient,
storage::http_client::StorageHTTPClient,
};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};

use super::ManagerError;

pub type ManagerActionSender = mpsc::UnboundedSender<ManagerAction>;
pub type ManagerActionReceiver = mpsc::UnboundedReceiver<ManagerAction>;

const SERVICE_NAME: &str = "org.opensuse.Agama.Manager1";

/// Actions that the manager service can perform.
#[derive(Debug)]
pub enum ManagerAction {
Expand All @@ -46,41 +51,46 @@ pub enum ManagerAction {
Finish,
/// Returns the installation status.
GetState(oneshot::Sender<InstallerStatus>),
/// Returns the current progress.
GetProgress(oneshot::Sender<Option<ProgressSummary>>),
}

// TODO: somehow duplicated from agama-server/web/common.rs
#[derive(Clone, Copy, PartialEq)]
pub enum ServiceStatus {
Idle = 0,
Busy = 1,
#[derive(Clone)]
struct ManagerState {
services: Services,
phase: InstallationPhase,
}

/// Main service for the installation process.
///
/// It is responsible for orchestrating the installation process. It may execute YaST2 clients or
/// call to other services through D-Bus or HTTP.
pub struct ManagerService {
pub phase: InstallationPhase,
pub status: ServiceStatus,
progress: Arc<Mutex<ServiceStatusManager>>,
recv: mpsc::UnboundedReceiver<ManagerAction>,
sender: mpsc::UnboundedSender<ManagerAction>,
events: EventsSender,
services: Services,
products: ProductsRegistry,
manager_state: ManagerState,
}

impl ManagerService {
pub fn new(products: ProductsRegistry, http: BaseHTTPClient, events: EventsSender) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let progress = ServiceStatusManager::new(SERVICE_NAME, events.clone());

let state = ManagerState {
phase: InstallationPhase::Startup,
services: Services::new(http),
};

Self {
recv: rx,
sender: tx,
products,
events,
services: Services::new(http),
phase: InstallationPhase::Startup,
status: ServiceStatus::Idle,
progress: Arc::new(Mutex::new(progress)),
manager_state: state,
}
}

Expand All @@ -90,23 +100,40 @@ impl ManagerService {

pub async fn startup(&mut self) -> Result<(), ManagerError> {
tracing::info!("Starting the manager service");
self.change_status(ServiceStatus::Busy);
self.phase = InstallationPhase::Startup;

if !self.products.is_multiproduct() {
// TODO: autoselect the product
self.probe().await?;
}
self.change_status(ServiceStatus::Idle);

Ok(())
}

pub async fn probe(&mut self) -> Result<(), ManagerError> {
tracing::info!("Probing the system");
self.change_status(ServiceStatus::Busy);
let steps = vec!["Analyze disks", "Configure software"];
// TODO: change the phase in the call to "run_in_background", when we are sure the
// service is not blocked.
self.change_phase(InstallationPhase::Config);
self.services.software.probe().await?;
self.services.storage.probe().await?;
self.change_status(ServiceStatus::Idle);

self.run_in_background(|_events, progress, state| async move {
{
// TODO: implement a macro for this
let mut progress_manager = progress.lock().unwrap();
progress_manager.start(steps.as_slice());
}
state.services.software.probe().await?;
{
let mut progress_manager = progress.lock().unwrap();
progress_manager.next();
}
state.services.storage.probe().await?;
{
progress.lock().unwrap().finish();
}
Ok(())
});

Ok(())
}

Expand All @@ -122,34 +149,17 @@ impl ManagerService {
&self,
tx: oneshot::Sender<InstallerStatus>,
) -> Result<(), ManagerError> {
let progress = self.progress.lock().unwrap();
let status = InstallerStatus {
is_busy: self.status == ServiceStatus::Busy,
use_iguana: false,
is_busy: progress.is_busy(),
// TODO: implement use_iguana and can_install
can_install: false,
phase: self.phase,
use_iguana: false,
phase: self.manager_state.phase,
};
tx.send(status).map_err(|_| ManagerError::SendResult)
}

fn change_status(&mut self, status: ServiceStatus) {
let event = Event::ServiceStatusChanged {
service: "manager".to_string(),
status: (status as u32),
};
if let Err(error) = self.events.send(event) {
tracing::error!("Could not send the event: {error}");
}
self.status = status;
}

fn change_phase(&mut self, phase: InstallationPhase) {
let event = Event::InstallationPhaseChanged { phase };
if let Err(error) = self.events.send(event) {
tracing::error!("Could not send the event: {error}");
}
self.phase = phase;
}

/// Starts the manager loop and returns a client.
///
/// The manager receives actions requests from the client using a channel.
Expand All @@ -176,6 +186,7 @@ impl ManagerService {

if let Err(error) = self.dispatch(action).await {
tracing::error!("Manager dispatch error: {error}");
// Send the message back.
}
}
}
Expand All @@ -185,18 +196,48 @@ impl ManagerService {
ManagerAction::Probe => {
self.probe().await?;
}

ManagerAction::Commit => {
self.commit().await?;
}

ManagerAction::Finish => {
self.finish().await?;
}

ManagerAction::GetState(tx) => {
self.get_state(tx).await?;
}

ManagerAction::GetProgress(tx) => {
let progress = self.progress.lock().unwrap();
let _ = tx.send(progress.get_progress());
}
}
Ok(())
}

fn change_phase(&mut self, phase: InstallationPhase) {
let event = Event::InstallationPhaseChanged { phase };
let _ = self.events.send(event);
self.manager_state.phase = phase;
}
}

impl InstallerService<ManagerState> for ManagerService {
type Error = ManagerError;

fn state(&self) -> ManagerState {
self.manager_state.clone()
}

fn progress(&self) -> Arc<Mutex<ServiceStatusManager>> {
Arc::clone(&self.progress)
}

fn events(&self) -> EventsSender {
self.events.clone()
}
}

/// Client to interact with the manager service.
Expand Down Expand Up @@ -233,9 +274,16 @@ impl ManagerServiceClient {
self.actions.send(ManagerAction::GetState(tx))?;
Ok(rx.await?)
}

pub async fn get_progress(&self) -> Result<Option<ProgressSummary>, ManagerError> {
let (tx, rx) = oneshot::channel();
self.actions.send(ManagerAction::GetProgress(tx))?;
Ok(rx.await?)
}
}

/// Services used by the manager service.
#[derive(Clone)]
pub struct Services {
software: SoftwareHTTPClient,
storage: StorageHTTPClient,
Expand Down
4 changes: 4 additions & 0 deletions rust/agama-server/src/manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ pub enum ManagerError {
RecvResult(#[from] oneshot::error::RecvError),
#[error("Could not send the result")]
SendResult,
#[error("Could not join the background task: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("The service task is busy")]
Busy,
}
Loading

0 comments on commit 6c629c1

Please sign in to comment.