Skip to content

Commit

Permalink
misc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Jan 4, 2025
1 parent 7546c36 commit b249269
Show file tree
Hide file tree
Showing 20 changed files with 123 additions and 221 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ GZIP_BIN := $(shell which pigz || which gzip)
TAR_BIN := $(shell which gtar || which tar)
COMPILED_TARGETS := core/target/$(ARCH)-unknown-linux-musl/release/startbox core/target/$(ARCH)-unknown-linux-musl/release/containerbox system-images/compat/docker-images/$(ARCH).tar system-images/utils/docker-images/$(ARCH).tar system-images/binfmt/docker-images/$(ARCH).tar container-runtime/rootfs.$(ARCH).squashfs
ALL_TARGETS := $(STARTD_SRC) $(ENVIRONMENT_FILE) $(GIT_HASH_FILE) $(VERSION_FILE) $(COMPILED_TARGETS) cargo-deps/$(ARCH)-unknown-linux-musl/release/startos-backup-fs $(shell if [ "$(PLATFORM)" = "raspberrypi" ]; then echo cargo-deps/aarch64-unknown-linux-musl/release/pi-beep; fi) $(shell /bin/bash -c 'if [[ "${ENVIRONMENT}" =~ (^|-)unstable($$|-) ]]; then echo cargo-deps/$(ARCH)-unknown-linux-musl/release/tokio-console; fi') $(PLATFORM_FILE)
REBUILD_TYPES = 1

ifeq ($(REMOTE),)
mkdir = mkdir -p $1
Expand Down Expand Up @@ -226,7 +227,7 @@ container-runtime/node_modules/.package-lock.json: container-runtime/package.jso
npm --prefix container-runtime ci
touch container-runtime/node_modules/.package-lock.json

sdk/base/lib/osBindings/index.ts: core/startos/bindings/index.ts
sdk/base/lib/osBindings/index.ts: $(shell if [ "$(REBUILD_TYPES)" -ne 0 ]; then echo core/startos/bindings/index.ts; fi)
mkdir -p sdk/base/lib/osBindings
rsync -ac --delete core/startos/bindings/ sdk/base/lib/osBindings/
touch sdk/base/lib/osBindings/index.ts
Expand Down
1 change: 0 additions & 1 deletion core/startos/src/action.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::BTreeMap;
use std::fmt;

use clap::{CommandFactory, FromArgMatches, Parser};
Expand Down
1 change: 0 additions & 1 deletion core/startos/src/backup/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::disk::mount::backup::BackupMountGuard;
use crate::disk::mount::filesystem::ReadWrite;
use crate::disk::mount::guard::{GenericMountGuard, TmpMountGuard};
use crate::init::{init, InitResult};
use crate::net::web_server::WebServer;
use crate::prelude::*;
use crate::s9pk::S9pk;
use crate::service::service_map::DownloadInstallFuture;
Expand Down
2 changes: 1 addition & 1 deletion core/startos/src/context/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::init::check_time_is_synchronized;
use crate::lxc::{ContainerId, LxcContainer, LxcManager};
use crate::net::net_controller::{NetController, PreInitNetController};
use crate::net::utils::{find_eth_iface, find_wifi_iface};
use crate::net::web_server::{UpgradableListener, WebServer, WebServerAcceptorSetter};
use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter};
use crate::net::wifi::WpaCli;
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle};
Expand Down
1 change: 0 additions & 1 deletion core/startos/src/diagnostic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::path::Path;
use std::sync::Arc;

use rpc_toolkit::yajrc::RpcError;
Expand Down
5 changes: 2 additions & 3 deletions core/startos/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::{Duration, SystemTime};

