From 06f1ebd9ee5bea25c00c1b146f37800a04b0741e Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 27 Jul 2022 19:02:04 -0500 Subject: [PATCH] send packets as quickly as possible (#271) * send packets as quickly as possible * Queue packets until we have a connection * Carry through the time we received the packet from the radio Co-authored-by: Marc Nijdam Co-authored-by: macpie Co-authored-by: michaeldjeffrey --- src/error.rs | 94 -------- src/gateway.rs | 11 +- src/router/client.rs | 423 +++-------------------------------- src/router/dispatcher.rs | 50 ++--- src/router/mod.rs | 2 +- src/router/store.rs | 126 +---------- src/service/gateway.rs | 119 +--------- src/service/mod.rs | 6 +- src/service/router.rs | 103 +-------- src/state_channel/channel.rs | 359 ----------------------------- src/state_channel/message.rs | 69 +++--- src/state_channel/mod.rs | 4 - src/traits/msg_verify.rs | 7 +- src/updater/releases.rs | 9 +- 14 files changed, 123 insertions(+), 1259 deletions(-) delete mode 100644 src/state_channel/channel.rs diff --git a/src/error.rs b/src/error.rs index e89746c3..860c94f4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,3 @@ -use crate::state_channel; use std::net; use thiserror::Error; @@ -20,8 +19,6 @@ pub enum Error { Decode(#[from] DecodeError), #[error("service error: {0}")] Service(#[from] ServiceError), - #[error("state channel error")] - StateChannel(#[from] Box), #[error("semtech udp error")] Semtech(#[from] semtech_udp::server_runtime::Error), #[error("time error")] @@ -80,47 +77,6 @@ pub enum ServiceError { LocalClientConnect(helium_proto::services::Error), } -#[allow(clippy::large_enum_variant)] -#[derive(Error, Debug)] -pub enum StateChannelError { - #[error("ignored state channel")] - Ignored { sc: state_channel::StateChannel }, - #[error("inactive state channel")] - Inactive, - #[error("state channel not found")] - NotFound { sc_id: Vec }, - #[error("invalid owner for state channel")] - InvalidOwner, - #[error("state channel summary error")] - Summary(#[from] StateChannelSummaryError), - #[error("new state channel error")] - NewChannel { sc: state_channel::StateChannel }, - #[error("state channel causal conflict")] - CausalConflict { - sc: state_channel::StateChannel, - conflicts_with: state_channel::StateChannel, - }, - #[error("state channel overpaid")] - Overpaid { - sc: state_channel::StateChannel, - original_dc_amount: u64, - }, - #[error("state channel underpaid for a packet")] - Underpaid { sc: state_channel::StateChannel }, - #[error("state channel balance too low")] - LowBalance, -} - -#[derive(Error, Debug)] -pub enum StateChannelSummaryError { - #[error("zero state channel packet summary")] - ZeroPacket, - #[error("zero state channel packet over dc count")] - PacketDCMismatch, - #[error("invalid address")] - InvalidAddress, -} - #[derive(Debug, Error)] pub enum RegionError { #[error("no region params found or active")] @@ -178,56 +134,6 @@ impl DecodeError { } } -// State Channel Errors -impl StateChannelError { - pub fn invalid_owner() -> Error { - Error::StateChannel(Box::new(Self::InvalidOwner)) - } - - pub fn invalid_summary(err: StateChannelSummaryError) -> Error { - Error::StateChannel(Box::new(Self::Summary(err))) - } - - pub fn inactive() -> Error { - Error::StateChannel(Box::new(Self::Inactive)) - } - - pub fn not_found(sc_id: &[u8]) -> Error { - let sc_id = sc_id.to_vec(); - Error::StateChannel(Box::new(Self::NotFound { sc_id })) - } - - pub fn ignored(sc: state_channel::StateChannel) -> Error { - Error::StateChannel(Box::new(Self::Ignored { sc })) - } - - pub fn new_channel(sc: state_channel::StateChannel) -> Error { - Error::StateChannel(Box::new(Self::NewChannel { sc })) - } - - pub fn causal_conflict( - sc: state_channel::StateChannel, - conflicts_with: state_channel::StateChannel, - ) -> Error { - Error::StateChannel(Box::new(Self::CausalConflict { sc, conflicts_with })) - } - - pub fn overpaid(sc: state_channel::StateChannel, original_dc_amount: u64) -> Error { - Error::StateChannel(Box::new(Self::Overpaid { - sc, - original_dc_amount, - })) - } - - pub fn underpaid(sc: state_channel::StateChannel) -> Error { - Error::StateChannel(Box::new(Self::Underpaid { sc })) - } - - pub fn low_balance() -> Error { - Error::StateChannel(Box::new(Self::LowBalance)) - } -} - impl RegionError { pub fn no_region_params() -> Error { Error::Region(RegionError::NoRegionParams) diff --git a/src/gateway.rs b/src/gateway.rs index fbf09d2f..52108b6b 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -5,7 +5,10 @@ use semtech_udp::{ tx_ack, MacAddress, }; use slog::{debug, info, o, warn, Logger}; -use std::{convert::TryFrom, time::Duration}; +use std::{ + convert::TryFrom, + time::{Duration, Instant}, +}; use tokio::sync::mpsc; pub const DOWNLINK_TIMEOUT_SECS: u64 = 5; @@ -112,7 +115,7 @@ impl Gateway { Ok(packet) if packet.is_longfi() => { info!(logger, "ignoring longfi packet"); } - Ok(packet) => self.handle_uplink(logger, packet).await, + Ok(packet) => self.handle_uplink(logger, packet, Instant::now()).await, Err(err) => { warn!(logger, "ignoring push_data: {err:?}"); } @@ -127,9 +130,9 @@ impl Gateway { Ok(()) } - async fn handle_uplink(&mut self, logger: &Logger, packet: Packet) { + async fn handle_uplink(&mut self, logger: &Logger, packet: Packet, received: Instant) { info!(logger, "uplink {} from {}", packet, self.downlink_mac); - match self.uplinks.uplink(packet).await { + match self.uplinks.uplink(packet, received).await { Ok(()) => (), Err(err) => warn!(logger, "ignoring uplink error {:?}", err), } diff --git a/src/router/client.rs b/src/router/client.rs index f43e9311..45eda6cc 100644 --- a/src/router/client.rs +++ b/src/router/client.rs @@ -1,37 +1,26 @@ use crate::{ - error::{Error, StateChannelError}, + error::Error, gateway, - router::{QuePacket, RouterStore, StateChannelEntry}, - service::router::{RouterService, StateChannelService}, - service::{ - self, - gateway::{GatewayService, StateChannelFollowService}, - }, - state_channel::{check_active, check_active_diff, StateChannel, StateChannelMessage}, - Base64, CacheSettings, KeyedUri, Keypair, MsgSign, Packet, Region, Result, TxnFee, - TxnFeeConfig, -}; -use futures::{future::OptionFuture, TryFutureExt}; -use helium_proto::{ - blockchain_state_channel_message_v1::Msg, BlockchainStateChannelDiffV1, - BlockchainStateChannelV1, BlockchainTxnStateChannelCloseV1, CloseState, + router::{QuePacket, RouterStore}, + service::router::RouterService, + state_channel::StateChannelMessage, + Base64, CacheSettings, KeyedUri, Keypair, Packet, Region, Result, }; +use futures::TryFutureExt; use slog::{debug, info, o, warn, Logger}; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tokio::{ sync::mpsc, time::{self, Duration, MissedTickBehavior}, }; -use tokio_stream::StreamExt; pub const STORE_GC_INTERVAL: Duration = Duration::from_secs(60); pub const STATE_CHANNEL_CONNECT_INTERVAL: Duration = Duration::from_secs(60); #[derive(Debug)] pub enum Message { - Uplink(Packet), + Uplink { packet: Packet, received: Instant }, RegionChanged(Region), - GatewayChanged(Option), Stop, } @@ -45,17 +34,13 @@ pub fn message_channel(size: usize) -> (MessageSender, MessageReceiver) { } impl MessageSender { - pub async fn gateway_changed(&self, gateway: Option) { - let _ = self.0.send(Message::GatewayChanged(gateway)).await; - } - pub async fn region_changed(&self, region: Region) { let _ = self.0.send(Message::RegionChanged(region)).await; } - pub async fn uplink(&self, packet: Packet) -> Result { + pub async fn uplink(&self, packet: Packet, received: Instant) -> Result { self.0 - .send(Message::Uplink(packet)) + .send(Message::Uplink { packet, received }) .map_err(|_| Error::channel()) .await } @@ -71,16 +56,7 @@ pub struct RouterClient { region: Region, keypair: Arc, downlinks: gateway::MessageSender, - gateway: Option, - state_channel_follower: StateChannelFollowService, store: RouterStore, - // This allows an attempt to connect on an initial uplink without endlessly - // trying to connect to a failing state channel - first_uplink: bool, - // This is used to request state channel diffs on anything but the first - // offer sent to the state channel - first_offer: bool, - state_channel: StateChannelService, } impl RouterClient { @@ -88,16 +64,12 @@ impl RouterClient { oui: u32, region: Region, uri: KeyedUri, - mut gateway: GatewayService, downlinks: gateway::MessageSender, keypair: Arc, settings: CacheSettings, ) -> Result { - let mut router = RouterService::new(uri)?; - let state_channel = router.state_channel()?; - let state_channel_follower = gateway.follow_sc().await?; + let router = RouterService::new(uri)?; let store = RouterStore::new(&settings); - let gateway = Some(gateway); Ok(Self { router, oui, @@ -105,11 +77,6 @@ impl RouterClient { keypair, downlinks, store, - state_channel, - gateway, - state_channel_follower, - first_uplink: true, - first_offer: true, }) } @@ -130,9 +97,6 @@ impl RouterClient { let mut store_gc_timer = time::interval(STORE_GC_INTERVAL); store_gc_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); - let mut state_channel_connect_timer = time::interval(STATE_CHANNEL_CONNECT_INTERVAL); - state_channel_connect_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { tokio::select! { _ = shutdown.clone() => { @@ -140,22 +104,11 @@ impl RouterClient { return Ok(()) }, message = messages.recv() => match message { - Some(Message::Uplink(packet)) => { - self.handle_uplink(&logger, packet) + Some(Message::Uplink{packet, received}) => { + self.handle_uplink(&logger, packet, received) .unwrap_or_else(|err| warn!(logger, "ignoring failed uplink {:?}", err)) .await; }, - Some(Message::GatewayChanged(gateway)) => { - info!(logger, "gateway changed"); - self.gateway = gateway; - match self.state_channel_follower.set_gateway(self.gateway.clone()).await { - Ok(()) => (), - Err(err) => { - warn!(logger, "ignoring gateway service setup error: {err:?}"); - let _ = self.state_channel_follower.set_gateway(None).await; - } - } - }, Some(Message::RegionChanged(region)) => { self.region = region; info!(logger, "updated region"; @@ -167,298 +120,24 @@ impl RouterClient { }, None => warn!(logger, "ignoring closed uplinks channel"), }, - gw_message = self.state_channel_follower.next() => match gw_message { - Some(Ok(message)) => { - self.handle_state_channel_close_message(&logger, &message) - .unwrap_or_else(|err| warn!(logger, "ignoring gateway service handling error {:?}", err)) - .await - }, - Some(Err(err)) => { - warn!(logger, "ignoring gateway service error: {:?}", err); - } - // The follower service has closd or errored out. Give up - // since the dispatcher will notice the disconnect/error and - // reconnect a potentially different gateway - None => { - warn!(logger, "gateway service disconnected"); - let _ = self.state_channel_follower.set_gateway(None).await; - }, - }, - sc_message = self.state_channel.message() => match sc_message { - Ok(Some(message)) => { - if let Some(inner_msg) = message.msg { - self.handle_state_channel_message(&logger, inner_msg.into()) - .unwrap_or_else(|err| warn!(logger, "ignoring state channel handling error {:?}", err)) - .await - } - }, - Ok(None) => { - // The state channel connect timer will reconnect the - // state channel on the next cycle - self.first_offer = true; - warn!(logger, "state channel disconnected"); - }, - Err(err) => { - // The state channel connect timer will reconnect the - // state channel on the next cycle - self.first_offer = true; - warn!(logger, "state channel error {:?}", err); - }, - }, _ = store_gc_timer.tick() => { - let removed = self.store.gc_queued_packets(STORE_GC_INTERVAL); + let removed = self.store.gc_waiting_packets(STORE_GC_INTERVAL); if removed > 0 { info!(logger, "discarded {} queued packets", removed); } } - _ = state_channel_connect_timer.tick() => { - self.maybe_connect_state_channel(&logger).await - } - } - } - } - - // Reconects the state channel if there are queued or waiting packets in the - // store for the target router - async fn maybe_connect_state_channel(&mut self, logger: &Logger) { - if self.store.packet_queue_len() + self.store.waiting_packets_len() > 0 - && !self.state_channel.is_connected() - { - match self.state_channel.connect().await { - Ok(()) => info!(logger, "connected state channel"), - Err(err) => warn!(logger, "failed to connect state channel: {:?}", err), } } } - async fn handle_uplink(&mut self, logger: &Logger, uplink: Packet) -> Result { - self.store.store_waiting_packet(uplink)?; - // First uplink is used to get a quicker state channel connect than - // waiting for the state channel connect timer to trigger - if self.first_uplink { - self.first_uplink = false; - self.maybe_connect_state_channel(logger).await; - } - self.send_packet_offers(logger).await - } - - async fn handle_state_channel_close_message( + async fn handle_uplink( &mut self, logger: &Logger, - message: &R, + uplink: Packet, + received: Instant, ) -> Result { - let message = message.state_channel_response()?; - let (txn, remove): (OptionFuture<_>, bool) = if let Some(entry) = - self.store.get_state_channel_entry_mut(&message.sc_id) - { - let keypair = self.keypair.clone(); - match message.close_state() { - // File a dispute as soon as we hit the expiration time - CloseState::Closable => ( - (entry.in_conflict()) - .then(|| mk_close_txn(keypair, entry.clone())) - .into(), - entry.in_conflict(), - ), - // This is after the router had it's time to close at the - // beginning of the grace period. Close non disputed - // state channels - CloseState::Closing => ( - (!entry.in_conflict()) - .then(|| mk_close_txn(keypair, entry.clone())) - .into(), - !entry.in_conflict(), - ), - // Done with the state channel, get it out of the cache - CloseState::Closed => (None.into(), true), - // A state channel was disputed. If we disputed it it would - // already have been sent and removed as part of Closing - // handling. If it was disputed by someone else we'll file - // our close here too to get in on the dispute - CloseState::Dispute => (Some(mk_close_txn(keypair, entry.clone())).into(), true), - } - } else { - (None.into(), false) - }; - if remove { - self.store.remove_state_channel(&message.sc_id); - } - if let Some(txn) = txn.await { - if let Some(gateway) = &mut self.gateway { - let _ = gateway - .close_sc(txn) - .inspect_err(|err| warn!(logger, "ignoring gateway close_sc error: {:?}", err)) - .await; - } else { - return Err(Error::no_service()); - } - } - Ok(()) - } - - async fn handle_state_channel_message( - &mut self, - logger: &Logger, - message: StateChannelMessage, - ) -> Result { - match message.msg() { - Msg::Response(response) => { - if let Some(packet) = Packet::from_state_channel_response(response.to_owned()) { - self.handle_downlink(logger, packet).await; - } - Ok(()) - } - Msg::Packet(_) => Err(Error::custom("unexpected state channel packet message")), - Msg::Offer(_) => Err(Error::custom("unexpected state channel offer message")), - Msg::Purchase(purchase) => { - let packet = self.store.dequeue_packet(&purchase.packet_hash); - let packet_ref = packet.as_ref(); - let state_channel_result = if let Some(purchase_sc) = &purchase.sc { - self.handle_purchase_state_channel(logger, packet_ref, purchase_sc) - .await - } else if let Some(purchase_sc_diff) = &purchase.sc_diff { - self.handle_purchase_state_channel_diff(logger, packet_ref, purchase_sc_diff) - .await - } else { - Ok(None) - }; - - match state_channel_result { - Err(Error::StateChannel(err)) => match *err { - // Overpaid state channels are ignored - StateChannelError::Overpaid { sc, .. } => { - warn!(logger, "ignoring overpaid state channel"; - "sc_id" => sc.id().to_b64url()); - self.store.ignore_state_channel(sc) - } - // Underpaid state channels are ignored - StateChannelError::Underpaid { sc, .. } => { - warn!(logger, "ignoring underpaid state channel"; - "sc_id" => sc.id().to_b64url()); - self.store.ignore_state_channel(sc) - } - // A previously ignored state channel - StateChannelError::Ignored { sc, .. } => { - warn!(logger, "ignored purchase state channel"; - "sc_id" => sc.id().to_b64url()); - Ok(()) - } - // A new channel was detected. We have no baseline - // for the received state channel in the purchase. - // Accept it, follow it for close actions and submit - // the packet - // - // TODO: Ideally we would check if the difference - // between last purchase channel (this is the harder - // part to infer) and the new one is enough to cover - // for the packet. - StateChannelError::NewChannel { sc } => { - info!(logger, "accepting new state channel"; - "sc_id" => sc.id().to_b64url()); - self.state_channel_follower - .send(sc.id(), sc.owner(), logger) - .await?; - self.store.store_state_channel(sc)?; - let _ = self - .send_packet(logger, packet_ref) - .map_err(|err| warn!(logger, "ignoring packet send error: {err:?}")) - .await; - self.send_packet_offers(logger).await - } - // TODO: Ideally we'd find the state channel that - // pays us back to most in the conflict between - // prev_sc, new_sc and conflicts_with and keep that - // one? - StateChannelError::CausalConflict { sc, conflicts_with } => { - warn!(logger, "ignoring non-causal purchase"; - "sc_id" => sc.id().to_b64url()); - self.store - .store_conflicting_state_channel(sc, conflicts_with) - } - StateChannelError::NotFound { sc_id } => { - warn!(logger, "accepting purchase with no local state channel"; - "sc_id" => sc_id.to_b64url()); - // Apparently we got an sc_diff on a purchase, but - // we have no local knowledge of that state channel. - // We tentatively accept the purchase by sending the - // packet and request the full state channel in the - // next offer. - self.first_offer = true; - let _ = self - .send_packet(logger, packet_ref) - .map_err(|err| warn!(logger, "ignoring packet send error: {err:?}")) - .await; - self.send_packet_offers(logger).await - } - err => { - info!(logger, "ignoring purchase: {err:?}"); - Ok(()) - } - }, - Err(err) => { - info!(logger, "ignoring purchase: {err:?}"); - Ok(()) - } - Ok(Some(new_sc)) => { - self.store.store_state_channel(new_sc)?; - let _ = self - .send_packet(logger, packet_ref) - .map_err(|err| warn!(logger, "ignoring packet send error: {err:?}")) - .await; - self.send_packet_offers(logger).await - } - Ok(None) => Ok(()), - } - } - Msg::Banner(banner) => { - // We ignore banners since they're not guaranteed to relate to - // the first received purchase - if let Some(banner_sc) = &banner.sc { - info!(logger, "received banner (ignored)"; - "sc_id" => banner_sc.id.to_b64url()); - } - self.send_packet_offers(logger).await - } - Msg::Reject(rejection) => { - debug!(logger, "packet rejected"; - "packet_hash" => rejection.packet_hash.to_b64()); - self.store.dequeue_packet(&rejection.packet_hash); - // We do not receive the hash of the packet that was rejected so - // we rely on the store cleanup to remove the implied packet. - // Try to send offers again in case we have space - self.send_packet_offers(logger).await - } - } - } - - async fn handle_purchase_state_channel( - &mut self, - _logger: &Logger, - packet: Option<&QuePacket>, - sc: &BlockchainStateChannelV1, - ) -> Result> { - if let Some(gateway) = &mut self.gateway { - let public_key = self.keypair.public_key(); - check_active(sc, gateway, &self.store) - .await - .and_then(|prev_sc| prev_sc.is_valid_purchase_sc(public_key, packet, sc)) - .map(Some) - } else { - Err(Error::no_service()) - } - } - - async fn handle_purchase_state_channel_diff( - &mut self, - _logger: &Logger, - packet: Option<&QuePacket>, - sc_diff: &BlockchainStateChannelDiffV1, - ) -> Result> { - let public_key = self.keypair.public_key(); - check_active_diff(sc_diff, &self.store) - .await - .and_then(|prev_sc| prev_sc.is_valid_purchase_sc_diff(public_key, packet, sc_diff)) - .map(Some) + self.store.store_waiting_packet(uplink, received)?; + self.send_waiting_packets(logger).await } async fn handle_downlink(&mut self, logger: &Logger, packet: Packet) { @@ -469,45 +148,24 @@ impl RouterClient { .await; } - async fn send_packet_offers(&mut self, logger: &Logger) -> Result { - if !self.state_channel.is_connected() { - return Ok(()); - } - if self.state_channel.capacity() == 0 || self.store.packet_queue_full() { - return Ok(()); - } + async fn send_waiting_packets(&mut self, logger: &Logger) -> Result { while let Some(packet) = self.store.pop_waiting_packet() { - self.send_offer(logger, &packet, self.first_offer).await?; - self.first_offer = false; - self.store.queue_packet(packet)?; - if self.state_channel.capacity() == 0 || self.store.packet_queue_full() { - return Ok(()); + if let Some(message) = self.send_packet(logger, &packet).await? { + match message.to_downlink() { + Ok(Some(packet)) => self.handle_downlink(logger, packet).await, + Ok(None) => (), + Err(err) => warn!(logger, "ignoring router response: {err:?}"), + } } } Ok(()) } - async fn send_offer( + async fn send_packet( &mut self, - _logger: &Logger, + logger: &Logger, packet: &QuePacket, - first_offer: bool, - ) -> Result { - StateChannelMessage::offer( - packet.packet().clone(), - self.keypair.clone(), - &self.region, - !first_offer, - ) - .and_then(|message| self.state_channel.send(message.to_message())) - .await - } - - async fn send_packet(&mut self, logger: &Logger, packet: Option<&QuePacket>) -> Result { - if packet.is_none() { - return Ok(()); - } - let packet = packet.unwrap(); + ) -> Result> { debug!(logger, "sending packet"; "packet_hash" => packet.hash().to_b64()); StateChannelMessage::packet( @@ -516,27 +174,8 @@ impl RouterClient { &self.region, packet.hold_time().as_millis() as u64, ) - .and_then(|message| self.state_channel.send(message.to_message())) + .and_then(|message| self.router.route(message.to_message())) + .map_ok(StateChannelMessage::from_message) .await } } - -async fn mk_close_txn( - keypair: Arc, - entry: StateChannelEntry, -) -> BlockchainTxnStateChannelCloseV1 { - let mut txn = BlockchainTxnStateChannelCloseV1 { - state_channel: Some(entry.sc.sc), - closer: keypair.public_key().into(), - conflicts_with: None, - fee: 0, - signature: vec![], - }; - if let Some(conflicts_with) = entry.conflicts_with { - txn.conflicts_with = Some(conflicts_with.sc); - } - let fee_config = TxnFeeConfig::default(); - txn.fee = txn.txn_fee(&fee_config).expect("close txn fee"); - txn.signature = txn.sign(keypair).await.expect("signature"); - txn -} diff --git a/src/router/dispatcher.rs b/src/router/dispatcher.rs index 1d70afa2..3c661bea 100644 --- a/src/router/dispatcher.rs +++ b/src/router/dispatcher.rs @@ -12,13 +12,21 @@ use futures::{ use helium_proto::BlockchainVarV1; use slog::{debug, info, o, warn, Logger}; use slog_scope; -use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{task::JoinHandle, time}; use tokio_stream::{self, StreamExt, StreamMap}; #[derive(Debug)] pub enum Message { - Uplink(Packet), + Uplink { + packet: Packet, + received_time: Instant, + }, Config { keys: Vec, response: sync::ResponseSender>>, @@ -59,9 +67,12 @@ impl MessageSender { rx.recv().await? } - pub async fn uplink(&self, packet: Packet) -> Result { + pub async fn uplink(&self, packet: Packet, received_time: Instant) -> Result { self.0 - .send(Message::Uplink(packet)) + .send(Message::Uplink { + packet, + received_time, + }) .map_err(|_| Error::channel()) .await } @@ -256,8 +267,6 @@ impl Dispatcher { "pubkey" => gateway.uri.pubkey.to_string(), "uri" => gateway.uri.uri.to_string()); - // Notify of gateway change - self.notify_gateway_change(Some(gateway.clone())).await; // Initialize liveness check for gateway let mut gateway_check = time::interval(GATEWAY_CHECK_INTERVAL); loop { @@ -268,7 +277,7 @@ impl Dispatcher { }, gateway_message = streams.next() => match gateway_message { Some((gateway_stream, Ok(gateway_message))) => match gateway_stream { - GatewayStream::Routing => self.handle_routing_update(&mut gateway, &gateway_message, &shutdown, logger).await, + GatewayStream::Routing => self.handle_routing_update(&gateway_message, &shutdown, logger).await, GatewayStream::RegionParams => self.handle_region_params_update(&gateway_message, logger).await, }, Some((gateway_stream, Err(err))) => { @@ -317,12 +326,6 @@ impl Dispatcher { Ok(()) } - async fn notify_gateway_change(&self, gateway: Option) { - for router_entry in self.routers.values() { - router_entry.dispatch.gateway_changed(gateway.clone()).await; - } - } - async fn prepare_gateway_change( &mut self, backoff: &Backoff, @@ -333,8 +336,6 @@ impl Dispatcher { if shutdown.is_triggered() { return; } - // Tell routers to clear their gateway entries - self.notify_gateway_change(None).await; // Reset routing and region heigth for the next gateway self.routing_height = 0; @@ -366,7 +367,10 @@ impl Dispatcher { logger: &Logger, ) { match message { - Message::Uplink(packet) => self.handle_uplink(&packet, logger).await, + Message::Uplink { + packet, + received_time, + } => self.handle_uplink(&packet, received_time, logger).await, Message::Config { keys, response } => { let reply = if let Some(gateway) = gateway { gateway.config(keys).await @@ -396,11 +400,11 @@ impl Dispatcher { } } - async fn handle_uplink(&self, packet: &Packet, logger: &Logger) { + async fn handle_uplink(&self, packet: &Packet, received: Instant, logger: &Logger) { let mut handled = false; for router_entry in self.routers.values() { if router_entry.routing.matches_routing_info(packet.routing()) { - match router_entry.dispatch.uplink(packet.clone()).await { + match router_entry.dispatch.uplink(packet.clone(), received).await { Ok(()) => (), Err(err) => warn!(logger, "ignoring router dispatch error: {err:?}"), } @@ -412,7 +416,7 @@ impl Dispatcher { for (router_key, router_entry) in &self.routers { if default_routers.contains(&router_key.uri) { debug!(logger, "sending to default router"); - let _ = router_entry.dispatch.uplink(packet.clone()).await; + let _ = router_entry.dispatch.uplink(packet.clone(), received).await; } } } @@ -459,7 +463,6 @@ impl Dispatcher { async fn handle_routing_update( &mut self, - gateway: &mut GatewayService, response: &R, shutdown: &triggered::Listener, logger: &Logger, @@ -484,7 +487,7 @@ impl Dispatcher { while let Some(proto) = proto_stream.next().await { match Routing::from_proto(logger, proto) { Ok(routing) => { - self.handle_oui_routing_update(gateway, &routing, shutdown, logger) + self.handle_oui_routing_update(&routing, shutdown, logger) .await } Err(err) => warn!(logger, "failed to parse routing: {err:?}"), @@ -497,7 +500,6 @@ impl Dispatcher { #[allow(clippy::map_entry)] async fn handle_oui_routing_update( &mut self, - gateway: &mut GatewayService, routing: &Routing, shutdown: &triggered::Listener, logger: &Logger, @@ -512,7 +514,7 @@ impl Dispatcher { // immutable before borrowing as mutable to insert if !self.routers.contains_key(&key) { match self - .start_router(gateway, shutdown.clone(), routing.clone(), uri.clone()) + .start_router(shutdown.clone(), routing.clone(), uri.clone()) .await { Ok(router_entry) => { @@ -546,7 +548,6 @@ impl Dispatcher { async fn start_router( &self, - gateway: &mut GatewayService, shutdown: triggered::Listener, routing: Routing, uri: KeyedUri, @@ -559,7 +560,6 @@ impl Dispatcher { routing.oui, self.region, uri, - gateway.clone(), self.downlinks.clone(), self.keypair.clone(), self.cache_settings.clone(), diff --git a/src/router/mod.rs b/src/router/mod.rs index 774c56ac..cbdab218 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -8,4 +8,4 @@ pub use client::RouterClient; pub use dispatcher::Dispatcher; pub use filter::{DevAddrFilter, EuiFilter}; pub use routing::Routing; -pub use store::{QuePacket, RouterStore, StateChannelEntry}; +pub use store::{QuePacket, RouterStore}; diff --git a/src/router/store.rs b/src/router/store.rs index 32850cbd..66ddde42 100644 --- a/src/router/store.rs +++ b/src/router/store.rs @@ -1,30 +1,15 @@ -use crate::{state_channel::StateChannel, CacheSettings, Packet, Result}; +use crate::{CacheSettings, Packet, Result}; use std::{ - collections::{hash_map::Values, HashMap, VecDeque}, + collections::VecDeque, ops::Deref, time::{Duration, Instant}, }; pub struct RouterStore { - state_channels: HashMap, StateChannelEntry>, waiting_packets: VecDeque, - queued_packets: HashMap, QuePacket>, max_packets: u16, } -#[derive(Clone)] -pub struct StateChannelEntry { - pub(crate) ignore: bool, - pub(crate) sc: StateChannel, - pub(crate) conflicts_with: Option, -} - -impl StateChannelEntry { - pub fn in_conflict(&self) -> bool { - self.conflicts_with.is_some() - } -} - #[derive(Debug)] pub struct QuePacket { received: Instant, @@ -49,29 +34,19 @@ impl Deref for QuePacket { } } -impl From for QuePacket { - fn from(packet: Packet) -> Self { - let received = Instant::now(); - Self { received, packet } - } -} - impl RouterStore { pub fn new(settings: &CacheSettings) -> Self { let max_packets = settings.max_packets; let waiting_packets = VecDeque::new(); - let queued_packets = HashMap::new(); - let state_channels = HashMap::new(); Self { waiting_packets, - queued_packets, max_packets, - state_channels, } } - pub fn store_waiting_packet(&mut self, packet: Packet) -> Result { - self.waiting_packets.push_back(QuePacket::from(packet)); + pub fn store_waiting_packet(&mut self, packet: Packet, received: Instant) -> Result { + self.waiting_packets + .push_back(QuePacket { packet, received }); if self.waiting_packets_len() > self.max_packets as usize { self.waiting_packets.pop_front(); } @@ -86,91 +61,12 @@ impl RouterStore { self.waiting_packets.len() } - pub fn packet_queue_full(&self) -> bool { - self.packet_queue_len() >= self.max_packets as usize - } - - pub fn packet_queue_len(&self) -> usize { - self.queued_packets.len() - } - - pub fn queue_packet(&mut self, packet: QuePacket) -> Result { - self.queued_packets.insert(packet.hash(), packet); - Ok(()) - } - - /// Removes and returns the queued packets with the given packet_hash if it - /// exists. - pub fn dequeue_packet(&mut self, packet_hash: &[u8]) -> Option { - self.queued_packets.remove(packet_hash) - } - - /// Removes queued packets older than the given duration. Returns the number + /// Removes waiting packets older than the given duration. Returns the number /// of packets that were removed. - pub fn gc_queued_packets(&mut self, duration: Duration) -> usize { - let before_len = self.queued_packets.len(); - self.queued_packets - .retain(|_, packet| packet.received.elapsed() <= duration); - before_len - self.queued_packets.len() - } - - pub fn get_state_channel_entry(&self, sk: &[u8]) -> Option<&StateChannelEntry> { - self.state_channels.get(&sk.to_vec()) - } - - pub fn get_state_channel_entry_mut(&mut self, sk: &[u8]) -> Option<&mut StateChannelEntry> { - self.state_channels.get_mut(&sk.to_vec()) - } - - pub fn state_channel_entries(&self) -> Values<'_, Vec, StateChannelEntry> { - self.state_channels.values() - } - - pub fn store_conflicting_state_channel( - &mut self, - sc: StateChannel, - conflicts_with: StateChannel, - ) -> Result { - self.state_channels.insert( - sc.id().to_vec(), - StateChannelEntry { - ignore: true, - sc, - conflicts_with: Some(conflicts_with), - }, - ); - Ok(()) - } - - pub fn ignore_state_channel(&mut self, sc: StateChannel) -> Result { - self.state_channels.insert( - sc.id().to_vec(), - StateChannelEntry { - ignore: true, - sc, - conflicts_with: None, - }, - ); - Ok(()) - } - - pub fn store_state_channel(&mut self, sc: StateChannel) -> Result { - self.state_channels.insert( - sc.id().to_vec(), - StateChannelEntry { - ignore: false, - sc, - conflicts_with: None, - }, - ); - Ok(()) - } - - pub fn remove_state_channel(&mut self, sk: &[u8]) -> Option { - self.state_channels.remove(&sk.to_vec()) - } - - pub fn state_channel_count(&self) -> usize { - self.state_channels.len() + pub fn gc_waiting_packets(&mut self, duration: Duration) -> usize { + let before_len = self.waiting_packets.len(); + self.waiting_packets + .retain(|packet| packet.received.elapsed() <= duration); + before_len - self.waiting_packets.len() } } diff --git a/src/service/gateway.rs b/src/service/gateway.rs index dc9b16f8..669c0456 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -5,22 +5,18 @@ use crate::{ use helium_proto::{ gateway_resp_v1, services::{self, Channel, Endpoint}, - BlockchainTxnStateChannelCloseV1, BlockchainVarV1, GatewayConfigReqV1, GatewayConfigRespV1, - GatewayRegionParamsUpdateReqV1, GatewayRespV1, GatewayRoutingReqV1, GatewayScCloseReqV1, - GatewayScFollowReqV1, GatewayScFollowStreamedRespV1, GatewayScIsActiveReqV1, - GatewayScIsActiveRespV1, GatewayValidatorsReqV1, GatewayValidatorsRespV1, GatewayVersionReqV1, - GatewayVersionRespV1, Routing, + BlockchainVarV1, GatewayConfigReqV1, GatewayConfigRespV1, GatewayRegionParamsUpdateReqV1, + GatewayRespV1, GatewayRoutingReqV1, GatewayScIsActiveReqV1, GatewayScIsActiveRespV1, + GatewayValidatorsReqV1, GatewayValidatorsRespV1, GatewayVersionReqV1, GatewayVersionRespV1, + Routing, }; use rand::{rngs::OsRng, seq::SliceRandom}; -use slog::{info, Logger}; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, }; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::Stream; type GatewayClient = services::gateway::Client; pub use crate::service::version::GatewayVersion; @@ -50,7 +46,6 @@ pub(crate) trait Response { fn height(&self) -> u64; fn routings(&self) -> Result<&[Routing]>; fn region_params(&self) -> Result; - fn state_channel_response(&self) -> Result<&GatewayScFollowStreamedRespV1>; } impl Response for GatewayRespV1 { @@ -77,92 +72,6 @@ impl Response for GatewayRespV1 { )), } } - - fn state_channel_response(&self) -> Result<&GatewayScFollowStreamedRespV1> { - match &self.msg { - Some(gateway_resp_v1::Msg::FollowStreamedResp(res)) => Ok(res), - msg => Err(Error::custom( - format!("Unexpected gateway message {msg:?}",), - )), - } - } -} - -#[derive(Debug)] -pub struct StateChannelFollowService { - gateway: Option, - tx: Option>, - rx: Option, -} - -impl StateChannelFollowService { - pub async fn new(gateway: GatewayService) -> Result { - let mut result = Self { - tx: None, - rx: None, - gateway: None, - }; - result.set_gateway(Some(gateway)).await?; - Ok(result) - } - - pub async fn send(&mut self, id: &[u8], owner: &[u8], logger: &Logger) -> Result { - self.connect(logger).await?; - match self.tx.as_mut() { - Some(tx) => { - let msg = GatewayScFollowReqV1 { - sc_id: id.into(), - sc_owner: owner.into(), - }; - Ok(tx.send(msg).await?) - } - None => Err(Error::no_service()), - } - } - - pub async fn connect(&mut self, logger: &Logger) -> Result { - if self.tx.is_some() { - return Ok(()); - } - match self.gateway.as_mut() { - Some(gateway) => { - info!(logger, "connecting to gateway state channel updates"); - let (tx, client_rx) = mpsc::channel(3); - let streaming = gateway - .client - .follow_sc(ReceiverStream::new(client_rx)) - .await? - .into_inner(); - let rx = Streaming { - streaming, - verifier: gateway.uri.pubkey.clone(), - }; - self.tx = Some(tx); - self.rx = Some(rx); - Ok(()) - } - None => Err(Error::no_service()), - } - } - - pub async fn set_gateway(&mut self, gateway: Option) -> Result { - self.gateway = gateway; - self.tx = None; - self.rx = None; - Ok(()) - } -} - -impl Stream for StateChannelFollowService { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(rx) = self.rx.as_mut() { - Pin::new(rx).poll_next(cx) - } else { - Poll::Pending - } - } } #[derive(Debug, Clone)] @@ -174,8 +83,8 @@ pub struct GatewayService { impl GatewayService { pub fn new(keyed_uri: &KeyedUri) -> Result { let channel = Endpoint::from(keyed_uri.uri.clone()) - .connect_timeout(Duration::from_secs(CONNECT_TIMEOUT)) - .timeout(Duration::from_secs(RPC_TIMEOUT)) + .connect_timeout(CONNECT_TIMEOUT) + .timeout(RPC_TIMEOUT) .connect_lazy(); Ok(Self { uri: keyed_uri.clone(), @@ -262,20 +171,6 @@ impl GatewayService { } } - pub async fn follow_sc(&mut self) -> Result { - StateChannelFollowService::new(self.clone()).await - } - - pub async fn close_sc(&mut self, close_txn: BlockchainTxnStateChannelCloseV1) -> Result { - let _ = self - .client - .close_sc(GatewayScCloseReqV1 { - close_txn: Some(close_txn), - }) - .await?; - Ok(()) - } - async fn get_config(&mut self, keys: Vec) -> Result { let resp = self .client diff --git a/src/service/mod.rs b/src/service/mod.rs index 8b5c59c9..5cae2a0b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,5 +1,7 @@ -pub const CONNECT_TIMEOUT: u64 = 10; -pub const RPC_TIMEOUT: u64 = 5; +use std::time::Duration; + +pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); pub mod gateway; pub mod router; diff --git a/src/service/router.rs b/src/service/router.rs index a9baaeb7..50119755 100644 --- a/src/service/router.rs +++ b/src/service/router.rs @@ -1,113 +1,29 @@ -use crate::{service::CONNECT_TIMEOUT, KeyedUri, Result}; +use crate::{ + service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, + KeyedUri, Result, +}; use helium_proto::{ services::{self, Channel, Endpoint}, BlockchainStateChannelMessageV1, }; -use tokio::{sync::mpsc, time::Duration}; -use tokio_stream::wrappers::ReceiverStream; type RouterClient = services::router::RouterClient; -type StateChannelClient = services::router::StateChannelClient; #[derive(Debug)] pub struct RouterService { pub uri: KeyedUri, router_client: RouterClient, - state_channel_client: StateChannelClient, -} - -#[derive(Debug)] -pub struct StateChannelService { - client: StateChannelClient, - conduit: Option<( - mpsc::Sender, - tonic::Streaming, - )>, -} - -pub const CONDUIT_CAPACITY: usize = 50; - -impl StateChannelService { - pub async fn send(&mut self, msg: BlockchainStateChannelMessageV1) -> Result { - if self.conduit.is_none() { - self.conduit = Some(self.mk_conduit().await?) - } - let (tx, _) = self.conduit.as_ref().unwrap(); - Ok(tx.send(msg).await?) - } - - pub fn capacity(&self) -> usize { - self.conduit - .as_ref() - .map(|(tx, _)| tx.capacity()) - .unwrap_or(CONDUIT_CAPACITY) - } - - pub async fn message(&mut self) -> Result> { - if self.conduit.is_none() { - futures::future::pending::<()>().await; - return Ok(None); - } - let (_, rx) = self.conduit.as_mut().unwrap(); - match rx.message().await { - Ok(Some(msg)) => Ok(Some(msg)), - Ok(None) => { - self.disconnect(); - Ok(None) - } - Err(err) => { - self.disconnect(); - Err(err.into()) - } - } - } - - pub async fn connect(&mut self) -> Result { - if self.conduit.is_none() { - self.conduit = Some(self.mk_conduit().await?); - } - Ok(()) - } - - pub fn is_connected(&self) -> bool { - self.conduit.is_some() - } - - pub fn disconnect(&mut self) { - self.conduit = None; - } - - pub async fn reconnect(&mut self) -> Result { - self.disconnect(); - self.connect().await - } - - pub async fn mk_conduit( - &mut self, - ) -> Result<( - mpsc::Sender, - tonic::Streaming, - )> { - let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); - let rx = self - .client - .msg(ReceiverStream::new(client_rx)) - .await? - .into_inner(); - Ok((tx, rx)) - } } impl RouterService { pub fn new(keyed_uri: KeyedUri) -> Result { let router_channel = Endpoint::from(keyed_uri.uri.clone()) - .timeout(Duration::from_secs(CONNECT_TIMEOUT)) + .timeout(RPC_TIMEOUT) + .connect_timeout(CONNECT_TIMEOUT) .connect_lazy(); - let state_channel = router_channel.clone(); Ok(Self { uri: keyed_uri, router_client: RouterClient::new(router_channel), - state_channel_client: StateChannelClient::new(state_channel), }) } @@ -117,11 +33,4 @@ impl RouterService { ) -> Result { Ok(self.router_client.route(msg).await?.into_inner()) } - - pub fn state_channel(&mut self) -> Result { - Ok(StateChannelService { - client: self.state_channel_client.clone(), - conduit: None, - }) - } } diff --git a/src/state_channel/channel.rs b/src/state_channel/channel.rs deleted file mode 100644 index 7dbc5338..00000000 --- a/src/state_channel/channel.rs +++ /dev/null @@ -1,359 +0,0 @@ -use crate::{ - error::{DecodeError, StateChannelError, StateChannelSummaryError}, - router::{store::StateChannelEntry, QuePacket, RouterStore}, - service::gateway::GatewayService, - Error, MsgVerify, Result, -}; -use bytes::{Buf, BufMut, BytesMut}; -use helium_crypto::PublicKey; -use helium_proto::{ - blockchain_state_channel_diff_entry_v1, BlockchainStateChannelDiffAppendSummaryV1, - BlockchainStateChannelDiffUpdateSummaryV1, BlockchainStateChannelDiffV1, - BlockchainStateChannelSummaryV1, BlockchainStateChannelV1, Message, -}; -use sha2::{Digest, Sha256}; -use std::{convert::TryFrom, mem}; - -#[derive(PartialEq, Debug, Eq)] -pub enum StateChannelCausality { - Effect, - Cause, - Equal, - Conflict, -} - -#[derive(Debug, Clone)] -pub struct StateChannel { - pub(crate) sc: BlockchainStateChannelV1, - total_dcs: u64, - expiry_at_block: u64, - original_dc_amount: u64, -} - -impl From for BlockchainStateChannelV1 { - fn from(v: StateChannel) -> Self { - v.sc - } -} - -impl TryFrom<&[u8]> for StateChannel { - type Error = Error; - - fn try_from(v: &[u8]) -> Result { - let mut buf = v; - if buf.len() < (mem::size_of::() * 3) { - return Err(DecodeError::prost_decode("not enough data")); - } - let expiry_at_block = buf.get_u64(); - let original_dc_amount = buf.get_u64(); - let total_dcs = buf.get_u64(); - let sc = BlockchainStateChannelV1::decode(buf)?; - Ok(Self { - sc, - total_dcs, - expiry_at_block, - original_dc_amount, - }) - } -} - -impl StateChannel { - pub fn to_vec(&self) -> Result> { - let mut buf = BytesMut::new(); - buf.put_u64(self.expiry_at_block); - buf.put_u64(self.original_dc_amount); - buf.put_u64(self.total_dcs); - self.sc.encode(&mut buf)?; - Ok(buf.to_vec()) - } - - /// Validates this state channel for just the gateway with the given public key - /// - /// This assumes the caller will validate that the state channel is active. - pub fn is_valid_purchase_sc( - self, - public_key: &PublicKey, - packet: Option<&QuePacket>, - newer: &BlockchainStateChannelV1, - ) -> Result { - newer - .is_valid_owner() - .and_then(|_| newer.is_valid_for(public_key))?; - let new_sc = Self { - sc: newer.clone(), - total_dcs: newer.total_dcs(), - expiry_at_block: self.expiry_at_block, - original_dc_amount: self.original_dc_amount, - }; - let causality = (&self.sc).causally_compare_for(public_key, &newer); - // Chheck that the purchase is an effect of the current one to avoid - // double payment - if causality != StateChannelCausality::Cause { - return Err(StateChannelError::causal_conflict(self, new_sc)); - } - self.is_valid_packet_purchase(new_sc, packet) - } - - pub fn is_valid_purchase_sc_diff( - self, - _public_key: &PublicKey, - packet: Option<&QuePacket>, - diff: &BlockchainStateChannelDiffV1, - ) -> Result { - let mut new_sc = self.clone(); - new_sc.sc.nonce += diff.add_nonce; - for diff in &diff.diffs { - match &diff.entry { - Some(blockchain_state_channel_diff_entry_v1::Entry::Append( - BlockchainStateChannelDiffAppendSummaryV1 { - client_pubkeybin, - num_packets, - num_dcs, - }, - )) => { - let new_summary = BlockchainStateChannelSummaryV1 { - client_pubkeybin: client_pubkeybin.clone(), - num_packets: *num_packets, - num_dcs: *num_dcs, - }; - new_sc.sc.summaries.push(new_summary); - new_sc.total_dcs += num_dcs; - } - Some(blockchain_state_channel_diff_entry_v1::Entry::Add( - BlockchainStateChannelDiffUpdateSummaryV1 { - client_index, - add_packets, - add_dcs, - }, - )) => { - if let Some(summary) = new_sc.sc.summaries.get_mut(*client_index as usize) { - summary.num_packets += add_packets; - summary.num_dcs += add_dcs; - new_sc.total_dcs += add_dcs; - } - } - _ => (), - } - } - self.is_valid_packet_purchase(new_sc, packet) - } - - fn is_valid_packet_purchase( - &self, - new_sc: StateChannel, - packet: Option<&QuePacket>, - ) -> Result { - let original_dc_amount = new_sc.original_dc_amount; - if new_sc.total_dcs > original_dc_amount { - return Err(StateChannelError::overpaid(new_sc, original_dc_amount)); - } - if let Some(packet) = packet { - let dc_total = (&new_sc.sc).total_dcs(); - let dc_prev_total = (&self.sc).total_dcs(); - let dc_packet = packet.dc_payload(); - // Check that the dc change between the last state chanel and the - // new one is at least incremented by the dcs for the packet. - if (dc_total - dc_prev_total) < dc_packet { - return Err(StateChannelError::underpaid(new_sc)); - } - } - Ok(new_sc) - } - - pub fn id(&self) -> &[u8] { - &self.sc.id - } - - pub fn owner(&self) -> &[u8] { - &self.sc.owner - } - - pub fn amount(&self) -> u64 { - self.sc.credits - } - - pub fn hash(&self) -> Vec { - let mut buf = vec![]; - self.sc.encode(&mut buf).expect("encoded state channel"); - Sha256::digest(&buf).to_vec() - } -} - -pub trait StateChannelValidation { - fn is_valid_owner(&self) -> Result; - fn is_valid_for(&self, public_key: &PublicKey) -> Result; - fn total_dcs(&self) -> u64; - fn num_dcs_for(&self, public_key: &PublicKey) -> u64; - fn get_summary(&self, public_key: &PublicKey) -> Option<&BlockchainStateChannelSummaryV1>; - fn causally_compare_for(&self, public_key: &PublicKey, newer: &Self) -> StateChannelCausality; -} - -pub async fn check_active( - channel: &BlockchainStateChannelV1, - gateway: &mut GatewayService, - store: &RouterStore, -) -> Result { - match store.get_state_channel_entry(&channel.id) { - None => { - let resp = gateway.is_active_sc(&channel.id, &channel.owner).await?; - if !resp.active { - return Err(StateChannelError::inactive()); - } - let new_sc = StateChannel { - sc: channel.clone(), - total_dcs: channel.total_dcs(), - expiry_at_block: resp.sc_expiry_at_block, - original_dc_amount: resp.sc_original_dc_amount, - }; - Err(StateChannelError::new_channel(new_sc)) - } - Some(entry) => match entry { - // If the entry is ignored return an error - StateChannelEntry { - ignore: true, sc, .. - } => Err(StateChannelError::ignored(sc.clone())), - // Next is the conflict check - StateChannelEntry { - sc, - conflicts_with: Some(conflicts_with), - .. - } => Err(StateChannelError::causal_conflict( - sc.clone(), - conflicts_with.clone(), - )), - // After which we're ok for a active check - StateChannelEntry { sc, .. } => Ok(sc.clone()), - }, - } -} - -pub async fn check_active_diff( - diff: &BlockchainStateChannelDiffV1, - store: &RouterStore, -) -> Result { - match store.get_state_channel_entry(&diff.id) { - None => - // No entry is not good for a diff since there's no state channel to - // clone - { - Err(StateChannelError::not_found(&diff.id)) - } - Some(entry) => match entry { - // If the entry is ignored return an error - StateChannelEntry { - ignore: true, sc, .. - } => Err(StateChannelError::ignored(sc.clone())), - // Next is the conflict check - StateChannelEntry { - sc, - conflicts_with: Some(conflicts_with), - .. - } => Err(StateChannelError::causal_conflict( - sc.clone(), - conflicts_with.clone(), - )), - // After which we're ok for a active check - StateChannelEntry { sc, .. } => Ok(sc.clone()), - }, - } -} - -impl StateChannelValidation for &BlockchainStateChannelV1 { - fn is_valid_owner(&self) -> Result { - PublicKey::try_from(&self.owner[..]) - .map_err(|_| StateChannelError::invalid_owner()) - .and_then(|owner| self.verify(&owner)) - .map_err(|_| StateChannelError::invalid_owner())?; - Ok(()) - } - - fn is_valid_for(&self, public_key: &PublicKey) -> Result { - // Validate summary for this gateway - if let Some(summary) = self.get_summary(public_key) { - is_valid_summary(summary)?; - } - Ok(()) - } - - fn get_summary(&self, public_key: &PublicKey) -> Option<&BlockchainStateChannelSummaryV1> { - let public_keybin = public_key.to_vec(); - self.summaries - .iter() - .find(|summary| summary.client_pubkeybin == public_keybin) - } - - fn total_dcs(&self) -> u64 { - self.summaries - .iter() - .fold(0, |acc, summary| acc + summary.num_dcs) - } - - fn num_dcs_for(&self, public_key: &PublicKey) -> u64 { - let public_keybin = public_key.to_vec(); - self.summaries.iter().fold(0, |acc, summary| { - if summary.client_pubkeybin == public_keybin { - acc + summary.num_dcs - } else { - acc - } - }) - } - - fn causally_compare_for(&self, public_key: &PublicKey, newer: &Self) -> StateChannelCausality { - match (self.nonce, newer.nonce) { - (older_nonce, newer_nonce) if older_nonce == newer_nonce => { - if self.summaries == newer.summaries { - return StateChannelCausality::Equal; - } - StateChannelCausality::Conflict - } - (older_nonce, newer_nonce) if newer_nonce > older_nonce => { - match (self.get_summary(public_key), newer.get_summary(public_key)) { - (None, _) => StateChannelCausality::Cause, - (Some(_), None) => StateChannelCausality::Conflict, - (Some(older_summary), Some(newer_summary)) => { - if newer_summary.num_packets >= older_summary.num_packets - && newer_summary.num_dcs >= older_summary.num_dcs - { - StateChannelCausality::Cause - } else { - StateChannelCausality::Conflict - } - } - } - } - (_older_nonce, _newer_nonce) => { - match (self.get_summary(public_key), newer.get_summary(public_key)) { - (_, None) => StateChannelCausality::Effect, - (None, _) => StateChannelCausality::Conflict, - (Some(older_summary), Some(newer_summary)) => { - if newer_summary.num_packets <= older_summary.num_packets - && newer_summary.num_dcs <= older_summary.num_packets - { - StateChannelCausality::Effect - } else { - StateChannelCausality::Conflict - } - } - } - } - } - } -} - -fn is_valid_summary(summary: &BlockchainStateChannelSummaryV1) -> Result { - PublicKey::try_from(&summary.client_pubkeybin[..]).map_err(|_| { - StateChannelError::invalid_summary(StateChannelSummaryError::InvalidAddress) - })?; - if summary.num_dcs < summary.num_packets { - return Err(StateChannelError::invalid_summary( - StateChannelSummaryError::PacketDCMismatch, - )); - } - if summary.num_packets == 0 { - return Err(StateChannelError::invalid_summary( - StateChannelSummaryError::ZeroPacket, - )); - } - Ok(()) -} diff --git a/src/state_channel/message.rs b/src/state_channel/message.rs index 55c95502..9cb9dfe8 100644 --- a/src/state_channel/message.rs +++ b/src/state_channel/message.rs @@ -1,7 +1,7 @@ -use crate::{Keypair, MsgSign, Packet, Region, Result}; +use crate::{Error, Keypair, MsgSign, Packet, Region, Result}; use helium_proto::{ blockchain_state_channel_message_v1::Msg, BlockchainStateChannelMessageV1, - BlockchainStateChannelOfferV1, BlockchainStateChannelPacketV1, + BlockchainStateChannelPacketV1, }; use std::sync::Arc; @@ -23,28 +23,7 @@ impl StateChannelMessage { hold_time, }; packet.signature = packet.sign(keypair).await?; - Ok(StateChannelMessage::from(packet)) - } - - pub async fn offer( - packet: Packet, - keypair: Arc, - region: &Region, - req_diff: bool, - ) -> Result { - let frame = Packet::parse_frame(lorawan::Direction::Uplink, packet.payload())?; - let mut offer = BlockchainStateChannelOfferV1 { - packet_hash: packet.hash(), - payload_size: packet.payload().len() as u64, - fcnt: frame.fcnt().unwrap_or(0) as u32, - hotspot: keypair.public_key().into(), - region: region.into(), - routing: Packet::routing_information(&frame)?, - signature: vec![], - req_diff, - }; - offer.signature = offer.sign(keypair).await?; - Ok(Self::from(offer)) + Ok(Self::from(packet)) } pub fn msg(&self) -> &Msg { @@ -54,6 +33,17 @@ impl StateChannelMessage { pub fn to_message(self) -> BlockchainStateChannelMessageV1 { BlockchainStateChannelMessageV1 { msg: Some(self.0) } } + + pub fn to_downlink(self) -> Result> { + match self.0 { + Msg::Response(response) => Ok(Packet::from_state_channel_response(response)), + _ => Err(Error::custom("state channel message not a downlink packet")), + } + } + + pub fn from_message(msg: BlockchainStateChannelMessageV1) -> Option { + msg.msg.map(Self::from) + } } impl From for StateChannelMessage { @@ -62,25 +52,18 @@ impl From for StateChannelMessage { } } -macro_rules! from_msg { - ($msg_type:ty, $enum:path) => { - impl From<$msg_type> for StateChannelMessage { - fn from(inner: $msg_type) -> Self { - let msg = $enum(inner); - Self(msg) - } - } +impl From for StateChannelMessage { + fn from(inner: BlockchainStateChannelPacketV1) -> Self { + let msg = Msg::Packet(inner); + Self(msg) + } +} - impl From for $msg_type { - fn from(v: StateChannelMessage) -> $msg_type { - match v.0 { - $enum(inner) => inner, - _ => panic!("invalid state channel message conversion"), - } - } +impl From for BlockchainStateChannelPacketV1 { + fn from(v: StateChannelMessage) -> Self { + match v.0 { + Msg::Packet(inner) => inner, + _ => panic!("invalid state channel message conversion"), } - }; + } } - -from_msg!(BlockchainStateChannelPacketV1, Msg::Packet); -from_msg!(BlockchainStateChannelOfferV1, Msg::Offer); diff --git a/src/state_channel/mod.rs b/src/state_channel/mod.rs index ab91f1e5..2296a6b0 100644 --- a/src/state_channel/mod.rs +++ b/src/state_channel/mod.rs @@ -1,7 +1,3 @@ -mod channel; mod message; -pub use channel::{ - check_active, check_active_diff, StateChannel, StateChannelCausality, StateChannelValidation, -}; pub use message::StateChannelMessage; diff --git a/src/traits/msg_verify.rs b/src/traits/msg_verify.rs index 58f5518a..791d59b6 100644 --- a/src/traits/msg_verify.rs +++ b/src/traits/msg_verify.rs @@ -1,8 +1,7 @@ use crate::{Error, Result}; use helium_crypto::{PublicKey, Verify}; use helium_proto::{ - BlockchainStateChannelMessageV1, BlockchainStateChannelOfferV1, BlockchainStateChannelPacketV1, - BlockchainStateChannelV1, GatewayRespV1, Message, + BlockchainStateChannelMessageV1, BlockchainStateChannelPacketV1, GatewayRespV1, Message, }; pub trait MsgVerify { @@ -25,8 +24,6 @@ macro_rules! impl_msg_verify { impl_msg_verify!(GatewayRespV1, signature); impl_msg_verify!(BlockchainStateChannelPacketV1, signature); -impl_msg_verify!(BlockchainStateChannelOfferV1, signature); -impl_msg_verify!(BlockchainStateChannelV1, signature); impl MsgVerify for BlockchainStateChannelMessageV1 { fn verify(&self, verifier: &PublicKey) -> Result { @@ -34,7 +31,7 @@ impl MsgVerify for BlockchainStateChannelMessageV1 { match &self.msg { Some(Msg::Response(_m)) => Ok(()), Some(Msg::Packet(m)) => m.verify(verifier), - Some(Msg::Offer(m)) => m.verify(verifier), + Some(Msg::Offer(_m)) => Ok(()), Some(Msg::Purchase(_m)) => Ok(()), Some(Msg::Banner(_m)) => Ok(()), Some(Msg::Reject(_m)) => Ok(()), diff --git a/src/updater/releases.rs b/src/updater/releases.rs index 13aca52f..479af14a 100644 --- a/src/updater/releases.rs +++ b/src/updater/releases.rs @@ -206,12 +206,9 @@ impl Release { /// Find an asset with a given name in this release. Returns None if no such /// asset was found. pub fn asset_named(&self, name: &str) -> Option<&ReleaseAsset> { - for asset in &self.assets { - if asset.name.starts_with(name) { - return Some(asset); - } - } - None + self.assets + .iter() + .find(|&asset| asset.name.starts_with(name)) } }