Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ethexe): impl proper Streams on services #4480

Merged
merged 3 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub mod export {
use anyhow::{anyhow, Context};
use ethexe_db::Database;
use ethexe_signer::{PublicKey, Signer};
use futures::{future::Either, stream::FusedStream, Stream};
use futures::{future::Either, ready, stream::FusedStream, Stream};
use libp2p::{
connection_limits,
core::{muxing::StreamMuxerBox, upgrade},
Expand Down Expand Up @@ -121,6 +121,31 @@ pub struct NetworkService {
swarm: Swarm<Behaviour>,
}

impl Stream for NetworkService {
type Item = NetworkEvent;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let Some(event) = ready!(self.swarm.poll_next_unpin(cx)) else {
return Poll::Ready(None);
};

if let Some(event) = self.handle_swarm_event(event) {
return Poll::Ready(Some(event));
}
}
}
}

impl FusedStream for NetworkService {
fn is_terminated(&self) -> bool {
self.swarm.is_terminated()
}
}

impl NetworkService {
pub fn new(
config: NetworkConfig,
Expand Down Expand Up @@ -407,31 +432,6 @@ impl NetworkService {
}
}

impl Stream for NetworkService {
type Item = NetworkEvent;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(event)) = self.swarm.poll_next_unpin(cx) {
if let Some(event) = self.get_mut().handle_swarm_event(event) {
return Poll::Ready(Some(event));
} else {
cx.waker().wake_by_ref();
}
}

Poll::Pending
}
}

impl FusedStream for NetworkService {
fn is_terminated(&self) -> bool {
self.swarm.is_terminated()
}
}

#[cfg(test)]
impl NetworkService {
async fn connect(&mut self, service: &mut Self) {
Expand Down
100 changes: 55 additions & 45 deletions ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ use alloy::{
use anyhow::{Context as _, Result};
use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent};
use ethexe_db::BlockHeader;
use ethexe_service_utils::AsyncFnStream;
use ethexe_signer::Address;
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures::{
future::BoxFuture,
stream::{FusedStream, FuturesUnordered},
Stream, StreamExt,
};
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};
use std::{pin::Pin, sync::Arc, time::Duration};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

pub(crate) type Provider = RootProvider<BoxTransport>;

Expand Down Expand Up @@ -77,29 +85,31 @@ pub struct ObserverService {
codes_futures: FuturesUnordered<BlobDownloadFuture>,
}

impl AsyncFnStream for ObserverService {
impl Stream for ObserverService {
type Item = Result<ObserverEvent>;

async fn like_next(&mut self) -> Option<Self::Item> {
Some(self.next().await)
}
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some((hash, header, events))) = self.stream.poll_next_unpin(cx) {
let event = Ok(self.handle_stream_next(hash, header, events));

// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this.
// impl Stream for ObserverService {
// type Item = Result<ObserverEvent>;
return Poll::Ready(Some(event));
};

// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let e = ready!(pin!(self.next_event()).poll(cx));
// Poll::Ready(Some(e))
// }
// }
if let Poll::Ready(Some(res)) = self.codes_futures.poll_next_unpin(cx) {
let event = res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code });

// impl FusedStream for ObserverService {
// fn is_terminated(&self) -> bool {
// false
// }
// }
return Poll::Ready(Some(event));
}

Poll::Pending
}
}

impl FusedStream for ObserverService {
fn is_terminated(&self) -> bool {
false
}
}

impl ObserverService {
pub async fn new(config: &EthereumConfig) -> Result<Self> {
Expand Down Expand Up @@ -171,7 +181,7 @@ impl ObserverService {
router: Address,
) -> impl Stream<Item = (H256, BlockHeader, Vec<BlockEvent>)> {
async_stream::stream! {
while let Some(header) = stream.like_next().await {
while let Some(header) = stream.next().await {
let hash = (*header.hash).into();
let parent_hash = (*header.parent_hash).into();
let block_number = header.number as u32;
Expand All @@ -190,31 +200,31 @@ impl ObserverService {
}
}

pub async fn next(&mut self) -> Result<ObserverEvent> {
tokio::select! {
Some((hash, header, events)) = self.stream.next() => {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = event {
self.lookup_code(*code_id, *tx_hash);
}
}

Ok(ObserverEvent::Block(BlockData {
hash,
header,
events,
}))
},
Some(res) = self.codes_futures.next() => {
res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code })
fn handle_stream_next(
&mut self,
hash: H256,
header: BlockHeader,
events: Vec<BlockEvent>,
) -> ObserverEvent {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) =
event
{
self.lookup_code(*code_id, *tx_hash);
}
}

ObserverEvent::Block(BlockData {
hash,
header,
events,
})
}
}

Expand Down
6 changes: 4 additions & 2 deletions ethexe/observer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ async fn test_deployment() -> Result<()> {
let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert!(matches!(event, ObserverEvent::Block(..)));

let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert_eq!(
event,
Expand Down
49 changes: 17 additions & 32 deletions ethexe/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::{Context as _, Result};
use ethexe_service_utils::AsyncFnStream;
use futures::FutureExt;
use futures::{ready, stream::FusedStream, FutureExt, Stream};
use hyper::{
http::StatusCode,
server::conn::AddrIncoming,
Expand All @@ -35,7 +34,9 @@ use prometheus::{
};
use std::{
net::SocketAddr,
pin::Pin,
sync::LazyLock,
task::{Context, Poll},
time::{Duration, Instant, SystemTime},
};
use tokio::{
Expand Down Expand Up @@ -102,36 +103,28 @@ pub struct PrometheusService {
metrics: PrometheusMetrics,
updated: Instant,

#[allow(unused)]
// to be used in stream impl.
server: JoinHandle<()>,

interval: Interval,
interval: Pin<Box<Interval>>,
}

impl AsyncFnStream for PrometheusService {
impl Stream for PrometheusService {
type Item = PrometheusEvent;

async fn like_next(&mut self) -> Option<Self::Item> {
Some(self.next().await)
}
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let instant = ready!(self.interval.poll_tick(cx));

// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this.
// impl Stream for PrometheusService {
// type Item = PrometheusEvent;
self.updated = instant.into();

// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let e = ready!(pin!(self.next_event()).poll(cx));
// Poll::Ready(Some(e))
// }
// }
Poll::Ready(Some(PrometheusEvent::CollectMetrics))
}
}

// impl FusedStream for PrometheusService {
// fn is_terminated(&self) -> bool {
// self.server.is_finished()
// }
// }
impl FusedStream for PrometheusService {
fn is_terminated(&self) -> bool {
self.server.is_finished()
}
}

impl PrometheusService {
pub fn new(config: PrometheusConfig) -> Result<Self> {
Expand All @@ -140,7 +133,7 @@ impl PrometheusService {

let server = tokio::spawn(init_prometheus(config.addr, config.registry).map(drop));

let interval = time::interval(Duration::from_secs(6));
let interval = Box::pin(time::interval(Duration::from_secs(6)));

Ok(Self {
metrics,
Expand Down Expand Up @@ -168,14 +161,6 @@ impl PrometheusService {
.submitted_block_commitments
.set(submitted_block_commitments as u64);
}

pub async fn next(&mut self) -> PrometheusEvent {
let instant = self.interval.tick().await;

self.updated = instant.into();

PrometheusEvent::CollectMetrics
}
}

struct PrometheusMetrics {
Expand Down
Loading
Loading