use axum::extract::ws::{self};
use color_eyre::eyre::eyre;
use futures::future::Either;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use models::ResultExt;
Expand All @@ -26,7 +25,7 @@ use crate::db::model::Database;
use crate::disk::mount::util::unmount;
use crate::middleware::auth::LOCAL_AUTH_COOKIE_PATH;
use crate::net::net_controller::PreInitNetController;
use crate::net::web_server::{UpgradableListener, WebServer, WebServerAcceptorSetter};
use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter};
use crate::prelude::*;
use crate::progress::{
FullProgress, FullProgressTracker, PhaseProgressTrackerHandle, PhasedProgressBar,
Expand Down Expand Up @@ -359,7 +358,7 @@ pub async fn init(
account.tor_key,
)
.await?;
webserver.try_upgrade(|a| net_ctrl.net_iface.upgrade_listener(a));
webserver.try_upgrade(|a| net_ctrl.net_iface.upgrade_listener(a))?;
start_net.complete();

mount_logs.start();
Expand Down
1 change: 0 additions & 1 deletion core/startos/src/net/net_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use color_eyre::eyre::eyre;
use imbl::OrdMap;
use imbl_value::InternedString;
use ipnet::IpNet;
use itertools::Itertools;
use models::{HostId, OptionExt, PackageId};
use torut::onion::{OnionAddressV3, TorSecretKeyV3};
use tracing::instrument;
Expand Down
123 changes: 75 additions & 48 deletions core/startos/src/net/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::Command;
use tokio::sync::watch;
use ts_rs::TS;
use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream};
use zbus::zvariant::{
Expand All @@ -34,7 +35,7 @@ use crate::prelude::*;
use crate::util::future::Until;
use crate::util::io::open_file;
use crate::util::serde::{display_serializable, HandlerExtSerde};
use crate::util::sync::{SyncMutex, Watch};
use crate::util::sync::SyncMutex;
use crate::util::Invoke;

pub fn network_interface_api<C: Context>() -> ParentHandler<C> {
Expand Down Expand Up @@ -108,7 +109,7 @@ pub fn network_interface_api<C: Context>() -> ParentHandler<C> {
async fn list_interfaces(
ctx: RpcContext,
) -> Result<BTreeMap<InternedString, NetworkInterfaceInfo>, Error> {
Ok(ctx.net_controller.net_iface.ip_info.read())
Ok(ctx.net_controller.net_iface.ip_info.borrow().clone())
}

#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
Expand Down Expand Up @@ -167,13 +168,16 @@ async fn forget_iface(
)]
trait NetworkManager {
#[zbus(property)]
fn devices(&self) -> Result<Vec<OwnedObjectPath>, Error>;
fn all_devices(&self) -> Result<Vec<OwnedObjectPath>, Error>;

#[zbus(signal)]
fn device_added(&self) -> Result<(), Error>;

#[zbus(signal)]
fn device_removed(&self) -> Result<(), Error>;

#[zbus(signal)]
fn state_changed(&self) -> Result<(), Error>;
}

mod active_connection {
Expand Down Expand Up @@ -256,31 +260,38 @@ impl TryFrom<OwnedValue> for Dhcp4Options {
}
}

#[proxy(
interface = "org.freedesktop.NetworkManager.Device",
default_service = "org.freedesktop.NetworkManager"
)]
trait Device {
#[zbus(property)]
fn ip_interface(&self) -> Result<String, Error>;
mod device {
use zbus::proxy;
use zbus::zvariant::OwnedObjectPath;

#[zbus(property)]
fn managed(&self) -> Result<bool, Error>;
use crate::prelude::*;

#[zbus(property)]
fn active_connection(&self) -> Result<OwnedObjectPath, Error>;
#[proxy(
interface = "org.freedesktop.NetworkManager.Device",
default_service = "org.freedesktop.NetworkManager"
)]
pub trait Device {
#[zbus(property)]
fn ip_interface(&self) -> Result<String, Error>;

#[zbus(property)]
fn ip4_config(&self) -> Result<OwnedObjectPath, Error>;
#[zbus(property)]
fn managed(&self) -> Result<bool, Error>;

#[zbus(property)]
fn ip6_config(&self) -> Result<OwnedObjectPath, Error>;
#[zbus(property)]
fn active_connection(&self) -> Result<OwnedObjectPath, Error>;

#[zbus(property, name = "State")]
fn _state(&self) -> Result<u32, Error>;
#[zbus(property)]
fn ip4_config(&self) -> Result<OwnedObjectPath, Error>;

#[zbus(signal)]
fn state_changed(&self) -> Result<(), Error>;
#[zbus(property)]
fn ip6_config(&self) -> Result<OwnedObjectPath, Error>;

#[zbus(property, name = "State")]
fn _state(&self) -> Result<u32, Error>;

#[zbus(signal)]
fn state_changed(&self) -> Result<(), Error>;
}
}

trait StubStream<'a> {
Expand All @@ -305,15 +316,15 @@ impl<'a> StubStream<'a> for SignalStream<'a> {
}

