diff --git a/README.md b/README.md index c0a1a6e8..639f3c5f 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ Provides the following features: - Client/Server connection management - Authentication and encryption, checkout [renetcode](https://github.com/lucaspoffo/renet/tree/master/renetcode) - Multiple types of channels: - - Reliable Ordered: garantee ordering and delivery of all messages - - Unreliable Unordered: messages that don't require any garantee of delivery or ordering - - Block Reliable: for bigger messages, such as level initialization + - Reliable: garantee delivery of all messages + - Unreliable: messages that don't require any garantee of delivery or ordering + - Chunk Reliable: slice big messages to be sent in multiple frames (e.g. level initialization) - Packet fragmention and reassembly Sections: diff --git a/rechannel/src/channel/block.rs b/rechannel/src/channel/block.rs index 33bf92c1..5009c647 100644 --- a/rechannel/src/channel/block.rs +++ b/rechannel/src/channel/block.rs @@ -29,10 +29,12 @@ struct PacketSent { slice_ids: Vec, } -/// Configuration for a block channel, used for sending big and reliable messages, -/// that are not so frequent, level initialization as an example. Messages are sent one at a time. +/// Configuration for a chunk channel, used for sending big and reliable messages, +/// that are not so frequent, level initialization as an example. +/// The message is sliced in multiple chunks so it can be sent in multiple frames, +/// instead of sending all of it in one packet. One message in flight at a time. #[derive(Debug, Clone)] -pub struct BlockChannelConfig { +pub struct ChunkChannelConfig { /// Channel identifier, unique between all channels. pub channel_id: u8, /// Data is sliced up into fragments of this size (bytes). @@ -44,10 +46,10 @@ pub struct BlockChannelConfig { pub sent_packet_buffer_size: usize, /// Maximum nuber of bytes that this channel is allowed to write per packet. pub packet_budget: u64, - /// Maximum size that a message can have in this channel, for the block channel this value + /// Maximum size that a message can have in this channel, for the chunk channel this value /// can be above the packet budget. pub max_message_size: u64, - /// Queue size for the block channel. + /// Queue size for the chunk channel. pub message_send_queue_size: usize, } @@ -65,7 +67,7 @@ enum Sending { } #[derive(Debug)] -pub struct SendBlockChannel { +pub struct SendChunkChannel { channel_id: u8, chunk_id: u16, sending: Sending, @@ -92,7 +94,7 @@ enum Receiving { } #[derive(Debug)] -pub struct ReceiveBlockChannel { +pub struct ReceiveChunkChannel { channel_id: u8, receiving: Receiving, messages_received: VecDeque, @@ -102,7 +104,7 @@ pub struct ReceiveBlockChannel { error: Option, } -impl Default for BlockChannelConfig { +impl Default for ChunkChannelConfig { fn default() -> Self { Self { channel_id: 2, @@ -126,8 +128,8 @@ impl PacketSent { } } -impl SendBlockChannel { - pub fn new(config: BlockChannelConfig) -> Self { +impl SendChunkChannel { + pub fn new(config: ChunkChannelConfig) -> Self { assert!((config.slice_size as u64) <= config.packet_budget); Self { @@ -210,7 +212,7 @@ impl SendBlockChannel { } } -impl SendChannel for SendBlockChannel { +impl SendChannel for SendChunkChannel { fn get_messages_to_send(&mut self, available_bytes: u64, sequence: u16, current_time: Duration) -> Option { if let Sending::No = self.sending { if let Some(message) = self.messages_to_send.pop_front() { @@ -221,7 +223,7 @@ impl SendChannel for SendBlockChannel { let slice_messages: Vec = match self.generate_slice_packets(available_bytes, current_time) { Ok(messages) => messages, Err(e) => { - log::error!("Failed serialize message in block channel {}: {}", self.channel_id, e); + log::error!("Failed serialize message in chunk channel {}: {}", self.channel_id, e); self.error = Some(ChannelError::FailedToSerialize); return None; } @@ -241,7 +243,7 @@ impl SendChannel for SendBlockChannel { messages.push(message); } Err(e) => { - error!("Failed to serialize message in block message {}: {}", self.channel_id, e); + error!("Failed to serialize message in chunk message {}: {}", self.channel_id, e); self.error = Some(ChannelError::FailedToSerialize); return None; } @@ -285,7 +287,7 @@ impl SendChannel for SendBlockChannel { if num_acked_slices == num_slices { self.sending = Sending::No; - info!("Finished sending block message {}.", self.chunk_id); + info!("Finished sending chunk message {}.", self.chunk_id); self.chunk_id = self.chunk_id.wrapping_add(1); } } @@ -299,14 +301,14 @@ impl SendChannel for SendBlockChannel { } if payload.is_empty() { - log::error!("Tried to send empty block message"); + log::error!("Tried to send empty chunk message"); self.error = Some(ChannelError::SentEmptyMessage); return; } if payload.len() as u64 > self.max_message_size { log::error!( - "Tried to send block message with size above the limit, got {} bytes, expected less than {}", + "Tried to send chunk message with size above the limit, got {} bytes, expected less than {}", payload.len(), self.max_message_size ); @@ -317,7 +319,7 @@ impl SendChannel for SendBlockChannel { if matches!(self.sending, Sending::Yes { .. }) { if self.messages_to_send.len() >= self.message_send_queue_size { log::error!( - "Tried to send block message but the message queue is full, the limit is {} messages.", + "Tried to send chunk message but the message queue is full, the limit is {} messages.", self.message_send_queue_size ); self.error = Some(ChannelError::SendQueueFull); @@ -352,8 +354,8 @@ impl SendChannel for SendBlockChannel { } } -impl ReceiveBlockChannel { - pub fn new(config: BlockChannelConfig) -> Self { +impl ReceiveChunkChannel { + pub fn new(config: ChunkChannelConfig) -> Self { assert!((config.slice_size as u64) <= config.packet_budget); Self { @@ -370,7 +372,7 @@ impl ReceiveBlockChannel { fn process_slice_message(&mut self, message: &SliceMessage) -> Result, ChannelError> { if matches!(self.receiving, Receiving::No) { if message.num_slices == 0 { - error!("Cannot initialize block message with zero slices."); + error!("Cannot initialize chunk message with zero slices."); return Err(ChannelError::InvalidSliceMessage); } @@ -384,7 +386,7 @@ impl ReceiveBlockChannel { let total_size = message.num_slices as u64 * self.slice_size as u64; if total_size > self.max_message_size { error!( - "Cannot initialize block message above the channel limit size, got {}, expected less than {}", + "Cannot initialize chunk message above the channel limit size, got {}, expected less than {}", total_size, self.max_message_size ); return Err(ChannelError::ReceivedMessageAboveMaxSize); @@ -400,7 +402,7 @@ impl ReceiveBlockChannel { chunk_data: vec![0; num_slices * self.slice_size], }; info!( - "Receiving Block message with id {} with {} slices.", + "Receiving chunk message with id {} with {} slices.", message.chunk_id, message.num_slices ); } @@ -476,11 +478,11 @@ impl ReceiveBlockChannel { if *num_received_slices == *num_slices { info!("Received all slices for chunk {}.", chunk_id); - let block = mem::take(chunk_data); + let message = mem::take(chunk_data); let last_chunk_id = *chunk_id; self.last_chunk_id = Some(last_chunk_id); self.receiving = Receiving::No; - return Ok(Some(block)); + return Ok(Some(message)); } Ok(None) @@ -489,7 +491,7 @@ impl ReceiveBlockChannel { } } -impl ReceiveChannel for ReceiveBlockChannel { +impl ReceiveChannel for ReceiveChunkChannel { fn process_messages(&mut self, messages: Vec) { if self.error.is_some() { return; @@ -531,13 +533,13 @@ mod tests { fn split_chunk() { let current_time = Duration::ZERO; const SLICE_SIZE: usize = 10; - let config = BlockChannelConfig { + let config = ChunkChannelConfig { slice_size: SLICE_SIZE, packet_budget: 30, ..Default::default() }; - let mut send_channel = SendBlockChannel::new(config.clone()); - let mut receive_channel = ReceiveBlockChannel::new(config); + let mut send_channel = SendChunkChannel::new(config.clone()); + let mut receive_channel = ReceiveChunkChannel::new(config); let message = Bytes::from(vec![255u8; 30]); send_channel.send_message(message.clone(), Duration::ZERO); @@ -556,10 +558,10 @@ mod tests { } #[test] - fn block_chunk() { - let config = BlockChannelConfig::default(); - let mut send_channel = SendBlockChannel::new(config.clone()); - let mut receive_channel = ReceiveBlockChannel::new(config); + fn big_chungus() { + let config = ChunkChannelConfig::default(); + let mut send_channel = SendChunkChannel::new(config.clone()); + let mut receive_channel = ReceiveChunkChannel::new(config); let payload = Bytes::from(vec![7u8; 102400]); @@ -583,14 +585,14 @@ mod tests { } #[test] - fn block_channel_queue() { + fn chunk_channel_queue() { let current_time = Duration::ZERO; - let config = BlockChannelConfig { + let config = ChunkChannelConfig { resend_time: Duration::ZERO, ..Default::default() }; - let mut send_channel = SendBlockChannel::new(config.clone()); - let mut receive_channel = ReceiveBlockChannel::new(config); + let mut send_channel = SendChunkChannel::new(config.clone()); + let mut receive_channel = ReceiveChunkChannel::new(config); let first_message = Bytes::from(vec![3; 2000]); let second_message = Bytes::from(vec![5; 2000]); @@ -598,17 +600,17 @@ mod tests { send_channel.send_message(second_message.clone(), current_time); // First message - let block_channel_data = send_channel.get_messages_to_send(u64::MAX, 0, current_time).unwrap(); - assert!(!block_channel_data.messages.is_empty()); - receive_channel.process_messages(block_channel_data.messages); + let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 0, current_time).unwrap(); + assert!(!chunk_channel_data.messages.is_empty()); + receive_channel.process_messages(chunk_channel_data.messages); let received_first_message = receive_channel.receive_message().unwrap(); assert_eq!(first_message, received_first_message); send_channel.process_ack(0); // Second message - let block_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap(); - assert!(!block_channel_data.messages.is_empty()); - receive_channel.process_messages(block_channel_data.messages); + let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap(); + assert!(!chunk_channel_data.messages.is_empty()); + receive_channel.process_messages(chunk_channel_data.messages); let received_second_message = receive_channel.receive_message().unwrap(); assert_eq!(second_message, received_second_message); send_channel.process_ack(1); @@ -620,11 +622,11 @@ mod tests { #[test] fn acking_packet_with_old_chunk_id() { let current_time = Duration::ZERO; - let config = BlockChannelConfig { + let config = ChunkChannelConfig { resend_time: Duration::ZERO, ..Default::default() }; - let mut send_channel = SendBlockChannel::new(config); + let mut send_channel = SendChunkChannel::new(config); let first_message = Bytes::from(vec![5; 400 * 3]); let second_message = Bytes::from(vec![3; 400]); send_channel.send_message(first_message, current_time); @@ -644,8 +646,8 @@ mod tests { } #[test] - fn initialize_block_with_zero_slices() { - let mut receive_channel = ReceiveBlockChannel::new(Default::default()); + fn initialize_message_with_zero_slices() { + let mut receive_channel = ReceiveChunkChannel::new(Default::default()); let slice_message = SliceMessage { chunk_id: 0, slice_id: 0, @@ -659,36 +661,36 @@ mod tests { #[test] fn ignore_already_received_messages() { let current_time = Duration::ZERO; - let config = BlockChannelConfig { + let config = ChunkChannelConfig { slice_size: 10, ..Default::default() }; - let mut send_channel = SendBlockChannel::new(config.clone()); - let mut receive_channel = ReceiveBlockChannel::new(config); + let mut send_channel = SendChunkChannel::new(config.clone()); + let mut receive_channel = ReceiveChunkChannel::new(config); send_channel.send_message(Bytes::from(vec![3; 20]), current_time); send_channel.send_message(Bytes::from(vec![3; 20]), current_time); // First message - let first_block_channel_data = send_channel.get_messages_to_send(15, 0, current_time).unwrap(); - receive_channel.process_messages(first_block_channel_data.messages); + let first_chunk_channel_data = send_channel.get_messages_to_send(15, 0, current_time).unwrap(); + receive_channel.process_messages(first_chunk_channel_data.messages); send_channel.process_ack(0); assert!(receive_channel.receive_message().is_none()); - let first_block_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap(); - receive_channel.process_messages(first_block_channel_data.messages.clone()); + let first_chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap(); + receive_channel.process_messages(first_chunk_channel_data.messages.clone()); send_channel.process_ack(1); assert!(receive_channel.receive_message().is_some()); // Second message - let second_block_channel_data = send_channel.get_messages_to_send(20, 2, current_time).unwrap(); - receive_channel.process_messages(second_block_channel_data.messages); + let second_chunk_channel_data = send_channel.get_messages_to_send(20, 2, current_time).unwrap(); + receive_channel.process_messages(second_chunk_channel_data.messages); send_channel.process_ack(2); - let second_block_channel_data = send_channel.get_messages_to_send(u64::MAX, 3, current_time).unwrap(); - receive_channel.process_messages(second_block_channel_data.messages.clone()); + let second_chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 3, current_time).unwrap(); + receive_channel.process_messages(second_chunk_channel_data.messages.clone()); send_channel.process_ack(3); receive_channel.receive_message().unwrap(); @@ -697,26 +699,26 @@ mod tests { assert!(matches!(receive_channel.receiving, Receiving::No)); // Try receiving old messages - receive_channel.process_messages(first_block_channel_data.messages); + receive_channel.process_messages(first_chunk_channel_data.messages); assert!(matches!(receive_channel.receiving, Receiving::No)); - receive_channel.process_messages(second_block_channel_data.messages); + receive_channel.process_messages(second_chunk_channel_data.messages); assert!(matches!(receive_channel.receiving, Receiving::No)); } #[test] fn test_wrapping_chunk_id() { let current_time = Duration::ZERO; - let config = BlockChannelConfig::default(); - let mut send_channel = SendBlockChannel::new(config.clone()); - let mut receive_channel = ReceiveBlockChannel::new(config); + let config = ChunkChannelConfig::default(); + let mut send_channel = SendChunkChannel::new(config.clone()); + let mut receive_channel = ReceiveChunkChannel::new(config); let message = Bytes::from(vec![3; 20]); for i in 0..100000 { let sequence = (i % u16::MAX as usize) as u16; send_channel.send_message(message.clone(), current_time); - let block_channel_data = send_channel.get_messages_to_send(u64::MAX, sequence, current_time).unwrap(); - receive_channel.process_messages(block_channel_data.messages); + let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, sequence, current_time).unwrap(); + receive_channel.process_messages(chunk_channel_data.messages); assert!(receive_channel.receive_message().is_some()); send_channel.process_ack(sequence); } diff --git a/rechannel/src/channel/mod.rs b/rechannel/src/channel/mod.rs index 58a2dbad..25df6cdb 100644 --- a/rechannel/src/channel/mod.rs +++ b/rechannel/src/channel/mod.rs @@ -4,7 +4,7 @@ pub(crate) mod unreliable; use std::time::Duration; -pub use block::BlockChannelConfig; +pub use block::ChunkChannelConfig; pub use reliable::ReliableChannelConfig; pub use unreliable::UnreliableChannelConfig; @@ -12,7 +12,7 @@ use bytes::Bytes; use crate::{ channel::{ - block::{ReceiveBlockChannel, SendBlockChannel}, + block::{ReceiveChunkChannel, SendChunkChannel}, reliable::{ReceiveReliableChannel, SendReliableChannel}, unreliable::{ReceiveUnreliableChannel, SendUnreliableChannel}, }, @@ -25,7 +25,7 @@ use crate::{ pub enum ChannelConfig { Reliable(ReliableChannelConfig), Unreliable(UnreliableChannelConfig), - Block(BlockChannelConfig), + Chunk(ChunkChannelConfig), } pub(crate) trait SendChannel: std::fmt::Debug { @@ -47,7 +47,7 @@ pub(crate) trait ReceiveChannel: std::fmt::Debug { pub enum DefaultChannel { Reliable, Unreliable, - Block, + Chunk, } impl From for ChannelConfig { @@ -62,9 +62,9 @@ impl From for ChannelConfig { } } -impl From for ChannelConfig { - fn from(config: BlockChannelConfig) -> Self { - Self::Block(config) +impl From for ChannelConfig { + fn from(config: ChunkChannelConfig) -> Self { + Self::Chunk(config) } } @@ -86,9 +86,9 @@ impl ChannelConfig { Box::new(SendReliableChannel::new(config.clone())), Box::new(ReceiveReliableChannel::new(config.clone())), ), - Block(config) => ( - Box::new(SendBlockChannel::new(config.clone())), - Box::new(ReceiveBlockChannel::new(config.clone())), + Chunk(config) => ( + Box::new(SendChunkChannel::new(config.clone())), + Box::new(ReceiveChunkChannel::new(config.clone())), ), } } @@ -97,7 +97,7 @@ impl ChannelConfig { match self { ChannelConfig::Unreliable(config) => config.channel_id, ChannelConfig::Reliable(config) => config.channel_id, - ChannelConfig::Block(config) => config.channel_id, + ChannelConfig::Chunk(config) => config.channel_id, } } } @@ -107,7 +107,7 @@ impl From for u8 { match channel { DefaultChannel::Reliable => 0, DefaultChannel::Unreliable => 1, - DefaultChannel::Block => 2, + DefaultChannel::Chunk => 2, } } } @@ -117,7 +117,7 @@ impl DefaultChannel { vec![ ChannelConfig::Reliable(Default::default()), ChannelConfig::Unreliable(Default::default()), - ChannelConfig::Block(Default::default()), + ChannelConfig::Chunk(Default::default()), ] } } diff --git a/rechannel/src/remote_connection.rs b/rechannel/src/remote_connection.rs index ce088da7..5d3e88d8 100644 --- a/rechannel/src/remote_connection.rs +++ b/rechannel/src/remote_connection.rs @@ -278,7 +278,7 @@ impl RemoteConnection { for send_channel in self.send_channels.values_mut() { if let Some(channel_packet_data) = send_channel.get_messages_to_send(available_bytes, sequence, self.current_time) { available_bytes -= bincode::options().serialized_size(&channel_packet_data)?; - channels_packet_data.push(channel_packet_data) + channels_packet_data.push(channel_packet_data); } } diff --git a/renet/src/config.rs b/renet/src/config.rs index e7c200bb..7144c3e5 100644 --- a/renet/src/config.rs +++ b/renet/src/config.rs @@ -35,7 +35,7 @@ impl Default for RenetConnectionConfig { let channels_config = vec![ ChannelConfig::Reliable(Default::default()), ChannelConfig::Unreliable(Default::default()), - ChannelConfig::Block(Default::default()), + ChannelConfig::Chunk(Default::default()), ]; Self { diff --git a/renet/src/lib.rs b/renet/src/lib.rs index e3ae4038..c8653bd7 100644 --- a/renet/src/lib.rs +++ b/renet/src/lib.rs @@ -5,7 +5,7 @@ mod error; mod network_info; mod server; -pub use rechannel::channel::{BlockChannelConfig, ChannelConfig, DefaultChannel, ReliableChannelConfig, UnreliableChannelConfig}; +pub use rechannel::channel::{ChunkChannelConfig, ChannelConfig, DefaultChannel, ReliableChannelConfig, UnreliableChannelConfig}; pub use rechannel::error::{ChannelError, DisconnectionReason, RechannelError}; pub use renetcode::{generate_random_bytes, ConnectToken, NetcodeError};