Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: drop pin-project-lite #246

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ futures = "0.3"
integer-encoding = "4"
lz4 = { version = "1.23", optional = true }
parking_lot = "0.12"
pin-project-lite = "0.2"
rand = "0.8"
rustls = { version = "0.23", optional = true, default-features = false, features = ["logging", "ring", "std", "tls12"] }
snap = { version = "1", optional = true }
Expand Down
21 changes: 8 additions & 13 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use chrono::{TimeZone, Utc};
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
};
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
use parking_lot::Once;
use pin_project_lite::pin_project;
use rdkafka::{
consumer::{Consumer, StreamConsumer as RdStreamConsumer},
producer::{FutureProducer, FutureRecord},
Expand Down Expand Up @@ -256,17 +255,14 @@ where
fn time_it(self) -> Self::TimeItFut {
TimeIt {
t_start: Instant::now(),
inner: self,
inner: Box::pin(self),
}
}
}

pin_project! {
struct TimeIt<F> {
t_start: Instant,
#[pin]
inner: F,
}
struct TimeIt<F> {
t_start: Instant,
inner: Pin<Box<F>>,
}

impl<F> Future for TimeIt<F>
Expand All @@ -275,10 +271,9 @@ where
{
type Output = Duration;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(_) => Poll::Ready(this.t_start.elapsed()),
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.inner.poll_unpin(cx) {
Poll::Ready(_) => Poll::Ready(self.t_start.elapsed()),
Poll::Pending => Poll::Pending,
}
}
Expand Down
59 changes: 31 additions & 28 deletions fuzz/fuzz_targets/protocol_reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#![no_main]
use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration};
use std::{
collections::HashMap,
io::Cursor,
ops::DerefMut,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use libfuzzer_sys::fuzz_target;
use pin_project_lite::pin_project;
use rskafka::{
build_info::DEFAULT_CLIENT_ID,
messenger::Messenger,
Expand Down Expand Up @@ -151,16 +158,12 @@ where
})
}

pin_project! {
/// One-way mock transport with limited data.
///
/// Can only be read. Writes go to `/dev/null`.
struct MockTransport {
#[pin]
data: Cursor<Vec<u8>>,
#[pin]
sink: Sink,
}
/// One-way mock transport with limited data.
///
/// Can only be read. Writes go to `/dev/null`.
struct MockTransport {
data: Cursor<Vec<u8>>,
sink: Sink,
}

impl MockTransport {
Expand All @@ -174,34 +177,34 @@ impl MockTransport {

impl AsyncWrite for MockTransport {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().sink.poll_write(cx, buf)
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_write(cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().sink.poll_flush(cx)
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_flush(cx)
}

fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().sink.poll_shutdown(cx)
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_shutdown(cx)
}
}

impl AsyncRead for MockTransport {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().data.poll_read(cx, buf)
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.deref_mut().data).poll_read(cx, buf)
}
}
82 changes: 39 additions & 43 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use std::time::Duration;

use futures::future::{BoxFuture, Fuse, FusedFuture, FutureExt};
use futures::Stream;
use pin_project_lite::pin_project;
use tracing::{debug, trace, warn};

use crate::{
Expand Down Expand Up @@ -196,61 +195,58 @@ impl FetchClient for PartitionClient {
}
}