#[instrument(skip_all)]
async fn watcher(write_to: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>) {
async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>) {
loop {
let res: Result<(), Error> = async {
let connection = Connection::system().await?;

let netman_proxy = NetworkManagerProxy::new(&connection).await?;

let mut until = Until::new()
.with_stream(netman_proxy.receive_devices_changed().await.stub())
.with_stream(netman_proxy.receive_all_devices_changed().await.stub())
.with_stream(
netman_proxy
.receive_device_added()
Expand All @@ -327,17 +338,24 @@ async fn watcher(write_to: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>
.await?
.into_inner()
.stub(),
)
.with_stream(
netman_proxy
.receive_state_changed()
.await?
.into_inner()
.stub(),
);

loop {
until
.run(async {
let devices = netman_proxy.devices().await?;
let devices = netman_proxy.all_devices().await?;
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for device in devices {
let device_proxy =
DeviceProxy::new(&connection, device.clone()).await?;
device::DeviceProxy::new(&connection, device.clone()).await?;
let iface = InternedString::intern(device_proxy.ip_interface().await?);
if iface.is_empty() {
continue;
Expand Down Expand Up @@ -399,9 +417,9 @@ async fn get_wan_ipv4(iface: &str) -> Result<Option<Ipv4Addr>, Error> {
#[instrument(skip(connection, device_proxy, write_to))]
async fn watch_ip(
connection: &Connection,
device_proxy: DeviceProxy<'_>,
device_proxy: device::DeviceProxy<'_>,
iface: InternedString,
write_to: &Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
write_to: &watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>,
) -> Result<(), Error> {
let mut until = Until::new()
.with_stream(
Expand Down Expand Up @@ -554,7 +572,7 @@ async fn watch_ip(

pub struct NetworkInterfaceController {
db: TypedPatchDb<Database>,
ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
ip_info: watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>,
_watcher: NonDetachingJoinHandle<()>,
listeners: SyncMutex<BTreeMap<u16, Weak<()>>>,
}
Expand Down Expand Up @@ -624,7 +642,7 @@ impl NetworkInterfaceController {
Ok(())
}
pub fn new(db: TypedPatchDb<Database>) -> Self {
let mut ip_info = Watch::new(BTreeMap::new());
let (ip_info, mut recv) = watch::channel(BTreeMap::new());
Self {
db: db.clone(),
ip_info: ip_info.clone(),
Expand Down Expand Up @@ -652,7 +670,7 @@ impl NetworkInterfaceController {
let res: Result<(), Error> = async {
loop {
if let Err(e) = async {
let ip_info = ip_info.read();
let ip_info = { recv.borrow().clone() };
Self::sync(&db, &ip_info).boxed().await?;

Ok::<_, Error>(())
Expand All @@ -663,7 +681,7 @@ impl NetworkInterfaceController {
tracing::debug!("{e:?}");
}

ip_info.changed().await;
let _ = recv.changed().await;
}
}
.await;
Expand Down Expand Up @@ -692,7 +710,8 @@ impl NetworkInterfaceController {
})?;
Ok(NetworkInterfaceListener {
_arc: arc,
ip_info: self.ip_info.clone(),
ip_info: self.ip_info.subscribe(),
changed: None,
listeners: ListenerMap::new(port),
})
}
Expand All @@ -716,7 +735,8 @@ impl NetworkInterfaceController {
})?;
Ok(NetworkInterfaceListener {
_arc: arc,
ip_info: self.ip_info.clone(),
ip_info: self.ip_info.subscribe(),
changed: None,
listeners,
})
}
Expand Down Expand Up @@ -935,38 +955,45 @@ impl ListenerMap {
}

pub struct NetworkInterfaceListener {
ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
ip_info: watch::Receiver<BTreeMap<InternedString, NetworkInterfaceInfo>>,
listeners: ListenerMap,
changed: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
_arc: Arc<()>,
}
impl NetworkInterfaceListener {
pub fn port(&self) -> u16 {
self.listeners.port
}

fn poll_ip_info_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() {
changed
} else {
let mut ip_info = self.ip_info.clone();
Box::pin(async move {
let _ = ip_info.changed().await;
})
};
let res = changed.poll_unpin(cx);
if res.is_pending() {
self.changed = Some(changed);
}
res
}

pub fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
public: bool,
) -> Poll<Result<Accepted, Error>> {
if self.ip_info.poll_changed(cx).is_ready() || public != self.listeners.prev_public {
self.ip_info
.peek(|ip_info| self.listeners.update(ip_info, public))?;
if self.poll_ip_info_changed(cx).is_ready() || public != self.listeners.prev_public {
self.listeners.update(&*self.ip_info.borrow(), public)?;
}
self.listeners.poll_accept(cx)
}

pub async fn accept(&mut self, public: bool) -> Result<Accepted, Error> {
#[pin_project::pin_project]
struct Accept<'a>(&'a mut NetworkInterfaceListener, bool);
impl<'a> Future for Accept<'a> {
type Output = Result<Accepted, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.0.poll_accept(cx, *this.1)
}
}
Accept(self, public).await
futures::future::poll_fn(|cx| self.poll_accept(cx, public)).await
}
}

Expand Down
Loading

0 comments on commit b249269

Please sign in to comment.