From e905f4bb615618cd736645dd3ef8cdfe5513f1b6 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 5 Aug 2024 18:22:15 +0200 Subject: [PATCH 1/2] chore: upgrade to rust `1.80` --- Cargo.toml | 3 +-- rust-toolchain.toml | 2 +- tests/java_helper.rs | 7 +++---- tests/test_helpers.rs | 2 ++ 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 94db0f9..1c416c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "rskafka" version = "0.5.0" edition = "2021" -rust-version = "1.72" +rust-version = "1.80" license = "MIT OR Apache-2.0" readme = "README.md" keywords = [ @@ -45,7 +45,6 @@ criterion = { version = "0.5", features = ["async_tokio"] } dotenvy = "0.15.1" futures = "0.3" j4rs = "0.20.0" -once_cell = "1.9" proptest = "1" proptest-derive = "0.5" rustls-pemfile = "2.0" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8768560..55a49f1 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.78" +channel = "1.80" components = [ "rustfmt", "clippy" ] diff --git a/tests/java_helper.rs b/tests/java_helper.rs index abf1f37..bf90825 100644 --- a/tests/java_helper.rs +++ b/tests/java_helper.rs @@ -1,8 +1,7 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::LazyLock}; use chrono::{TimeZone, Utc}; use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; -use once_cell::sync::Lazy; use rskafka::{ client::partition::Compression, record::{Record, RecordAndOffset}, @@ -311,7 +310,7 @@ pub async fn consume( } /// Lazy static that tracks if we already installed all JVM dependencies. -static JVM_SETUP: Lazy<()> = Lazy::new(|| { +static JVM_SETUP: LazyLock<()> = LazyLock::new(|| { let jvm_installation = JvmBuilder::new().build().expect("setup JVM"); for artifact_name in [ @@ -338,7 +337,7 @@ static JVM_SETUP: Lazy<()> = Lazy::new(|| { }); fn setup_jvm() -> Jvm { - Lazy::force(&JVM_SETUP); + LazyLock::force(&JVM_SETUP); let jvm = JvmBuilder::new().build().expect("setup JVM"); jvm diff --git a/tests/test_helpers.rs b/tests/test_helpers.rs index 97559a1..cc1ea8f 100644 --- a/tests/test_helpers.rs +++ b/tests/test_helpers.rs @@ -43,7 +43,9 @@ impl BrokerImpl { #[derive(Debug)] pub struct TestConfig { pub bootstrap_brokers: Vec, + #[allow(dead_code)] pub broker_impl: BrokerImpl, + #[allow(dead_code)] pub socks5_proxy: Option, } From 51877d614594b527c36770efb73e42e90dd4b601 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 5 Aug 2024 18:30:04 +0200 Subject: [PATCH 2/2] chore: drop `async_trait` --- Cargo.toml | 1 - src/client/controller.rs | 2 -- src/client/partition.rs | 2 -- src/connection.rs | 30 +++++++++++++----------------- src/protocol/frame.rs | 14 ++++++-------- 5 files changed, 19 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1c416c8..d02d44c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ documentation = "https://docs.rs/rskafka/" [dependencies] async-socks5 = { version = "0.6", optional = true } -async-trait = "0.1" bytes = "1.1" chrono = { version = "0.4", default-features = false } crc32c = "0.6.5" diff --git a/src/client/controller.rs b/src/client/controller.rs index 4d87790..dcd8e1b 100644 --- a/src/client/controller.rs +++ b/src/client/controller.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use std::ops::ControlFlow; use std::sync::Arc; use tokio::sync::Mutex; @@ -172,7 +171,6 @@ impl ControllerClient { } /// Caches the cluster controller broker. -#[async_trait] impl BrokerCache for &ControllerClient { type R = MessengerTransport; type E = Error; diff --git a/src/client/partition.rs b/src/client/partition.rs index 45ad4b4..69ce255 100644 --- a/src/client/partition.rs +++ b/src/client/partition.rs @@ -23,7 +23,6 @@ use crate::{ throttle::maybe_throttle, validation::ExactlyOne, }; -use async_trait::async_trait; use chrono::{LocalResult, TimeZone, Utc}; use std::{ ops::{ControlFlow, Deref, Range}, @@ -412,7 +411,6 @@ impl PartitionClient { } /// Caches the partition leader broker. -#[async_trait] impl BrokerCache for &PartitionClient { type R = MessengerTransport; type E = Error; diff --git a/src/connection.rs b/src/connection.rs index beba5cf..0fef385 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,6 +1,6 @@ -use async_trait::async_trait; use rand::prelude::*; use std::fmt::Display; +use std::future::Future; use std::ops::ControlFlow; use std::sync::Arc; use thiserror::Error; @@ -75,18 +75,17 @@ impl Display for MultiError { } /// How to connect to a `Transport` -#[async_trait] trait ConnectionHandler { type R: RequestHandler + Send + Sync; - async fn connect( + fn connect( &self, client_id: Arc, tls_config: TlsConfig, socks5_proxy: Option, sasl_config: Option, max_message_size: usize, - ) -> Result>; + ) -> impl Future>> + Send; } /// Defines the possible request modes of metadata retrieval. @@ -138,7 +137,6 @@ impl BrokerRepresentation { } } -#[async_trait] impl ConnectionHandler for BrokerRepresentation { type R = MessengerTransport; @@ -373,15 +371,13 @@ impl std::fmt::Debug for BrokerConnector { } } -#[async_trait] trait RequestHandler { - async fn metadata_request( + fn metadata_request( &self, request_params: &MetadataRequest, - ) -> Result; + ) -> impl Future> + Send; } -#[async_trait] impl RequestHandler for MessengerTransport { async fn metadata_request( &self, @@ -415,18 +411,22 @@ impl BrokerCacheGeneration { } } -#[async_trait] pub trait BrokerCache: Send + Sync { type R: Send + Sync; type E: std::error::Error + Send + Sync; - async fn get(&self) -> Result<(Arc, BrokerCacheGeneration), Self::E>; + fn get( + &self, + ) -> impl Future, BrokerCacheGeneration), Self::E>> + Send; - async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration); + fn invalidate( + &self, + reason: &'static str, + gen: BrokerCacheGeneration, + ) -> impl Future + Send; } /// BrokerConnector caches an arbitrary broker that can successfully connect. -#[async_trait] impl BrokerCache for &BrokerConnector { type R = MessengerTransport; type E = Error; @@ -602,7 +602,6 @@ mod tests { } } - #[async_trait] impl RequestHandler for FakeBroker { async fn metadata_request( &self, @@ -617,7 +616,6 @@ mod tests { invalidate: Box, } - #[async_trait] impl BrokerCache for FakeBrokerCache { type R = FakeBroker; type E = Error; @@ -794,7 +792,6 @@ mod tests { #[derive(Debug, PartialEq)] struct FakeConn; - #[async_trait] impl RequestHandler for FakeConn { async fn metadata_request( &self, @@ -804,7 +801,6 @@ mod tests { } } - #[async_trait] impl ConnectionHandler for FakeBrokerRepresentation { type R = FakeConn; diff --git a/src/protocol/frame.rs b/src/protocol/frame.rs index 297db75..479b3c4 100644 --- a/src/protocol/frame.rs +++ b/src/protocol/frame.rs @@ -3,9 +3,8 @@ //! # References //! - -use std::io::Cursor; +use std::{future::Future, io::Cursor}; -use async_trait::async_trait; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -27,12 +26,13 @@ pub enum ReadError { MessageTooLarge { limit: usize, actual: usize }, } -#[async_trait] pub trait AsyncMessageRead { - async fn read_message(&mut self, max_message_size: usize) -> Result, ReadError>; + fn read_message( + &mut self, + max_message_size: usize, + ) -> impl Future, ReadError>> + Send; } -#[async_trait] impl AsyncMessageRead for R where R: AsyncRead + Send + Unpin, @@ -85,12 +85,10 @@ pub enum WriteError { TooLarge { size: usize }, } -#[async_trait] pub trait AsyncMessageWrite { - async fn write_message(&mut self, msg: &[u8]) -> Result<(), WriteError>; + fn write_message(&mut self, msg: &[u8]) -> impl Future> + Send; } -#[async_trait] impl AsyncMessageWrite for W where W: AsyncWrite + Send + Unpin,