Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rust 1.80 #245

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "rskafka"
version = "0.5.0"
edition = "2021"
rust-version = "1.72"
rust-version = "1.80"
license = "MIT OR Apache-2.0"
readme = "README.md"
keywords = [
Expand All @@ -20,7 +20,6 @@ documentation = "https://docs.rs/rskafka/"

[dependencies]
async-socks5 = { version = "0.6", optional = true }
async-trait = "0.1"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
crc32c = "0.6.5"
Expand All @@ -45,7 +44,6 @@ criterion = { version = "0.5", features = ["async_tokio"] }
dotenvy = "0.15.1"
futures = "0.3"
j4rs = "0.20.0"
once_cell = "1.9"
proptest = "1"
proptest-derive = "0.5"
rustls-pemfile = "2.0"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.78"
channel = "1.80"
components = [ "rustfmt", "clippy" ]
2 changes: 0 additions & 2 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -172,7 +171,6 @@ impl ControllerClient {
}

/// Caches the cluster controller broker.
#[async_trait]
impl BrokerCache for &ControllerClient {
type R = MessengerTransport;
type E = Error;
Expand Down
2 changes: 0 additions & 2 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
throttle::maybe_throttle,
validation::ExactlyOne,
};
use async_trait::async_trait;
use chrono::{LocalResult, TimeZone, Utc};
use std::{
ops::{ControlFlow, Deref, Range},
Expand Down Expand Up @@ -412,7 +411,6 @@ impl PartitionClient {
}

/// Caches the partition leader broker.
#[async_trait]
impl BrokerCache for &PartitionClient {
type R = MessengerTransport;
type E = Error;
Expand Down
30 changes: 13 additions & 17 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use rand::prelude::*;
use std::fmt::Display;
use std::future::Future;
use std::ops::ControlFlow;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -75,18 +75,17 @@ impl Display for MultiError {
}

/// How to connect to a `Transport`
#[async_trait]
trait ConnectionHandler {
type R: RequestHandler + Send + Sync;

async fn connect(
fn connect(
&self,
client_id: Arc<str>,
tls_config: TlsConfig,
socks5_proxy: Option<String>,
sasl_config: Option<SaslConfig>,
max_message_size: usize,
) -> Result<Arc<Self::R>>;
) -> impl Future<Output = Result<Arc<Self::R>>> + Send;
}

/// Defines the possible request modes of metadata retrieval.
Expand Down Expand Up @@ -138,7 +137,6 @@ impl BrokerRepresentation {
}
}

#[async_trait]
impl ConnectionHandler for BrokerRepresentation {
type R = MessengerTransport;

Expand Down Expand Up @@ -373,15 +371,13 @@ impl std::fmt::Debug for BrokerConnector {
}
}

#[async_trait]
trait RequestHandler {
async fn metadata_request(
fn metadata_request(
&self,
request_params: &MetadataRequest,
) -> Result<MetadataResponse, RequestError>;
) -> impl Future<Output = Result<MetadataResponse, RequestError>> + Send;
}

#[async_trait]
impl RequestHandler for MessengerTransport {
async fn metadata_request(
&self,
Expand Down Expand Up @@ -415,18 +411,22 @@ impl BrokerCacheGeneration {
}
}

#[async_trait]
pub trait BrokerCache: Send + Sync {
type R: Send + Sync;
type E: std::error::Error + Send + Sync;

async fn get(&self) -> Result<(Arc<Self::R>, BrokerCacheGeneration), Self::E>;
fn get(
&self,
) -> impl Future<Output = Result<(Arc<Self::R>, BrokerCacheGeneration), Self::E>> + Send;

async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration);
fn invalidate(
&self,
reason: &'static str,
gen: BrokerCacheGeneration,
) -> impl Future<Output = ()> + Send;
}

/// BrokerConnector caches an arbitrary broker that can successfully connect.
#[async_trait]
impl BrokerCache for &BrokerConnector {
type R = MessengerTransport;
type E = Error;
Expand Down Expand Up @@ -602,7 +602,6 @@ mod tests {
}
}

#[async_trait]
impl RequestHandler for FakeBroker {
async fn metadata_request(
&self,
Expand All @@ -617,7 +616,6 @@ mod tests {
invalidate: Box<dyn Fn() + Send + Sync>,
}

#[async_trait]
impl BrokerCache for FakeBrokerCache {
type R = FakeBroker;
type E = Error;
Expand Down Expand Up @@ -794,7 +792,6 @@ mod tests {
#[derive(Debug, PartialEq)]
struct FakeConn;

#[async_trait]
impl RequestHandler for FakeConn {
async fn metadata_request(
&self,
Expand All @@ -804,7 +801,6 @@ mod tests {
}
}

#[async_trait]
impl ConnectionHandler for FakeBrokerRepresentation {
type R = FakeConn;

Expand Down
14 changes: 6 additions & 8 deletions src/protocol/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
//! # References
//! - <https://kafka.apache.org/protocol#protocol_common>

use std::io::Cursor;
use std::{future::Future, io::Cursor};

use async_trait::async_trait;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

Expand All @@ -27,12 +26,13 @@ pub enum ReadError {
MessageTooLarge { limit: usize, actual: usize },
}

#[async_trait]
pub trait AsyncMessageRead {
async fn read_message(&mut self, max_message_size: usize) -> Result<Vec<u8>, ReadError>;
fn read_message(
&mut self,
max_message_size: usize,
) -> impl Future<Output = Result<Vec<u8>, ReadError>> + Send;
}

#[async_trait]
impl<R> AsyncMessageRead for R
where
R: AsyncRead + Send + Unpin,
Expand Down Expand Up @@ -85,12 +85,10 @@ pub enum WriteError {
TooLarge { size: usize },
}

#[async_trait]
pub trait AsyncMessageWrite {
async fn write_message(&mut self, msg: &[u8]) -> Result<(), WriteError>;
fn write_message(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), WriteError>> + Send;
}

#[async_trait]
impl<W> AsyncMessageWrite for W
where
W: AsyncWrite + Send + Unpin,
Expand Down
7 changes: 3 additions & 4 deletions tests/java_helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::LazyLock};

use chrono::{TimeZone, Utc};
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use once_cell::sync::Lazy;
use rskafka::{
client::partition::Compression,
record::{Record, RecordAndOffset},
Expand Down Expand Up @@ -311,7 +310,7 @@ pub async fn consume(
}

/// Lazy static that tracks if we already installed all JVM dependencies.
static JVM_SETUP: Lazy<()> = Lazy::new(|| {
static JVM_SETUP: LazyLock<()> = LazyLock::new(|| {
let jvm_installation = JvmBuilder::new().build().expect("setup JVM");

for artifact_name in [
Expand All @@ -338,7 +337,7 @@ static JVM_SETUP: Lazy<()> = Lazy::new(|| {
});

fn setup_jvm() -> Jvm {
Lazy::force(&JVM_SETUP);
LazyLock::force(&JVM_SETUP);

let jvm = JvmBuilder::new().build().expect("setup JVM");
jvm
Expand Down
2 changes: 2 additions & 0 deletions tests/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl BrokerImpl {
#[derive(Debug)]
pub struct TestConfig {
pub bootstrap_brokers: Vec<String>,
#[allow(dead_code)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not actually dead code, just not used by all end2end tests and I didn't wanna add yet another top-level crate just for the test infra.

pub broker_impl: BrokerImpl,
#[allow(dead_code)]
pub socks5_proxy: Option<String>,
}

Expand Down