Skip to content

Commit

Permalink
added hole punching to lunabot
Browse files Browse the repository at this point in the history
  • Loading branch information
manglemix committed Feb 4, 2025
1 parent 696dd83 commit 6f3b1c5
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 70 deletions.
4 changes: 2 additions & 2 deletions lunabotics/lunabase-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ common = { path = "../common", features = ["godot_urdf"] }
cakap2 = { workspace = true }
bitcode = { workspace = true }
tasker.workspace = true
crossbeam.workspace = true

openh264 = { workspace = true, optional = true }
webrtc = { version = "0.12.0", optional = true}
axum = { workspace = true, optional = true}
crossbeam = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
opus = { workspace = true, optional = true }

[features]
extended = ["streaming_server", "audio_streaming"]
streaming_server = ["webrtc", "axum", "production", "serde_json", "crossbeam"]
streaming_server = ["webrtc", "axum", "production", "serde_json"]
production = ["openh264"]
audio_streaming = ["opus"]
6 changes: 3 additions & 3 deletions lunabotics/lunabot/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ mod production;
#[cfg(not(feature = "production"))]
mod sim;

use std::{fs::File, net::SocketAddr, sync::Arc, time::Duration};
use std::{fs::File, net::{IpAddr, SocketAddr}, sync::Arc, time::Duration};

use common::{FromLunabase, FromLunabot, LunabotStage};
use crossbeam::atomic::AtomicCell;
#[cfg(feature = "production")]
pub use production::{dataviz, Apriltag, CameraInfo, DepthCameraInfo, LunabotApp, IMUInfo};
pub use production::{Apriltag, CameraInfo, DepthCameraInfo, LunabotApp, IMUInfo};
#[cfg(not(feature = "production"))]
pub use sim::{LunasimStdin, LunasimbotApp};
use tasker::tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl LunabotConnected {
}

