Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GetEpoch local state query #320

Closed
wants to merge 13 commits into from
67 changes: 45 additions & 22 deletions examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use pallas::network::{
facades::NodeClient,
miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC},
miniprotocols::{
chainsync,
localstate::{self, queries::Request},
Point, MAINNET_MAGIC, PRE_PRODUCTION_MAGIC,
},
};
use tracing::info;

async fn do_localstate_query(client: &mut NodeClient) {
client.statequery().acquire(None).await.unwrap();
async fn do_localstate_query(client: &mut NodeClient, query: Request) {
do_localstate_query_acquisition(client).await;

let result = client
.statequery()
.query(localstate::queries::Request::GetSystemStart)
.await
.unwrap();
let result = client.statequery().query(query).await.unwrap();
info!("result: {:?}", result);

client.statequery().send_release().await.unwrap();
}

info!("system start result: {:?}", result);
async fn do_localstate_query_acquisition(client: &mut NodeClient) {
if let localstate::State::Idle = client.statequery().state() {
client.statequery().acquire(None).await.unwrap();
}
}

async fn do_chainsync(client: &mut NodeClient) {
Expand Down Expand Up @@ -43,16 +50,7 @@ async fn do_chainsync(client: &mut NodeClient) {
}
}

#[cfg(target_family = "unix")]
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();

async fn setup_client() -> NodeClient {
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let socket_path = "/tmp/node.socket";
Expand All @@ -61,14 +59,39 @@ async fn main() {
let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC)
.await
.unwrap();

info!("handshake query result: {:?}", version_table);

let mut client = NodeClient::connect(socket_path, MAINNET_MAGIC)
NodeClient::connect(socket_path, MAINNET_MAGIC)
.await
.unwrap();
.unwrap()
}

#[cfg(target_family = "unix")]
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();

let mut client = setup_client().await;

// specify the query we want to execute
let get_system_start_query = localstate::queries::Request::GetSystemStart;
let get_epoch_query =
localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo);
let get_stake_pools_query =
localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetStakePools);

// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut client).await;
do_localstate_query(&mut client, get_system_start_query).await;
do_localstate_query(&mut client, get_epoch_query).await;
do_localstate_query(&mut client, get_stake_pools_query).await;

client.statequery().send_done().await.unwrap();

// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut client).await;
Expand Down
2 changes: 1 addition & 1 deletion pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl NodeServer {
plexer_handle,
version: ver,
chainsync: server_cs,
statequery: server_sq
statequery: server_sq,
})
} else {
plexer_handle.abort();
Expand Down
26 changes: 17 additions & 9 deletions pallas-network/src/miniprotocols/localstate/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use pallas_codec::Fragment;
use std::marker::PhantomData;
use thiserror::*;

use super::queries::QueryV16;
use super::{AcquireFailure, Message, Query, State};
use crate::miniprotocols::localstate::queries::Response;
use crate::miniprotocols::Point;
use crate::multiplexer;

Expand Down Expand Up @@ -102,15 +104,18 @@ where
match (&self.0, msg) {
(State::Acquiring, Message::Acquired) => Ok(()),
(State::Acquiring, Message::Failure(_)) => Ok(()),
(State::Querying, Message::Result(_)) => Ok(()),
(State::Querying, Message::Response(_)) => Ok(()),
_ => Err(ClientError::InvalidInbound),
}
}

pub async fn send_message(&mut self, msg: &Message<Q>) -> Result<(), ClientError> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(ClientError::Plexer)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(ClientError::Plexer)?;

Ok(())
}
Expand Down Expand Up @@ -174,27 +179,30 @@ where
self.recv_while_acquiring().await
}

pub async fn send_query(&mut self, request: Q::Request) -> Result<(), ClientError> {
pub async fn send_query(&mut self, request: Q::Request) -> Result<Message<Q>, ClientError> {
let msg = Message::<Q>::Query(request);
self.send_message(&msg).await?;
self.0 = State::Querying;

Ok(())
Ok(msg)
}

pub async fn recv_while_querying(&mut self) -> Result<Q::Response, ClientError> {
match self.recv_message().await? {
Message::Result(x) => {
Message::Response(result) => {
self.0 = State::Acquired;
Ok(x)
Ok(result)
}
_ => Err(ClientError::InvalidInbound),
}
}

pub async fn query(&mut self, request: Q::Request) -> Result<Q::Response, ClientError> {
self.send_query(request).await?;
self.recv_while_querying().await
pub async fn query(&mut self, request: Q::Request) -> Result<Response, ClientError> {
self.send_query(request.clone()).await?;
let code: u16 = QueryV16::request_signal(request.clone().into());
let response: Q::Response = self.recv_while_querying().await?;
let vec = QueryV16::to_vec(response.clone().into());
Ok(QueryV16::map_response(code, vec))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pallas-network/src/miniprotocols/localstate/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
e.encode(query)?;
Ok(())
}
Message::Result(result) => {
Message::Response(result) => {
e.array(2)?.u16(4)?;
e.encode(result)?;
Ok(())
Expand Down Expand Up @@ -127,7 +127,7 @@ where
}
4 => {
let response = d.decode()?;
Ok(Message::Result(response))
Ok(Message::Response(response))
}
5 => Ok(Message::Release),
6 => {
Expand Down
13 changes: 10 additions & 3 deletions pallas-network/src/miniprotocols/localstate/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::convert::Into;
use std::fmt::Debug;

use crate::miniprotocols::Point;

use super::queries::{Request, Response};

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
Expand All @@ -18,8 +21,12 @@ pub enum AcquireFailure {
}

pub trait Query: Debug {
type Request: Clone + Debug;
type Response: Clone + Debug;
type Request: Clone + Debug + Into<Request>;
type Response: Clone + Debug + Into<Response>;

fn to_vec(response: Self::Response) -> Vec<u8>;
fn map_response(signal: u16, response: Vec<u8>) -> Self::Response;
fn request_signal(request: Self::Request) -> u16;
}

#[derive(Debug)]
Expand All @@ -28,7 +35,7 @@ pub enum Message<Q: Query> {
Failure(AcquireFailure),
Acquired,
Query(Q::Request),
Result(Q::Response),
Response(Q::Response),
ReAcquire(Option<Point>),
Release,
Done,
Expand Down
Loading