From e8c97afc288545e9f32fc31a52afa7988c148cb8 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 6 Aug 2024 12:37:40 +0200 Subject: [PATCH] chore: drop `pin-project-lite` Actually we don't need this, all our nested types fall roughly into three categories: - **no perf critical:** Use `Box::pin` to create a new allocation `Pin>` - **already alloated:** They where already wrapped in a `Box` (e.g. for enums with vastly different variants), so `Pin>` is the same overhead. - **`Unpin`:** Some upstream types implement `Unpin`, so you can just use `Pin::new(&mut x)`. --- Cargo.toml | 1 - benches/throughput.rs | 21 +++---- fuzz/fuzz_targets/protocol_reader.rs | 59 ++++++++++---------- src/client/consumer.rs | 82 +++++++++++++--------------- src/connection/transport.rs | 71 ++++++++++-------------- 5 files changed, 108 insertions(+), 126 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d02d44c..926ea76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ futures = "0.3" integer-encoding = "4" lz4 = { version = "1.23", optional = true } parking_lot = "0.12" -pin-project-lite = "0.2" rand = "0.8" rustls = { version = "0.23", optional = true, default-features = false, features = ["logging", "ring", "std", "tls12"] } snap = { version = "1", optional = true } diff --git a/benches/throughput.rs b/benches/throughput.rs index 36f4054..7bf5c4f 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -11,9 +11,8 @@ use chrono::{TimeZone, Utc}; use criterion::{ criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode, }; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; use parking_lot::Once; -use pin_project_lite::pin_project; use rdkafka::{ consumer::{Consumer, StreamConsumer as RdStreamConsumer}, producer::{FutureProducer, FutureRecord}, @@ -256,17 +255,14 @@ where fn time_it(self) -> Self::TimeItFut { TimeIt { t_start: Instant::now(), - inner: self, + inner: Box::pin(self), } } } -pin_project! { - struct TimeIt { - t_start: Instant, - #[pin] - inner: F, - } +struct TimeIt { + t_start: Instant, + inner: Pin>, } impl Future for TimeIt @@ -275,10 +271,9 @@ where { type Output = Duration; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = self.project(); - match this.inner.poll(cx) { - Poll::Ready(_) => Poll::Ready(this.t_start.elapsed()), + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + match self.inner.poll_unpin(cx) { + Poll::Ready(_) => Poll::Ready(self.t_start.elapsed()), Poll::Pending => Poll::Pending, } } diff --git a/fuzz/fuzz_targets/protocol_reader.rs b/fuzz/fuzz_targets/protocol_reader.rs index 6635242..1bb384c 100644 --- a/fuzz/fuzz_targets/protocol_reader.rs +++ b/fuzz/fuzz_targets/protocol_reader.rs @@ -1,8 +1,15 @@ #![no_main] -use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + io::Cursor, + ops::DerefMut, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; use libfuzzer_sys::fuzz_target; -use pin_project_lite::pin_project; use rskafka::{ build_info::DEFAULT_CLIENT_ID, messenger::Messenger, @@ -151,16 +158,12 @@ where }) } -pin_project! { - /// One-way mock transport with limited data. - /// - /// Can only be read. Writes go to `/dev/null`. - struct MockTransport { - #[pin] - data: Cursor>, - #[pin] - sink: Sink, - } +/// One-way mock transport with limited data. +/// +/// Can only be read. Writes go to `/dev/null`. +struct MockTransport { + data: Cursor>, + sink: Sink, } impl MockTransport { @@ -174,34 +177,34 @@ impl MockTransport { impl AsyncWrite for MockTransport { fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, buf: &[u8], - ) -> std::task::Poll> { - self.project().sink.poll_write(cx, buf) + ) -> Poll> { + Pin::new(&mut self.deref_mut().sink).poll_write(cx, buf) } fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().sink.poll_flush(cx) + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.deref_mut().sink).poll_flush(cx) } fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().sink.poll_shutdown(cx) + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.deref_mut().sink).poll_shutdown(cx) } } impl AsyncRead for MockTransport { fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().data.poll_read(cx, buf) + ) -> Poll> { + Pin::new(&mut self.deref_mut().data).poll_read(cx, buf) } } diff --git a/src/client/consumer.rs b/src/client/consumer.rs index 60a33bb..f5510c1 100644 --- a/src/client/consumer.rs +++ b/src/client/consumer.rs @@ -50,7 +50,6 @@ use std::time::Duration; use futures::future::{BoxFuture, Fuse, FusedFuture, FutureExt}; use futures::Stream; -use pin_project_lite::pin_project; use tracing::{debug, trace, warn}; use crate::{ @@ -196,61 +195,58 @@ impl FetchClient for PartitionClient { } } -pin_project! { - /// Stream consuming data from start offset. - /// - /// # Error Handling - /// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error - /// once and will terminate afterwards. - pub struct StreamConsumer { - client: Arc, +/// Stream consuming data from start offset. +/// +/// # Error Handling +/// If an error is returned by [`fetch_records`](`PartitionClient::fetch_records`) then the stream will emit this error +/// once and will terminate afterwards. +pub struct StreamConsumer { + client: Arc, - min_batch_size: i32, + min_batch_size: i32, - max_batch_size: i32, + max_batch_size: i32, - max_wait_ms: i32, + max_wait_ms: i32, - start_offset: StartOffset, + start_offset: StartOffset, - next_offset: Option, + next_offset: Option, - next_backoff: Option, + next_backoff: Option, - terminated: bool, + terminated: bool, - last_high_watermark: i64, + last_high_watermark: i64, - buffer: VecDeque, + buffer: VecDeque, - fetch_fut: Fuse>, - } + fetch_fut: Fuse>, } impl Stream for StreamConsumer { type Item = Result<(RecordAndOffset, i64)>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if *this.terminated { + if self.terminated { return Poll::Ready(None); } - if let Some(x) = this.buffer.pop_front() { - return Poll::Ready(Some(Ok((x, *this.last_high_watermark)))); + if let Some(x) = self.buffer.pop_front() { + return Poll::Ready(Some(Ok((x, self.last_high_watermark)))); } - if this.fetch_fut.is_terminated() { - let next_offset = *this.next_offset; - let start_offset = *this.start_offset; - let bytes = (*this.min_batch_size)..(*this.max_batch_size); - let max_wait_ms = *this.max_wait_ms; - let next_backoff = std::mem::take(this.next_backoff); - let client = Arc::clone(this.client); + if self.fetch_fut.is_terminated() { + let next_offset = self.next_offset; + let start_offset = self.start_offset; + let bytes = (self.min_batch_size)..(self.max_batch_size); + let max_wait_ms = self.max_wait_ms; + let next_backoff = std::mem::take(&mut self.next_backoff); + let client = Arc::clone(&self.client); trace!(?start_offset, ?next_offset, "Fetching records at offset"); - *this.fetch_fut = FutureExt::fuse(Box::pin(async move { + self.fetch_fut = FutureExt::fuse(Box::pin(async move { if let Some(backoff) = next_backoff { tokio::time::sleep(backoff).await; } @@ -282,9 +278,9 @@ impl Stream for StreamConsumer { })); } - let data: FetchResult = futures::ready!(this.fetch_fut.poll_unpin(cx)); + let data: FetchResult = futures::ready!(self.fetch_fut.poll_unpin(cx)); - match (data, *this.start_offset) { + match (data, self.start_offset) { (Ok(inner), _) => { let FetchResultOk { mut records_and_offsets, @@ -300,14 +296,14 @@ impl Stream for StreamConsumer { // Remember used offset (might be overwritten if there was any data) so we don't refetch the // earliest / latest offset for every try. Also fetching the latest offset might be racy otherwise, // since we'll never be in a position where the latest one can actually be fetched. - *this.next_offset = Some(used_offset); + self.next_offset = Some(used_offset); // Sort records by offset in case they aren't in order records_and_offsets.sort_by_key(|x| x.offset); - *this.last_high_watermark = watermark; + self.last_high_watermark = watermark; if let Some(x) = records_and_offsets.last() { - *this.next_offset = Some(x.offset + 1); - this.buffer.extend(records_and_offsets) + self.next_offset = Some(x.offset + 1); + self.buffer.extend(records_and_offsets) } continue; } @@ -320,7 +316,7 @@ impl Stream for StreamConsumer { StartOffset::Earliest | StartOffset::Latest, ) => { // wipe offset and try again - *this.next_offset = None; + self.next_offset = None; // This will only happen if retention / deletions happen after we've asked for the earliest/latest // offset and our "fetch" request. This should be a rather rare event, but if something is horrible @@ -328,17 +324,17 @@ impl Stream for StreamConsumer { // a bit. let backoff_secs = 1; warn!( - start_offset=?this.start_offset, + start_offset=?self.start_offset, backoff_secs, "Records are gone between ListOffsets and Fetch, backoff a bit", ); - *this.next_backoff = Some(Duration::from_secs(backoff_secs)); + self.next_backoff = Some(Duration::from_secs(backoff_secs)); continue; } // if we have an offset, terminate the stream (Err(e), _) => { - *this.terminated = true; + self.terminated = true; // report error once return Poll::Ready(Some(Err(e))); diff --git a/src/connection/transport.rs b/src/connection/transport.rs index aac6475..4fa4c4a 100644 --- a/src/connection/transport.rs +++ b/src/connection/transport.rs @@ -1,4 +1,4 @@ -use pin_project_lite::pin_project; +use std::ops::DerefMut; use std::pin::Pin; #[cfg(feature = "transport-tls")] use std::sync::Arc; @@ -45,78 +45,67 @@ pub enum Error { pub type Result = std::result::Result; #[cfg(feature = "transport-tls")] -pin_project! { - #[project = TransportProj] - #[derive(Debug)] - pub enum Transport { - Plain{ - #[pin] - inner: TcpStream, - }, - - Tls{ - #[pin] - inner: Box>, - }, - } +#[derive(Debug)] +pub enum Transport { + Plain { + inner: TcpStream, + }, + + Tls { + inner: Pin>>, + }, } #[cfg(not(feature = "transport-tls"))] -pin_project! { - #[project = TransportProj] - #[derive(Debug)] - pub enum Transport { - Plain{ - #[pin] - inner: TcpStream, - }, - } +#[derive(Debug)] +pub enum Transport { + Plain { inner: TcpStream }, } impl AsyncRead for Transport { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - match self.project() { - TransportProj::Plain { inner } => inner.poll_read(cx, buf), + match self.deref_mut() { + Self::Plain { inner } => Pin::new(inner).poll_read(cx, buf), #[cfg(feature = "transport-tls")] - TransportProj::Tls { inner } => inner.poll_read(cx, buf), + Self::Tls { inner } => inner.as_mut().poll_read(cx, buf), } } } impl AsyncWrite for Transport { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - match self.project() { - TransportProj::Plain { inner } => inner.poll_write(cx, buf), + match self.deref_mut() { + Self::Plain { inner } => Pin::new(inner).poll_write(cx, buf), #[cfg(feature = "transport-tls")] - TransportProj::Tls { inner } => inner.poll_write(cx, buf), + Self::Tls { inner } => inner.as_mut().poll_write(cx, buf), } } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - TransportProj::Plain { inner } => inner.poll_flush(cx), + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.deref_mut() { + Self::Plain { inner } => Pin::new(inner).poll_flush(cx), #[cfg(feature = "transport-tls")] - TransportProj::Tls { inner } => inner.poll_flush(cx), + Self::Tls { inner } => inner.as_mut().poll_flush(cx), } } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - TransportProj::Plain { inner } => inner.poll_shutdown(cx), + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.deref_mut() { + Self::Plain { inner } => Pin::new(inner).poll_shutdown(cx), #[cfg(feature = "transport-tls")] - TransportProj::Tls { inner } => inner.poll_shutdown(cx), + Self::Tls { inner } => inner.as_mut().poll_shutdown(cx), } } } @@ -176,7 +165,7 @@ impl Transport { let connector = TlsConnector::from(config); let tls_stream = connector.connect(server_name, tcp_stream).await?; Ok(Self::Tls { - inner: Box::new(tls_stream), + inner: Box::pin(tls_stream), }) } None => Ok(Self::Plain { inner: tcp_stream }),