From b249269286418be226572eb8f159c1234aac19b6 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 3 Jan 2025 17:19:11 -0700 Subject: [PATCH] misc fixes --- Makefile | 3 +- core/startos/src/action.rs | 1 - core/startos/src/backup/restore.rs | 1 - core/startos/src/context/rpc.rs | 2 +- core/startos/src/diagnostic.rs | 1 - core/startos/src/init.rs | 5 +- core/startos/src/net/net_controller.rs | 1 - core/startos/src/net/network_interface.rs | 123 ++++++++++------- core/startos/src/net/web_server.rs | 62 +++++---- core/startos/src/notifications.rs | 2 +- core/startos/src/registry/mod.rs | 2 +- core/startos/src/s9pk/v2/manifest.rs | 1 - core/startos/src/service/effects/net/host.rs | 1 - core/startos/src/service/transition/backup.rs | 3 +- .../startos/src/service/transition/restore.rs | 3 +- core/startos/src/update/mod.rs | 2 +- core/startos/src/util/future.rs | 4 +- core/startos/src/util/io.rs | 1 - core/startos/src/util/rpc.rs | 1 - core/startos/src/util/sync.rs | 125 ------------------ 20 files changed, 123 insertions(+), 221 deletions(-) diff --git a/Makefile b/Makefile index e0f89f3f9..a00a5c1c9 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ GZIP_BIN := $(shell which pigz || which gzip) TAR_BIN := $(shell which gtar || which tar) COMPILED_TARGETS := core/target/$(ARCH)-unknown-linux-musl/release/startbox core/target/$(ARCH)-unknown-linux-musl/release/containerbox system-images/compat/docker-images/$(ARCH).tar system-images/utils/docker-images/$(ARCH).tar system-images/binfmt/docker-images/$(ARCH).tar container-runtime/rootfs.$(ARCH).squashfs ALL_TARGETS := $(STARTD_SRC) $(ENVIRONMENT_FILE) $(GIT_HASH_FILE) $(VERSION_FILE) $(COMPILED_TARGETS) cargo-deps/$(ARCH)-unknown-linux-musl/release/startos-backup-fs $(shell if [ "$(PLATFORM)" = "raspberrypi" ]; then echo cargo-deps/aarch64-unknown-linux-musl/release/pi-beep; fi) $(shell /bin/bash -c 'if [[ "${ENVIRONMENT}" =~ (^|-)unstable($$|-) ]]; then echo cargo-deps/$(ARCH)-unknown-linux-musl/release/tokio-console; fi') $(PLATFORM_FILE) +REBUILD_TYPES = 1 ifeq ($(REMOTE),) mkdir = mkdir -p $1 @@ -226,7 +227,7 @@ container-runtime/node_modules/.package-lock.json: container-runtime/package.jso npm --prefix container-runtime ci touch container-runtime/node_modules/.package-lock.json -sdk/base/lib/osBindings/index.ts: core/startos/bindings/index.ts +sdk/base/lib/osBindings/index.ts: $(shell if [ "$(REBUILD_TYPES)" -ne 0 ]; then echo core/startos/bindings/index.ts; fi) mkdir -p sdk/base/lib/osBindings rsync -ac --delete core/startos/bindings/ sdk/base/lib/osBindings/ touch sdk/base/lib/osBindings/index.ts diff --git a/core/startos/src/action.rs b/core/startos/src/action.rs index 801360a44..b29768d73 100644 --- a/core/startos/src/action.rs +++ b/core/startos/src/action.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::fmt; use clap::{CommandFactory, FromArgMatches, Parser}; diff --git a/core/startos/src/backup/restore.rs b/core/startos/src/backup/restore.rs index b22d9027d..e2e9e2158 100644 --- a/core/startos/src/backup/restore.rs +++ b/core/startos/src/backup/restore.rs @@ -19,7 +19,6 @@ use crate::disk::mount::backup::BackupMountGuard; use crate::disk::mount::filesystem::ReadWrite; use crate::disk::mount::guard::{GenericMountGuard, TmpMountGuard}; use crate::init::{init, InitResult}; -use crate::net::web_server::WebServer; use crate::prelude::*; use crate::s9pk::S9pk; use crate::service::service_map::DownloadInstallFuture; diff --git a/core/startos/src/context/rpc.rs b/core/startos/src/context/rpc.rs index 12de59e05..87245b4fa 100644 --- a/core/startos/src/context/rpc.rs +++ b/core/startos/src/context/rpc.rs @@ -31,7 +31,7 @@ use crate::init::check_time_is_synchronized; use crate::lxc::{ContainerId, LxcContainer, LxcManager}; use crate::net::net_controller::{NetController, PreInitNetController}; use crate::net::utils::{find_eth_iface, find_wifi_iface}; -use crate::net::web_server::{UpgradableListener, WebServer, WebServerAcceptorSetter}; +use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter}; use crate::net::wifi::WpaCli; use crate::prelude::*; use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle}; diff --git a/core/startos/src/diagnostic.rs b/core/startos/src/diagnostic.rs index f0c142706..71f76c379 100644 --- a/core/startos/src/diagnostic.rs +++ b/core/startos/src/diagnostic.rs @@ -1,4 +1,3 @@ -use std::path::Path; use std::sync::Arc; use rpc_toolkit::yajrc::RpcError; diff --git a/core/startos/src/init.rs b/core/startos/src/init.rs index 92715683b..3652336dc 100644 --- a/core/startos/src/init.rs +++ b/core/startos/src/init.rs @@ -7,7 +7,6 @@ use std::time::{Duration, SystemTime}; use axum::extract::ws::{self}; use color_eyre::eyre::eyre; -use futures::future::Either; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use models::ResultExt; @@ -26,7 +25,7 @@ use crate::db::model::Database; use crate::disk::mount::util::unmount; use crate::middleware::auth::LOCAL_AUTH_COOKIE_PATH; use crate::net::net_controller::PreInitNetController; -use crate::net::web_server::{UpgradableListener, WebServer, WebServerAcceptorSetter}; +use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter}; use crate::prelude::*; use crate::progress::{ FullProgress, FullProgressTracker, PhaseProgressTrackerHandle, PhasedProgressBar, @@ -359,7 +358,7 @@ pub async fn init( account.tor_key, ) .await?; - webserver.try_upgrade(|a| net_ctrl.net_iface.upgrade_listener(a)); + webserver.try_upgrade(|a| net_ctrl.net_iface.upgrade_listener(a))?; start_net.complete(); mount_logs.start(); diff --git a/core/startos/src/net/net_controller.rs b/core/startos/src/net/net_controller.rs index cdf2607b7..75c37eb4c 100644 --- a/core/startos/src/net/net_controller.rs +++ b/core/startos/src/net/net_controller.rs @@ -6,7 +6,6 @@ use color_eyre::eyre::eyre; use imbl::OrdMap; use imbl_value::InternedString; use ipnet::IpNet; -use itertools::Itertools; use models::{HostId, OptionExt, PackageId}; use torut::onion::{OnionAddressV3, TorSecretKeyV3}; use tracing::instrument; diff --git a/core/startos/src/net/network_interface.rs b/core/startos/src/net/network_interface.rs index e732c1ae7..b964af273 100644 --- a/core/startos/src/net/network_interface.rs +++ b/core/startos/src/net/network_interface.rs @@ -19,6 +19,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::process::Command; +use tokio::sync::watch; use ts_rs::TS; use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream}; use zbus::zvariant::{ @@ -34,7 +35,7 @@ use crate::prelude::*; use crate::util::future::Until; use crate::util::io::open_file; use crate::util::serde::{display_serializable, HandlerExtSerde}; -use crate::util::sync::{SyncMutex, Watch}; +use crate::util::sync::SyncMutex; use crate::util::Invoke; pub fn network_interface_api() -> ParentHandler { @@ -108,7 +109,7 @@ pub fn network_interface_api() -> ParentHandler { async fn list_interfaces( ctx: RpcContext, ) -> Result, Error> { - Ok(ctx.net_controller.net_iface.ip_info.read()) + Ok(ctx.net_controller.net_iface.ip_info.borrow().clone()) } #[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)] @@ -167,13 +168,16 @@ async fn forget_iface( )] trait NetworkManager { #[zbus(property)] - fn devices(&self) -> Result, Error>; + fn all_devices(&self) -> Result, Error>; #[zbus(signal)] fn device_added(&self) -> Result<(), Error>; #[zbus(signal)] fn device_removed(&self) -> Result<(), Error>; + + #[zbus(signal)] + fn state_changed(&self) -> Result<(), Error>; } mod active_connection { @@ -256,31 +260,38 @@ impl TryFrom for Dhcp4Options { } } -#[proxy( - interface = "org.freedesktop.NetworkManager.Device", - default_service = "org.freedesktop.NetworkManager" -)] -trait Device { - #[zbus(property)] - fn ip_interface(&self) -> Result; +mod device { + use zbus::proxy; + use zbus::zvariant::OwnedObjectPath; - #[zbus(property)] - fn managed(&self) -> Result; + use crate::prelude::*; - #[zbus(property)] - fn active_connection(&self) -> Result; + #[proxy( + interface = "org.freedesktop.NetworkManager.Device", + default_service = "org.freedesktop.NetworkManager" + )] + pub trait Device { + #[zbus(property)] + fn ip_interface(&self) -> Result; - #[zbus(property)] - fn ip4_config(&self) -> Result; + #[zbus(property)] + fn managed(&self) -> Result; - #[zbus(property)] - fn ip6_config(&self) -> Result; + #[zbus(property)] + fn active_connection(&self) -> Result; - #[zbus(property, name = "State")] - fn _state(&self) -> Result; + #[zbus(property)] + fn ip4_config(&self) -> Result; - #[zbus(signal)] - fn state_changed(&self) -> Result<(), Error>; + #[zbus(property)] + fn ip6_config(&self) -> Result; + + #[zbus(property, name = "State")] + fn _state(&self) -> Result; + + #[zbus(signal)] + fn state_changed(&self) -> Result<(), Error>; + } } trait StubStream<'a> { @@ -305,7 +316,7 @@ impl<'a> StubStream<'a> for SignalStream<'a> { } #[instrument(skip_all)] -async fn watcher(write_to: Watch>) { +async fn watcher(write_to: watch::Sender>) { loop { let res: Result<(), Error> = async { let connection = Connection::system().await?; @@ -313,7 +324,7 @@ async fn watcher(write_to: Watch> let netman_proxy = NetworkManagerProxy::new(&connection).await?; let mut until = Until::new() - .with_stream(netman_proxy.receive_devices_changed().await.stub()) + .with_stream(netman_proxy.receive_all_devices_changed().await.stub()) .with_stream( netman_proxy .receive_device_added() @@ -327,17 +338,24 @@ async fn watcher(write_to: Watch> .await? .into_inner() .stub(), + ) + .with_stream( + netman_proxy + .receive_state_changed() + .await? + .into_inner() + .stub(), ); loop { until .run(async { - let devices = netman_proxy.devices().await?; + let devices = netman_proxy.all_devices().await?; let mut ifaces = BTreeSet::new(); let mut jobs = Vec::new(); for device in devices { let device_proxy = - DeviceProxy::new(&connection, device.clone()).await?; + device::DeviceProxy::new(&connection, device.clone()).await?; let iface = InternedString::intern(device_proxy.ip_interface().await?); if iface.is_empty() { continue; @@ -399,9 +417,9 @@ async fn get_wan_ipv4(iface: &str) -> Result, Error> { #[instrument(skip(connection, device_proxy, write_to))] async fn watch_ip( connection: &Connection, - device_proxy: DeviceProxy<'_>, + device_proxy: device::DeviceProxy<'_>, iface: InternedString, - write_to: &Watch>, + write_to: &watch::Sender>, ) -> Result<(), Error> { let mut until = Until::new() .with_stream( @@ -554,7 +572,7 @@ async fn watch_ip( pub struct NetworkInterfaceController { db: TypedPatchDb, - ip_info: Watch>, + ip_info: watch::Sender>, _watcher: NonDetachingJoinHandle<()>, listeners: SyncMutex>>, } @@ -624,7 +642,7 @@ impl NetworkInterfaceController { Ok(()) } pub fn new(db: TypedPatchDb) -> Self { - let mut ip_info = Watch::new(BTreeMap::new()); + let (ip_info, mut recv) = watch::channel(BTreeMap::new()); Self { db: db.clone(), ip_info: ip_info.clone(), @@ -652,7 +670,7 @@ impl NetworkInterfaceController { let res: Result<(), Error> = async { loop { if let Err(e) = async { - let ip_info = ip_info.read(); + let ip_info = { recv.borrow().clone() }; Self::sync(&db, &ip_info).boxed().await?; Ok::<_, Error>(()) @@ -663,7 +681,7 @@ impl NetworkInterfaceController { tracing::debug!("{e:?}"); } - ip_info.changed().await; + let _ = recv.changed().await; } } .await; @@ -692,7 +710,8 @@ impl NetworkInterfaceController { })?; Ok(NetworkInterfaceListener { _arc: arc, - ip_info: self.ip_info.clone(), + ip_info: self.ip_info.subscribe(), + changed: None, listeners: ListenerMap::new(port), }) } @@ -716,7 +735,8 @@ impl NetworkInterfaceController { })?; Ok(NetworkInterfaceListener { _arc: arc, - ip_info: self.ip_info.clone(), + ip_info: self.ip_info.subscribe(), + changed: None, listeners, }) } @@ -935,8 +955,9 @@ impl ListenerMap { } pub struct NetworkInterfaceListener { - ip_info: Watch>, + ip_info: watch::Receiver>, listeners: ListenerMap, + changed: Option + Send + Sync + 'static>>>, _arc: Arc<()>, } impl NetworkInterfaceListener { @@ -944,29 +965,35 @@ impl NetworkInterfaceListener { self.listeners.port } + fn poll_ip_info_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { + let mut changed = if let Some(changed) = self.changed.take() { + changed + } else { + let mut ip_info = self.ip_info.clone(); + Box::pin(async move { + let _ = ip_info.changed().await; + }) + }; + let res = changed.poll_unpin(cx); + if res.is_pending() { + self.changed = Some(changed); + } + res + } + pub fn poll_accept( &mut self, cx: &mut std::task::Context<'_>, public: bool, ) -> Poll> { - if self.ip_info.poll_changed(cx).is_ready() || public != self.listeners.prev_public { - self.ip_info - .peek(|ip_info| self.listeners.update(ip_info, public))?; + if self.poll_ip_info_changed(cx).is_ready() || public != self.listeners.prev_public { + self.listeners.update(&*self.ip_info.borrow(), public)?; } self.listeners.poll_accept(cx) } pub async fn accept(&mut self, public: bool) -> Result { - #[pin_project::pin_project] - struct Accept<'a>(&'a mut NetworkInterfaceListener, bool); - impl<'a> Future for Accept<'a> { - type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = self.project(); - this.0.poll_accept(cx, *this.1) - } - } - Accept(self, public).await + futures::future::poll_fn(|cx| self.poll_accept(cx, public)).await } } diff --git a/core/startos/src/net/web_server.rs b/core/startos/src/net/web_server.rs index 113273b04..b38a7ee56 100644 --- a/core/startos/src/net/web_server.rs +++ b/core/startos/src/net/web_server.rs @@ -1,20 +1,19 @@ use std::future::Future; use std::net::SocketAddr; use std::ops::Deref; -use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::task::Poll; use std::time::Duration; use axum::Router; -use futures::future::Either; +use futures::future::{BoxFuture, Either}; use futures::FutureExt; use helpers::NonDetachingJoinHandle; use hyper_util::rt::{TokioIo, TokioTimer}; use hyper_util::service::TowerToHyperService; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; use crate::context::{DiagnosticContext, InitContext, InstallContext, RpcContext, SetupContext}; use crate::net::network_interface::NetworkInterfaceListener; @@ -24,7 +23,6 @@ use crate::net::static_server::{ }; use crate::prelude::*; use crate::util::actor::background::BackgroundJobQueue; -use crate::util::sync::Watch; pub struct Accepted { pub https_redirect: bool, @@ -78,19 +76,38 @@ impl Accept for Option { #[pin_project::pin_project] pub struct Acceptor { - acceptor: Watch, + acceptor: (watch::Sender, watch::Receiver), + changed: Option>, } -impl Acceptor { +impl Acceptor { pub fn new(acceptor: A) -> Self { Self { - acceptor: Watch::new(acceptor), + acceptor: watch::channel(acceptor), + changed: None, } } + fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { + let mut changed = if let Some(changed) = self.changed.take() { + changed + } else { + let mut recv = self.acceptor.1.clone(); + async move { + let _ = recv.changed().await; + } + .boxed() + }; + let res = changed.poll_unpin(cx); + if res.is_pending() { + self.changed = Some(changed); + } + res + } + fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - let _ = self.acceptor.poll_changed(cx); + let _ = self.poll_changed(cx); let mut res = Poll::Pending; - self.acceptor.send_if_modified(|a| { + self.acceptor.0.send_if_modified(|a| { res = a.poll_accept(cx); false }); @@ -98,15 +115,7 @@ impl Acceptor { } async fn accept(&mut self) -> Result { - #[pin_project::pin_project] - struct AcceptFut<'a, A: Accept>(&'a mut Acceptor); - impl<'a, A: Accept> Future for AcceptFut<'a, A> { - type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - self.project().0.poll_accept(cx) - } - } - AcceptFut(self).await + std::future::poll_fn(|cx| self.poll_accept(cx)).await } } impl Acceptor> { @@ -130,7 +139,7 @@ impl Acceptor { } pub struct WebServerAcceptorSetter { - acceptor: Watch, + acceptor: watch::Sender, } impl WebServerAcceptorSetter>> { pub fn try_upgrade Result>(&self, f: F) -> Result<(), Error> { @@ -151,7 +160,7 @@ impl WebServerAcceptorSetter>> { } } impl Deref for WebServerAcceptorSetter { - type Target = Watch; + type Target = watch::Sender; fn deref(&self) -> &Self::Target { &self.acceptor } @@ -159,11 +168,11 @@ impl Deref for WebServerAcceptorSetter { pub struct WebServer { shutdown: oneshot::Sender<()>, - router: Watch>, - acceptor: Watch, + router: watch::Sender>, + acceptor: watch::Sender, thread: NonDetachingJoinHandle<()>, } -impl WebServer { +impl WebServer { pub fn acceptor_setter(&self) -> WebServerAcceptorSetter { WebServerAcceptorSetter { acceptor: self.acceptor.clone(), @@ -171,9 +180,8 @@ impl WebServer { } pub fn new(mut acceptor: Acceptor) -> Self { - let acceptor_send = acceptor.acceptor.clone(); - let router = Watch::>::new(None); - let service = router.clone(); + let acceptor_send = acceptor.acceptor.0.clone(); + let (router, service) = watch::channel::>(None); let (shutdown, shutdown_recv) = oneshot::channel(); let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { #[derive(Clone)] @@ -228,7 +236,7 @@ impl WebServer { ), ); } else { - let service = service.read(); + let service = { service.borrow().clone() }; if let Some(service) = service { queue.add_job( graceful.watch( diff --git a/core/startos/src/notifications.rs b/core/startos/src/notifications.rs index 4b45531a4..3ac09de0f 100644 --- a/core/startos/src/notifications.rs +++ b/core/startos/src/notifications.rs @@ -13,11 +13,11 @@ use serde::{Deserialize, Serialize}; use tracing::instrument; use ts_rs::TS; +use crate::backup::BackupReport; use crate::context::{CliContext, RpcContext}; use crate::db::model::DatabaseModel; use crate::prelude::*; use crate::util::serde::HandlerExtSerde; -use crate::{backup::BackupReport, db::model::Database}; // #[command(subcommands(list, delete, delete_before, create))] pub fn notification() -> ParentHandler { diff --git a/core/startos/src/registry/mod.rs b/core/startos/src/registry/mod.rs index b56cedce6..4e1411ea9 100644 --- a/core/startos/src/registry/mod.rs +++ b/core/startos/src/registry/mod.rs @@ -143,7 +143,7 @@ pub fn registry_router(ctx: RegistryContext) -> Router { ) } -impl WebServer { +impl WebServer { pub fn serve_registry(&mut self, ctx: RegistryContext) { self.serve_router(registry_router(ctx)) } diff --git a/core/startos/src/s9pk/v2/manifest.rs b/core/startos/src/s9pk/v2/manifest.rs index 187b2dede..11ea0d9af 100644 --- a/core/startos/src/s9pk/v2/manifest.rs +++ b/core/startos/src/s9pk/v2/manifest.rs @@ -3,7 +3,6 @@ use std::path::Path; use color_eyre::eyre::eyre; use exver::{Version, VersionRange}; -use helpers::const_true; use imbl_value::InternedString; pub use models::PackageId; use models::{mime, ImageId, VolumeId}; diff --git a/core/startos/src/service/effects/net/host.rs b/core/startos/src/service/effects/net/host.rs index 51f9eceec..570d5033d 100644 --- a/core/startos/src/service/effects/net/host.rs +++ b/core/startos/src/service/effects/net/host.rs @@ -1,6 +1,5 @@ use models::{HostId, PackageId}; -use crate::net::host::address::HostAddress; use crate::net::host::Host; use crate::service::effects::callbacks::CallbackHandler; use crate::service::effects::prelude::*; diff --git a/core/startos/src/service/transition/backup.rs b/core/startos/src/service/transition/backup.rs index 0d4116078..6205cdd61 100644 --- a/core/startos/src/service/transition/backup.rs +++ b/core/startos/src/service/transition/backup.rs @@ -15,6 +15,7 @@ use crate::service::ServiceActor; use crate::util::actor::background::BackgroundJobQueue; use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::future::RemoteCancellable; +use crate::util::serde::NoOutput; pub(in crate::service) struct Backup { pub path: PathBuf, @@ -48,7 +49,7 @@ impl Handler for ServiceActor { .mount_backup(path, ReadWrite) .await?; seed.persistent_container - .execute(id, ProcedureName::CreateBackup, Value::Null, None) + .execute::(id, ProcedureName::CreateBackup, Value::Null, None) .await?; backup_guard.unmount(true).await?; diff --git a/core/startos/src/service/transition/restore.rs b/core/startos/src/service/transition/restore.rs index 08f3be942..7061b0c1e 100644 --- a/core/startos/src/service/transition/restore.rs +++ b/core/startos/src/service/transition/restore.rs @@ -11,6 +11,7 @@ use crate::service::ServiceActor; use crate::util::actor::background::BackgroundJobQueue; use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::future::RemoteCancellable; +use crate::util::serde::NoOutput; pub(in crate::service) struct Restore { pub path: PathBuf, @@ -38,7 +39,7 @@ impl Handler for ServiceActor { .mount_backup(path, ReadOnly) .await?; seed.persistent_container - .execute(id, ProcedureName::RestoreBackup, Value::Null, None) + .execute::(id, ProcedureName::RestoreBackup, Value::Null, None) .await?; backup_guard.unmount(true).await?; diff --git a/core/startos/src/update/mod.rs b/core/startos/src/update/mod.rs index 51d8d77ae..d88838d4a 100644 --- a/core/startos/src/update/mod.rs +++ b/core/startos/src/update/mod.rs @@ -20,7 +20,7 @@ use ts_rs::TS; use crate::context::{CliContext, RpcContext}; use crate::disk::mount::filesystem::bind::Bind; use crate::disk::mount::filesystem::block_dev::BlockDev; -use crate::disk::mount::filesystem::efivarfs::{self, EfiVarFs}; +use crate::disk::mount::filesystem::efivarfs::{ EfiVarFs}; use crate::disk::mount::filesystem::overlayfs::OverlayGuard; use crate::disk::mount::filesystem::MountType; use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard}; diff --git a/core/startos/src/util/future.rs b/core/startos/src/util/future.rs index 2ef053fed..c690f9754 100644 --- a/core/startos/src/util/future.rs +++ b/core/startos/src/util/future.rs @@ -1,6 +1,5 @@ use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::task::{Context, Poll, Waker}; +use std::task::{Context, Poll}; use futures::future::{abortable, pending, BoxFuture, FusedFuture}; use futures::stream::{AbortHandle, Abortable, BoxStream}; @@ -8,7 +7,6 @@ use futures::{Future, FutureExt, Stream, StreamExt}; use tokio::sync::watch; use crate::prelude::*; -use crate::util::sync::SyncMutex; #[pin_project::pin_project(PinnedDrop)] pub struct DropSignaling { diff --git a/core/startos/src/util/io.rs b/core/startos/src/util/io.rs index 9018b3344..f0bae7a0a 100644 --- a/core/startos/src/util/io.rs +++ b/core/startos/src/util/io.rs @@ -541,7 +541,6 @@ impl std::io::Read for BackTrackingIO { } BTBuffer::NotBuffering => self.io.read(buf), BTBuffer::Rewound { read } => { - let mut ready = false; if (read.position() as usize) < read.get_ref().len() { let n = std::io::Read::read(read, buf)?; if n != 0 { diff --git a/core/startos/src/util/rpc.rs b/core/startos/src/util/rpc.rs index b2dea340e..f7c91eb82 100644 --- a/core/startos/src/util/rpc.rs +++ b/core/startos/src/util/rpc.rs @@ -3,7 +3,6 @@ use std::path::Path; use clap::Parser; use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; -use url::Url; use crate::context::CliContext; use crate::prelude::*; diff --git a/core/startos/src/util/sync.rs b/core/startos/src/util/sync.rs index 2bf1e767d..2630858a9 100644 --- a/core/startos/src/util/sync.rs +++ b/core/startos/src/util/sync.rs @@ -1,9 +1,3 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; -use std::task::{Poll, Waker}; - #[derive(Debug, Default)] pub struct SyncMutex(std::sync::Mutex); impl SyncMutex { @@ -17,122 +11,3 @@ impl SyncMutex { f(&*self.0.lock().unwrap()) } } - -struct WatchData { - data: T, - wakers: Vec, -} - -struct Shared { - data: SyncMutex>, - version: AtomicUsize, -} - -pub struct Watch { - data: Arc>, - seen_version: usize, -} -impl Clone for Watch { - fn clone(&self) -> Self { - Self { - data: self.data.clone(), - seen_version: self.seen_version, - } - } -} -impl Watch { - pub fn new(init: T) -> Self { - Self { - data: Arc::new(Shared { - data: SyncMutex::new(WatchData { - data: init, - wakers: Vec::new(), - }), - version: AtomicUsize::new(1), - }), - seen_version: 1, - } - } - - pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { - let mut guard = self.data.data.0.lock().unwrap(); - let version = self.data.version.load(std::sync::atomic::Ordering::SeqCst); - if version > self.seen_version { - self.seen_version = version; - return Poll::Ready(()); - } - let waker = cx.waker(); - if !guard.wakers.iter().any(|w| w.will_wake(waker)) { - guard.wakers.push(waker.clone()); - } - Poll::Pending - } - - pub async fn changed(&mut self) { - #[pin_project::pin_project] - struct Changed<'a, T>(&'a mut Watch); - impl<'a, T> Future for Changed<'a, T> { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = self.project(); - this.0.poll_changed(cx) - } - } - Changed(self).await - } - - pub fn mark_changed(&mut self) { - self.seen_version = 0; - } - - pub fn peek(&self, peek: F) -> U - where - F: FnOnce(&T) -> U, - { - self.data.data.peek(|d| peek(&d.data)) - } - - pub fn send_if_modified(&self, modify: F) -> bool - where - F: FnOnce(&mut T) -> bool, - { - let mut guard = self.data.data.0.lock().unwrap(); - let changed = modify(&mut guard.data); - if changed { - self.data - .version - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - for waker in guard.wakers.drain(..) { - waker.wake(); - } - } - changed - } - - pub fn send_modify(&self, modify: F) - where - F: FnOnce(&mut T), - { - self.send_if_modified(|x| { - modify(x); - true - }); - } - - pub fn send_replace(&self, mut value: T) -> T { - self.send_modify(|x| { - std::mem::swap(x, &mut value); - }); - value - } - - pub fn send(&self, value: T) { - self.send_replace(value); - } -} - -impl Watch { - pub fn read(&self) -> T { - self.data.data.0.lock().unwrap().data.clone() - } -}