fn create_packet_builder(
lunabase_address: SocketAddr,
lunabase_address: Option<IpAddr>,
lunabot_stage: Arc<AtomicCell<LunabotStage>>,
max_pong_delay_ms: u64,
) -> (
Expand Down
14 changes: 4 additions & 10 deletions lunabotics/lunabot/src/apps/production.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{net::{IpAddr, SocketAddr}, sync::Arc};

use anyhow::Context;
use camera::enumerate_cameras;
Expand Down Expand Up @@ -36,7 +36,7 @@ mod motors;
mod streaming;
mod rp2040;

pub mod dataviz;
// pub mod dataviz;
// mod audio_streaming;

pub use apriltag::Apriltag;
Expand Down Expand Up @@ -72,10 +72,7 @@ fn subaddress_of(mut addr: SocketAddr, port_offset: u16) -> SocketAddr {
}

pub struct LunabotApp {
pub lunabase_address: SocketAddr,
pub lunabase_streaming_address: Option<SocketAddr>,
#[cfg(feature = "experimental")]
pub lunabase_audio_streaming_address: Option<SocketAddr>,
pub lunabase_address: Option<IpAddr>,
pub max_pong_delay_ms: u64,
pub cameras: FxHashMap<String, CameraInfo>,
pub depth_cameras: FxHashMap<String, DepthCameraInfo>,
Expand Down Expand Up @@ -131,11 +128,8 @@ impl LunabotApp {
let localizer = Localizer::new(robot_chain.clone());
let localizer_ref = localizer.get_ref();
std::thread::spawn(|| localizer.run());
let camera_streaming_address = self
.lunabase_streaming_address
.unwrap_or_else(|| subaddress_of(self.lunabase_address, 1));

camera_streaming(camera_streaming_address);
camera_streaming(self.lunabase_address);

#[cfg(feature = "experimental")]
if let Err(e) = audio_streaming::audio_streaming(
Expand Down
4 changes: 2 additions & 2 deletions lunabotics/lunabot/src/apps/production/dataviz.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{net::{IpAddr, SocketAddr}, sync::Arc};

use super::{depth::enumerate_depth_cameras, subaddress_of};
use anyhow::Context;
Expand All @@ -19,7 +19,7 @@ use crate::{
use super::{create_packet_builder, DepthCameraInfo};

pub struct DatavizApp {
pub lunabase_address: SocketAddr,
pub lunabase_address: Option<IpAddr>,
pub lunabase_data_address: Option<SocketAddr>,
pub max_pong_delay_ms: u64,
pub depth_cameras: FxHashMap<String, DepthCameraInfo>,
Expand Down
44 changes: 30 additions & 14 deletions lunabotics/lunabot/src/apps/production/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
io::{ErrorKind, Read},
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
OnceLock,
Expand All @@ -16,7 +16,7 @@ use openh264::{
};
use spin_sleep::SpinSleeper;
use tasker::parking_lot::RwLock;
use tracing::{error, info};
use tracing::{error, info, warn};

const CAMERA_COL_COUNT: usize = 3;
const CAMERA_ROW_COUNT: usize = 2;
Expand Down Expand Up @@ -132,7 +132,7 @@ impl<'a> Read for DownscaleRgbImageReader<'a> {
}
}

pub fn camera_streaming(lunabase_streaming_address: SocketAddr) {
pub fn camera_streaming(mut lunabase_address: Option<IpAddr>) {
let camera_frame_buffer = vec![
0u8;
CAMERA_RESOLUTION.x as usize
Expand Down Expand Up @@ -162,20 +162,24 @@ pub fn camera_streaming(lunabase_streaming_address: SocketAddr) {
return;
}
};
let udp = match UdpSocket::bind("0.0.0.0:0") {
let udp = match UdpSocket::bind(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), common::ports::CAMERAS)) {
Ok(x) => x,
Err(e) => {
error!("Failed to bind to UDP socket: {e}");
return;
}
};
match udp.connect(lunabase_streaming_address) {
Ok(_) => {}
Err(e) => {
error!("Failed to connect to lunabase streaming address: {e}");
return;
}
if let Err(e) = udp.set_nonblocking(true) {
error!("Failed to set UDP socket to non-blocking: {e}");
return;
}
// match udp.connect(lunabase_streaming_address) {
// Ok(_) => {}
// Err(e) => {
// error!("Failed to connect to lunabase streaming address: {e}");
// return;
// }
// }

std::thread::spawn(move || {
let mut yuv_buffer = YUVBuffer::new(
Expand All @@ -193,6 +197,16 @@ pub fn camera_streaming(lunabase_streaming_address: SocketAddr) {
);

loop {
let Some(ip) = lunabase_address else {
if let Ok((_, addr)) = udp.recv_from(&mut [0u8; 1]) {
if addr.port() == common::ports::CAMERAS {
lunabase_address = Some(addr.ip());
} else {
warn!("Received data from unknown address: {addr}");
}
}
continue;
};
now += now.elapsed();
keyframe = (keyframe + 1) % KEYFRAME_INTERVAL;
if keyframe == 0 {
Expand Down Expand Up @@ -223,13 +237,15 @@ pub fn camera_streaming(lunabase_streaming_address: SocketAddr) {
let nal = layer.nal_unit(nal_i).unwrap();

nal.chunks(1400).for_each(|chunk| {
// println!("{nal_i} {} {:?}", nal.len(), &chunk[1..8]);
if let Err(e) = udp.send(chunk) {
if let Err(e) = udp.send_to(chunk, SocketAddr::new(ip, common::ports::CAMERAS)) {
if e.kind() == ErrorKind::ConnectionRefused {
if lunabase_streaming_address.ip().is_loopback() {
return;
if let Some(ip) = lunabase_address {
if ip.is_loopback() {
return;
}
}
}
lunabase_address = None;
error!("Failed to send stream data to lunabase: {e}");
}
});
Expand Down
29 changes: 11 additions & 18 deletions lunabotics/lunabot/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(result_flattening, array_chunks, iterator_try_collect)]

use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};

use apps::default_max_pong_delay_ms;
use lumpur::LumpurBuilder;
Expand All @@ -17,10 +17,8 @@ mod utils;
lumpur::define_configuration! {
pub enum Commands {
Main {
lunabase_address: SocketAddr,
lunabase_address: Option<IpAddr>,
max_pong_delay_ms: Option<u64>,
lunabase_streaming_address: Option<SocketAddr>,
lunabase_audio_streaming_address: Option<SocketAddr>,
#[serde(default)]
cameras: fxhash::FxHashMap<String, apps::CameraInfo>,
#[serde(default)]
Expand Down Expand Up @@ -91,8 +89,6 @@ fn main() {
Commands::Main {
lunabase_address,
max_pong_delay_ms,
lunabase_streaming_address,
lunabase_audio_streaming_address,
cameras,
depth_cameras,
apriltags,
Expand All @@ -101,7 +97,6 @@ fn main() {
} => {
apps::LunabotApp {
lunabase_address,
lunabase_streaming_address,
max_pong_delay_ms: max_pong_delay_ms.unwrap_or_else(default_max_pong_delay_ms),
#[cfg(feature = "experimental")]
lunabase_audio_streaming_address,
Expand All @@ -113,8 +108,6 @@ fn main() {
.unwrap_or_else(|| "robot-layout/lunabot.json".to_string()),
}
.run();
#[cfg(not(feature = "experimental"))]
let _ = lunabase_audio_streaming_address;
}
#[cfg(feature = "production")]
Commands::Dataviz {
Expand All @@ -124,15 +117,15 @@ fn main() {
depth_cameras,
robot_layout,
} => {
apps::dataviz::DatavizApp {
lunabase_address,
lunabase_data_address,
max_pong_delay_ms: max_pong_delay_ms.unwrap_or_else(default_max_pong_delay_ms),
depth_cameras,
robot_layout: robot_layout
.unwrap_or_else(|| "robot-layout/lunabot.json".to_string()),
}
.run();
// apps::dataviz::DatavizApp {
// lunabase_address,
// lunabase_data_address,
// max_pong_delay_ms: max_pong_delay_ms.unwrap_or_else(default_max_pong_delay_ms),
// depth_cameras,
// robot_layout: robot_layout
// .unwrap_or_else(|| "robot-layout/lunabot.json".to_string()),
// }
// .run();
}
}
}
52 changes: 31 additions & 21 deletions lunabotics/lunabot/src/teleop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
ops::Deref,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -33,7 +33,7 @@ impl PacketBuilder {
}

pub struct LunabaseConn<F> {
pub lunabase_address: SocketAddr,
pub lunabase_address: Option<IpAddr>,
pub on_msg: F,
pub lunabot_stage: Arc<AtomicCell<LunabotStage>>,
}
Expand All @@ -50,19 +50,19 @@ impl<F: FnMut(&[u8]) -> bool + Send + 'static> LunabaseConn<F> {

get_tokio_handle().spawn(async move {
let udp = loop {
let udp = match UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await {
let udp = match UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, common::ports::TELEOP)).await {
Ok(x) => x,
Err(e) => {
error!("Failed to bind to lunabase address: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
if let Err(e) = udp.connect(self.lunabase_address).await {
error!("Failed to connect to lunabase: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// if let Err(e) = udp.connect(self.lunabase_address).await {
// error!("Failed to connect to lunabase: {e}");
// tokio::time::sleep(Duration::from_secs(1)).await;
// continue;
// }
break udp;
};

Expand All @@ -72,14 +72,19 @@ impl<F: FnMut(&[u8]) -> bool + Send + 'static> LunabaseConn<F> {
macro_rules! send {
($data: expr) => {{
loop {
if let Err(e) = udp.send($data).await {
if e.kind() == std::io::ErrorKind::ConnectionRefused {
if self.lunabase_address.ip().is_loopback() {
continue;
if let Some(ip) = self.lunabase_address {
let addr = SocketAddr::new(ip, common::ports::TELEOP);
if let Err(e) = udp.send_to($data, addr).await {
if e.kind() == std::io::ErrorKind::ConnectionRefused {
if let Some(ip) = self.lunabase_address {
if ip.is_loopback() {
continue;
}
}
}
error!("Failed to send data to lunabase: {e}");
continue;
}
error!("Failed to send data to lunabase: {e}");
continue;
}
action = cakap_sm.poll(Event::NoEvent, Instant::now());
break;
Expand Down Expand Up @@ -151,21 +156,26 @@ impl<F: FnMut(&[u8]) -> bool + Send + 'static> LunabaseConn<F> {
} => {
action = cakap_sm.poll(Event::Action(packet), Instant::now());
}
result = udp.recv(&mut buf) => {
let n = match result {
Ok(n) => n,
result = udp.recv_from(&mut buf) => {
let (n, addr) = match result {
Ok(x) => x,
Err(e) => {
if e.kind() == std::io::ErrorKind::ConnectionRefused {
match self.lunabase_address.ip() {
IpAddr::V4(Ipv4Addr::LOCALHOST) | IpAddr::V6(Ipv6Addr::LOCALHOST) => continue,
_ => {}
if let Some(ip) = self.lunabase_address {
if ip.is_loopback() {
continue;
}
}
}
error!("Failed to receive data from lunabase: {e}");
continue;
}
};
// println!("{:?}", &buf[..n]);
if addr.port() != common::ports::TELEOP {
error!("Received data from unexpected address on teleop: {addr}");
continue;
}
self.lunabase_address = Some(addr.ip());
action = cakap_sm.poll(Event::IncomingData(&buf[..n]), Instant::now());
}
}
Expand Down

0 comments on commit 6f3b1c5

Please sign in to comment.