Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Nov 27, 2024
1 parent 1079ca2 commit fb76877
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 89 deletions.
28 changes: 4 additions & 24 deletions core/startos/src/db/model/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use chrono::{DateTime, Utc};
use exver::{Version, VersionRange};
use imbl_value::InternedString;
use ipnet::{Ipv4Net, Ipv6Net};
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use isocountry::CountryCode;
use itertools::Itertools;
use models::PackageId;
Expand Down Expand Up @@ -161,30 +161,10 @@ pub struct NetworkInterfaceInfo {
pub ip_info: IpInfo,
}

#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, TS)]
#[ts(export)]
pub struct IpInfo {
#[ts(type = "string | null")]
pub ipv4_range: Option<Ipv4Net>,
pub ipv4: Option<Ipv4Addr>,
#[ts(type = "string | null")]
pub ipv6_range: Option<Ipv6Net>,
pub ipv6: Option<Ipv6Addr>,
}
impl IpInfo {
pub async fn for_interface(iface: &str) -> Result<Self, Error> {
let (ipv4, ipv4_range) = get_iface_ipv4_addr(iface).await?.unzip();
let (ipv6, ipv6_range) = get_iface_ipv6_addr(iface).await?.unzip();
Ok(Self {
ipv4_range,
ipv4,
ipv6_range,
ipv6,
})
}
}
#[ts(type = "string[]")]
pub struct IpInfo(pub BTreeSet<IpNet>);

#[derive(Debug, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
Expand Down
47 changes: 26 additions & 21 deletions core/startos/src/net/net_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::{Arc, Weak};
use color_eyre::eyre::eyre;
use imbl::OrdMap;
use imbl_value::InternedString;
use ipnet::IpNet;
use models::{HostId, OptionExt, PackageId};
use torut::onion::{OnionAddressV3, TorSecretKeyV3};
use tracing::instrument;
Expand Down Expand Up @@ -424,27 +425,31 @@ impl NetService {
}
}
if !iface_info.public || new_lan_bind.0.public {
if let Some(ipv4) = iface_info.ip_info.ipv4 {
bind_hostname_info.push(HostnameInfo::Ip {
network_interface_id: interface.clone(),
public: iface_info.public,
hostname: IpHostname::Ipv4 {
value: ipv4,
port: new_lan_bind.0.assigned_port,
ssl_port: new_lan_bind.0.assigned_ssl_port,
},
});
}
if let Some(ipv6) = iface_info.ip_info.ipv6 {
bind_hostname_info.push(HostnameInfo::Ip {
network_interface_id: interface.clone(),
public: iface_info.public,
hostname: IpHostname::Ipv6 {
value: ipv6,
port: new_lan_bind.0.assigned_port,
ssl_port: new_lan_bind.0.assigned_ssl_port,
},
});
for ipnet in &iface_info.ip_info.0 {
match ipnet {
IpNet::V4(net) => {
bind_hostname_info.push(HostnameInfo::Ip {
network_interface_id: interface.clone(),
public: iface_info.public,
hostname: IpHostname::Ipv4 {
value: net.addr(),
port: new_lan_bind.0.assigned_port,
ssl_port: new_lan_bind.0.assigned_ssl_port,
},
});
}
IpNet::V6(net) => {
bind_hostname_info.push(HostnameInfo::Ip {
network_interface_id: interface.clone(),
public: iface_info.public,
hostname: IpHostname::Ipv6 {
value: net.addr(),
port: new_lan_bind.0.assigned_port,
ssl_port: new_lan_bind.0.assigned_ssl_port,
},
});
}
}
}
}
}
Expand Down
69 changes: 25 additions & 44 deletions core/startos/src/net/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,24 @@ use std::sync::{Arc, Weak};
use std::task::Poll;

use clap::Parser;
use futures::future::{pending, BoxFuture};
use futures::future::pending;
use futures::{FutureExt, TryFutureExt, TryStreamExt};
use imbl_value::InternedString;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use ipnet::IpNet;
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio::sync::{watch, RwLock};
use tokio::sync::watch;
use tokio_stream::StreamExt;
use ts_rs::TS;
use zbus::fdo::PropertiesChangedStream;
use zbus::proxy::PropertyStream;
use zbus::zvariant::{
DeserializeDict, DynamicDeserialize, OwnedObjectPath, OwnedValue, SerializeDict, Type as ZType,
Value as ZValue,
DeserializeDict, OwnedObjectPath, OwnedValue, Type as ZType, Value as ZValue,
};
use zbus::{proxy, Connection};