pin_project! {
/// Stream consuming data from start offset.
///
/// # Error Handling
/// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
/// once and will terminate afterwards.
pub struct StreamConsumer {
client: Arc<dyn FetchClient>,
/// Stream consuming data from start offset.
///
/// # Error Handling
/// If an error is returned by [`fetch_records`](`PartitionClient::fetch_records`) then the stream will emit this error
/// once and will terminate afterwards.
pub struct StreamConsumer {
client: Arc<dyn FetchClient>,

min_batch_size: i32,
min_batch_size: i32,

max_batch_size: i32,
max_batch_size: i32,

max_wait_ms: i32,
max_wait_ms: i32,

start_offset: StartOffset,
start_offset: StartOffset,

next_offset: Option<i64>,
next_offset: Option<i64>,

next_backoff: Option<Duration>,
next_backoff: Option<Duration>,

terminated: bool,
terminated: bool,

last_high_watermark: i64,
last_high_watermark: i64,

buffer: VecDeque<RecordAndOffset>,
buffer: VecDeque<RecordAndOffset>,

fetch_fut: Fuse<BoxFuture<'static, FetchResult>>,
}
fetch_fut: Fuse<BoxFuture<'static, FetchResult>>,
}

impl Stream for StreamConsumer {
type Item = Result<(RecordAndOffset, i64)>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if *this.terminated {
if self.terminated {
return Poll::Ready(None);
}
if let Some(x) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
if let Some(x) = self.buffer.pop_front() {
return Poll::Ready(Some(Ok((x, self.last_high_watermark))));
}

if this.fetch_fut.is_terminated() {
let next_offset = *this.next_offset;
let start_offset = *this.start_offset;
let bytes = (*this.min_batch_size)..(*this.max_batch_size);
let max_wait_ms = *this.max_wait_ms;
let next_backoff = std::mem::take(this.next_backoff);
let client = Arc::clone(this.client);
if self.fetch_fut.is_terminated() {
let next_offset = self.next_offset;
let start_offset = self.start_offset;
let bytes = (self.min_batch_size)..(self.max_batch_size);
let max_wait_ms = self.max_wait_ms;
let next_backoff = std::mem::take(&mut self.next_backoff);
let client = Arc::clone(&self.client);

trace!(?start_offset, ?next_offset, "Fetching records at offset");

*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
self.fetch_fut = FutureExt::fuse(Box::pin(async move {
if let Some(backoff) = next_backoff {
tokio::time::sleep(backoff).await;
}
Expand Down Expand Up @@ -282,9 +278,9 @@ impl Stream for StreamConsumer {
}));
}

let data: FetchResult = futures::ready!(this.fetch_fut.poll_unpin(cx));
let data: FetchResult = futures::ready!(self.fetch_fut.poll_unpin(cx));

match (data, *this.start_offset) {
match (data, self.start_offset) {
(Ok(inner), _) => {
let FetchResultOk {
mut records_and_offsets,
Expand All @@ -300,14 +296,14 @@ impl Stream for StreamConsumer {
// Remember used offset (might be overwritten if there was any data) so we don't refetch the
// earliest / latest offset for every try. Also fetching the latest offset might be racy otherwise,
// since we'll never be in a position where the latest one can actually be fetched.
*this.next_offset = Some(used_offset);
self.next_offset = Some(used_offset);

// Sort records by offset in case they aren't in order
records_and_offsets.sort_by_key(|x| x.offset);
*this.last_high_watermark = watermark;
self.last_high_watermark = watermark;
if let Some(x) = records_and_offsets.last() {
*this.next_offset = Some(x.offset + 1);
this.buffer.extend(records_and_offsets)
self.next_offset = Some(x.offset + 1);
self.buffer.extend(records_and_offsets)
}
continue;
}
Expand All @@ -320,25 +316,25 @@ impl Stream for StreamConsumer {
StartOffset::Earliest | StartOffset::Latest,
) => {
// wipe offset and try again
*this.next_offset = None;
self.next_offset = None;

// This will only happen if retention / deletions happen after we've asked for the earliest/latest
// offset and our "fetch" request. This should be a rather rare event, but if something is horrible
// wrong in our cluster (e.g. some actor is spamming "delete" requests) then let's at least backoff
// a bit.
let backoff_secs = 1;
warn!(
start_offset=?this.start_offset,
start_offset=?self.start_offset,
backoff_secs,
"Records are gone between ListOffsets and Fetch, backoff a bit",
);
*this.next_backoff = Some(Duration::from_secs(backoff_secs));
self.next_backoff = Some(Duration::from_secs(backoff_secs));

continue;
}
// if we have an offset, terminate the stream
(Err(e), _) => {
*this.terminated = true;
self.terminated = true;

// report error once
return Poll::Ready(Some(Err(e)));
Expand Down
Loading