diff --git a/Cargo.lock b/Cargo.lock index 5ef2690b9..cd7716cfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4568,7 +4568,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.200" +version = "0.4.201" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/server/Cargo.toml b/server/Cargo.toml index 65c8f295f..4b1ac777f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.200" +version = "0.4.201" edition = "2021" build = "src/build.rs" license = "Apache-2.0" diff --git a/server/src/streaming/segments/logs/log_writer.rs b/server/src/streaming/segments/logs/log_writer.rs index fa9bfdf89..8567a562a 100644 --- a/server/src/streaming/segments/logs/log_writer.rs +++ b/server/src/streaming/segments/logs/log_writer.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::{ fs::{File, OpenOptions}, - io::{AsyncSeekExt, AsyncWriteExt}, + io::AsyncWriteExt, sync::RwLock, }; use tracing::{error, trace}; @@ -101,8 +101,9 @@ impl SegmentLogWriter { let batch_size = batch.get_size_bytes(); match confirmation { Confirmation::Wait => { - let position = self.write_batch(batch).await?; - self.log_size_bytes.store(position, Ordering::Release); + self.write_batch(batch).await?; + self.log_size_bytes + .fetch_add(batch_size.as_bytes_u64(), Ordering::AcqRel); trace!( "Written batch of size {batch_size} bytes to log file: {}", self.file_path @@ -127,7 +128,7 @@ impl SegmentLogWriter { } /// Write a batch of bytes to the log file and return the new file position. - async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result { + async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> { let mut file_guard = self.file.write().await; if let Some(ref mut file) = *file_guard { let header = batch_to_write.header_as_bytes(); @@ -141,18 +142,7 @@ impl SegmentLogWriter { }) .map_err(|_| IggyError::CannotWriteToFile)?; - let new_position = file - .stream_position() - .await - .with_error_context(|e| { - format!( - "Failed to get position of file: {}, error: {e}", - self.file_path - ) - }) - .map_err(|_| IggyError::CannotReadFileMetadata)?; - - Ok(new_position) + Ok(()) } else { error!("File handle is not available for synchronous write."); Err(IggyError::CannotWriteToFile) diff --git a/server/src/streaming/segments/logs/persister_task.rs b/server/src/streaming/segments/logs/persister_task.rs index bc5a3a279..5fd831d15 100644 --- a/server/src/streaming/segments/logs/persister_task.rs +++ b/server/src/streaming/segments/logs/persister_task.rs @@ -1,17 +1,15 @@ -use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use flume::{unbounded, Receiver}; use iggy::{error::IggyError, utils::duration::IggyDuration}; use std::{ io::IoSlice, - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::Duration, }; -use tokio::{ - fs::File, - io::{AsyncSeekExt, AsyncWriteExt}, - select, - time::sleep, -}; +use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep}; use tracing::{error, trace, warn}; #[derive(Debug)] @@ -35,12 +33,12 @@ impl PersisterTask { file: File, file_path: String, fsync: bool, - file_size: Arc, + log_file_size: Arc, max_retries: u32, retry_delay: IggyDuration, ) -> Self { let (sender, receiver) = unbounded(); - let file_size_clone = file_size.clone(); + let log_file_size_clone = log_file_size.clone(); let file_path_clone = file_path.clone(); let handle = tokio::spawn(async move { Self::run( @@ -50,7 +48,7 @@ impl PersisterTask { fsync, max_retries, retry_delay, - file_size_clone, + log_file_size_clone, ) .await; }); @@ -171,12 +169,12 @@ impl PersisterTask { fsync: bool, max_retries: u32, retry_delay: IggyDuration, - file_size: Arc, + log_file_size: Arc, ) { while let Ok(request) = receiver.recv_async().await { match request { PersisterTaskCommand::WriteRequest(batch_to_write) => { - if let Err(e) = Self::write_with_retries( + match Self::write_with_retries( &mut file, &file_path, batch_to_write, @@ -186,21 +184,14 @@ impl PersisterTask { ) .await { - error!( + Ok(bytes_written) => { + log_file_size.fetch_add(bytes_written, Ordering::AcqRel); + } + Err(e) => { + error!( "Failed to persist data in LogPersisterTask for file {file_path}: {:?}", e - ); - } else { - match file.stream_position().await { - Ok(pos) => { - file_size.store(pos, std::sync::atomic::Ordering::Release); - } - Err(e) => { - error!( - "Failed to get file stream position in LogPersisterTask for file {file_path}: {:?}", - e - ); - } + ) } } } @@ -227,10 +218,11 @@ impl PersisterTask { fsync: bool, max_retries: u32, retry_delay: IggyDuration, - ) -> Result<(), IggyError> { + ) -> Result { let header = batch_to_write.header_as_bytes(); let batch_bytes = batch_to_write.bytes; let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)]; + let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes.len() as u64; let mut attempts = 0; loop { @@ -238,7 +230,7 @@ impl PersisterTask { Ok(_) => { if fsync { match file.sync_all().await { - Ok(_) => return Ok(()), + Ok(_) => return Ok(bytes_written), Err(e) => { attempts += 1; error!( @@ -248,7 +240,7 @@ impl PersisterTask { } } } else { - return Ok(()); + return Ok(bytes_written); } } Err(e) => { @@ -265,7 +257,7 @@ impl PersisterTask { ); return Err(IggyError::CannotWriteToFile); } - tokio::time::sleep(retry_delay.get_duration()).await; + sleep(retry_delay.get_duration()).await; } } }