Skip to content

Commit

Permalink
Rechannel: rename block channel to chunk channel
Browse files Browse the repository at this point in the history
This makes clear what the channels does, message is sliced in multiple chunks so it can be sent in multiple frames, also it is not confused with "blocking" thread/logic.
  • Loading branch information
lucaspoffo committed Oct 22, 2022
1 parent a415a5d commit 30411c9
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 83 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ Provides the following features:
- Client/Server connection management
- Authentication and encryption, checkout [renetcode](https://github.com/lucaspoffo/renet/tree/master/renetcode)
- Multiple types of channels:
- Reliable Ordered: garantee ordering and delivery of all messages
- Unreliable Unordered: messages that don't require any garantee of delivery or ordering
- Block Reliable: for bigger messages, such as level initialization
- Reliable: garantee delivery of all messages
- Unreliable: messages that don't require any garantee of delivery or ordering
- Chunk Reliable: slice big messages to be sent in multiple frames (e.g. level initialization)
- Packet fragmention and reassembly

Sections:
Expand Down
130 changes: 66 additions & 64 deletions rechannel/src/channel/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ struct PacketSent {
slice_ids: Vec<u32>,
}

/// Configuration for a block channel, used for sending big and reliable messages,
/// that are not so frequent, level initialization as an example. Messages are sent one at a time.
/// Configuration for a chunk channel, used for sending big and reliable messages,
/// that are not so frequent, level initialization as an example.
/// The message is sliced in multiple chunks so it can be sent in multiple frames,
/// instead of sending all of it in one packet. One message in flight at a time.
#[derive(Debug, Clone)]
pub struct BlockChannelConfig {
pub struct ChunkChannelConfig {
/// Channel identifier, unique between all channels.
pub channel_id: u8,
/// Data is sliced up into fragments of this size (bytes).
Expand All @@ -44,10 +46,10 @@ pub struct BlockChannelConfig {
pub sent_packet_buffer_size: usize,
/// Maximum nuber of bytes that this channel is allowed to write per packet.
pub packet_budget: u64,
/// Maximum size that a message can have in this channel, for the block channel this value
/// Maximum size that a message can have in this channel, for the chunk channel this value
/// can be above the packet budget.
pub max_message_size: u64,
/// Queue size for the block channel.
/// Queue size for the chunk channel.
pub message_send_queue_size: usize,
}

Expand All @@ -65,7 +67,7 @@ enum Sending {
}

#[derive(Debug)]
pub struct SendBlockChannel {
pub struct SendChunkChannel {
channel_id: u8,
chunk_id: u16,
sending: Sending,
Expand All @@ -92,7 +94,7 @@ enum Receiving {
}

#[derive(Debug)]
pub struct ReceiveBlockChannel {
pub struct ReceiveChunkChannel {
channel_id: u8,
receiving: Receiving,
messages_received: VecDeque<Payload>,
Expand All @@ -102,7 +104,7 @@ pub struct ReceiveBlockChannel {
error: Option<ChannelError>,
}

impl Default for BlockChannelConfig {
impl Default for ChunkChannelConfig {
fn default() -> Self {
Self {
channel_id: 2,
Expand All @@ -126,8 +128,8 @@ impl PacketSent {
}
}

impl SendBlockChannel {
pub fn new(config: BlockChannelConfig) -> Self {
impl SendChunkChannel {
pub fn new(config: ChunkChannelConfig) -> Self {
assert!((config.slice_size as u64) <= config.packet_budget);

Self {
Expand Down Expand Up @@ -210,7 +212,7 @@ impl SendBlockChannel {
}
}

impl SendChannel for SendBlockChannel {
impl SendChannel for SendChunkChannel {
fn get_messages_to_send(&mut self, available_bytes: u64, sequence: u16, current_time: Duration) -> Option<ChannelPacketData> {
if let Sending::No = self.sending {
if let Some(message) = self.messages_to_send.pop_front() {
Expand All @@ -221,7 +223,7 @@ impl SendChannel for SendBlockChannel {
let slice_messages: Vec<SliceMessage> = match self.generate_slice_packets(available_bytes, current_time) {
Ok(messages) => messages,
Err(e) => {
log::error!("Failed serialize message in block channel {}: {}", self.channel_id, e);
log::error!("Failed serialize message in chunk channel {}: {}", self.channel_id, e);
self.error = Some(ChannelError::FailedToSerialize);
return None;
}
Expand All @@ -241,7 +243,7 @@ impl SendChannel for SendBlockChannel {
messages.push(message);
}
Err(e) => {
error!("Failed to serialize message in block message {}: {}", self.channel_id, e);
error!("Failed to serialize message in chunk message {}: {}", self.channel_id, e);
self.error = Some(ChannelError::FailedToSerialize);
return None;
}
Expand Down Expand Up @@ -285,7 +287,7 @@ impl SendChannel for SendBlockChannel {

if num_acked_slices == num_slices {
self.sending = Sending::No;
info!("Finished sending block message {}.", self.chunk_id);
info!("Finished sending chunk message {}.", self.chunk_id);
self.chunk_id = self.chunk_id.wrapping_add(1);
}
}
Expand All @@ -299,14 +301,14 @@ impl SendChannel for SendBlockChannel {
}

if payload.is_empty() {
log::error!("Tried to send empty block message");
log::error!("Tried to send empty chunk message");
self.error = Some(ChannelError::SentEmptyMessage);
return;
}

if payload.len() as u64 > self.max_message_size {
log::error!(
"Tried to send block message with size above the limit, got {} bytes, expected less than {}",
"Tried to send chunk message with size above the limit, got {} bytes, expected less than {}",
payload.len(),
self.max_message_size
);
Expand All @@ -317,7 +319,7 @@ impl SendChannel for SendBlockChannel {
if matches!(self.sending, Sending::Yes { .. }) {
if self.messages_to_send.len() >= self.message_send_queue_size {
log::error!(
"Tried to send block message but the message queue is full, the limit is {} messages.",
"Tried to send chunk message but the message queue is full, the limit is {} messages.",
self.message_send_queue_size
);
self.error = Some(ChannelError::SendQueueFull);
Expand Down Expand Up @@ -352,8 +354,8 @@ impl SendChannel for SendBlockChannel {
}
}

impl ReceiveBlockChannel {
pub fn new(config: BlockChannelConfig) -> Self {
impl ReceiveChunkChannel {
pub fn new(config: ChunkChannelConfig) -> Self {
assert!((config.slice_size as u64) <= config.packet_budget);

Self {
Expand All @@ -370,7 +372,7 @@ impl ReceiveBlockChannel {
fn process_slice_message(&mut self, message: &SliceMessage) -> Result<Option<Payload>, ChannelError> {
if matches!(self.receiving, Receiving::No) {
if message.num_slices == 0 {
error!("Cannot initialize block message with zero slices.");
error!("Cannot initialize chunk message with zero slices.");
return Err(ChannelError::InvalidSliceMessage);
}

Expand All @@ -384,7 +386,7 @@ impl ReceiveBlockChannel {
let total_size = message.num_slices as u64 * self.slice_size as u64;
if total_size > self.max_message_size {
error!(
"Cannot initialize block message above the channel limit size, got {}, expected less than {}",
"Cannot initialize chunk message above the channel limit size, got {}, expected less than {}",
total_size, self.max_message_size
);
return Err(ChannelError::ReceivedMessageAboveMaxSize);
Expand All @@ -400,7 +402,7 @@ impl ReceiveBlockChannel {
chunk_data: vec![0; num_slices * self.slice_size],
};
info!(
"Receiving Block message with id {} with {} slices.",
"Receiving chunk message with id {} with {} slices.",
message.chunk_id, message.num_slices
);
}
Expand Down Expand Up @@ -476,11 +478,11 @@ impl ReceiveBlockChannel {

if *num_received_slices == *num_slices {
info!("Received all slices for chunk {}.", chunk_id);
let block = mem::take(chunk_data);
let message = mem::take(chunk_data);
let last_chunk_id = *chunk_id;
self.last_chunk_id = Some(last_chunk_id);
self.receiving = Receiving::No;
return Ok(Some(block));
return Ok(Some(message));
}

Ok(None)
Expand All @@ -489,7 +491,7 @@ impl ReceiveBlockChannel {
}
}

impl ReceiveChannel for ReceiveBlockChannel {
impl ReceiveChannel for ReceiveChunkChannel {
fn process_messages(&mut self, messages: Vec<Payload>) {
if self.error.is_some() {
return;
Expand Down Expand Up @@ -531,13 +533,13 @@ mod tests {
fn split_chunk() {
let current_time = Duration::ZERO;
const SLICE_SIZE: usize = 10;
let config = BlockChannelConfig {
let config = ChunkChannelConfig {
slice_size: SLICE_SIZE,
packet_budget: 30,
..Default::default()
};
let mut send_channel = SendBlockChannel::new(config.clone());
let mut receive_channel = ReceiveBlockChannel::new(config);
let mut send_channel = SendChunkChannel::new(config.clone());
let mut receive_channel = ReceiveChunkChannel::new(config);
let message = Bytes::from(vec![255u8; 30]);
send_channel.send_message(message.clone(), Duration::ZERO);

Expand All @@ -556,10 +558,10 @@ mod tests {
}

#[test]
fn block_chunk() {
let config = BlockChannelConfig::default();
let mut send_channel = SendBlockChannel::new(config.clone());
let mut receive_channel = ReceiveBlockChannel::new(config);
fn big_chungus() {
let config = ChunkChannelConfig::default();
let mut send_channel = SendChunkChannel::new(config.clone());
let mut receive_channel = ReceiveChunkChannel::new(config);

let payload = Bytes::from(vec![7u8; 102400]);

Expand All @@ -583,32 +585,32 @@ mod tests {
}

#[test]
fn block_channel_queue() {
fn chunk_channel_queue() {
let current_time = Duration::ZERO;
let config = BlockChannelConfig {
let config = ChunkChannelConfig {
resend_time: Duration::ZERO,
..Default::default()
};
let mut send_channel = SendBlockChannel::new(config.clone());
let mut receive_channel = ReceiveBlockChannel::new(config);
let mut send_channel = SendChunkChannel::new(config.clone());
let mut receive_channel = ReceiveChunkChannel::new(config);

let first_message = Bytes::from(vec![3; 2000]);
let second_message = Bytes::from(vec![5; 2000]);
send_channel.send_message(first_message.clone(), current_time);
send_channel.send_message(second_message.clone(), current_time);

// First message
let block_channel_data = send_channel.get_messages_to_send(u64::MAX, 0, current_time).unwrap();
assert!(!block_channel_data.messages.is_empty());
receive_channel.process_messages(block_channel_data.messages);
let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 0, current_time).unwrap();
assert!(!chunk_channel_data.messages.is_empty());
receive_channel.process_messages(chunk_channel_data.messages);
let received_first_message = receive_channel.receive_message().unwrap();
assert_eq!(first_message, received_first_message);
send_channel.process_ack(0);

// Second message
let block_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap();
assert!(!block_channel_data.messages.is_empty());
receive_channel.process_messages(block_channel_data.messages);
let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap();
assert!(!chunk_channel_data.messages.is_empty());
receive_channel.process_messages(chunk_channel_data.messages);
let received_second_message = receive_channel.receive_message().unwrap();
assert_eq!(second_message, received_second_message);
send_channel.process_ack(1);
Expand All @@ -620,11 +622,11 @@ mod tests {
#[test]
fn acking_packet_with_old_chunk_id() {
let current_time = Duration::ZERO;
let config = BlockChannelConfig {
let config = ChunkChannelConfig {
resend_time: Duration::ZERO,
..Default::default()
};
let mut send_channel = SendBlockChannel::new(config);
let mut send_channel = SendChunkChannel::new(config);
let first_message = Bytes::from(vec![5; 400 * 3]);
let second_message = Bytes::from(vec![3; 400]);
send_channel.send_message(first_message, current_time);
Expand All @@ -644,8 +646,8 @@ mod tests {
}

#[test]
fn initialize_block_with_zero_slices() {
let mut receive_channel = ReceiveBlockChannel::new(Default::default());
fn initialize_message_with_zero_slices() {
let mut receive_channel = ReceiveChunkChannel::new(Default::default());
let slice_message = SliceMessage {
chunk_id: 0,
slice_id: 0,
Expand All @@ -659,36 +661,36 @@ mod tests {
#[test]
fn ignore_already_received_messages() {
let current_time = Duration::ZERO;
let config = BlockChannelConfig {
let config = ChunkChannelConfig {
slice_size: 10,
..Default::default()
};
let mut send_channel = SendBlockChannel::new(config.clone());
let mut receive_channel = ReceiveBlockChannel::new(config);
let mut send_channel = SendChunkChannel::new(config.clone());
let mut receive_channel = ReceiveChunkChannel::new(config);

send_channel.send_message(Bytes::from(vec![3; 20]), current_time);
send_channel.send_message(Bytes::from(vec![3; 20]), current_time);

// First message
let first_block_channel_data = send_channel.get_messages_to_send(15, 0, current_time).unwrap();
receive_channel.process_messages(first_block_channel_data.messages);
let first_chunk_channel_data = send_channel.get_messages_to_send(15, 0, current_time).unwrap();
receive_channel.process_messages(first_chunk_channel_data.messages);
send_channel.process_ack(0);

assert!(receive_channel.receive_message().is_none());

let first_block_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap();
receive_channel.process_messages(first_block_channel_data.messages.clone());
let first_chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 1, current_time).unwrap();
receive_channel.process_messages(first_chunk_channel_data.messages.clone());
send_channel.process_ack(1);

assert!(receive_channel.receive_message().is_some());

// Second message
let second_block_channel_data = send_channel.get_messages_to_send(20, 2, current_time).unwrap();
receive_channel.process_messages(second_block_channel_data.messages);
let second_chunk_channel_data = send_channel.get_messages_to_send(20, 2, current_time).unwrap();
receive_channel.process_messages(second_chunk_channel_data.messages);
send_channel.process_ack(2);

let second_block_channel_data = send_channel.get_messages_to_send(u64::MAX, 3, current_time).unwrap();
receive_channel.process_messages(second_block_channel_data.messages.clone());
let second_chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, 3, current_time).unwrap();
receive_channel.process_messages(second_chunk_channel_data.messages.clone());
send_channel.process_ack(3);

receive_channel.receive_message().unwrap();
Expand All @@ -697,26 +699,26 @@ mod tests {
assert!(matches!(receive_channel.receiving, Receiving::No));

// Try receiving old messages
receive_channel.process_messages(first_block_channel_data.messages);
receive_channel.process_messages(first_chunk_channel_data.messages);
assert!(matches!(receive_channel.receiving, Receiving::No));

receive_channel.process_messages(second_block_channel_data.messages);
receive_channel.process_messages(second_chunk_channel_data.messages);
assert!(matches!(receive_channel.receiving, Receiving::No));
}

#[test]
fn test_wrapping_chunk_id() {
let current_time = Duration::ZERO;
let config = BlockChannelConfig::default();
let mut send_channel = SendBlockChannel::new(config.clone());
let mut receive_channel = ReceiveBlockChannel::new(config);
let config = ChunkChannelConfig::default();
let mut send_channel = SendChunkChannel::new(config.clone());
let mut receive_channel = ReceiveChunkChannel::new(config);
let message = Bytes::from(vec![3; 20]);

for i in 0..100000 {
let sequence = (i % u16::MAX as usize) as u16;
send_channel.send_message(message.clone(), current_time);
let block_channel_data = send_channel.get_messages_to_send(u64::MAX, sequence, current_time).unwrap();
receive_channel.process_messages(block_channel_data.messages);
let chunk_channel_data = send_channel.get_messages_to_send(u64::MAX, sequence, current_time).unwrap();
receive_channel.process_messages(chunk_channel_data.messages);
assert!(receive_channel.receive_message().is_some());
send_channel.process_ack(sequence);
}
Expand Down
Loading

0 comments on commit 30411c9

Please sign in to comment.