use crate::context::{CliContext, RpcContext};
use crate::db::model::public::IpInfo;
use crate::db::model::Database;
use crate::net::utils::{iface_is_physical, list_interfaces};
use crate::prelude::*;
use crate::util::actor::background::{BackgroundJobQueue, BackgroundJobRunner};
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::sync::SyncMutex;

#[proxy(
Expand Down Expand Up @@ -67,7 +62,16 @@ struct AddressData {
prefix: u32,
}
impl TryFrom<Vec<AddressData>> for IpInfo {
fn try_from(value: Vec<AddressData>) -> Result<Self, Self::Error> {}
type Error = Error;
fn try_from(value: Vec<AddressData>) -> Result<Self, Self::Error> {
value
.into_iter()
.map(|a| {
IpNet::new(a.address.parse()?, a.prefix as u8).with_kind(ErrorKind::ParseNetAddress)
})
.collect::<Result<_, _>>()
.map(Self)
}
}

#[proxy(
Expand All @@ -81,32 +85,11 @@ trait Device {

#[tokio::test]
async fn test() -> Result<(), Error> {
let connection = Connection::system().await?;

let proxy = NetworkManagerProxy::new(&connection).await?;
let active = proxy
.receive_active_connections_changed()
.await
.next()
.await
.unwrap()
.get()
.await?;
eprintln!("{active:?}");
for active in active {
let proxy = ActiveConnectionProxy::new(&connection, active).await?;
let ip4 = proxy.ip4_config().await?;
eprintln!("{ip4:?}");
let ip_proxy = Ip4ConfigProxy::new(&connection, ip4).await?;
let addresses = ip_proxy.address_data().await?;
eprintln!("{addresses:?}");
let devices = proxy.devices().await?;
eprintln!("{devices:?}");
for device in devices {
let proxy = DeviceProxy::new(&connection, device).await?;
let ifaces = proxy.ip_interface().await?;
eprintln!("{ifaces:?}");
}
let (write_to, mut read_from) = watch::channel(BTreeMap::new());
tokio::task::spawn(watcher(write_to));
loop {
eprintln!("{:?}", &*read_from.borrow());
read_from.changed().await;
}

Ok(())
Expand Down Expand Up @@ -134,7 +117,7 @@ where
&mut self,
fut: Fut,
) -> Result<(), Error> {
let mut next = self.stream.next();
let next = self.stream.next();
tokio::select! {
changed = next => {
self.last = changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?;
Expand All @@ -148,10 +131,8 @@ where
}

async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
let (q, run) = BackgroundJobQueue::new();
loop {
if let Err(e) = async {
let mut jobs = BackgroundJobQueue::new();
let connection = Connection::system().await?;
let netman_proxy = NetworkManagerProxy::new(&connection).await?;

Expand All @@ -175,6 +156,7 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
ifaces.insert(iface.clone());
jobs.push(async {
let ac_proxy = ac_proxy;
let iface = iface;
let mut ip_config_sub = WatchPropertyStream::new(
ac_proxy.receive_ip4_config_changed().await,
)
Expand All @@ -200,7 +182,7 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
addresses.try_into()?;

write_to.send_if_modified(|m| {
m.insert(iface, ip_info.clone())
m.insert(iface.clone(), ip_info.clone())
.filter(|old| old == &ip_info)
.is_none()
});
Expand All @@ -209,14 +191,14 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
})
.await?;
}

Ok::<_, Error>(())
})
.await?;
}

Ok::<_, Error>(())
});
} else {
tracing::warn!("devices.len ({}) is not exactly 1. We're not sure what this means, but it shouldn't happen", devices.len());
}
}
write_to.send_if_modified(|m| {
Expand Down Expand Up @@ -246,7 +228,6 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, IpInfo>>) {
tracing::debug!("{e:?}");
}
}
run.await;
}

pub struct NetworkInterfaceController {
Expand Down

0 comments on commit fb76877

Please sign in to comment.