Skip to content

Commit

Permalink
Fix toggler by using fsync
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio committed Feb 14, 2025
1 parent da61c7b commit c60a55d
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 20 deletions.
4 changes: 2 additions & 2 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use server::streaming::storage::SystemStorage;
use std::sync::Arc;
use tokio::fs;
Expand All @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = PersisterKind::File(FilePersister {});
let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/partitions/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ mod tests {
use super::*;
use crate::configs::system::{MessageDeduplicationConfig, SystemConfig};
use crate::streaming::partitions::create_messages;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;

#[tokio::test]
Expand Down Expand Up @@ -648,7 +648,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

(
Expand Down
8 changes: 4 additions & 4 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl fmt::Display for Partition {
mod tests {
use crate::configs::system::{CacheConfig, SystemConfig};
use crate::streaming::partitions::partition::Partition;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
Expand All @@ -229,7 +229,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let partition = Partition::create(
Expand Down Expand Up @@ -316,7 +316,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let topic_id = 1;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for Stream {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};

#[test]
fn should_be_created_given_valid_parameters() {
Expand All @@ -90,7 +90,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let id = 1;
let name = "test";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
use crate::{
configs::system::SystemConfig,
streaming::{
persistence::persister::{FilePersister, PersisterKind},
persistence::persister::{FileWithSyncPersister, PersisterKind},
storage::SystemStorage,
},
};
Expand All @@ -262,7 +262,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let stream_name = "test_stream";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
use crate::configs::system::SystemConfig;
use crate::state::{MockState, StateKind};
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use crate::streaming::users::user::User;
use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
Expand All @@ -418,7 +418,7 @@ mod tests {
});
let storage = SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
);

let stream_id = 1;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::expiry::IggyExpiry;
Expand Down Expand Up @@ -358,7 +358,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::FilePersister;
use crate::streaming::persistence::persister::FileWithSyncPersister;
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::storage::SystemStorage;
use bytes::Bytes;
Expand Down Expand Up @@ -434,7 +434,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl fmt::Display for Topic {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use iggy::locking::IggySharedMutFn;
use std::str::FromStr;

Expand All @@ -277,7 +277,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down

0 comments on commit c60a55d

Please sign in to comment.