Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(server): optimize writing performance #1530

Merged
merged 3 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use server::streaming::storage::SystemStorage;
use std::sync::Arc;
use tokio::fs;
Expand All @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = PersisterKind::File(FilePersister {});
let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.202"
version = "0.4.203"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/partitions/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ mod tests {
use super::*;
use crate::configs::system::{MessageDeduplicationConfig, SystemConfig};
use crate::streaming::partitions::create_messages;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;

#[tokio::test]
Expand Down Expand Up @@ -648,7 +648,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

(
Expand Down
8 changes: 4 additions & 4 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl fmt::Display for Partition {
mod tests {
use crate::configs::system::{CacheConfig, SystemConfig};
use crate::streaming::partitions::partition::Partition;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
Expand All @@ -229,7 +229,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let partition = Partition::create(
Expand Down Expand Up @@ -316,7 +316,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let topic_id = 1;
Expand Down
22 changes: 6 additions & 16 deletions server/src/streaming/segments/logs/log_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncSeekExt, AsyncWriteExt},
io::AsyncWriteExt,
sync::RwLock,
};
use tracing::{error, trace};
Expand Down Expand Up @@ -101,8 +101,9 @@ impl SegmentLogWriter {
let batch_size = batch.get_size_bytes();
match confirmation {
Confirmation::Wait => {
let position = self.write_batch(batch).await?;
self.log_size_bytes.store(position, Ordering::Release);
self.write_batch(batch).await?;
self.log_size_bytes
.fetch_add(batch_size.as_bytes_u64(), Ordering::AcqRel);
trace!(
"Written batch of size {batch_size} bytes to log file: {}",
self.file_path
Expand All @@ -127,7 +128,7 @@ impl SegmentLogWriter {
}

/// Write a batch of bytes to the log file and return the new file position.
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<u64, IggyError> {
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> {
let mut file_guard = self.file.write().await;
if let Some(ref mut file) = *file_guard {
let header = batch_to_write.header_as_bytes();
Expand All @@ -141,18 +142,7 @@ impl SegmentLogWriter {
})
.map_err(|_| IggyError::CannotWriteToFile)?;

let new_position = file
.stream_position()
.await
.with_error_context(|e| {
format!(
"Failed to get position of file: {}, error: {e}",
self.file_path
)
})
.map_err(|_| IggyError::CannotReadFileMetadata)?;

Ok(new_position)
Ok(())
} else {
error!("File handle is not available for synchronous write.");
Err(IggyError::CannotWriteToFile)
Expand Down
52 changes: 22 additions & 30 deletions server/src/streaming/segments/logs/persister_task.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
use flume::{unbounded, Receiver};
use iggy::{error::IggyError, utils::duration::IggyDuration};
use std::{
io::IoSlice,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt},
select,
time::sleep,
};
use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep};
use tracing::{error, trace, warn};

#[derive(Debug)]
Expand All @@ -35,12 +33,12 @@ impl PersisterTask {
file: File,
file_path: String,
fsync: bool,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
max_retries: u32,
retry_delay: IggyDuration,
) -> Self {
let (sender, receiver) = unbounded();
let file_size_clone = file_size.clone();
let log_file_size_clone = log_file_size.clone();
let file_path_clone = file_path.clone();
let handle = tokio::spawn(async move {
Self::run(
Expand All @@ -50,7 +48,7 @@ impl PersisterTask {
fsync,
max_retries,
retry_delay,
file_size_clone,
log_file_size_clone,
)
.await;
});
Expand Down Expand Up @@ -171,12 +169,12 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
) {
while let Ok(request) = receiver.recv_async().await {
match request {
PersisterTaskCommand::WriteRequest(batch_to_write) => {
if let Err(e) = Self::write_with_retries(
match Self::write_with_retries(
&mut file,
&file_path,
batch_to_write,
Expand All @@ -186,21 +184,14 @@ impl PersisterTask {
)
.await
{
error!(
Ok(bytes_written) => {
log_file_size.fetch_add(bytes_written, Ordering::AcqRel);
}
Err(e) => {
error!(
"Failed to persist data in LogPersisterTask for file {file_path}: {:?}",
e
);
} else {
match file.stream_position().await {
Ok(pos) => {
file_size.store(pos, std::sync::atomic::Ordering::Release);
}
Err(e) => {
error!(
"Failed to get file stream position in LogPersisterTask for file {file_path}: {:?}",
e
);
}
)
}
}
}
Expand All @@ -227,18 +218,19 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
) -> Result<(), IggyError> {
) -> Result<u64, IggyError> {
let header = batch_to_write.header_as_bytes();
let batch_bytes = batch_to_write.bytes;
let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)];
let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes.len() as u64;

let mut attempts = 0;
loop {
match file.write_vectored(&slices).await {
Ok(_) => {
if fsync {
match file.sync_all().await {
Ok(_) => return Ok(()),
Ok(_) => return Ok(bytes_written),
Err(e) => {
attempts += 1;
error!(
Expand All @@ -248,7 +240,7 @@ impl PersisterTask {
}
}
} else {
return Ok(());
return Ok(bytes_written);
}
}
Err(e) => {
Expand All @@ -265,7 +257,7 @@ impl PersisterTask {
);
return Err(IggyError::CannotWriteToFile);
}
tokio::time::sleep(retry_delay.get_duration()).await;
sleep(retry_delay.get_duration()).await;
}
}
}
4 changes: 2 additions & 2 deletions server/src/streaming/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for Stream {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};

#[test]
fn should_be_created_given_valid_parameters() {
Expand All @@ -90,7 +90,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let id = 1;
let name = "test";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
use crate::{
configs::system::SystemConfig,
streaming::{
persistence::persister::{FilePersister, PersisterKind},
persistence::persister::{FileWithSyncPersister, PersisterKind},
storage::SystemStorage,
},
};
Expand All @@ -262,7 +262,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let stream_name = "test_stream";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
use crate::configs::system::SystemConfig;
use crate::state::{MockState, StateKind};
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use crate::streaming::users::user::User;
use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
Expand All @@ -418,7 +418,7 @@ mod tests {
});
let storage = SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
);

let stream_id = 1;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::expiry::IggyExpiry;
Expand Down Expand Up @@ -358,7 +358,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::FilePersister;
use crate::streaming::persistence::persister::FileWithSyncPersister;
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::storage::SystemStorage;
use bytes::Bytes;
Expand Down Expand Up @@ -434,7 +434,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl fmt::Display for Topic {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use iggy::locking::IggySharedMutFn;
use std::str::FromStr;

Expand All @@ -277,7 +277,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down
Loading