From e48fd1160b2b81de4039b57b128c754720bdfa9d Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 30 Oct 2023 13:10:42 +0100 Subject: [PATCH 01/13] implement GetEpoch local state query w examples test fixture is also included. --- examples/n2c-miniprotocols/src/main.rs | 59 ++++--- .../src/miniprotocols/localstate/client.rs | 5 +- .../src/miniprotocols/localstate/queries.rs | 55 +++--- pallas-network/tests/protocols.rs | 163 +++++++++++++++++- 4 files changed, 225 insertions(+), 57 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 6ca04582..0f1b177b 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -4,16 +4,20 @@ use pallas::network::{ }; use tracing::info; -async fn do_localstate_query(client: &mut NodeClient) { - client.statequery().acquire(None).await.unwrap(); +async fn do_localstate_query(client: &mut NodeClient, query: localstate::queries::Request) { + do_localstate_query_acquisition(client).await; - let result = client - .statequery() - .query(localstate::queries::Request::GetSystemStart) - .await - .unwrap(); + let result = client.statequery().query(query).await.unwrap(); + + info!("result: {:?}", result); + + client.statequery().send_release().await.unwrap(); +} - info!("system start result: {:?}", result); +async fn do_localstate_query_acquisition(client: &mut NodeClient) { + if let localstate::State::Idle = client.statequery().state() { + client.statequery().acquire(None).await.unwrap(); + } } async fn do_chainsync(client: &mut NodeClient) { @@ -43,16 +47,7 @@ async fn do_chainsync(client: &mut NodeClient) { } } -#[cfg(target_family = "unix")] -#[tokio::main] -async fn main() { - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) - .finish(), - ) - .unwrap(); - +async fn setup_client() -> NodeClient { // we connect to the unix socket of the local node. Make sure you have the right // path for your environment let socket_path = "/tmp/node.socket"; @@ -61,14 +56,36 @@ async fn main() { let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC) .await .unwrap(); + info!("handshake query result: {:?}", version_table); - let mut client = NodeClient::connect(socket_path, MAINNET_MAGIC) + NodeClient::connect(socket_path, MAINNET_MAGIC) .await - .unwrap(); + .unwrap() +} + +#[cfg(target_family = "unix")] +#[tokio::main] +async fn main() { + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap(); + + let mut client = setup_client().await; + + // specify the query we want to execute + let get_system_start_query = localstate::queries::Request::GetSystemStart; + let get_epoch_query = + localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); // execute an arbitrary "Local State" query against the node - do_localstate_query(&mut client).await; + do_localstate_query(&mut client, get_system_start_query).await; + do_localstate_query(&mut client, get_epoch_query).await; + + client.statequery().send_done().await.unwrap(); // execute the chainsync flow from an arbitrary point in the chain do_chainsync(&mut client).await; diff --git a/pallas-network/src/miniprotocols/localstate/client.rs b/pallas-network/src/miniprotocols/localstate/client.rs index f14d79cb..f692054e 100644 --- a/pallas-network/src/miniprotocols/localstate/client.rs +++ b/pallas-network/src/miniprotocols/localstate/client.rs @@ -110,7 +110,10 @@ where pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; - self.1.send_msg_chunks(msg).await.map_err(ClientError::Plexer)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ClientError::Plexer)?; Ok(()) } diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 7cae029d..233a81c8 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -122,27 +122,26 @@ impl Encode<()> for BlockQuery { BlockQuery::GetRewardInfoPools => { e.array(1)?; e.u16(18)?; - } - // BlockQuery::GetPoolState(()) => { - // e.array(X)?; - // e.u16(19)?; - // } - // BlockQuery::GetStakeSnapshots(()) => { - // e.array(X)?; - // e.u16(20)?; - // } - // BlockQuery::GetPoolDistr(()) => { - // e.array(X)?; - // e.u16(21)?; - // } - // BlockQuery::GetStakeDelegDeposits(()) => { - // e.array(X)?; - // e.u16(22)?; - // } - // BlockQuery::GetConstitutionHash => { - // e.array(1)?; - // e.u16(23)?; - // } + } // BlockQuery::GetPoolState(()) => { + // e.array(X)?; + // e.u16(19)?; + // } + // BlockQuery::GetStakeSnapshots(()) => { + // e.array(X)?; + // e.u16(20)?; + // } + // BlockQuery::GetPoolDistr(()) => { + // e.array(X)?; + // e.u16(21)?; + // } + // BlockQuery::GetStakeDelegDeposits(()) => { + // e.array(X)?; + // e.u16(22)?; + // } + // BlockQuery::GetConstitutionHash => { + // e.array(1)?; + // e.u16(23)?; + // } } Ok(()) } @@ -241,11 +240,9 @@ impl<'b> Decode<'b, ()> for Request { (1, 1) => Ok(Self::GetSystemStart), (1, 2) => Ok(Self::GetChainBlockNo), (1, 3) => Ok(Self::GetChainPoint), - _ => { - return Err(decode::Error::message( - "invalid (size, tag) for lsq request", - )) - } + _ => Err(decode::Error::message( + "invalid (size, tag) for lsq request", + )), } } } @@ -258,6 +255,10 @@ impl GenericResponse { pub fn new(bytes: Vec) -> Self { Self(bytes) } + + pub fn bytes(&self) -> &[u8] { + &self.0 + } } impl Encode<()> for GenericResponse { @@ -268,7 +269,7 @@ impl Encode<()> for GenericResponse { ) -> Result<(), encode::Error> { e.writer_mut() .write_all(&self.0) - .map_err(|e| encode::Error::write(e)) + .map_err(encode::Error::write) } } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index ad5cb341..ee427799 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -4,17 +4,19 @@ use std::time::Duration; use pallas_network::facades::{NodeClient, PeerClient, PeerServer}; use pallas_network::miniprotocols::blockfetch::BlockRequest; +use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries::{GenericResponse, Request}; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; -use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; -use pallas_network::miniprotocols::{handshake, localstate}; +use pallas_network::miniprotocols::{ + handshake, localstate, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, +}; use pallas_network::multiplexer::{Bearer, Plexer}; use std::path::Path; use tokio::net::{TcpListener, UnixListener}; @@ -256,15 +258,14 @@ pub async fn blockfetch_server_and_client_happy_path() { } #[tokio::test] -#[ignore] -pub async fn local_state_query_server_and_client_happy_path() { +pub async fn local_state_query_server_and_block_query_stake_pools_client_happy_path() { let server = tokio::spawn({ async move { // server setup let socket_path = Path::new("node.socket"); if socket_path.exists() { - fs::remove_file(&socket_path).unwrap(); + fs::remove_file(socket_path).unwrap(); } let unix_listener = UnixListener::bind(socket_path).unwrap(); @@ -274,10 +275,10 @@ pub async fn local_state_query_server_and_client_happy_path() { let mut server_plexer = Plexer::new(bearer); let mut server_hs: handshake::Server = - handshake::Server::new(server_plexer.subscribe_server(0)); + handshake::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE)); let mut server_sq: localstate::Server = - localstate::Server::new(server_plexer.subscribe_server(7)); + localstate::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY)); tokio::spawn(async move { server_plexer.run().await }); @@ -376,6 +377,7 @@ pub async fn local_state_query_server_and_client_happy_path() { .unwrap(); let resp = client_sq.recv_while_querying().await.unwrap(); + println!("resp: {:?}", resp); assert_eq!( resp, @@ -399,6 +401,151 @@ pub async fn local_state_query_server_and_client_happy_path() { _ = tokio::join!(client, server); } +#[tokio::test] +pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_path() { + let server = tokio::spawn({ + // TODO: create a common function to setup the server + async move { + // server setup + let socket_path = Path::new("node.socket"); + + if socket_path.exists() { + fs::remove_file(socket_path).unwrap(); + } + + let unix_listener = UnixListener::bind(socket_path).unwrap(); + + let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap(); + + let mut server_plexer = Plexer::new(bearer); + + let mut server_hs: handshake::Server = + handshake::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE)); + + let mut server_sq: localstate::Server = + localstate::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY)); + + tokio::spawn(async move { server_plexer.run().await }); + + server_hs.receive_proposed_versions().await.unwrap(); + server_hs + .accept_version(10, n2c::VersionData::new(0, Some(false))) + .await + .unwrap(); + + // server receives range from client, sends blocks + + let ClientAcquireRequest(maybe_point) = + server_sq.recv_while_idle().await.unwrap().unwrap(); + + assert_eq!(maybe_point, Some(Point::Origin)); + assert_eq!(*server_sq.state(), localstate::State::Acquiring); + + // server_bf.send_block_range(bodies).await.unwrap(); + + server_sq.send_acquired().await.unwrap(); + + assert_eq!(*server_sq.state(), localstate::State::Acquired); + + // server receives query from client + + let query = match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::Query(q) => q, + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!( + query, + Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo) + ); + + assert_eq!(*server_sq.state(), localstate::State::Querying); + + server_sq + .send_result(GenericResponse::new(hex::decode("83188118181867").unwrap())) + .await + .unwrap(); + + assert_eq!(*server_sq.state(), localstate::State::Acquired); + + // server receives reaquire from the client + + let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::ReAcquire(p) => p, + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3]))); + assert_eq!(*server_sq.state(), localstate::State::Acquiring); + + server_sq.send_acquired().await.unwrap(); + + // server receives release from the client + + match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::Release => (), + x => panic!("unexpected message from client: {x:?}"), + }; + + assert!(server_sq.recv_while_idle().await.unwrap().is_none()); + + assert_eq!(*server_sq.state(), localstate::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + + let socket_path = "node.socket"; + + let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap(); + + let client_sq = client_to_server_conn.statequery(); + + // client sends acquire + + client_sq.send_acquire(Some(Point::Origin)).await.unwrap(); + + client_sq.recv_while_acquiring().await.unwrap(); + + assert_eq!(*client_sq.state(), localstate::State::Acquired); + + // client sends a BlockQuery + + client_sq + .send_query(Request::BlockQuery( + localstate::queries::BlockQuery::GetEpochNo, + )) + .await + .unwrap(); + + let resp = client_sq.recv_while_querying().await.unwrap(); + println!("resp: {:?}", resp); + + assert_eq!( + resp, + GenericResponse::new(hex::decode("83188118181867").unwrap()) + ); + + // client sends a ReAquire + + client_sq + .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) + .await + .unwrap(); + + client_sq.recv_while_acquiring().await.unwrap(); + + client_sq.send_release().await.unwrap(); + + client_sq.send_done().await.unwrap(); + }); + + _ = tokio::join!(client, server); +} + #[tokio::test] #[ignore] pub async fn chainsync_server_and_client_happy_path_n2n() { @@ -596,4 +743,4 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { }); _ = tokio::join!(client, server); -} \ No newline at end of file +} From f68e8735539621eb803cc9938df831709c841db4 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 30 Oct 2023 13:18:50 +0100 Subject: [PATCH 02/13] remove unnecessary println! --- pallas-network/tests/protocols.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index ee427799..e003fa86 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -522,7 +522,6 @@ pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_pat .unwrap(); let resp = client_sq.recv_while_querying().await.unwrap(); - println!("resp: {:?}", resp); assert_eq!( resp, From b7e434a5f829e131e904f8456262be499fd8b3ef Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Tue, 31 Oct 2023 16:07:59 +0100 Subject: [PATCH 03/13] unify local state query fixtures --- pallas-network/tests/protocols.rs | 141 ++++++------------------------ 1 file changed, 26 insertions(+), 115 deletions(-) diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index e003fa86..5812f888 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -258,7 +258,8 @@ pub async fn blockfetch_server_and_client_happy_path() { } #[tokio::test] -pub async fn local_state_query_server_and_block_query_stake_pools_client_happy_path() { +#[ignore] +pub async fn local_state_query_server_and_client_happy_path() { let server = tokio::spawn({ async move { // server setup @@ -303,7 +304,6 @@ pub async fn local_state_query_server_and_block_query_stake_pools_client_happy_p assert_eq!(*server_sq.state(), localstate::State::Acquired); // server receives query from client - let query = match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::Query(q) => q, x => panic!("unexpected message from client: {x:?}"), @@ -321,119 +321,13 @@ pub async fn local_state_query_server_and_block_query_stake_pools_client_happy_p .await .unwrap(); - assert_eq!(*server_sq.state(), localstate::State::Acquired); - - // server receives reaquire from the client - - let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { - ClientQueryRequest::ReAcquire(p) => p, - x => panic!("unexpected message from client: {x:?}"), - }; - - assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3]))); - assert_eq!(*server_sq.state(), localstate::State::Acquiring); - - server_sq.send_acquired().await.unwrap(); - // server receives release from the client match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::Release => (), x => panic!("unexpected message from client: {x:?}"), }; - - assert!(server_sq.recv_while_idle().await.unwrap().is_none()); - - assert_eq!(*server_sq.state(), localstate::State::Done); - } - }); - - let client = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; - - // client setup - - let socket_path = "node.socket"; - - let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap(); - - let client_sq = client_to_server_conn.statequery(); - - // client sends acquire - - client_sq.send_acquire(Some(Point::Origin)).await.unwrap(); - - client_sq.recv_while_acquiring().await.unwrap(); - - assert_eq!(*client_sq.state(), localstate::State::Acquired); - - // client sends a BlockQuery - - client_sq - .send_query(Request::BlockQuery( - localstate::queries::BlockQuery::GetStakePools, - )) - .await - .unwrap(); - - let resp = client_sq.recv_while_querying().await.unwrap(); - println!("resp: {:?}", resp); - - assert_eq!( - resp, - GenericResponse::new(hex::decode("82011A008BD423").unwrap()) - ); - - // client sends a ReAquire - - client_sq - .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) - .await - .unwrap(); - - client_sq.recv_while_acquiring().await.unwrap(); - - client_sq.send_release().await.unwrap(); - - client_sq.send_done().await.unwrap(); - }); - - _ = tokio::join!(client, server); -} - -#[tokio::test] -pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_path() { - let server = tokio::spawn({ - // TODO: create a common function to setup the server - async move { - // server setup - let socket_path = Path::new("node.socket"); - - if socket_path.exists() { - fs::remove_file(socket_path).unwrap(); - } - - let unix_listener = UnixListener::bind(socket_path).unwrap(); - - let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap(); - - let mut server_plexer = Plexer::new(bearer); - - let mut server_hs: handshake::Server = - handshake::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE)); - - let mut server_sq: localstate::Server = - localstate::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY)); - - tokio::spawn(async move { server_plexer.run().await }); - - server_hs.receive_proposed_versions().await.unwrap(); - server_hs - .accept_version(10, n2c::VersionData::new(0, Some(false))) - .await - .unwrap(); - - // server receives range from client, sends blocks + assert_eq!(*server_sq.state(), localstate::State::Idle); let ClientAcquireRequest(maybe_point) = server_sq.recv_while_idle().await.unwrap().unwrap(); @@ -441,21 +335,18 @@ pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_pat assert_eq!(maybe_point, Some(Point::Origin)); assert_eq!(*server_sq.state(), localstate::State::Acquiring); - // server_bf.send_block_range(bodies).await.unwrap(); - server_sq.send_acquired().await.unwrap(); assert_eq!(*server_sq.state(), localstate::State::Acquired); // server receives query from client - - let query = match server_sq.recv_while_acquired().await.unwrap() { + let epoch_query = match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::Query(q) => q, x => panic!("unexpected message from client: {x:?}"), }; assert_eq!( - query, + epoch_query, Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo) ); @@ -516,7 +407,7 @@ pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_pat client_sq .send_query(Request::BlockQuery( - localstate::queries::BlockQuery::GetEpochNo, + localstate::queries::BlockQuery::GetStakePools, )) .await .unwrap(); @@ -525,6 +416,26 @@ pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_pat assert_eq!( resp, + GenericResponse::new(hex::decode("82011A008BD423").unwrap()) + ); + + client_sq.send_release().await.unwrap(); + + client_sq.send_acquire(Some(Point::Origin)).await.unwrap(); + + client_sq.recv_while_acquiring().await.unwrap(); + + client_sq + .send_query(Request::BlockQuery( + localstate::queries::BlockQuery::GetEpochNo, + )) + .await + .unwrap(); + + let resp_get_epoch_no = client_sq.recv_while_querying().await.unwrap(); + + assert_eq!( + resp_get_epoch_no, GenericResponse::new(hex::decode("83188118181867").unwrap()) ); From 86aa076a8726cf566f9f5f2d03a311ff0efdc3d8 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Tue, 31 Oct 2023 16:10:38 +0100 Subject: [PATCH 04/13] minor changes ```diff diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 5812f88838ed..b625632f0f15 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -290,7 +290,6 @@ pub async fn local_state_query_server_and_client_happy_path() { .unwrap(); // server receives range from client, sends blocks - let clientacquirerequest(maybe_point) = server_sq.recv_while_idle().await.unwrap().unwrap(); @@ -322,7 +321,6 @@ pub async fn local_state_query_server_and_client_happy_path() { .unwrap(); // server receives release from the client - match server_sq.recv_while_acquired().await.unwrap() { clientqueryrequest::release => (), x => panic!("unexpected message from client: {x:?}"), @@ -360,7 +358,6 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::state::acquired); // server receives reaquire from the client - let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { clientqueryrequest::reacquire(p) => p, x => panic!("unexpected message from client: {x:?}"), @@ -372,7 +369,6 @@ pub async fn local_state_query_server_and_client_happy_path() { server_sq.send_acquired().await.unwrap(); // server receives release from the client - match server_sq.recv_while_acquired().await.unwrap() { clientqueryrequest::release => (), x => panic!("unexpected message from client: {x:?}"), ``` --- pallas-network/tests/protocols.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 5812f888..b625632f 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -290,7 +290,6 @@ pub async fn local_state_query_server_and_client_happy_path() { .unwrap(); // server receives range from client, sends blocks - let ClientAcquireRequest(maybe_point) = server_sq.recv_while_idle().await.unwrap().unwrap(); @@ -322,7 +321,6 @@ pub async fn local_state_query_server_and_client_happy_path() { .unwrap(); // server receives release from the client - match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::Release => (), x => panic!("unexpected message from client: {x:?}"), @@ -360,7 +358,6 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Acquired); // server receives reaquire from the client - let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::ReAcquire(p) => p, x => panic!("unexpected message from client: {x:?}"), @@ -372,7 +369,6 @@ pub async fn local_state_query_server_and_client_happy_path() { server_sq.send_acquired().await.unwrap(); // server receives release from the client - match server_sq.recv_while_acquired().await.unwrap() { ClientQueryRequest::Release => (), x => panic!("unexpected message from client: {x:?}"), From 02edf49203ac992e7d2df2129bef9ccdfe443b02 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Wed, 1 Nov 2023 20:15:28 +0100 Subject: [PATCH 05/13] add response primitives currently supporting only non params local state queries --- .../src/miniprotocols/localstate/queries.rs | 186 ++++++++++++++++-- pallas-network/tests/protocols.rs | 26 ++- 2 files changed, 183 insertions(+), 29 deletions(-) diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 233a81c8..dabc73e6 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -42,7 +42,7 @@ impl Encode<()> for BlockQuery { e.u16(0)?; e.array(2)?; /* - TODO: Think this is era or something? First fetch era with + TODO: I think this is era or something? First fetch era with [3, [0, [2, [1]]]], then use it here? */ e.u16(5)?; @@ -248,39 +248,185 @@ impl<'b> Decode<'b, ()> for Request { } #[derive(Debug, Clone, PartialEq)] -pub struct GenericResponse(Vec); +pub enum BlockQueryResponse { + GetLedgerTip(Vec), + EpochNo(Vec), + StakePools(Vec), + GetCurrentPParams(Vec), + GetProposedPParamsUpdates(Vec), + GetStakeDistribution(Vec), + GetGenesisConfig(Vec), + DebugChainDepState(Vec), + GetRewardProvenance(Vec), + GetStakePools(Vec), + GetRewardInfoPools(Vec), +} -impl GenericResponse { - /// "bytes" must be valid CBOR - pub fn new(bytes: Vec) -> Self { - Self(bytes) +impl Encode<()> for BlockQueryResponse { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Self::GetLedgerTip(bytes) => { + e.array(2)?; + e.u16(0)?; + e.bytes(bytes)?; + Ok(()) + } + Self::EpochNo(bytes) => { + e.array(2)?; + e.u16(1)?; + e.bytes(bytes)?; + Ok(()) + } + Self::StakePools(bytes) => { + e.array(2)?; + e.u16(2)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetCurrentPParams(bytes) => { + e.array(2)?; + e.u16(3)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetProposedPParamsUpdates(bytes) => { + e.array(2)?; + e.u16(4)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetStakeDistribution(bytes) => { + e.array(2)?; + e.u16(5)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetGenesisConfig(bytes) => { + e.array(2)?; + e.u16(11)?; + e.bytes(bytes)?; + Ok(()) + } + Self::DebugChainDepState(bytes) => { + e.array(2)?; + e.u16(13)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetRewardProvenance(bytes) => { + e.array(2)?; + e.u16(14)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetStakePools(bytes) => { + e.array(2)?; + e.u16(16)?; + e.bytes(bytes)?; + Ok(()) + } + Self::GetRewardInfoPools(bytes) => { + e.array(2)?; + e.u16(17)?; + e.bytes(bytes)?; + Ok(()) + } + } } +} - pub fn bytes(&self) -> &[u8] { - &self.0 +impl<'b> Decode<'b, ()> for BlockQueryResponse { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + let size = d + .array()? + .ok_or_else(|| decode::Error::message("unexpected indefinite len list"))?; + + let tag = d.u16()?; + + match (size, tag) { + (2, 0) => Ok(Self::GetLedgerTip(d.bytes()?.to_vec())), + (2, 1) => Ok(Self::EpochNo(d.bytes()?.to_vec())), + (2, 2) => Ok(Self::StakePools(d.bytes()?.to_vec())), + (2, 3) => Ok(Self::GetCurrentPParams(d.bytes()?.to_vec())), + (2, 4) => Ok(Self::GetProposedPParamsUpdates(d.bytes()?.to_vec())), + (2, 5) => Ok(Self::GetStakeDistribution(d.bytes()?.to_vec())), + (2, 11) => Ok(Self::GetGenesisConfig(d.bytes()?.to_vec())), + (2, 13) => Ok(Self::DebugChainDepState(d.bytes()?.to_vec())), + (2, 14) => Ok(Self::GetRewardProvenance(d.bytes()?.to_vec())), + (2, 16) => Ok(Self::GetStakePools(d.bytes()?.to_vec())), + (2, 17) => Ok(Self::GetRewardInfoPools(d.bytes()?.to_vec())), + _ => Err(decode::Error::message( + "invalid (size, tag) for lsq response", + )), + } } } -impl Encode<()> for GenericResponse { +#[derive(Debug, Clone, PartialEq)] +pub enum Response { + BlockQuery(BlockQueryResponse), + SystemStart(Vec), + ChainBlockNo(Vec), + ChainPoint(Vec), +} + +impl Encode<()> for Response { fn encode( &self, e: &mut Encoder, _ctx: &mut (), ) -> Result<(), encode::Error> { - e.writer_mut() - .write_all(&self.0) - .map_err(encode::Error::write) + match self { + Self::BlockQuery(q) => { + e.array(2)?; + e.u16(0)?; + e.encode(q)?; + + Ok(()) + } + Self::SystemStart(bytes) => { + e.array(1)?; + e.u16(1)?; + e.bytes(bytes)?; + Ok(()) + } + Self::ChainBlockNo(bytes) => { + e.array(1)?; + e.u16(2)?; + e.bytes(bytes)?; + Ok(()) + } + Self::ChainPoint(bytes) => { + e.array(1)?; + e.u16(3)?; + e.bytes(bytes)?; + Ok(()) + } + } } } -impl<'b> Decode<'b, ()> for GenericResponse { +impl<'b> Decode<'b, ()> for Response { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - let start = d.position(); - d.skip()?; - let end = d.position(); - let slice = &d.input()[start..end]; - let vec = slice.to_vec(); - Ok(GenericResponse(vec)) + let size = d + .array()? + .ok_or_else(|| decode::Error::message("unexpected indefinite len list"))?; + + let tag = d.u16()?; + + match (size, tag) { + (2, 0) => Ok(Self::BlockQuery(d.decode()?)), + (1, 1) => Ok(Self::SystemStart(d.bytes()?.to_vec())), + (1, 2) => Ok(Self::ChainBlockNo(d.bytes()?.to_vec())), + (1, 3) => Ok(Self::ChainPoint(d.bytes()?.to_vec())), + _ => Err(decode::Error::message( + "invalid (size, tag) for lsq response", + )), + } } } @@ -290,5 +436,5 @@ pub struct QueryV16 {} impl Query for QueryV16 { type Request = Request; - type Response = GenericResponse; + type Response = Response; } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index b625632f..84fe835e 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -7,7 +7,7 @@ use pallas_network::miniprotocols::blockfetch::BlockRequest; use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; -use pallas_network::miniprotocols::localstate::queries::{GenericResponse, Request}; +use pallas_network::miniprotocols::localstate::queries::{BlockQueryResponse, Request, Response}; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; use pallas_network::miniprotocols::{ blockfetch, @@ -258,7 +258,6 @@ pub async fn blockfetch_server_and_client_happy_path() { } #[tokio::test] -#[ignore] pub async fn local_state_query_server_and_client_happy_path() { let server = tokio::spawn({ async move { @@ -315,8 +314,12 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Querying); + let get_stake_pools_response = Response::BlockQuery(BlockQueryResponse::StakePools( + hex::decode("82011A008BD423").unwrap(), + )); + server_sq - .send_result(GenericResponse::new(hex::decode("82011A008BD423").unwrap())) + .send_result(get_stake_pools_response) .await .unwrap(); @@ -350,10 +353,11 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Querying); - server_sq - .send_result(GenericResponse::new(hex::decode("83188118181867").unwrap())) - .await - .unwrap(); + let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo( + hex::decode("83188118181867").unwrap(), + )); + + server_sq.send_result(get_epoch_no_response).await.unwrap(); assert_eq!(*server_sq.state(), localstate::State::Acquired); @@ -412,7 +416,9 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!( resp, - GenericResponse::new(hex::decode("82011A008BD423").unwrap()) + Response::BlockQuery(BlockQueryResponse::StakePools( + hex::decode("82011A008BD423").unwrap() + )) ); client_sq.send_release().await.unwrap(); @@ -432,7 +438,9 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!( resp_get_epoch_no, - GenericResponse::new(hex::decode("83188118181867").unwrap()) + Response::BlockQuery(BlockQueryResponse::EpochNo( + hex::decode("83188118181867").unwrap() + )) ); // client sends a ReAquire From f5c986ac8788350600e6db77b920de1e905a5351 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Wed, 1 Nov 2023 20:17:41 +0100 Subject: [PATCH 06/13] minor changes. ```diff diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index dabc73e646d0..71921c1672c7 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -42,7 +42,7 @@ impl encode<()> for blockquery { e.u16(0)?; e.array(2)?; /* - todo: i think this is era or something? first fetch era with + todo: think this is era or something? first fetch era with [3, [0, [2, [1]]]], then use it here? */ e.u16(5)?; ``` --- pallas-network/src/miniprotocols/localstate/queries.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index dabc73e6..71921c16 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -42,7 +42,7 @@ impl Encode<()> for BlockQuery { e.u16(0)?; e.array(2)?; /* - TODO: I think this is era or something? First fetch era with + TODO: Think this is era or something? First fetch era with [3, [0, [2, [1]]]], then use it here? */ e.u16(5)?; From 9b8316b32a87edb00146aec482ea2b294d7b1901 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Wed, 1 Nov 2023 20:23:31 +0100 Subject: [PATCH 07/13] renaming blockqueryresponse variants renames the variants of the `blockqueryresponse` enum in the `queries.rs` file of the `localstate` miniprotocol. the following changes were made: - `getledgertip` is now `ledgertip` - `stakepools` is now `currentpparams` - `getcurrentpparams` is now `proposedpparamsupdates` - `getproposedpparamsupdates` is now `stakedistribution` - `getstakedistribution` is now `genesisconfig` - `getgenesisconfig` is now `debugchaindepstate` - `getrewardprovenance` is now `rewardprovenance` - `getstakepools` is now `stakepools` - `getrewardinfopools` is now `rewardinfopools` --- .../src/miniprotocols/localstate/queries.rs | 56 ++++++++----------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 71921c16..a0cd10d1 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -249,17 +249,16 @@ impl<'b> Decode<'b, ()> for Request { #[derive(Debug, Clone, PartialEq)] pub enum BlockQueryResponse { - GetLedgerTip(Vec), + LedgerTip(Vec), EpochNo(Vec), - StakePools(Vec), - GetCurrentPParams(Vec), - GetProposedPParamsUpdates(Vec), - GetStakeDistribution(Vec), - GetGenesisConfig(Vec), + CurrentPParams(Vec), + ProposedPParamsUpdates(Vec), + StakeDistribution(Vec), + GenesisConfig(Vec), DebugChainDepState(Vec), - GetRewardProvenance(Vec), - GetStakePools(Vec), - GetRewardInfoPools(Vec), + RewardProvenance(Vec), + StakePools(Vec), + RewardInfoPools(Vec), } impl Encode<()> for BlockQueryResponse { @@ -269,7 +268,7 @@ impl Encode<()> for BlockQueryResponse { _ctx: &mut (), ) -> Result<(), encode::Error> { match self { - Self::GetLedgerTip(bytes) => { + Self::LedgerTip(bytes) => { e.array(2)?; e.u16(0)?; e.bytes(bytes)?; @@ -281,31 +280,25 @@ impl Encode<()> for BlockQueryResponse { e.bytes(bytes)?; Ok(()) } - Self::StakePools(bytes) => { - e.array(2)?; - e.u16(2)?; - e.bytes(bytes)?; - Ok(()) - } - Self::GetCurrentPParams(bytes) => { + Self::CurrentPParams(bytes) => { e.array(2)?; e.u16(3)?; e.bytes(bytes)?; Ok(()) } - Self::GetProposedPParamsUpdates(bytes) => { + Self::ProposedPParamsUpdates(bytes) => { e.array(2)?; e.u16(4)?; e.bytes(bytes)?; Ok(()) } - Self::GetStakeDistribution(bytes) => { + Self::StakeDistribution(bytes) => { e.array(2)?; e.u16(5)?; e.bytes(bytes)?; Ok(()) } - Self::GetGenesisConfig(bytes) => { + Self::GenesisConfig(bytes) => { e.array(2)?; e.u16(11)?; e.bytes(bytes)?; @@ -317,19 +310,19 @@ impl Encode<()> for BlockQueryResponse { e.bytes(bytes)?; Ok(()) } - Self::GetRewardProvenance(bytes) => { + Self::RewardProvenance(bytes) => { e.array(2)?; e.u16(14)?; e.bytes(bytes)?; Ok(()) } - Self::GetStakePools(bytes) => { + Self::StakePools(bytes) => { e.array(2)?; e.u16(16)?; e.bytes(bytes)?; Ok(()) } - Self::GetRewardInfoPools(bytes) => { + Self::RewardInfoPools(bytes) => { e.array(2)?; e.u16(17)?; e.bytes(bytes)?; @@ -348,17 +341,16 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { let tag = d.u16()?; match (size, tag) { - (2, 0) => Ok(Self::GetLedgerTip(d.bytes()?.to_vec())), + (2, 0) => Ok(Self::LedgerTip(d.bytes()?.to_vec())), (2, 1) => Ok(Self::EpochNo(d.bytes()?.to_vec())), - (2, 2) => Ok(Self::StakePools(d.bytes()?.to_vec())), - (2, 3) => Ok(Self::GetCurrentPParams(d.bytes()?.to_vec())), - (2, 4) => Ok(Self::GetProposedPParamsUpdates(d.bytes()?.to_vec())), - (2, 5) => Ok(Self::GetStakeDistribution(d.bytes()?.to_vec())), - (2, 11) => Ok(Self::GetGenesisConfig(d.bytes()?.to_vec())), + (2, 3) => Ok(Self::CurrentPParams(d.bytes()?.to_vec())), + (2, 4) => Ok(Self::ProposedPParamsUpdates(d.bytes()?.to_vec())), + (2, 5) => Ok(Self::StakeDistribution(d.bytes()?.to_vec())), + (2, 11) => Ok(Self::GenesisConfig(d.bytes()?.to_vec())), (2, 13) => Ok(Self::DebugChainDepState(d.bytes()?.to_vec())), - (2, 14) => Ok(Self::GetRewardProvenance(d.bytes()?.to_vec())), - (2, 16) => Ok(Self::GetStakePools(d.bytes()?.to_vec())), - (2, 17) => Ok(Self::GetRewardInfoPools(d.bytes()?.to_vec())), + (2, 14) => Ok(Self::RewardProvenance(d.bytes()?.to_vec())), + (2, 16) => Ok(Self::StakePools(d.bytes()?.to_vec())), + (2, 17) => Ok(Self::RewardInfoPools(d.bytes()?.to_vec())), _ => Err(decode::Error::message( "invalid (size, tag) for lsq response", )), From f73f8fce0bbb7492d0ae746bfae1ae3ac69e3b2a Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 6 Nov 2023 00:40:19 +0100 Subject: [PATCH 08/13] mapping response primitives with local request code signals this has been an attempt to map the server requests into our responses. tests must be updated. --- examples/n2c-miniprotocols/Cargo.toml | 1 + examples/n2c-miniprotocols/src/main.rs | 13 +- pallas-network/src/facades.rs | 2 +- .../src/miniprotocols/localstate/client.rs | 20 +-- .../src/miniprotocols/localstate/codec.rs | 4 +- .../src/miniprotocols/localstate/protocol.rs | 13 +- .../src/miniprotocols/localstate/queries.rs | 117 ++++++++++++++++-- .../src/miniprotocols/localstate/server.rs | 4 +- pallas-network/tests/protocols.rs | 8 +- 9 files changed, 150 insertions(+), 32 deletions(-) diff --git a/examples/n2c-miniprotocols/Cargo.toml b/examples/n2c-miniprotocols/Cargo.toml index 4761a112..ab5d282a 100644 --- a/examples/n2c-miniprotocols/Cargo.toml +++ b/examples/n2c-miniprotocols/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] pallas = { path = "../../pallas" } +pallas-codec = { path = "../../pallas-codec" } net2 = "0.2.37" hex = "0.4.3" log = "0.4.16" diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 0f1b177b..af1ee81f 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,6 +1,10 @@ use pallas::network::{ facades::NodeClient, - miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC}, + miniprotocols::{ + chainsync, + localstate::{self, queries::EpochNo}, + Point, MAINNET_MAGIC, PRE_PRODUCTION_MAGIC, + }, }; use tracing::info; @@ -8,7 +12,6 @@ async fn do_localstate_query(client: &mut NodeClient, query: localstate::queries do_localstate_query_acquisition(client).await; let result = client.statequery().query(query).await.unwrap(); - info!("result: {:?}", result); client.statequery().send_release().await.unwrap(); @@ -50,16 +53,16 @@ async fn do_chainsync(client: &mut NodeClient) { async fn setup_client() -> NodeClient { // we connect to the unix socket of the local node. Make sure you have the right // path for your environment - let socket_path = "/tmp/node.socket"; + let socket_path = "/Users/falcucci/Downloads/cardano-node-8.1.2-macos/node.socket"; // we connect to the unix socket of the local node and perform a handshake query - let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC) + let version_table = NodeClient::handshake_query(socket_path, PRE_PRODUCTION_MAGIC) .await .unwrap(); info!("handshake query result: {:?}", version_table); - NodeClient::connect(socket_path, MAINNET_MAGIC) + NodeClient::connect(socket_path, PRE_PRODUCTION_MAGIC) .await .unwrap() } diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 2cd8b5dd..491fdbd8 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -282,7 +282,7 @@ impl NodeServer { plexer_handle, version: ver, chainsync: server_cs, - statequery: server_sq + statequery: server_sq, }) } else { plexer_handle.abort(); diff --git a/pallas-network/src/miniprotocols/localstate/client.rs b/pallas-network/src/miniprotocols/localstate/client.rs index f692054e..7468eabd 100644 --- a/pallas-network/src/miniprotocols/localstate/client.rs +++ b/pallas-network/src/miniprotocols/localstate/client.rs @@ -5,7 +5,9 @@ use pallas_codec::Fragment; use std::marker::PhantomData; use thiserror::*; +use super::queries::QueryV16; use super::{AcquireFailure, Message, Query, State}; +use crate::miniprotocols::localstate::queries::Response; use crate::miniprotocols::Point; use crate::multiplexer; @@ -102,7 +104,7 @@ where match (&self.0, msg) { (State::Acquiring, Message::Acquired) => Ok(()), (State::Acquiring, Message::Failure(_)) => Ok(()), - (State::Querying, Message::Result(_)) => Ok(()), + (State::Querying, Message::Response(_)) => Ok(()), _ => Err(ClientError::InvalidInbound), } } @@ -177,27 +179,31 @@ where self.recv_while_acquiring().await } - pub async fn send_query(&mut self, request: Q::Request) -> Result<(), ClientError> { + pub async fn send_query(&mut self, request: Q::Request) -> Result, ClientError> { let msg = Message::::Query(request); self.send_message(&msg).await?; self.0 = State::Querying; - Ok(()) + Ok(msg) } pub async fn recv_while_querying(&mut self) -> Result { match self.recv_message().await? { - Message::Result(x) => { + Message::Response(result) => { self.0 = State::Acquired; - Ok(x) + Ok(result) } _ => Err(ClientError::InvalidInbound), } } - pub async fn query(&mut self, request: Q::Request) -> Result { + pub async fn query(&mut self, request: Q::Request) -> Result { + let code: u16 = QueryV16::request_signal(request.clone().into()); self.send_query(request).await?; - self.recv_while_querying().await + let response = self.recv_while_querying().await?; + let vec = QueryV16::to_vec(response.clone().into()); + let result = QueryV16::map_response(code, vec); + Ok(result) } } diff --git a/pallas-network/src/miniprotocols/localstate/codec.rs b/pallas-network/src/miniprotocols/localstate/codec.rs index 7b747eb0..7ead60dd 100644 --- a/pallas-network/src/miniprotocols/localstate/codec.rs +++ b/pallas-network/src/miniprotocols/localstate/codec.rs @@ -71,7 +71,7 @@ where e.encode(query)?; Ok(()) } - Message::Result(result) => { + Message::Response(result) => { e.array(2)?.u16(4)?; e.encode(result)?; Ok(()) @@ -127,7 +127,7 @@ where } 4 => { let response = d.decode()?; - Ok(Message::Result(response)) + Ok(Message::Response(response)) } 5 => Ok(Message::Release), 6 => { diff --git a/pallas-network/src/miniprotocols/localstate/protocol.rs b/pallas-network/src/miniprotocols/localstate/protocol.rs index 1c82106d..f03e3c12 100644 --- a/pallas-network/src/miniprotocols/localstate/protocol.rs +++ b/pallas-network/src/miniprotocols/localstate/protocol.rs @@ -1,7 +1,10 @@ +use std::convert::Into; use std::fmt::Debug; use crate::miniprotocols::Point; +use super::queries::{Request, Response}; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum State { Idle, @@ -18,8 +21,12 @@ pub enum AcquireFailure { } pub trait Query: Debug { - type Request: Clone + Debug; - type Response: Clone + Debug; + type Request: Clone + Debug + Into; + type Response: Clone + Debug + Into; + + fn to_vec(response: Self::Response) -> Vec; + fn map_response(signal: u16, response: Vec) -> Self::Response; + fn request_signal(request: Self::Request) -> u16; } #[derive(Debug)] @@ -28,7 +35,7 @@ pub enum Message { Failure(AcquireFailure), Acquired, Query(Q::Request), - Result(Q::Response), + Response(Q::Response), ReAcquire(Option), Release, Done, diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index a0cd10d1..7fec8cd5 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -261,6 +261,23 @@ pub enum BlockQueryResponse { RewardInfoPools(Vec), } +impl BlockQueryResponse { + fn to_vec(&self) -> Vec { + match self { + BlockQueryResponse::LedgerTip(data) => data.clone(), + BlockQueryResponse::EpochNo(data) => data.clone(), + BlockQueryResponse::CurrentPParams(data) => data.clone(), + BlockQueryResponse::ProposedPParamsUpdates(data) => data.clone(), + BlockQueryResponse::StakeDistribution(data) => data.clone(), + BlockQueryResponse::GenesisConfig(data) => data.clone(), + BlockQueryResponse::DebugChainDepState(data) => data.clone(), + BlockQueryResponse::RewardProvenance(data) => data.clone(), + BlockQueryResponse::StakePools(data) => data.clone(), + BlockQueryResponse::RewardInfoPools(data) => data.clone(), + } + } +} + impl Encode<()> for BlockQueryResponse { fn encode( &self, @@ -358,12 +375,52 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { } } +#[derive(Debug, Clone, PartialEq)] +pub struct EpochNo(Vec); + +impl Encode<()> for EpochNo { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.array(2)?; + e.u16(1)?; + e.bytes(&self.0)?; + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for EpochNo { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + let start = d.position(); + d.skip()?; + let end = d.position(); + let slice = &d.input()[start..end]; + let vec = slice.to_vec(); + Ok(EpochNo(vec)) + } +} + #[derive(Debug, Clone, PartialEq)] pub enum Response { BlockQuery(BlockQueryResponse), SystemStart(Vec), ChainBlockNo(Vec), ChainPoint(Vec), + Generic(Vec), +} + +impl Response { + pub fn to_vec(&self) -> Vec { + match self { + Response::BlockQuery(block_query_response) => block_query_response.to_vec(), + Response::SystemStart(data) => data.clone(), + Response::ChainBlockNo(data) => data.clone(), + Response::ChainPoint(data) => data.clone(), + Response::Generic(data) => data.clone(), + } + } } impl Encode<()> for Response { @@ -377,7 +434,6 @@ impl Encode<()> for Response { e.array(2)?; e.u16(0)?; e.encode(q)?; - Ok(()) } Self::SystemStart(bytes) => { @@ -398,23 +454,27 @@ impl Encode<()> for Response { e.bytes(bytes)?; Ok(()) } + Response::Generic(_) => todo!(), } } } impl<'b> Decode<'b, ()> for Response { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - let size = d - .array()? - .ok_or_else(|| decode::Error::message("unexpected indefinite len list"))?; + d.set_position(d.position() - 1); + let label = d.u16()?; + let start = d.position(); + d.skip()?; + let end = d.position(); + let slice = &d.input()[start..end]; + let vec = slice.to_vec(); - let tag = d.u16()?; - - match (size, tag) { - (2, 0) => Ok(Self::BlockQuery(d.decode()?)), - (1, 1) => Ok(Self::SystemStart(d.bytes()?.to_vec())), - (1, 2) => Ok(Self::ChainBlockNo(d.bytes()?.to_vec())), - (1, 3) => Ok(Self::ChainPoint(d.bytes()?.to_vec())), + match label { + 0 => Ok(Self::BlockQuery(d.decode()?)), + 1 => Ok(Self::SystemStart(vec)), + 2 => Ok(Self::ChainBlockNo(vec)), + 3 => Ok(Self::ChainPoint(vec)), + 4 => Ok(Self::Generic(vec)), _ => Err(decode::Error::message( "invalid (size, tag) for lsq response", )), @@ -429,4 +489,39 @@ pub struct QueryV16 {} impl Query for QueryV16 { type Request = Request; type Response = Response; + + fn request_signal(request: Self::Request) -> u16 { + match request { + Request::BlockQuery(BlockQuery::GetEpochNo) => 0, + Request::BlockQuery(BlockQuery::GetLedgerTip) => 1, + Request::BlockQuery(BlockQuery::GetCurrentPParams) => 2, + Request::BlockQuery(BlockQuery::GetProposedPParamsUpdates) => 3, + Request::BlockQuery(BlockQuery::GetStakeDistribution) => 4, + Request::BlockQuery(BlockQuery::DebugChainDepState) => 5, + Request::BlockQuery(BlockQuery::GetGenesisConfig) => 6, + Request::BlockQuery(BlockQuery::GetRewardProvenance) => 7, + Request::BlockQuery(BlockQuery::GetStakePools) => 8, + Request::BlockQuery(BlockQuery::GetRewardInfoPools) => 9, + Request::GetSystemStart => 10, + Request::GetChainBlockNo => 11, + Request::GetChainPoint => 12, + } + } + + fn map_response(signal: u16, response: Vec) -> Self::Response { + match signal { + 0 => Response::BlockQuery(BlockQueryResponse::EpochNo(response)), + _ => Response::Generic(response), + } + } + + fn to_vec(response: Self::Response) -> Vec { + match response { + Response::BlockQuery(block_query_response) => block_query_response.to_vec(), + Response::SystemStart(data) => data.clone(), + Response::ChainBlockNo(data) => data.clone(), + Response::ChainPoint(data) => data.clone(), + Response::Generic(data) => data.clone(), + } + } } diff --git a/pallas-network/src/miniprotocols/localstate/server.rs b/pallas-network/src/miniprotocols/localstate/server.rs index d3ceba2e..1807e968 100644 --- a/pallas-network/src/miniprotocols/localstate/server.rs +++ b/pallas-network/src/miniprotocols/localstate/server.rs @@ -88,7 +88,7 @@ where match (&self.0, msg) { (State::Acquiring, Message::Acquired) => Ok(()), (State::Acquiring, Message::Failure(_)) => Ok(()), - (State::Querying, Message::Result(_)) => Ok(()), + (State::Querying, Message::Response(_)) => Ok(()), _ => Err(Error::InvalidOutbound), } } @@ -137,7 +137,7 @@ where } pub async fn send_result(&mut self, response: Q::Response) -> Result<(), Error> { - let msg = Message::::Result(response); + let msg = Message::::Response(response); self.send_message(&msg).await?; self.0 = State::Acquired; diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 84fe835e..87d8f8b3 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -7,7 +7,9 @@ use pallas_network::miniprotocols::blockfetch::BlockRequest; use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; -use pallas_network::miniprotocols::localstate::queries::{BlockQueryResponse, Request, Response}; +use pallas_network::miniprotocols::localstate::queries::{ + BlockQueryResponse, QueryResponse, Request, Response, +}; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; use pallas_network::miniprotocols::{ blockfetch, @@ -353,6 +355,10 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Querying); + // let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo( + // hex::decode("83188118181867").unwrap(), + // )); + let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo( hex::decode("83188118181867").unwrap(), )); From b58e6aae95a299409409cebfe50f262a9eca7ecd Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 6 Nov 2023 11:16:26 +0100 Subject: [PATCH 09/13] adding epoch_no struct we added the struct. maybe we should adapt to use it as turbofish --- examples/n2c-miniprotocols/src/main.rs | 6 +- .../src/miniprotocols/localstate/queries.rs | 58 ++++++++++--------- pallas-network/tests/protocols.rs | 6 +- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index af1ee81f..fc26f970 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -2,7 +2,7 @@ use pallas::network::{ facades::NodeClient, miniprotocols::{ chainsync, - localstate::{self, queries::EpochNo}, + localstate::{self}, Point, MAINNET_MAGIC, PRE_PRODUCTION_MAGIC, }, }; @@ -85,13 +85,13 @@ async fn main() { localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); // execute an arbitrary "Local State" query against the node - do_localstate_query(&mut client, get_system_start_query).await; + // do_localstate_query(&mut client, get_system_start_query).await; do_localstate_query(&mut client, get_epoch_query).await; client.statequery().send_done().await.unwrap(); // execute the chainsync flow from an arbitrary point in the chain - do_chainsync(&mut client).await; + // do_chainsync(&mut client).await; } #[cfg(not(target_family = "unix"))] diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 7fec8cd5..d0a64778 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -250,7 +250,7 @@ impl<'b> Decode<'b, ()> for Request { #[derive(Debug, Clone, PartialEq)] pub enum BlockQueryResponse { LedgerTip(Vec), - EpochNo(Vec), + EpochNo(EpochNo), CurrentPParams(Vec), ProposedPParamsUpdates(Vec), StakeDistribution(Vec), @@ -265,7 +265,7 @@ impl BlockQueryResponse { fn to_vec(&self) -> Vec { match self { BlockQueryResponse::LedgerTip(data) => data.clone(), - BlockQueryResponse::EpochNo(data) => data.clone(), + BlockQueryResponse::EpochNo(data) => data.clone().to_vec(), BlockQueryResponse::CurrentPParams(data) => data.clone(), BlockQueryResponse::ProposedPParamsUpdates(data) => data.clone(), BlockQueryResponse::StakeDistribution(data) => data.clone(), @@ -294,7 +294,7 @@ impl Encode<()> for BlockQueryResponse { Self::EpochNo(bytes) => { e.array(2)?; e.u16(1)?; - e.bytes(bytes)?; + // e.bytes(bytes)?; Ok(()) } Self::CurrentPParams(bytes) => { @@ -359,7 +359,7 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { match (size, tag) { (2, 0) => Ok(Self::LedgerTip(d.bytes()?.to_vec())), - (2, 1) => Ok(Self::EpochNo(d.bytes()?.to_vec())), + (2, 1) => Ok(Self::EpochNo(EpochNo(d.bytes()?.to_vec()))), (2, 3) => Ok(Self::CurrentPParams(d.bytes()?.to_vec())), (2, 4) => Ok(Self::ProposedPParamsUpdates(d.bytes()?.to_vec())), (2, 5) => Ok(Self::StakeDistribution(d.bytes()?.to_vec())), @@ -376,31 +376,37 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { } #[derive(Debug, Clone, PartialEq)] -pub struct EpochNo(Vec); +pub struct EpochNo(pub Vec); -impl Encode<()> for EpochNo { - fn encode( - &self, - e: &mut Encoder, - _ctx: &mut (), - ) -> Result<(), encode::Error> { - e.array(2)?; - e.u16(1)?; - e.bytes(&self.0)?; - Ok(()) +impl EpochNo { + pub fn to_vec(&self) -> Vec { + self.0.clone() } } -impl<'b> Decode<'b, ()> for EpochNo { - fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - let start = d.position(); - d.skip()?; - let end = d.position(); - let slice = &d.input()[start..end]; - let vec = slice.to_vec(); - Ok(EpochNo(vec)) - } -} +// impl Encode<()> for EpochNo { +// fn encode( +// &self, +// e: &mut Encoder, +// _ctx: &mut (), +// ) -> Result<(), encode::Error> { +// e.array(2)?; +// e.u16(1)?; +// e.bytes(&self.0)?; +// Ok(()) +// } +// } +// +// impl<'b> Decode<'b, ()> for EpochNo { +// fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { +// let start = d.position(); +// d.skip()?; +// let end = d.position(); +// let slice = &d.input()[start..end]; +// let vec = slice.to_vec(); +// Ok(EpochNo(vec)) +// } +// } #[derive(Debug, Clone, PartialEq)] pub enum Response { @@ -510,7 +516,7 @@ impl Query for QueryV16 { fn map_response(signal: u16, response: Vec) -> Self::Response { match signal { - 0 => Response::BlockQuery(BlockQueryResponse::EpochNo(response)), + 0 => Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo(response))), _ => Response::Generic(response), } } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 87d8f8b3..39652893 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -8,7 +8,7 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries::{ - BlockQueryResponse, QueryResponse, Request, Response, + BlockQueryResponse, EpochNo, QueryResponse, Request, Response, }; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; use pallas_network::miniprotocols::{ @@ -359,9 +359,9 @@ pub async fn local_state_query_server_and_client_happy_path() { // hex::decode("83188118181867").unwrap(), // )); - let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo( + let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo( hex::decode("83188118181867").unwrap(), - )); + ))); server_sq.send_result(get_epoch_no_response).await.unwrap(); From 72d3d7a976e1c367d9ad00da302f18b9602261a1 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 6 Nov 2023 11:31:57 +0100 Subject: [PATCH 10/13] fix some imports - changed the argument of the `do_localstate_query` function from `localstate::queries::request` to `request` - updated the function call in `main` to use `request` instead of `localstate::queries::request` - removed unnecessary impls for `epochno` --- examples/n2c-miniprotocols/src/main.rs | 6 ++--- .../src/miniprotocols/localstate/queries.rs | 24 ------------------- pallas-network/tests/protocols.rs | 6 ++--- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index fc26f970..606d2454 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -2,13 +2,13 @@ use pallas::network::{ facades::NodeClient, miniprotocols::{ chainsync, - localstate::{self}, + localstate::{self, queries::Request}, Point, MAINNET_MAGIC, PRE_PRODUCTION_MAGIC, }, }; use tracing::info; -async fn do_localstate_query(client: &mut NodeClient, query: localstate::queries::Request) { +async fn do_localstate_query(client: &mut NodeClient, query: Request) { do_localstate_query_acquisition(client).await; let result = client.statequery().query(query).await.unwrap(); @@ -85,7 +85,7 @@ async fn main() { localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); // execute an arbitrary "Local State" query against the node - // do_localstate_query(&mut client, get_system_start_query).await; + do_localstate_query(&mut client, get_system_start_query).await; do_localstate_query(&mut client, get_epoch_query).await; client.statequery().send_done().await.unwrap(); diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index d0a64778..66e4eca2 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -384,30 +384,6 @@ impl EpochNo { } } -// impl Encode<()> for EpochNo { -// fn encode( -// &self, -// e: &mut Encoder, -// _ctx: &mut (), -// ) -> Result<(), encode::Error> { -// e.array(2)?; -// e.u16(1)?; -// e.bytes(&self.0)?; -// Ok(()) -// } -// } -// -// impl<'b> Decode<'b, ()> for EpochNo { -// fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { -// let start = d.position(); -// d.skip()?; -// let end = d.position(); -// let slice = &d.input()[start..end]; -// let vec = slice.to_vec(); -// Ok(EpochNo(vec)) -// } -// } - #[derive(Debug, Clone, PartialEq)] pub enum Response { BlockQuery(BlockQueryResponse), diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 39652893..8bb4b23b 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -8,7 +8,7 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries::{ - BlockQueryResponse, EpochNo, QueryResponse, Request, Response, + BlockQueryResponse, EpochNo, Request, Response, }; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; use pallas_network::miniprotocols::{ @@ -444,9 +444,9 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!( resp_get_epoch_no, - Response::BlockQuery(BlockQueryResponse::EpochNo( + Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo( hex::decode("83188118181867").unwrap() - )) + ))), ); // client sends a ReAquire From a0e022548d408d5d4e434fb99dc4e7a4b87fa774 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Tue, 7 Nov 2023 00:33:16 +0100 Subject: [PATCH 11/13] fix tests and types --- examples/n2c-miniprotocols/src/main.rs | 11 +- .../src/miniprotocols/localstate/client.rs | 7 +- .../src/miniprotocols/localstate/queries.rs | 118 ++++++++++-------- pallas-network/tests/protocols.rs | 68 +++++----- 4 files changed, 113 insertions(+), 91 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 606d2454..600d568a 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -53,16 +53,16 @@ async fn do_chainsync(client: &mut NodeClient) { async fn setup_client() -> NodeClient { // we connect to the unix socket of the local node. Make sure you have the right // path for your environment - let socket_path = "/Users/falcucci/Downloads/cardano-node-8.1.2-macos/node.socket"; + let socket_path = "/tmp/node.socket"; // we connect to the unix socket of the local node and perform a handshake query - let version_table = NodeClient::handshake_query(socket_path, PRE_PRODUCTION_MAGIC) + let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC) .await .unwrap(); info!("handshake query result: {:?}", version_table); - NodeClient::connect(socket_path, PRE_PRODUCTION_MAGIC) + NodeClient::connect(socket_path, MAINNET_MAGIC) .await .unwrap() } @@ -83,15 +83,18 @@ async fn main() { let get_system_start_query = localstate::queries::Request::GetSystemStart; let get_epoch_query = localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); + let get_stake_pools_query = + localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetStakePools); // execute an arbitrary "Local State" query against the node do_localstate_query(&mut client, get_system_start_query).await; do_localstate_query(&mut client, get_epoch_query).await; + do_localstate_query(&mut client, get_stake_pools_query).await; client.statequery().send_done().await.unwrap(); // execute the chainsync flow from an arbitrary point in the chain - // do_chainsync(&mut client).await; + do_chainsync(&mut client).await; } #[cfg(not(target_family = "unix"))] diff --git a/pallas-network/src/miniprotocols/localstate/client.rs b/pallas-network/src/miniprotocols/localstate/client.rs index 7468eabd..02331712 100644 --- a/pallas-network/src/miniprotocols/localstate/client.rs +++ b/pallas-network/src/miniprotocols/localstate/client.rs @@ -198,12 +198,11 @@ where } pub async fn query(&mut self, request: Q::Request) -> Result { + self.send_query(request.clone()).await?; let code: u16 = QueryV16::request_signal(request.clone().into()); - self.send_query(request).await?; - let response = self.recv_while_querying().await?; + let response: Q::Response = self.recv_while_querying().await?; let vec = QueryV16::to_vec(response.clone().into()); - let result = QueryV16::map_response(code, vec); - Ok(result) + Ok(QueryV16::map_response(code, vec)) } } diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 66e4eca2..5735c4a8 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -248,7 +248,7 @@ impl<'b> Decode<'b, ()> for Request { } #[derive(Debug, Clone, PartialEq)] -pub enum BlockQueryResponse { +pub enum QueryResponse { LedgerTip(Vec), EpochNo(EpochNo), CurrentPParams(Vec), @@ -259,26 +259,28 @@ pub enum BlockQueryResponse { RewardProvenance(Vec), StakePools(Vec), RewardInfoPools(Vec), + Generic(Vec), // Server response to unknown query } -impl BlockQueryResponse { +impl QueryResponse { fn to_vec(&self) -> Vec { match self { - BlockQueryResponse::LedgerTip(data) => data.clone(), - BlockQueryResponse::EpochNo(data) => data.clone().to_vec(), - BlockQueryResponse::CurrentPParams(data) => data.clone(), - BlockQueryResponse::ProposedPParamsUpdates(data) => data.clone(), - BlockQueryResponse::StakeDistribution(data) => data.clone(), - BlockQueryResponse::GenesisConfig(data) => data.clone(), - BlockQueryResponse::DebugChainDepState(data) => data.clone(), - BlockQueryResponse::RewardProvenance(data) => data.clone(), - BlockQueryResponse::StakePools(data) => data.clone(), - BlockQueryResponse::RewardInfoPools(data) => data.clone(), + QueryResponse::LedgerTip(data) => data.clone(), + QueryResponse::EpochNo(data) => data.clone().to_vec(), + QueryResponse::CurrentPParams(data) => data.clone(), + QueryResponse::ProposedPParamsUpdates(data) => data.clone(), + QueryResponse::StakeDistribution(data) => data.clone(), + QueryResponse::GenesisConfig(data) => data.clone(), + QueryResponse::DebugChainDepState(data) => data.clone(), + QueryResponse::RewardProvenance(data) => data.clone(), + QueryResponse::StakePools(data) => data.clone(), + QueryResponse::RewardInfoPools(data) => data.clone(), + QueryResponse::Generic(data) => data.clone(), } } } -impl Encode<()> for BlockQueryResponse { +impl Encode<()> for QueryResponse { fn encode( &self, e: &mut Encoder, @@ -294,7 +296,7 @@ impl Encode<()> for BlockQueryResponse { Self::EpochNo(bytes) => { e.array(2)?; e.u16(1)?; - // e.bytes(bytes)?; + e.bytes(&bytes.to_vec())?; Ok(()) } Self::CurrentPParams(bytes) => { @@ -345,11 +347,17 @@ impl Encode<()> for BlockQueryResponse { e.bytes(bytes)?; Ok(()) } + Self::Generic(bytes) => { + e.array(2)?; + e.u16(18)?; + e.bytes(bytes)?; + Ok(()) + } } } } -impl<'b> Decode<'b, ()> for BlockQueryResponse { +impl<'b> Decode<'b, ()> for QueryResponse { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { let size = d .array()? @@ -368,6 +376,7 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { (2, 14) => Ok(Self::RewardProvenance(d.bytes()?.to_vec())), (2, 16) => Ok(Self::StakePools(d.bytes()?.to_vec())), (2, 17) => Ok(Self::RewardInfoPools(d.bytes()?.to_vec())), + (2, 18) => Ok(Self::Generic(d.input().to_vec())), _ => Err(decode::Error::message( "invalid (size, tag) for lsq response", )), @@ -379,6 +388,32 @@ impl<'b> Decode<'b, ()> for BlockQueryResponse { pub struct EpochNo(pub Vec); impl EpochNo { + pub fn to_vec(&self) -> Vec { + self.0.clone().to_vec() + } +} + +impl Encode<()> for EpochNo { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.bytes(&self.0)?; + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for EpochNo { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + Ok(Self(d.bytes()?.to_vec())) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Generic(pub Vec); + +impl Generic { pub fn to_vec(&self) -> Vec { self.0.clone() } @@ -386,21 +421,20 @@ impl EpochNo { #[derive(Debug, Clone, PartialEq)] pub enum Response { - BlockQuery(BlockQueryResponse), SystemStart(Vec), ChainBlockNo(Vec), ChainPoint(Vec), - Generic(Vec), + Query(QueryResponse), } impl Response { pub fn to_vec(&self) -> Vec { match self { - Response::BlockQuery(block_query_response) => block_query_response.to_vec(), + // Response::BlockQuery(block_query_response) => block_query_response.to_vec(), Response::SystemStart(data) => data.clone(), Response::ChainBlockNo(data) => data.clone(), Response::ChainPoint(data) => data.clone(), - Response::Generic(data) => data.clone(), + Response::Query(query) => query.clone().to_vec(), } } } @@ -412,31 +446,11 @@ impl Encode<()> for Response { _ctx: &mut (), ) -> Result<(), encode::Error> { match self { - Self::BlockQuery(q) => { - e.array(2)?; - e.u16(0)?; + Self::Query(q) => { e.encode(q)?; Ok(()) } - Self::SystemStart(bytes) => { - e.array(1)?; - e.u16(1)?; - e.bytes(bytes)?; - Ok(()) - } - Self::ChainBlockNo(bytes) => { - e.array(1)?; - e.u16(2)?; - e.bytes(bytes)?; - Ok(()) - } - Self::ChainPoint(bytes) => { - e.array(1)?; - e.u16(3)?; - e.bytes(bytes)?; - Ok(()) - } - Response::Generic(_) => todo!(), + _ => Ok(()), } } } @@ -445,18 +459,11 @@ impl<'b> Decode<'b, ()> for Response { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.set_position(d.position() - 1); let label = d.u16()?; - let start = d.position(); d.skip()?; - let end = d.position(); - let slice = &d.input()[start..end]; - let vec = slice.to_vec(); + let vec = d.input().to_vec(); match label { - 0 => Ok(Self::BlockQuery(d.decode()?)), - 1 => Ok(Self::SystemStart(vec)), - 2 => Ok(Self::ChainBlockNo(vec)), - 3 => Ok(Self::ChainPoint(vec)), - 4 => Ok(Self::Generic(vec)), + 4 => Ok(Self::Query(QueryResponse::Generic(vec))), _ => Err(decode::Error::message( "invalid (size, tag) for lsq response", )), @@ -492,18 +499,21 @@ impl Query for QueryV16 { fn map_response(signal: u16, response: Vec) -> Self::Response { match signal { - 0 => Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo(response))), - _ => Response::Generic(response), + 0 => Self::Response::Query(QueryResponse::EpochNo(EpochNo(response.clone().to_vec()))), + 8 => Self::Response::Query(QueryResponse::StakePools(response)), + _ => Self::Response::Query(QueryResponse::Generic(response.clone().to_vec())), } } fn to_vec(response: Self::Response) -> Vec { match response { - Response::BlockQuery(block_query_response) => block_query_response.to_vec(), + // Response::BlockQuery(block_query_response) => block_query_response.to_vec(), Response::SystemStart(data) => data.clone(), Response::ChainBlockNo(data) => data.clone(), Response::ChainPoint(data) => data.clone(), - Response::Generic(data) => data.clone(), + Response::Query(QueryResponse::EpochNo(epoch_no)) => epoch_no.to_vec(), + Response::Query(QueryResponse::Generic(data)) => data.clone(), + _ => panic!("unimplemented"), } } } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 8bb4b23b..d9c7f78b 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -8,9 +8,9 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries::{ - BlockQueryResponse, EpochNo, Request, Response, + EpochNo, QueryResponse, QueryV16, Request, Response, }; -use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; +use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest, Query}; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, @@ -316,8 +316,8 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Querying); - let get_stake_pools_response = Response::BlockQuery(BlockQueryResponse::StakePools( - hex::decode("82011A008BD423").unwrap(), + let get_stake_pools_response = Response::Query(QueryResponse::Generic( + hex::decode("8C188204188118D9010218990118441858181C03").unwrap(), )); server_sq @@ -355,12 +355,8 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*server_sq.state(), localstate::State::Querying); - // let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo( - // hex::decode("83188118181867").unwrap(), - // )); - - let get_epoch_no_response = Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo( - hex::decode("83188118181867").unwrap(), + let get_epoch_no_response = Response::Query(QueryResponse::EpochNo(EpochNo( + hex::decode("85188204188118181868").unwrap(), ))); server_sq.send_result(get_epoch_no_response).await.unwrap(); @@ -410,21 +406,31 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(*client_sq.state(), localstate::State::Acquired); // client sends a BlockQuery + let get_stake_pools_query = + Request::BlockQuery(localstate::queries::BlockQuery::GetStakePools); client_sq - .send_query(Request::BlockQuery( - localstate::queries::BlockQuery::GetStakePools, - )) + .send_query(get_stake_pools_query.clone()) .await .unwrap(); let resp = client_sq.recv_while_querying().await.unwrap(); + let vec = QueryV16::to_vec(resp.clone()); + + let stake_pools_vec: Vec = vec![ + 130, 4, 130, 18, 84, 140, 24, 130, 4, 24, 129, 24, 217, 1, 2, 24, 153, 1, 24, 68, 24, + 88, 24, 28, 3, + ]; + + assert_eq!(vec, stake_pools_vec); + let code = QueryV16::request_signal(get_stake_pools_query.clone()); + assert_eq!(code, 8); + + let result = QueryV16::map_response(code, vec); assert_eq!( - resp, - Response::BlockQuery(BlockQueryResponse::StakePools( - hex::decode("82011A008BD423").unwrap() - )) + result, + Response::Query(QueryResponse::StakePools(stake_pools_vec)) ); client_sq.send_release().await.unwrap(); @@ -433,24 +439,28 @@ pub async fn local_state_query_server_and_client_happy_path() { client_sq.recv_while_acquiring().await.unwrap(); - client_sq - .send_query(Request::BlockQuery( - localstate::queries::BlockQuery::GetEpochNo, - )) - .await - .unwrap(); + let get_epoch_query = Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); + + client_sq.send_query(get_epoch_query.clone()).await.unwrap(); + + let resp = client_sq.recv_while_querying().await.unwrap(); + + let vec = QueryV16::to_vec(resp.clone()); + let epoch_no_vec: Vec = vec![ + 130, 4, 130, 1, 74, 133, 24, 130, 4, 24, 129, 24, 24, 24, 104, + ]; + assert_eq!(vec, epoch_no_vec); - let resp_get_epoch_no = client_sq.recv_while_querying().await.unwrap(); + let code = QueryV16::request_signal(get_epoch_query.clone()); + assert_eq!(code, 0); + let result = QueryV16::map_response(code, vec); assert_eq!( - resp_get_epoch_no, - Response::BlockQuery(BlockQueryResponse::EpochNo(EpochNo( - hex::decode("83188118181867").unwrap() - ))), + result, + Response::Query(QueryResponse::EpochNo(EpochNo(epoch_no_vec))) ); // client sends a ReAquire - client_sq .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) .await From 1b2a8a1b4502c83ceba670f33c0af483bfa4ecfa Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Tue, 7 Nov 2023 00:37:36 +0100 Subject: [PATCH 12/13] remove usuless dependency --- examples/n2c-miniprotocols/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/n2c-miniprotocols/Cargo.toml b/examples/n2c-miniprotocols/Cargo.toml index ab5d282a..4761a112 100644 --- a/examples/n2c-miniprotocols/Cargo.toml +++ b/examples/n2c-miniprotocols/Cargo.toml @@ -8,7 +8,6 @@ publish = false [dependencies] pallas = { path = "../../pallas" } -pallas-codec = { path = "../../pallas-codec" } net2 = "0.2.37" hex = "0.4.3" log = "0.4.16" From 478a96b2447b4f858caf5f40efbb346b89cd53b9 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Tue, 7 Nov 2023 00:42:35 +0100 Subject: [PATCH 13/13] remove blockquery related match --- pallas-network/src/miniprotocols/localstate/queries.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs index 5735c4a8..c360a2c0 100644 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ b/pallas-network/src/miniprotocols/localstate/queries.rs @@ -507,7 +507,6 @@ impl Query for QueryV16 { fn to_vec(response: Self::Response) -> Vec { match response { - // Response::BlockQuery(block_query_response) => block_query_response.to_vec(), Response::SystemStart(data) => data.clone(), Response::ChainBlockNo(data) => data.clone(), Response::ChainPoint(data) => data.clone(),