Skip to content

Commit

Permalink
Refactor: Replace dyn Trait with Enums (#1462)
Browse files Browse the repository at this point in the history
Addresses part of #667

This PR refactors the codebase to replace the use of dyn Trait with
enums for Sender, Archiver, State, Storage, and ConfigProvider
  • Loading branch information
haze518 authored Jan 26, 2025
1 parent 7d23bd2 commit 6b2695f
Show file tree
Hide file tree
Showing 89 changed files with 780 additions and 582 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ anyhow = "1.0.95"
clap = { version = "4.5.23", features = ["derive"] }
clap_complete = "4.5.40"
figlet-rs = "0.1.5"
iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.81" }
iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.90" }
keyring = { version = "3.6.1", features = ["sync-secret-service", "vendored"], optional = true }
passterm = "=2.0.1"
thiserror = "2.0.9"
Expand Down
6 changes: 3 additions & 3 deletions integration/tests/streaming/consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::streaming::common::test_setup::TestSetup;
use iggy::consumer::ConsumerKind;
use server::configs::system::SystemConfig;
use server::streaming::partitions::partition::ConsumerOffset;
use server::streaming::storage::PartitionStorage;
use server::streaming::storage::PartitionStorageKind;
use std::sync::Arc;
use tokio::fs;

Expand All @@ -16,7 +16,7 @@ async fn should_persist_consumer_offsets_and_then_load_them_from_disk() {

async fn assert_persisted_offsets(
config: &Arc<SystemConfig>,
storage: &dyn PartitionStorage,
storage: &PartitionStorageKind,
kind: ConsumerKind,
) {
let consumer_ids_count = 3;
Expand All @@ -38,7 +38,7 @@ async fn assert_persisted_offsets(

async fn assert_persisted_offset(
path: &str,
storage: &dyn PartitionStorage,
storage: &PartitionStorageKind,
consumer_offset: &ConsumerOffset,
expected_offsets_count: u32,
) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.82"
version = "0.6.90"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
62 changes: 45 additions & 17 deletions sdk/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,60 @@ const NAME: &str = "Iggy";
/// It requires a valid server address.
#[derive(Debug)]
pub struct TcpClient {
pub(crate) stream: Mutex<Option<Box<dyn ConnectionStream>>>,
pub(crate) stream: Mutex<Option<ConnectionStreamKind>>,
pub(crate) config: Arc<TcpClientConfig>,
pub(crate) state: Mutex<ClientState>,
client_address: Mutex<Option<SocketAddr>>,
events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
connected_at: Mutex<Option<IggyTimestamp>>,
}

unsafe impl Send for TcpClient {}
unsafe impl Sync for TcpClient {}

#[async_trait]
pub(crate) trait ConnectionStream: Debug + Sync + Send {
pub(crate) trait ConnectionStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError>;
async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>;
async fn flush(&mut self) -> Result<(), IggyError>;
async fn shutdown(&mut self) -> Result<(), IggyError>;
}

#[derive(Debug)]
struct TcpConnectionStream {
pub(crate) enum ConnectionStreamKind {
Tcp(TcpConnectionStream),
TcpTls(TcpTlsConnectionStream),
}

impl ConnectionStreamKind {
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError> {
match self {
Self::Tcp(c) => c.read(buf).await,
Self::TcpTls(c) => c.read(buf).await,
}
}

pub async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError> {
match self {
Self::Tcp(c) => c.write(buf).await,
Self::TcpTls(c) => c.write(buf).await,
}
}

pub async fn flush(&mut self) -> Result<(), IggyError> {
match self {
Self::Tcp(c) => c.flush().await,
Self::TcpTls(c) => c.flush().await,
}
}

pub async fn shutdown(&mut self) -> Result<(), IggyError> {
match self {
Self::Tcp(c) => c.shutdown().await,
Self::TcpTls(c) => c.shutdown().await,
}
}
}

#[derive(Debug)]
pub(crate) struct TcpConnectionStream {
client_address: SocketAddr,
reader: BufReader<OwnedReadHalf>,
writer: BufWriter<OwnedWriteHalf>,
Expand Down Expand Up @@ -85,12 +118,6 @@ impl TcpTlsConnectionStream {
}
}

unsafe impl Send for TcpConnectionStream {}
unsafe impl Sync for TcpConnectionStream {}

unsafe impl Send for TcpTlsConnectionStream {}
unsafe impl Sync for TcpTlsConnectionStream {}

#[async_trait]
impl ConnectionStream for TcpConnectionStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError> {
Expand Down Expand Up @@ -320,7 +347,7 @@ impl TcpClient {
&self,
status: u32,
length: u32,
stream: &mut dyn ConnectionStream,
stream: &mut ConnectionStreamKind,
) -> Result<Bytes, IggyError> {
if status != 0 {
// TEMP: See https://github.com/iggy-rs/iggy/pull/604 for context.
Expand Down Expand Up @@ -396,7 +423,7 @@ impl TcpClient {

let tls_enabled = self.config.tls_enabled;
let mut retry_count = 0;
let connection_stream: Box<dyn ConnectionStream>;
let connection_stream: ConnectionStreamKind;
let remote_address;
let client_address;
loop {
Expand Down Expand Up @@ -456,7 +483,8 @@ impl TcpClient {
self.client_address.lock().await.replace(client_address);

if !tls_enabled {
connection_stream = Box::new(TcpConnectionStream::new(client_address, stream));
connection_stream =
ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream));
break;
}

Expand Down Expand Up @@ -500,7 +528,7 @@ impl TcpClient {
error!("Failed to establish a TLS connection to the server: {error}",);
IggyError::CannotEstablishConnection
})?;
connection_stream = Box::new(TcpTlsConnectionStream::new(
connection_stream = ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new(
client_address,
TlsStream::Client(stream),
));
Expand Down Expand Up @@ -623,7 +651,7 @@ impl TcpClient {
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
return self.handle_response(status, length, stream.as_mut()).await;
return self.handle_response(status, length, stream).await;
}

error!("Cannot send data. Client is not connected.");
Expand Down
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.121"
version = "0.4.130"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand All @@ -13,7 +13,7 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"]
[dependencies]
ahash = { version = "0.8.11" }
anyhow = "1.0.95"
async-trait = "0.1.85"
# async-trait = "0.1.85"
atone = "0.3.7"
axum = "0.8.1"
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
Expand Down
2 changes: 0 additions & 2 deletions server/src/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::DiskArchiverConfig;
use crate::server_error::ArchiverError;
use async_trait::async_trait;
use error_set::ErrContext;
use std::path::Path;
use tokio::fs;
Expand All @@ -18,7 +17,6 @@ impl DiskArchiver {
}
}

#[async_trait]
impl Archiver for DiskArchiver {
async fn init(&self) -> Result<(), ArchiverError> {
if !Path::new(&self.config.path).exists() {
Expand Down
75 changes: 59 additions & 16 deletions server/src/archiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,96 @@
pub mod disk;
pub mod s3;

use crate::configs::server::{DiskArchiverConfig, S3ArchiverConfig};
use crate::server_error::ArchiverError;
use async_trait::async_trait;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
use std::future::Future;
use std::str::FromStr;

use crate::archiver::disk::DiskArchiver;
use crate::archiver::s3::S3Archiver;

pub const COMPONENT: &str = "ARCHIVER";

#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Display, Copy, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ArchiverKind {
pub enum ArchiverKindType {
#[default]
#[display("disk")]
Disk,
#[display("s3")]
S3,
}

impl FromStr for ArchiverKind {
impl FromStr for ArchiverKindType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"disk" => Ok(ArchiverKind::Disk),
"s3" => Ok(ArchiverKind::S3),
"disk" => Ok(ArchiverKindType::Disk),
"s3" => Ok(ArchiverKindType::S3),
_ => Err(format!("Unknown archiver kind: {}", s)),
}
}
}

#[async_trait]
pub trait Archiver: Sync + Send {
async fn init(&self) -> Result<(), ArchiverError>;
async fn is_archived(
pub trait Archiver: Send {
fn init(&self) -> impl Future<Output = Result<(), ArchiverError>> + Send;
fn is_archived(
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ArchiverError>;
async fn archive(
) -> impl Future<Output = Result<bool, ArchiverError>> + Send;
fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ArchiverError>;
) -> impl Future<Output = Result<(), ArchiverError>> + Send;
}

#[derive(Debug)]
pub enum ArchiverKind {
Disk(DiskArchiver),
S3(S3Archiver),
}

impl Debug for dyn Archiver {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Archiver")
impl ArchiverKind {
pub fn get_disk_arhiver(config: DiskArchiverConfig) -> Self {
Self::Disk(DiskArchiver::new(config))
}

pub fn get_s3_archiver(config: S3ArchiverConfig) -> Result<Self, ArchiverError> {
let archiver = S3Archiver::new(config)?;
Ok(Self::S3(archiver))
}

pub async fn init(&self) -> Result<(), ArchiverError> {
match self {
Self::Disk(a) => a.init().await,
Self::S3(a) => a.init().await,
}
}

pub async fn is_archived(
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ArchiverError> {
match self {
Self::Disk(d) => d.is_archived(file, base_directory).await,
Self::S3(d) => d.is_archived(file, base_directory).await,
}
}

pub async fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ArchiverError> {
match self {
Self::Disk(d) => d.archive(files, base_directory).await,
Self::S3(d) => d.archive(files, base_directory).await,
}
}
}
2 changes: 0 additions & 2 deletions server/src/archiver/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::S3ArchiverConfig;
use crate::server_error::ArchiverError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use error_set::ErrContext;
use s3::creds::Credentials;
use s3::{Bucket, Region};
Expand Down Expand Up @@ -75,7 +74,6 @@ impl S3Archiver {
}
}

#[async_trait]
impl Archiver for S3Archiver {
async fn init(&self) -> Result<(), ArchiverError> {
let response = self.bucket.list("/".to_string(), None).await;
Expand Down
6 changes: 3 additions & 3 deletions server/src/binary/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::binary::handlers::users::{
get_users_handler, login_user_handler, logout_user_handler, update_permissions_handler,
update_user_handler,
};
use crate::binary::sender::Sender;
use crate::binary::sender::SenderKind;
use crate::binary::COMPONENT;
use crate::command::ServerCommand;
use crate::streaming::session::Session;
Expand All @@ -28,7 +28,7 @@ use tracing::{debug, error};

pub async fn handle(
command: ServerCommand,
sender: &mut dyn Sender,
sender: &mut SenderKind,
session: &Session,
system: SharedSystem,
) -> Result<(), IggyError> {
Expand Down Expand Up @@ -65,7 +65,7 @@ pub async fn handle(

async fn try_handle(
command: ServerCommand,
sender: &mut dyn Sender,
sender: &mut SenderKind,
session: &Session,
system: &SharedSystem,
) -> Result<(), IggyError> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::mapper;
use crate::binary::sender::Sender;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
Expand All @@ -13,7 +12,7 @@ use tracing::{debug, instrument};
#[instrument(skip_all, name = "trace_create_consumer_group", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))]
pub async fn handle(
command: CreateConsumerGroup,
sender: &mut dyn Sender,
sender: &mut SenderKind,
session: &Session,
system: &SharedSystem,
) -> Result<(), IggyError> {
Expand Down
Loading

0 comments on commit 6b2695f

Please sign in to comment.