Skip to content

Commit

Permalink
implement unix domain sockets as transport layer
Browse files Browse the repository at this point in the history
  • Loading branch information
lilioid committed Mar 25, 2024
1 parent 721ac62 commit a269480
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The following features are implemented:
- TCP Transport
- UDP Transport
- WebSocket Transport
- Unix socket Transport
- Live-Streaming of the servers canvas via RTMP/RTSP
- Live-Display of the servers canvas via a window or linux framebuffer device
- Drawing of images (and colored rectangles) on a remote servers canvas
Expand Down
11 changes: 10 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use image::imageops::FilterType;
use rand::prelude::*;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
Expand All @@ -22,7 +24,7 @@ use image::io::Reader as ImageReader;
use itertools::Itertools;
use pixeldike::net::clients::{GenClient, TcpClient};
use pixeldike::net::protocol::{Request, Response};
use pixeldike::net::servers::{GenServer, TcpServer, TcpServerOptions};
use pixeldike::net::servers::{GenServer, TcpServer, TcpServerOptions, UnixSocketOptions, UnixSocketServer};
#[cfg(feature = "udp")]
use pixeldike::net::servers::{UdpServer, UdpServerOptions};
#[cfg(feature = "ws")]
Expand Down Expand Up @@ -204,6 +206,13 @@ async fn start_server(opts: &cli::ServerOpts) {
.expect(&format!("Could not start tcp server on {}", url));
}
}
"unix" => {
let path = PathBuf::from_str(url.path()).expect("Could not turn url path into system path");
UnixSocketServer::new(UnixSocketOptions { path })
.start(pixmap.clone(), &mut join_set)
.await
.expect(&format!("Could not start unix socket listener on {}", url));
}
#[cfg(feature = "udp")]
"udp" => {
if !url.username().is_empty() {
Expand Down
6 changes: 4 additions & 2 deletions src/net/clients/gen_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::net::protocol::{Request, Response};
use async_trait::async_trait;
use std::net::SocketAddr;

/// A trait to unify the different transport protocol clients
///
Expand All @@ -9,8 +8,11 @@ use std::net::SocketAddr;
/// it is recommended to implement your own client.
#[async_trait]
pub trait GenClient: Sized {
/// The parameter given to `connect()` that specifies where to connect to
type ConnectionParam;

/// Create a new client by connecting to a server
async fn connect(addr: SocketAddr) -> std::io::Result<Self>;
async fn connect(addr: Self::ConnectionParam) -> std::io::Result<Self>;

/// Send a request to the connected server
async fn send_request(&mut self, request: Request) -> std::io::Result<()>;
Expand Down
2 changes: 2 additions & 0 deletions src/net/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ mod gen_client;
mod tcp_client;
#[cfg(feature = "udp")]
mod udp_client;
mod unix_socket_client;

pub use gen_client::GenClient;
#[cfg(feature = "tcp")]
pub use tcp_client::TcpClient;
#[cfg(feature = "udp")]
pub use udp_client::UdpClient;
pub use unix_socket_client::UnixSocketClient;
4 changes: 3 additions & 1 deletion src/net/clients/tcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ impl TcpClient {

#[async_trait]
impl GenClient for TcpClient {
async fn connect(addr: SocketAddr) -> std::io::Result<Self> {
type ConnectionParam = SocketAddr;

async fn connect(addr: Self::ConnectionParam) -> std::io::Result<Self> {
let (reader, writer) = TcpStream::connect(addr).await?.into_split();
Ok(Self {
reader: BufReader::new(reader),
Expand Down
4 changes: 3 additions & 1 deletion src/net/clients/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub struct UdpClient {

#[async_trait]
impl GenClient for UdpClient {
async fn connect(addr: SocketAddr) -> std::io::Result<Self> {
type ConnectionParam = SocketAddr;

async fn connect(addr: Self::ConnectionParam) -> std::io::Result<Self> {
let socket = if addr.is_ipv4() {
UdpSocket::bind(SocketAddr::from_str("0.0.0.0:0").unwrap()).await?
} else {
Expand Down
57 changes: 57 additions & 0 deletions src/net/clients/unix_socket_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::net::clients::GenClient;
use crate::net::protocol::{parse_response_str, Request, Response};
use async_trait::async_trait;
use std::path::PathBuf;
use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;

/// A pixelflut client that connects to a unix domain socket and uses buffered read/write for communication with a pixelflut server
#[derive(Debug)]
pub struct UnixSocketClient {
reader: BufReader<OwnedReadHalf>,
writer: BufWriter<OwnedWriteHalf>,
}

impl UnixSocketClient {
/// Flush the write buffer to immediately send all enqueued requests to the server.
async fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush().await
}

/// Get the raw writer that is connected to the pixelflut server.
pub fn get_writer(&mut self) -> &mut BufWriter<impl AsyncWrite> {
&mut self.writer
}
}

#[async_trait]
impl GenClient for UnixSocketClient {
type ConnectionParam = PathBuf;

async fn connect(addr: Self::ConnectionParam) -> std::io::Result<Self> {
let (reader, writer) = UnixStream::connect(addr).await?.into_split();
Ok(Self {
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
})
}

async fn send_request(&mut self, request: Request) -> std::io::Result<()> {
request.write_async(&mut self.writer).await
}

async fn await_response(&mut self) -> anyhow::Result<Response> {
let mut buf = String::with_capacity(32);
self.reader.read_line(&mut buf).await?;
let response = parse_response_str(&buf)?;
Ok(response)
}

async fn exchange(&mut self, request: Request) -> anyhow::Result<Response> {
self.send_request(request).await?;
self.flush().await?;
let response = self.await_response().await?;
Ok(response)
}
}
2 changes: 2 additions & 0 deletions src/net/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use gen_server::GenServer;
mod tcp_server;
#[cfg(feature = "udp")]
mod udp_server;
mod unix_sock_server;
#[cfg(feature = "ws")]
mod ws_server;

Expand All @@ -21,6 +22,7 @@ use crate::pixmap::SharedPixmap;
pub use tcp_server::{TcpServer, TcpServerOptions};
#[cfg(feature = "udp")]
pub use udp_server::{UdpServer, UdpServerOptions};
pub use unix_sock_server::{UnixSocketOptions, UnixSocketServer};
#[cfg(feature = "ws")]
pub use ws_server::{WsServer, WsServerOptions};

Expand Down
113 changes: 113 additions & 0 deletions src/net/servers/unix_sock_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::net::servers::GenServer;
use crate::pixmap::SharedPixmap;
use crate::DaemonResult;
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use std::io::Write;
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::task::{AbortHandle, JoinSet};

/// Options with which the `UnixSocketServer` is configured
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct UnixSocketOptions {
/// The path at which a socket should be created
pub path: PathBuf,
}

/// A server implementation using unix domain sockets to transport pixelflut messages.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct UnixSocketServer {
options: UnixSocketOptions,
}

impl UnixSocketServer {
#[tracing::instrument(skip_all)]
async fn handle_listener(listener: UnixListener, pixmap: SharedPixmap) -> anyhow::Result<!> {
loop {
let (stream, _) = listener.accept().await?;
let pixmap = pixmap.clone();
tokio::spawn(async move {
if let Err(e) = UnixSocketServer::handle_connection(stream, pixmap).await {
tracing::warn!("Got error while handling unix socket stream: {e}");
}
});
}
}

#[tracing::instrument(skip_all)]
async fn handle_connection(mut stream: UnixStream, pixmap: SharedPixmap) -> anyhow::Result<()> {
const MAX_LINE_LEN: usize = 32;
tracing::debug!("Client connected");

let mut req_buf = BytesMut::with_capacity(16 * 1024);
let mut resp_buf = BytesMut::with_capacity(2 * 1024).writer();
loop {
// fill the line buffer from the socket
let n = stream.read_buf(&mut req_buf).await?;
if n == 0 {
return Err(anyhow!("client stream exhausted"));
}
tracing::trace!("Received {}KiB stream data: {:?}", n / 1024, req_buf);

// handle all lines contained in the buffer
while let Some((i, _)) = req_buf.iter().enumerate().find(|(_, &b)| b == b'\n') {
let line = req_buf.split_to(i + 1);
let result = super::handle_request(&line, &pixmap);
match result {
Err(e) => {
resp_buf.write_fmt(format_args!("{}\n", e)).unwrap();
}
Ok(Some(response)) => response.write(&mut resp_buf).unwrap(),
Ok(None) => {}
}
}

// clear the buffer if someone is deliberately not sending a newline
if req_buf.len() > MAX_LINE_LEN {
tracing::warn!(
"Request buffer has {}B but no lines left in it. Client is probably misbehaving.",
req_buf.len()
);
req_buf.clear();
resp_buf.write_all("line too long\n".as_bytes()).unwrap();
}

// write accumulated responses back to the sender
if !resp_buf.get_ref().is_empty() {
tracing::trace!(
"Sending back {}KiB response: {:?}",
resp_buf.get_ref().len() / 1024,
resp_buf.get_ref()
);
stream.write_all_buf(resp_buf.get_mut()).await?;
}
}
}
}

#[async_trait]
impl GenServer for UnixSocketServer {
type Options = UnixSocketOptions;

fn new(options: Self::Options) -> Self {
Self { options }
}

async fn start(
self,
pixmap: SharedPixmap,
join_set: &mut JoinSet<DaemonResult>,
) -> anyhow::Result<AbortHandle> {
let listener = UnixListener::bind(&self.options.path)?;
tracing::info!("Started unix listener on {}", self.options.path.display());

let handle = join_set
.build_task()
.name("unix_listener")
.spawn(async move { UnixSocketServer::handle_listener(listener, pixmap).await })?;
Ok(handle)
}
}

0 comments on commit a269480

Please sign in to comment.