Skip to content

Commit

Permalink
send packets as quickly as possible (#271)
Browse files Browse the repository at this point in the history
* send packets as quickly as possible
* Queue packets until we have a connection
* Carry through the time we received the packet from the radio

Co-authored-by: Marc Nijdam <[email protected]>
Co-authored-by: macpie <[email protected]>
Co-authored-by: michaeldjeffrey <[email protected]>
  • Loading branch information
4 people authored Jul 28, 2022
1 parent c00717b commit 06f1ebd
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 1,259 deletions.
94 changes: 0 additions & 94 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::state_channel;
use std::net;
use thiserror::Error;

Expand All @@ -20,8 +19,6 @@ pub enum Error {
Decode(#[from] DecodeError),
#[error("service error: {0}")]
Service(#[from] ServiceError),
#[error("state channel error")]
StateChannel(#[from] Box<StateChannelError>),
#[error("semtech udp error")]
Semtech(#[from] semtech_udp::server_runtime::Error),
#[error("time error")]
Expand Down Expand Up @@ -80,47 +77,6 @@ pub enum ServiceError {
LocalClientConnect(helium_proto::services::Error),
}

#[allow(clippy::large_enum_variant)]
#[derive(Error, Debug)]
pub enum StateChannelError {
#[error("ignored state channel")]
Ignored { sc: state_channel::StateChannel },
#[error("inactive state channel")]
Inactive,
#[error("state channel not found")]
NotFound { sc_id: Vec<u8> },
#[error("invalid owner for state channel")]
InvalidOwner,
#[error("state channel summary error")]
Summary(#[from] StateChannelSummaryError),
#[error("new state channel error")]
NewChannel { sc: state_channel::StateChannel },
#[error("state channel causal conflict")]
CausalConflict {
sc: state_channel::StateChannel,
conflicts_with: state_channel::StateChannel,
},
#[error("state channel overpaid")]
Overpaid {
sc: state_channel::StateChannel,
original_dc_amount: u64,
},
#[error("state channel underpaid for a packet")]
Underpaid { sc: state_channel::StateChannel },
#[error("state channel balance too low")]
LowBalance,
}

#[derive(Error, Debug)]
pub enum StateChannelSummaryError {
#[error("zero state channel packet summary")]
ZeroPacket,
#[error("zero state channel packet over dc count")]
PacketDCMismatch,
#[error("invalid address")]
InvalidAddress,
}

#[derive(Debug, Error)]
pub enum RegionError {
#[error("no region params found or active")]
Expand Down Expand Up @@ -178,56 +134,6 @@ impl DecodeError {
}
}

// State Channel Errors
impl StateChannelError {
pub fn invalid_owner() -> Error {
Error::StateChannel(Box::new(Self::InvalidOwner))
}

pub fn invalid_summary(err: StateChannelSummaryError) -> Error {
Error::StateChannel(Box::new(Self::Summary(err)))
}

pub fn inactive() -> Error {
Error::StateChannel(Box::new(Self::Inactive))
}

pub fn not_found(sc_id: &[u8]) -> Error {
let sc_id = sc_id.to_vec();
Error::StateChannel(Box::new(Self::NotFound { sc_id }))
}

pub fn ignored(sc: state_channel::StateChannel) -> Error {
Error::StateChannel(Box::new(Self::Ignored { sc }))
}

pub fn new_channel(sc: state_channel::StateChannel) -> Error {
Error::StateChannel(Box::new(Self::NewChannel { sc }))
}

pub fn causal_conflict(
sc: state_channel::StateChannel,
conflicts_with: state_channel::StateChannel,
) -> Error {
Error::StateChannel(Box::new(Self::CausalConflict { sc, conflicts_with }))
}

pub fn overpaid(sc: state_channel::StateChannel, original_dc_amount: u64) -> Error {
Error::StateChannel(Box::new(Self::Overpaid {
sc,
original_dc_amount,
}))
}

pub fn underpaid(sc: state_channel::StateChannel) -> Error {
Error::StateChannel(Box::new(Self::Underpaid { sc }))
}

pub fn low_balance() -> Error {
Error::StateChannel(Box::new(Self::LowBalance))
}
}

impl RegionError {
pub fn no_region_params() -> Error {
Error::Region(RegionError::NoRegionParams)
Expand Down
11 changes: 7 additions & 4 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use semtech_udp::{
tx_ack, MacAddress,
};
use slog::{debug, info, o, warn, Logger};
use std::{convert::TryFrom, time::Duration};
use std::{
convert::TryFrom,
time::{Duration, Instant},
};
use tokio::sync::mpsc;

pub const DOWNLINK_TIMEOUT_SECS: u64 = 5;
Expand Down Expand Up @@ -112,7 +115,7 @@ impl Gateway {
Ok(packet) if packet.is_longfi() => {
info!(logger, "ignoring longfi packet");
}
Ok(packet) => self.handle_uplink(logger, packet).await,
Ok(packet) => self.handle_uplink(logger, packet, Instant::now()).await,
Err(err) => {
warn!(logger, "ignoring push_data: {err:?}");
}
Expand All @@ -127,9 +130,9 @@ impl Gateway {
Ok(())
}

async fn handle_uplink(&mut self, logger: &Logger, packet: Packet) {
async fn handle_uplink(&mut self, logger: &Logger, packet: Packet, received: Instant) {
info!(logger, "uplink {} from {}", packet, self.downlink_mac);
match self.uplinks.uplink(packet).await {
match self.uplinks.uplink(packet, received).await {
Ok(()) => (),
Err(err) => warn!(logger, "ignoring uplink error {:?}", err),
}
Expand Down
Loading

0 comments on commit 06f1ebd

Please sign in to comment.