Skip to content

Commit

Permalink
perf(server): optimize writing performance
Browse files Browse the repository at this point in the history
This commit optimizes the log writing process by removing the
stream_position() call, which uses lseek and negatively
impacts performance. Afterwards, the new implementation
directly updates the log file size using fetch_add, resulting
in a 40% performance improvement on writing (Linux).
On MacOS, degradation was not visible.
  • Loading branch information
hubcio committed Feb 14, 2025
1 parent 09bb7bd commit 6356300
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.202"
version = "0.4.203"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
22 changes: 6 additions & 16 deletions server/src/streaming/segments/logs/log_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncSeekExt, AsyncWriteExt},
io::AsyncWriteExt,
sync::RwLock,
};
use tracing::{error, trace};
Expand Down Expand Up @@ -101,8 +101,9 @@ impl SegmentLogWriter {
let batch_size = batch.get_size_bytes();
match confirmation {
Confirmation::Wait => {
let position = self.write_batch(batch).await?;
self.log_size_bytes.store(position, Ordering::Release);
self.write_batch(batch).await?;
self.log_size_bytes
.fetch_add(batch_size.as_bytes_u64(), Ordering::AcqRel);
trace!(
"Written batch of size {batch_size} bytes to log file: {}",
self.file_path
Expand All @@ -127,7 +128,7 @@ impl SegmentLogWriter {
}

/// Write a batch of bytes to the log file and return the new file position.
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<u64, IggyError> {
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> {
let mut file_guard = self.file.write().await;
if let Some(ref mut file) = *file_guard {
let header = batch_to_write.header_as_bytes();
Expand All @@ -141,18 +142,7 @@ impl SegmentLogWriter {
})
.map_err(|_| IggyError::CannotWriteToFile)?;

let new_position = file
.stream_position()
.await
.with_error_context(|e| {
format!(
"Failed to get position of file: {}, error: {e}",
self.file_path
)
})
.map_err(|_| IggyError::CannotReadFileMetadata)?;

Ok(new_position)
Ok(())
} else {
error!("File handle is not available for synchronous write.");
Err(IggyError::CannotWriteToFile)
Expand Down
52 changes: 22 additions & 30 deletions server/src/streaming/segments/logs/persister_task.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
use flume::{unbounded, Receiver};
use iggy::{error::IggyError, utils::duration::IggyDuration};
use std::{
io::IoSlice,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt},
select,
time::sleep,
};
use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep};
use tracing::{error, trace, warn};

#[derive(Debug)]
Expand All @@ -35,12 +33,12 @@ impl PersisterTask {
file: File,
file_path: String,
fsync: bool,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
max_retries: u32,
retry_delay: IggyDuration,
) -> Self {
let (sender, receiver) = unbounded();
let file_size_clone = file_size.clone();
let log_file_size_clone = log_file_size.clone();
let file_path_clone = file_path.clone();
let handle = tokio::spawn(async move {
Self::run(
Expand All @@ -50,7 +48,7 @@ impl PersisterTask {
fsync,
max_retries,
retry_delay,
file_size_clone,
log_file_size_clone,
)
.await;
});
Expand Down Expand Up @@ -171,12 +169,12 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
) {
while let Ok(request) = receiver.recv_async().await {
match request {
PersisterTaskCommand::WriteRequest(batch_to_write) => {
if let Err(e) = Self::write_with_retries(
match Self::write_with_retries(
&mut file,
&file_path,
batch_to_write,
Expand All @@ -186,21 +184,14 @@ impl PersisterTask {
)
.await
{
error!(
Ok(bytes_written) => {
log_file_size.fetch_add(bytes_written, Ordering::AcqRel);
}
Err(e) => {
error!(
"Failed to persist data in LogPersisterTask for file {file_path}: {:?}",
e
);
} else {
match file.stream_position().await {
Ok(pos) => {
file_size.store(pos, std::sync::atomic::Ordering::Release);
}
Err(e) => {
error!(
"Failed to get file stream position in LogPersisterTask for file {file_path}: {:?}",
e
);
}
)
}
}
}
Expand All @@ -227,18 +218,19 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
) -> Result<(), IggyError> {
) -> Result<u64, IggyError> {
let header = batch_to_write.header_as_bytes();
let batch_bytes = batch_to_write.bytes;
let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)];
let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes.len() as u64;

let mut attempts = 0;
loop {
match file.write_vectored(&slices).await {
Ok(_) => {
if fsync {
match file.sync_all().await {
Ok(_) => return Ok(()),
Ok(_) => return Ok(bytes_written),
Err(e) => {
attempts += 1;
error!(
Expand All @@ -248,7 +240,7 @@ impl PersisterTask {
}
}
} else {
return Ok(());
return Ok(bytes_written);
}
}
Err(e) => {
Expand All @@ -265,7 +257,7 @@ impl PersisterTask {
);
return Err(IggyError::CannotWriteToFile);
}
tokio::time::sleep(retry_delay.get_duration()).await;
sleep(retry_delay.get_duration()).await;
}
}
}

0 comments on commit 6356300

Please sign in to comment.