diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 26dca494..2cc06471 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -8,7 +8,7 @@ use pallas::{ facades::NodeClient, miniprotocols::{ chainsync, - localstate::queries_v16::{self, Addr, Addrs, StakeAddr, TransactionInput}, + localstate::queries_v16::{self, Addr, Addrs, Pools, StakeAddr, TransactionInput}, Point, PRE_PRODUCTION_MAGIC, }, }, @@ -66,6 +66,19 @@ async fn do_localstate_query(client: &mut NodeClient) { .unwrap(); info!("result: {:?}", result); + let pool_id1 = "fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b"; + let pool_id1 = Bytes::from_str(pool_id1).unwrap(); + let pool_id2 = "1e3105f23f2ac91b3fb4c35fa4fe301421028e356e114944e902005b"; + let pool_id2 = Bytes::from_str(pool_id2).unwrap(); + let mut pools: Pools = BTreeSet::new(); + pools.insert(pool_id1); + pools.insert(pool_id2); + + let result = queries_v16::get_stake_pool_params(client, era, pools.into()) + .await + .unwrap(); + info!("result: {:?}", result); + let result = queries_v16::get_block_epoch_number(client, era) .await .unwrap(); diff --git a/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs b/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs index 2cf30a9a..64fb811d 100644 --- a/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs +++ b/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs @@ -339,6 +339,29 @@ impl minicbor::encode::Encode for RationalNumber { } } +impl<'b, C> minicbor::decode::Decode<'b, C> for PoolIds { + fn decode(d: &mut minicbor::Decoder<'b>, ctx: &mut C) -> Result { + d.tag()?; + + Ok(PoolIds { + hashes: d.decode_with(ctx)?, + }) + } +} + +impl minicbor::encode::Encode for PoolIds { + fn encode( + &self, + e: &mut minicbor::Encoder, + ctx: &mut C, + ) -> Result<(), minicbor::encode::Error> { + e.tag(Tag::new(258))?; + e.encode_with(self.hashes.clone(), ctx)?; + + Ok(()) + } +} + impl<'b, C> minicbor::decode::Decode<'b, C> for TransactionOutput { fn decode(d: &mut minicbor::Decoder<'b>, ctx: &mut C) -> Result { match d.datatype()? { diff --git a/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs b/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs index 72097ae6..76173b7b 100644 --- a/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs +++ b/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs @@ -1,17 +1,21 @@ // TODO: this should move to pallas::ledger crate at some point use pallas_crypto::hash::Hash; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::hash::Hash as StdHash; // required for derive attrs to work use pallas_codec::minicbor::{self}; -use pallas_codec::utils::{AnyUInt, Bytes, KeyValuePairs, TagWrap}; +use pallas_codec::utils::{AnyUInt, Bytes, KeyValuePairs, Nullable, TagWrap}; use pallas_codec::{ minicbor::{Decode, Encode}, utils::AnyCbor, }; +pub mod primitives; + +pub use primitives::{PoolMetadata, Relay}; + use crate::miniprotocols::Point; use super::{Client, ClientError}; @@ -39,7 +43,7 @@ pub enum BlockQuery { GetRewardProvenance, GetUTxOByTxIn(TxIns), GetStakePools, - GetStakePoolParams(AnyCbor), + GetStakePoolParams(PoolIds), GetRewardInfoPools, GetPoolState(AnyCbor), GetStakeSnapshots(Pools), @@ -197,6 +201,19 @@ pub struct StakeDistribution { pub pools: KeyValuePairs, } +/// The use of `BTreeMap`s as per [Pools] definition ensures that the hashes are +/// in order (otherwise, the node will reject some queries). +#[derive(Debug, PartialEq, Clone)] +pub struct PoolIds { + pub hashes: Pools, +} + +impl From for PoolIds { + fn from(hashes: Pools) -> Self { + Self { hashes } + } +} + #[derive(Debug, Encode, Decode, PartialEq, Clone)] pub struct Pool { #[n(0)] @@ -206,6 +223,39 @@ pub struct Pool { pub hashes: Bytes, } +// Essentially the `PoolRegistration` component of `Certificate` at +// `pallas-primitives/src/alonzo/model.rs`, with types modified for the present +// context +#[derive(Debug, Encode, Decode, PartialEq, Clone)] +pub struct PoolParams { + #[n(0)] + pub operator: Bytes, + + #[n(1)] + pub vrf_keyhash: Bytes, + + #[n(2)] + pub pledge: Coin, + + #[n(3)] + pub cost: Coin, + + #[n(4)] + pub margin: UnitInterval, + + #[n(5)] + pub reward_account: Addr, + + #[n(6)] + pub pool_owners: PoolIds, + + #[n(7)] + pub relays: Vec, + + #[n(8)] + pub pool_metadata: Nullable, +} + /// Type used at [GenesisConfig], which is a fraction that is CBOR-encoded /// as an untagged array. #[derive(Debug, Encode, Decode, PartialEq, Clone)] @@ -504,6 +554,20 @@ pub async fn get_cbor( Ok(result) } +/// Get parameters for the given pools. +pub async fn get_stake_pool_params( + client: &mut Client, + era: u16, + pool_ids: PoolIds, +) -> Result, ClientError> { + let query = BlockQuery::GetStakePoolParams(pool_ids); + let query = LedgerQuery::BlockQuery(era, query); + let query = Request::LedgerQuery(query); + let result: (_,) = client.query(query).await?; + + Ok(result.0) +} + /// Get the genesis configuration for the given era. pub async fn get_genesis_config( client: &mut Client, diff --git a/pallas-network/src/miniprotocols/localstate/queries_v16/primitives.rs b/pallas-network/src/miniprotocols/localstate/queries_v16/primitives.rs new file mode 100644 index 00000000..ea87768d --- /dev/null +++ b/pallas-network/src/miniprotocols/localstate/queries_v16/primitives.rs @@ -0,0 +1,91 @@ +// Material brought from `pallas-primitives` +// TODO: Refactor in order to avoid repetition. +pub use pallas_codec::utils::{Bytes, Nullable}; +pub use pallas_crypto::hash::Hash; + +use pallas_codec::minicbor::{self, Decode, Encode}; + +#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] +pub struct PoolMetadata { + #[n(0)] + pub url: String, + + #[n(1)] + pub hash: PoolMetadataHash, +} + +pub type PoolMetadataHash = Hash<32>; + +pub type Port = u32; + +pub type IPv4 = Bytes; + +pub type IPv6 = Bytes; + +pub type DnsName = String; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Relay { + SingleHostAddr(Nullable, Nullable, Nullable), + SingleHostName(Nullable, DnsName), + MultiHostName(DnsName), +} + +// Move to `codec.rs`? +impl<'b, C> minicbor::decode::Decode<'b, C> for Relay { + fn decode(d: &mut minicbor::Decoder<'b>, ctx: &mut C) -> Result { + d.array()?; + let variant = d.u16()?; + + match variant { + 0 => Ok(Relay::SingleHostAddr( + d.decode_with(ctx)?, + d.decode_with(ctx)?, + d.decode_with(ctx)?, + )), + 1 => Ok(Relay::SingleHostName( + d.decode_with(ctx)?, + d.decode_with(ctx)?, + )), + 2 => Ok(Relay::MultiHostName(d.decode_with(ctx)?)), + _ => Err(minicbor::decode::Error::message( + "invalid variant id for Relay", + )), + } + } +} + +impl minicbor::encode::Encode for Relay { + fn encode( + &self, + e: &mut minicbor::Encoder, + ctx: &mut C, + ) -> Result<(), minicbor::encode::Error> { + match self { + Relay::SingleHostAddr(a, b, c) => { + e.array(4)?; + e.encode_with(0, ctx)?; + e.encode_with(a, ctx)?; + e.encode_with(b, ctx)?; + e.encode_with(c, ctx)?; + + Ok(()) + } + Relay::SingleHostName(a, b) => { + e.array(3)?; + e.encode_with(1, ctx)?; + e.encode_with(a, ctx)?; + e.encode_with(b, ctx)?; + + Ok(()) + } + Relay::MultiHostName(a) => { + e.array(2)?; + e.encode_with(2, ctx)?; + e.encode_with(a, ctx)?; + + Ok(()) + } + } + } +} diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 9d35359c..b9e676a4 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -1,9 +1,9 @@ use hex::FromHex; -use pallas_codec::utils::{AnyCbor, AnyUInt, Bytes, KeyValuePairs, TagWrap}; +use pallas_codec::utils::{AnyCbor, AnyUInt, Bytes, KeyValuePairs, Nullable, TagWrap}; use pallas_crypto::hash::Hash; use pallas_network::miniprotocols::localstate::queries_v16::{ - self, Addr, Addrs, ChainBlockNumber, Fraction, GenesisConfig, RationalNumber, Snapshots, - StakeAddr, Stakes, SystemStart, UnitInterval, Value, + self, Addr, Addrs, ChainBlockNumber, Fraction, GenesisConfig, PoolMetadata, PoolParams, + RationalNumber, Relay, Snapshots, StakeAddr, Stakes, SystemStart, UnitInterval, Value, }; use pallas_network::{ facades::{NodeClient, PeerClient, PeerServer}, @@ -23,10 +23,11 @@ use pallas_network::{ multiplexer::{Bearer, Plexer}, }; use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet}, fs, net::{Ipv4Addr, SocketAddrV4}, path::Path, + str::FromStr, time::Duration, }; @@ -1175,6 +1176,181 @@ pub async fn local_state_query_server_and_client_happy_path() { tokio::try_join!(client, server).unwrap(); } +#[cfg(unix)] +#[tokio::test] +pub async fn local_state_query_server_and_client_happy_path2() { + let server = tokio::spawn({ + async move { + // server setup + let socket_path = Path::new("node2.socket"); + + if socket_path.exists() { + fs::remove_file(socket_path).unwrap(); + } + + let listener = UnixListener::bind(socket_path).unwrap(); + + let mut server = pallas_network::facades::NodeServer::accept(&listener, 0) + .await + .unwrap(); + + // wait for acquire request from client + + let maybe_acquire = server.statequery().recv_while_idle().await.unwrap(); + + assert!(maybe_acquire.is_some()); + assert_eq!(*server.statequery().state(), localstate::State::Acquiring); + + server.statequery().send_acquired().await.unwrap(); + + assert_eq!(*server.statequery().state(), localstate::State::Acquired); + + // server receives query from client + + let query: Vec = match server.statequery().recv_while_acquired().await.unwrap() { + ClientQueryRequest::Query(q) => q.unwrap(), + x => panic!( + "While expecting `GetStakePoolParams`) \ + Unexpected message from client: {x:?}" + ), + }; + + // CBOR got from preprod node. Mind the stripped `82038200`. + let cbor_query = Vec::::from_hex( + "820082068211d9010281581cfdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b", + ) + .unwrap(); + + assert_eq!(query, cbor_query); + + assert_eq!(*server.statequery().state(), localstate::State::Querying); + + let pool_id: Bytes = + Vec::::from_hex("fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b") + .unwrap() + .into(); + let operator = pool_id.clone(); + let vrf_keyhash = Vec::::from_hex( + "2A6A3D82278A554E9C1777C427BF0397FAF5CD7734900752D698E57679CC523F", + ) + .unwrap() + .into(); + let reward_account = + Vec::::from_hex("E01AEF81CBAB75DB2DE0FE3885332EBE67C34EB1ADBF43BB2408BA3981") + .unwrap() + .into(); + let pool_metadata: Nullable = Some(PoolMetadata { + url: "https://csouza.me/jp-pp.json".to_string(), + hash: Hash::<32>::from_str( + "C9623111188D0BF90E8305E40AA91A040D8036C7813A4ECA44E06FA0A1A893A6", + ) + .unwrap(), + }) + .into(); + let pool_params = PoolParams { + operator, + vrf_keyhash, + pledge: AnyUInt::U64(5_000_000_000), + cost: AnyUInt::U32(340_000_000), + margin: localstate::queries_v16::RationalNumber { + numerator: 3, + denominator: 40, + }, + reward_account, + pool_owners: BTreeSet::from([Bytes::from( + Vec::::from_hex("1AEF81CBAB75DB2DE0FE3885332EBE67C34EB1ADBF43BB2408BA3981") + .unwrap(), + )]) + .into(), + relays: vec![Relay::SingleHostName( + Some(3001).into(), + "preprod.junglestakepool.com".to_string(), + )], + pool_metadata, + }; + // The map is inside a (singleton) array + let result = AnyCbor::from_encode([BTreeMap::from([(pool_id, pool_params)])]); + + server.statequery().send_result(result).await.unwrap(); + + assert_eq!(*server.statequery().state(), localstate::State::Acquired); + + match server.statequery().recv_while_acquired().await.unwrap() { + ClientQueryRequest::Release => (), + x => panic!("unexpected message from client: {x:?}"), + }; + + let next_request = server.statequery().recv_while_idle().await.unwrap(); + + assert!(next_request.is_none()); + assert_eq!(*server.statequery().state(), localstate::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + let socket_path = "node2.socket"; + + let mut client = NodeClient::connect(socket_path, 0).await.unwrap(); + + // client sends acquire + + client + .statequery() + .send_acquire(Some(Point::Origin)) + .await + .unwrap(); + + client.statequery().recv_while_acquiring().await.unwrap(); + + assert_eq!(*client.statequery().state(), localstate::State::Acquired); + + // client sends a BlockQuery + + let pool_id1 = "fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b"; + let pool_id1: Bytes = Vec::::from_hex(pool_id1).unwrap().into(); + let mut pools = BTreeSet::::new(); + pools.insert(pool_id1); + + let request = AnyCbor::from_encode(localstate::queries_v16::LedgerQuery::BlockQuery( + 6, + localstate::queries_v16::BlockQuery::GetStakePoolParams(pools.into()), + )); + + client.statequery().send_query(request).await.unwrap(); + + let result: Vec = client + .statequery() + .recv_while_querying() + .await + .unwrap() + .unwrap(); + // CBOR got from preprod node. + let pool_params_cbor = Vec::::from_hex( + "81a1581cfdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b8958\ + 1cfdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b58202a6a3d\ + 82278a554e9c1777c427bf0397faf5cd7734900752d698e57679cc523f1b00000001\ + 2a05f2001a1443fd00d81e82031828581de01aef81cbab75db2de0fe3885332ebe67\ + c34eb1adbf43bb2408ba3981d9010281581c1aef81cbab75db2de0fe3885332ebe67\ + c34eb1adbf43bb2408ba3981818301190bb9781b70726570726f642e6a756e676c65\ + 7374616b65706f6f6c2e636f6d82781c68747470733a2f2f63736f757a612e6d652f\ + 6a702d70702e6a736f6e5820c9623111188d0bf90e8305e40aa91a040d8036c7813a\ + 4eca44e06fa0a1a893a6", + ) + .unwrap(); + + assert_eq!(result, pool_params_cbor); + + client.statequery().send_release().await.unwrap(); + + client.statequery().send_done().await.unwrap(); + }); + + tokio::try_join!(client, server).unwrap(); +} + #[tokio::test] #[ignore] pub async fn txsubmission_server_and_client_happy_path_n2n() {