diff --git a/Cargo.lock b/Cargo.lock index 885237365..ea3e21f96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1567,6 +1567,7 @@ dependencies = [ "helium-proto", "http 0.2.11", "http-serde", + "humantime-serde", "lazy_static", "metrics", "metrics-exporter-prometheus", @@ -2500,6 +2501,7 @@ dependencies = [ "chrono", "config", "helium-crypto", + "humantime-serde", "reqwest", "serde", "serde_json", @@ -3419,7 +3421,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.1.16", + "getrandom 0.2.10", "k256", "lazy_static", "multihash", @@ -3660,6 +3662,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.28" @@ -3885,6 +3897,7 @@ dependencies = [ "helium-crypto", "helium-proto", "http 0.2.11", + "humantime-serde", "metrics", "metrics-exporter-prometheus", "poc-metrics", @@ -3954,6 +3967,7 @@ dependencies = [ "hextree", "http 0.2.11", "http-serde", + "humantime-serde", "libflate", "metrics", "metrics-exporter-prometheus", @@ -3993,6 +4007,7 @@ dependencies = [ "helium-proto", "http 0.2.11", "http-serde", + "humantime-serde", "iot-config", "metrics", "poc-metrics", @@ -4032,6 +4047,7 @@ dependencies = [ "helium-crypto", "helium-proto", "http-serde", + "humantime-serde", "iot-config", "itertools", "lazy_static", @@ -4647,6 +4663,7 @@ dependencies = [ "helium-proto", "http 0.2.11", "http-serde", + "humantime-serde", "metrics", "mobile-config", "poc-metrics", @@ -4686,6 +4703,7 @@ dependencies = [ "hextree", "http-serde", "humantime", + "humantime-serde", "lazy_static", "metrics", "metrics-exporter-prometheus", @@ -5396,6 +5414,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-proto", + "humantime-serde", "metrics", "metrics-exporter-prometheus", "poc-metrics", @@ -5958,6 +5977,7 @@ dependencies = [ "futures-util", "helium-crypto", "helium-proto", + "humantime-serde", "lazy_static", "metrics", "metrics-exporter-prometheus", diff --git a/Cargo.toml b/Cargo.toml index 48b60ed99..41399f4b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ reqwest = { version = "0", default-features = false, features = [ ] } beacon = { git = "https://github.com/helium/proto", branch = "master" } humantime = "2" +humantime-serde = "1" metrics = ">=0.22" metrics-exporter-prometheus = "0" tracing = "0" diff --git a/boost_manager/Cargo.toml b/boost_manager/Cargo.toml index 802beea11..cadf8b971 100644 --- a/boost_manager/Cargo.toml +++ b/boost_manager/Cargo.toml @@ -48,3 +48,4 @@ http = {workspace = true} http-serde = {workspace = true} solana = {path = "../solana"} solana-sdk = {workspace = true} +humantime-serde = { workspace = true } diff --git a/boost_manager/pkg/settings-template.toml b/boost_manager/pkg/settings-template.toml index 46cb7f764..c325a6815 100644 --- a/boost_manager/pkg/settings-template.toml +++ b/boost_manager/pkg/settings-template.toml @@ -3,7 +3,7 @@ log = "boost_manager=info,solana=debug" # Cache location for generated boost manager outputs; Required cache = "/tmp/oracles/boost-manager" -start_after = 1702602001 +start_after = "2024-12-15 01:00:00Z" enable_solana_integration = true diff --git a/boost_manager/src/db.rs b/boost_manager/src/db.rs index 6dfac3ccb..6503b4f49 100644 --- a/boost_manager/src/db.rs +++ b/boost_manager/src/db.rs @@ -1,7 +1,8 @@ use crate::OnChainStatus; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use file_store::hex_boost::BoostedHexActivation; use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction}; +use std::time::Duration; const MAX_RETRIES: i32 = 10; const MAX_BATCH_COUNT: i32 = 200; diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index adb022086..f3be11d49 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -85,6 +85,8 @@ impl Server { file_upload::FileUpload::from_settings_tm(&settings.output).await?; let store_base_path = path::Path::new(&settings.cache); + let reward_check_interval = settings.reward_check_interval; + // setup the received for the rewards manifest files let file_store = FileStore::from_settings(&settings.verifier).await?; let (manifest_receiver, manifest_server) = @@ -92,9 +94,9 @@ impl Server { .state(pool.clone()) .store(file_store) .prefix(FileType::RewardManifest.to_string()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .poll_duration(settings.reward_check_interval()) - .offset(settings.reward_check_interval() * 2) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .poll_duration(reward_check_interval) + .offset(reward_check_interval * 2) .create() .await?; @@ -124,12 +126,12 @@ impl Server { let updater = Updater::new( pool.clone(), settings.enable_solana_integration, - settings.activation_check_interval(), + settings.activation_check_interval, settings.txn_batch_size(), solana, )?; - let purger = Purger::new(pool.clone(), settings.retention_period()); + let purger = Purger::new(pool.clone(), settings.retention_period); TaskManager::builder() .add_task(file_upload_server) diff --git a/boost_manager/src/purger.rs b/boost_manager/src/purger.rs index 48d691d95..0e6c41949 100644 --- a/boost_manager/src/purger.rs +++ b/boost_manager/src/purger.rs @@ -1,5 +1,4 @@ use crate::db; -use chrono::Duration as ChronoDuration; use futures::{future::LocalBoxFuture, TryFutureExt}; use sqlx::{Pool, Postgres}; use std::time::Duration; @@ -9,7 +8,7 @@ const PURGE_INTERVAL: Duration = Duration::from_secs(30); pub struct Purger { pool: Pool, - retention_period: ChronoDuration, + retention_period: Duration, } impl ManagedTask for Purger { @@ -27,7 +26,7 @@ impl ManagedTask for Purger { } impl Purger { - pub fn new(pool: Pool, retention_period: ChronoDuration) -> Self { + pub fn new(pool: Pool, retention_period: Duration) -> Self { Self { pool, retention_period, @@ -50,7 +49,7 @@ impl Purger { } } -pub async fn purge(pool: &Pool, retention_period: ChronoDuration) -> anyhow::Result<()> { +pub async fn purge(pool: &Pool, retention_period: Duration) -> anyhow::Result<()> { let num_records_purged = db::purge_stale_records(pool, retention_period).await?; tracing::info!("purged {} stale records", num_records_purged); Ok(()) diff --git a/boost_manager/src/settings.rs b/boost_manager/src/settings.rs index 6ff34cd5a..2679a781d 100644 --- a/boost_manager/src/settings.rs +++ b/boost_manager/src/settings.rs @@ -1,5 +1,6 @@ -use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use config::{Config, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; use std::{path::Path, time::Duration}; @@ -11,13 +12,16 @@ pub struct Settings { pub log: String, /// Cache location for generated verified reports pub cache: String, - /// Reward files check interval in seconds. (Default is 900; 15 minutes) - #[serde(default = "default_reward_check_interval")] - pub reward_check_interval: i64, - /// Hex Activation check interval in seconds. (Default is 900; 15 minutes) + /// Reward files check interval in seconds. (Default is 15 minutes) + #[serde(with = "humantime_serde", default = "default_reward_check_interval")] + pub reward_check_interval: Duration, + /// Hex Activation check interval in seconds. (Default is 15 minutes) /// determines how often we will check the DB for queued txns to solana - #[serde(default = "default_activation_check_interval")] - pub activation_check_interval: i64, + #[serde( + with = "humantime_serde", + default = "default_activation_check_interval" + )] + pub activation_check_interval: Duration, pub database: db_store::Settings, pub verifier: file_store::Settings, pub mobile_config_client: mobile_config::ClientSettings, @@ -27,33 +31,33 @@ pub struct Settings { pub enable_solana_integration: bool, pub solana: Option, #[serde(default = "default_start_after")] - pub start_after: u64, + pub start_after: DateTime, // the number of records to fit per solana txn #[serde(default = "default_txn_batch_size")] pub txn_batch_size: u32, // default retention period in seconds - #[serde(default = "default_retention_period")] - pub retention_period: i64, + #[serde(with = "humantime_serde", default = "default_retention_period")] + pub retention_period: Duration, } -fn default_retention_period() -> i64 { - 86400 * 7 // 7 days +fn default_retention_period() -> Duration { + humantime::parse_duration("7 days").unwrap() } fn default_txn_batch_size() -> u32 { 18 } -fn default_reward_check_interval() -> i64 { - 900 +fn default_reward_check_interval() -> Duration { + humantime::parse_duration("15 minutes").unwrap() } -fn default_activation_check_interval() -> i64 { - 900 +fn default_activation_check_interval() -> Duration { + humantime::parse_duration("15 minutes").unwrap() } -pub fn default_start_after() -> u64 { - 0 +pub fn default_start_after() -> DateTime { + DateTime::UNIX_EPOCH } pub fn default_log() -> String { @@ -83,25 +87,7 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn reward_check_interval(&self) -> ChronoDuration { - ChronoDuration::seconds(self.reward_check_interval) - } - - pub fn activation_check_interval(&self) -> Duration { - Duration::from_secs(self.activation_check_interval as u64) - } - - pub fn retention_period(&self) -> ChronoDuration { - ChronoDuration::seconds(self.retention_period) - } - pub fn txn_batch_size(&self) -> usize { self.txn_batch_size as usize } - - pub fn start_after(&self) -> DateTime { - Utc.timestamp_opt(self.start_after as i64, 0) - .single() - .unwrap() - } } diff --git a/boost_manager/tests/integrations/purger_tests.rs b/boost_manager/tests/integrations/purger_tests.rs index 254be53e9..654875106 100644 --- a/boost_manager/tests/integrations/purger_tests.rs +++ b/boost_manager/tests/integrations/purger_tests.rs @@ -18,7 +18,7 @@ async fn test_purge(pool: PgPool) -> anyhow::Result<()> { assert_eq!(7, count); // do da purge - purger::purge(&pool, Duration::days(7)).await?; + purger::purge(&pool, Duration::days(7).to_std()?).await?; // assert the db contains the expected number of records post purge let count: i64 = sqlx::query_scalar("select count(*) from activated_hexes") diff --git a/denylist/Cargo.toml b/denylist/Cargo.toml index 01943323b..43eb9d98b 100644 --- a/denylist/Cargo.toml +++ b/denylist/Cargo.toml @@ -22,3 +22,4 @@ serde_json = { workspace = true } config = { workspace = true } chrono = { workspace = true } xorf-generator = { git = "https://github.com/helium/xorf-generator", branch = "main" } +humantime-serde = { workspace = true } diff --git a/denylist/src/settings.rs b/denylist/src/settings.rs index 670797fff..1c46da190 100644 --- a/denylist/src/settings.rs +++ b/denylist/src/settings.rs @@ -1,6 +1,7 @@ use crate::{Error, Result}; use config::{Config, Environment, File}; use helium_crypto::PublicKey; +use humantime_serde::re::humantime; use serde::Deserialize; use std::{path::Path, str::FromStr, time::Duration}; @@ -13,9 +14,9 @@ pub struct Settings { /// Listen address for http requests for entropy. Default "0.0.0.0:8080" #[serde(default = "default_denylist_url")] pub denylist_url: String, - /// Cadence at which we poll for an updated denylist (secs) - #[serde(default = "default_trigger_interval")] - pub trigger: u64, + /// Cadence at which we poll for an updated denylist (Default: 6hours) + #[serde(with = "humantime_serde", default = "default_trigger_interval")] + pub trigger_interval: Duration, // vec of b58 helium encoded pubkeys // used to verify signature of denylist filters #[serde(default)] @@ -30,8 +31,8 @@ pub fn default_denylist_url() -> String { "https://api.github.com/repos/helium/denylist/releases/latest".to_string() } -fn default_trigger_interval() -> u64 { - 21600 +fn default_trigger_interval() -> Duration { + humantime::parse_duration("6 hours").unwrap() } impl Settings { @@ -58,10 +59,6 @@ impl Settings { .map_err(Error::from) } - pub fn trigger_interval(&self) -> Duration { - Duration::from_secs(self.trigger) - } - pub fn sign_keys(&self) -> std::result::Result, helium_crypto::Error> { self.sign_keys .iter() diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 112ebea72..a7bf6950a 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -1,11 +1,11 @@ use crate::{file_store, traits::MsgDecode, Error, FileInfo, FileStore, Result}; use aws_sdk_s3::types::ByteStream; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use derive_builder::Builder; use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; use futures_util::TryFutureExt; use retainer::Cache; -use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; +use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration}; use task_manager::ManagedTask; use tokio::sync::mpsc::{Receiver, Sender}; @@ -85,14 +85,14 @@ pub enum LookbackBehavior { #[derive(Debug, Clone, Builder)] #[builder(pattern = "owned")] pub struct FileInfoPollerConfig { - #[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")] + #[builder(default = "DEFAULT_POLL_DURATION")] poll_duration: Duration, state: S, store: FileStore, prefix: String, parser: P, lookback: LookbackBehavior, - #[builder(default = "Duration::minutes(10)")] + #[builder(default = "Duration::from_secs(10 * 60)")] offset: Duration, #[builder(default = "5")] queue_size: usize, @@ -262,10 +262,7 @@ where } fn poll_duration(&self) -> std::time::Duration { - self.config - .poll_duration - .to_std() - .unwrap_or(DEFAULT_POLL_DURATION) + self.config.poll_duration } async fn is_already_processed(&self, file_info: &FileInfo) -> Result { diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml index f6cbc5d26..b8df84ebe 100644 --- a/ingest/Cargo.toml +++ b/ingest/Cargo.toml @@ -37,6 +37,7 @@ metrics-exporter-prometheus = { workspace = true } task-manager = { path = "../task_manager" } rand = { workspace = true } custom-tracing = { path = "../custom_tracing", features = ["grpc"] } +humantime-serde = { workspace = true } [dev-dependencies] backon = "0" diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index df8040d71..737d3254d 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -372,18 +372,18 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .create() .await?; - let grpc_addr = settings.listen; let grpc_server = GrpcServer { beacon_report_sink, witness_report_sink, required_network: settings.network, - address: grpc_addr, - session_key_offer_timeout: settings.session_key_offer_timeout(), - session_key_timeout: settings.session_key_timeout(), + address: settings.listen_addr, + session_key_offer_timeout: settings.session_key_offer_timeout, + session_key_timeout: settings.session_key_timeout, }; tracing::info!( - "grpc listening on {grpc_addr} and server mode {:?}", + "grpc listening on {} and server mode {:?}", + settings.listen_addr, settings.mode ); diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 34bfc40f3..60ac67b34 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -449,7 +449,6 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { bail!("expected valid api token in settings"); }; - let grpc_addr = settings.listen; let grpc_server = GrpcServer { heartbeat_report_sink, wifi_heartbeat_report_sink, @@ -460,12 +459,13 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { invalidated_radio_threshold_report_sink, coverage_object_report_sink, required_network: settings.network, - address: grpc_addr, + address: settings.listen_addr, api_token, }; tracing::info!( - "grpc listening on {grpc_addr} and server mode {:?}", + "grpc listening on {} and server mode {:?}", + settings.listen_addr, settings.mode ); diff --git a/ingest/src/settings.rs b/ingest/src/settings.rs index 266e0e72a..3b0f648f6 100644 --- a/ingest/src/settings.rs +++ b/ingest/src/settings.rs @@ -1,7 +1,8 @@ use config::{Config, Environment, File}; use helium_crypto::Network; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::{net::SocketAddr, path::Path}; +use std::{net::SocketAddr, path::Path, time::Duration}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -16,17 +17,20 @@ pub struct Settings { pub mode: Mode, /// Listen address. Required. Default is 0.0.0.0:9081 #[serde(default = "default_listen_addr")] - pub listen: SocketAddr, + pub listen_addr: SocketAddr, /// Local folder for storing intermediate files pub cache: String, /// Network required in all public keys: mainnet | testnet pub network: Network, /// Timeout of session key offer in seconds - #[serde(default = "default_session_key_offer_timeout")] - pub session_key_offer_timeout: u64, + #[serde( + with = "humantime_serde", + default = "default_session_key_offer_timeout" + )] + pub session_key_offer_timeout: Duration, /// Timeout of session key session in seconds - #[serde(default = "default_session_key_timeout")] - pub session_key_timeout: u64, + #[serde(with = "humantime_serde", default = "default_session_key_timeout")] + pub session_key_timeout: Duration, /// Settings for exposed public API /// Target bucket for uploads pub output: file_store::Settings, @@ -37,12 +41,12 @@ pub struct Settings { pub metrics: poc_metrics::Settings, } -pub fn default_session_key_timeout() -> u64 { - 30 * 60 +pub fn default_session_key_timeout() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } -pub fn default_session_key_offer_timeout() -> u64 { - 5 +pub fn default_session_key_offer_timeout() -> Duration { + humantime::parse_duration("5 seconds").unwrap() } pub fn default_listen_addr() -> SocketAddr { @@ -92,12 +96,4 @@ impl Settings { .build() .and_then(|config| config.try_deserialize()) } - - pub fn session_key_offer_timeout(&self) -> std::time::Duration { - std::time::Duration::from_secs(self.session_key_offer_timeout) - } - - pub fn session_key_timeout(&self) -> std::time::Duration { - std::time::Duration::from_secs(self.session_key_timeout) - } } diff --git a/iot_config/Cargo.toml b/iot_config/Cargo.toml index b7c724ab0..218dffc0f 100644 --- a/iot_config/Cargo.toml +++ b/iot_config/Cargo.toml @@ -42,6 +42,7 @@ tracing = {workspace = true} tracing-subscriber = {workspace = true} triggered = {workspace = true} task-manager = { path = "../task_manager" } +humantime-serde = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/iot_config/src/db_cleaner.rs b/iot_config/src/db_cleaner.rs index ae089e6fa..7da7b440a 100644 --- a/iot_config/src/db_cleaner.rs +++ b/iot_config/src/db_cleaner.rs @@ -1,6 +1,7 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use futures::TryFutureExt; use sqlx::{Pool, Postgres, Transaction}; +use std::time::Duration; use task_manager::ManagedTask; const SLEEP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(12 * 60 * 60); diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 7cb6efdc5..04982a68b 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -117,7 +117,7 @@ impl Daemon { admin_svc, }; - let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention()); + let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention); TaskManager::builder() .add_task(grpc_server) diff --git a/iot_config/src/settings.rs b/iot_config/src/settings.rs index 6bea2940c..ebf3c3dbf 100644 --- a/iot_config/src/settings.rs +++ b/iot_config/src/settings.rs @@ -1,7 +1,7 @@ -use chrono::Duration; use config::{Config, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::{net::SocketAddr, path::Path, str::FromStr}; +use std::{net::SocketAddr, path::Path, str::FromStr, time::Duration}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -16,8 +16,8 @@ pub struct Settings { pub keypair: String, /// B58 encoded public key of the admin keypair pub admin: String, - #[serde(default = "default_deleted_entry_retention")] - pub deleted_entry_retention: u64, + #[serde(with = "humantime_serde", default = "default_deleted_entry_retention")] + pub deleted_entry_retention: Duration, pub database: db_store::Settings, /// Settings passed to the db_store crate for connecting to /// the database for Solana on-chain data @@ -33,9 +33,8 @@ pub fn default_listen_addr() -> SocketAddr { "0.0.0.0:8080".parse().unwrap() } -pub fn default_deleted_entry_retention() -> u64 { - // 48 hours - 48 * 60 * 60 +pub fn default_deleted_entry_retention() -> Duration { + humantime::parse_duration("48 hours").unwrap() } impl Settings { @@ -70,8 +69,4 @@ impl Settings { pub fn admin_pubkey(&self) -> Result { helium_crypto::PublicKey::from_str(&self.admin) } - - pub fn deleted_entry_retention(&self) -> Duration { - Duration::seconds(self.deleted_entry_retention as i64) - } } diff --git a/iot_packet_verifier/Cargo.toml b/iot_packet_verifier/Cargo.toml index 3acf411b7..6250136d2 100644 --- a/iot_packet_verifier/Cargo.toml +++ b/iot_packet_verifier/Cargo.toml @@ -35,3 +35,4 @@ tracing-subscriber = {workspace = true} triggered = {workspace = true} http = {workspace = true} http-serde = {workspace = true} +humantime-serde = { workspace = true } diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 4f0dda388..750860511 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -49,11 +49,16 @@ pub enum BurnError { } impl Burner { - pub fn new(pending_tables: P, balances: &BalanceCache, burn_period: u64, solana: S) -> Self { + pub fn new( + pending_tables: P, + balances: &BalanceCache, + burn_period: Duration, + solana: S, + ) -> Self { Self { pending_tables, balances: balances.balances(), - burn_period: Duration::from_secs(60 * burn_period), + burn_period, solana, } } diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index b26d22b3e..0d05e3cec 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -18,7 +18,7 @@ use futures_util::TryFutureExt; use iot_config::client::{org_client::Orgs, OrgClient}; use solana::burn::SolanaRpc; use sqlx::{Pool, Postgres}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::{mpsc::Receiver, Mutex}; @@ -167,7 +167,7 @@ impl Cmd { file_source::continuous_source::() .state(pool.clone()) .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::IotPacketReport.to_string()) .create() .await?; @@ -199,7 +199,7 @@ impl Cmd { solana, balance_store, minimum_allowed_balance, - Duration::from_secs(60 * monitor_funds_period), + monitor_funds_period, shutdown, ) .map_err(anyhow::Error::from) diff --git a/iot_packet_verifier/src/settings.rs b/iot_packet_verifier/src/settings.rs index c67aeb886..e7bd45c52 100644 --- a/iot_packet_verifier/src/settings.rs +++ b/iot_packet_verifier/src/settings.rs @@ -1,7 +1,8 @@ -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use config::{Config, ConfigError, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::path::Path; +use std::{path::Path, time::Duration}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -12,8 +13,8 @@ pub struct Settings { /// Cache location for generated verified reports pub cache: String, /// Data credit burn period in minutes. Default is 1. - #[serde(default = "default_burn_period")] - pub burn_period: u64, + #[serde(with = "humantime_serde", default = "default_burn_period")] + pub burn_period: Duration, pub database: db_store::Settings, pub ingest: file_store::Settings, pub iot_config_client: iot_config::client::Settings, @@ -26,19 +27,19 @@ pub struct Settings { pub minimum_allowed_balance: u64, pub solana: Option, #[serde(default = "default_start_after")] - pub start_after: u64, + pub start_after: DateTime, /// Number of minutes we should sleep before checking to re-enable /// any disabled orgs. #[serde(default = "default_monitor_funds_period")] - pub monitor_funds_period: u64, + pub monitor_funds_period: Duration, } -pub fn default_start_after() -> u64 { - 0 +pub fn default_start_after() -> DateTime { + DateTime::UNIX_EPOCH } -pub fn default_burn_period() -> u64 { - 1 +pub fn default_burn_period() -> Duration { + humantime::parse_duration("1 minute").unwrap() } pub fn default_log() -> String { @@ -49,8 +50,8 @@ pub fn default_minimum_allowed_balance() -> u64 { 3_500_000 } -pub fn default_monitor_funds_period() -> u64 { - 30 +pub fn default_monitor_funds_period() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } impl Settings { @@ -75,10 +76,4 @@ impl Settings { .build() .and_then(|config| config.try_deserialize()) } - - pub fn start_after(&self) -> DateTime { - Utc.timestamp_opt(self.start_after as i64, 0) - .single() - .unwrap() - } } diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index b20587b36..c3ca415ac 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -452,7 +452,7 @@ async fn test_end_to_end() { let mut burner = Burner::new( pending_tables.clone(), &balance_cache, - 0, // Burn period does not matter, we manually burn + Duration::default(), // Burn period does not matter, we manually burn solana_network.clone(), ); diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index 6bbf783c8..234e1ccc6 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -55,3 +55,4 @@ price = { path = "../price" } tokio-util = { workspace = true } tokio-stream = { workspace = true } task-manager = { path = "../task_manager" } +humantime-serde = { workspace = true } diff --git a/iot_verifier/src/entropy.rs b/iot_verifier/src/entropy.rs index d582cd920..08d2fcbc7 100644 --- a/iot_verifier/src/entropy.rs +++ b/iot_verifier/src/entropy.rs @@ -1,12 +1,13 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use std::time::Duration; /// measurement in seconds of a piece of entropy /// its lifespan will be valid from entropy.timestamp to entropy.timestamp + ENTROPY_LIFESPAN /// any beacon or witness report received after this period and before the ENTROPY_STALE_PERIOD /// defined in the purger module will be rejected due to being outside of the entropy lifespan /// TODO: determine a sane value here -pub const ENTROPY_LIFESPAN: i64 = 180; +pub const ENTROPY_LIFESPAN: Duration = Duration::from_secs(180); #[derive(sqlx::Type, Serialize, Deserialize, Debug)] #[sqlx(type_name = "report_type", rename_all = "lowercase")] diff --git a/iot_verifier/src/gateway_updater.rs b/iot_verifier/src/gateway_updater.rs index 1826775d1..f403a4ae2 100644 --- a/iot_verifier/src/gateway_updater.rs +++ b/iot_verifier/src/gateway_updater.rs @@ -1,8 +1,7 @@ -use chrono::Duration; use futures::{future::LocalBoxFuture, stream::StreamExt, TryFutureExt}; use helium_crypto::PublicKeyBinary; use iot_config::{client::Gateways, gateway_info::GatewayInfo}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use task_manager::ManagedTask; use tokio::sync::watch; use tokio::time; @@ -64,11 +63,7 @@ where pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting gateway_updater"); - let mut trigger_timer = time::interval( - self.refresh_interval - .to_std() - .expect("valid interval in seconds"), - ); + let mut trigger_timer = time::interval(self.refresh_interval); loop { tokio::select! { biased; diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 0be84f53d..c2c70ded7 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -6,7 +6,7 @@ use crate::{ Settings, }; use chrono::DateTime; -use chrono::{Duration as ChronoDuration, Utc}; +use chrono::Utc; use file_store::{ iot_beacon_report::IotBeaconIngestReport, iot_witness_report::IotWitnessIngestReport, @@ -15,8 +15,9 @@ use file_store::{ }; use futures::{future::LocalBoxFuture, stream, StreamExt}; use helium_crypto::PublicKeyBinary; +use humantime_serde::re::humantime; use sqlx::PgPool; -use std::{hash::Hasher, ops::DerefMut, str::FromStr}; +use std::{hash::Hasher, ops::DerefMut, str::FromStr, time::Duration}; use task_manager::ManagedTask; use tokio::{ sync::Mutex, @@ -31,9 +32,9 @@ pub struct Loader { ingest_store: FileStore, pool: PgPool, poll_time: time::Duration, - window_width: ChronoDuration, - ingestor_rollup_time: ChronoDuration, - max_lookback_age: ChronoDuration, + window_width: Duration, + ingestor_rollup_time: Duration, + max_lookback_age: Duration, gateway_cache: GatewayCache, } @@ -67,17 +68,13 @@ impl Loader { ) -> Result { tracing::info!("from_settings verifier loader"); let ingest_store = FileStore::from_settings(&settings.ingest).await?; - let poll_time = settings.poc_loader_poll_time(); - let window_width = settings.poc_loader_window_width(); - let ingestor_rollup_time = settings.ingestor_rollup_time(); - let max_lookback_age = settings.loader_window_max_lookback_age(); Ok(Self { pool, ingest_store, - poll_time, - window_width, - ingestor_rollup_time, - max_lookback_age, + poll_time: settings.poc_loader_poll_time, + window_width: settings.poc_loader_window_width, + ingestor_rollup_time: settings.ingestor_rollup_time, + max_lookback_age: settings.loader_window_max_lookback_age, gateway_cache, }) } @@ -121,10 +118,11 @@ impl Loader { .max(window_max_lookback); let before_max = after + self.window_width; let before = (now - (self.window_width * 3)).min(before_max); - let cur_window_width = before - after; + let cur_window_width = (before - after).to_std()?; tracing::info!( "sliding window, after: {after}, before: {before}, cur width: {:?}, required width: {:?}", - cur_window_width.num_minutes(), self.window_width.num_minutes() + humantime::format_duration(cur_window_width), + humantime::format_duration(self.window_width) ); // if the current window width is less than our expected width // then do nothing @@ -195,12 +193,13 @@ impl Loader { // to account for the potential of the ingestor write time for // witness reports being out of sync with that of beacon files // for witnesses we do need the filter but not the arc + let two_minutes = Duration::from_secs(120); match self .process_events( FileType::IotWitnessIngestReport, &self.ingest_store, - after - (self.ingestor_rollup_time + ChronoDuration::seconds(120)), - before + (self.ingestor_rollup_time + ChronoDuration::seconds(120)), + after - (self.ingestor_rollup_time + two_minutes), + before + (self.ingestor_rollup_time + two_minutes), None, Some(&filter), ) diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 3fd50ab9a..4efcc15ed 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -84,11 +84,9 @@ impl Server { // * // setup caches // * - let (gateway_updater_receiver, gateway_updater_server) = GatewayUpdater::new( - settings.gateway_refresh_interval(), - iot_config_client.clone(), - ) - .await?; + let (gateway_updater_receiver, gateway_updater_server) = + GatewayUpdater::new(settings.gateway_refresh_interval, iot_config_client.clone()) + .await?; let gateway_cache = GatewayCache::new(gateway_updater_receiver.clone()); // * @@ -106,7 +104,7 @@ impl Server { // setup the density scaler requirements // * let density_scaler = DensityScaler::new( - settings.loader_window_max_lookback_age(), + settings.loader_window_max_lookback_age, pool.clone(), gateway_updater_receiver, ) @@ -143,17 +141,17 @@ impl Server { pool: pool.clone(), rewards_sink, reward_manifests_sink, - reward_period_hours: settings.rewards, - reward_offset: settings.reward_offset_duration(), + reward_period_hours: settings.reward_period, + reward_offset: settings.reward_period_offset, price_tracker, }; // * // setup entropy requirements // * - let max_lookback_age = settings.loader_window_max_lookback_age(); + let max_lookback_age = settings.loader_window_max_lookback_age; let entropy_store = FileStore::from_settings(&settings.entropy).await?; - let entropy_interval = settings.entropy_interval(); + let entropy_interval = settings.entropy_interval; let (entropy_loader_receiver, entropy_loader_server) = file_source::continuous_source::() .state(pool.clone()) @@ -186,7 +184,7 @@ impl Server { .await?; let packet_store = FileStore::from_settings(&settings.packet_ingest).await?; - let packet_interval = settings.packet_interval(); + let packet_interval = settings.packet_interval; let (pk_loader_receiver, pk_loader_server) = file_source::continuous_source::() .state(pool.clone()) @@ -232,15 +230,11 @@ impl Server { .create() .await?; - let base_stale_period = settings.base_stale_period(); - let beacon_stale_period = settings.beacon_stale_period(); - let witness_stale_period = settings.witness_stale_period(); - let entropy_stale_period = settings.entropy_stale_period(); let purger = purger::Purger::new( - base_stale_period, - beacon_stale_period, - witness_stale_period, - entropy_stale_period, + settings.base_stale_period, + settings.beacon_stale_period, + settings.witness_stale_period, + settings.entropy_stale_period, pool.clone(), purger_invalid_beacon_sink, purger_invalid_witness_sink, diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index 62c498eed..fdc8e1fb7 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -9,7 +9,7 @@ use crate::{ witness_updater::WitnessUpdater, }; use beacon; -use chrono::{DateTime, Duration, DurationRound, Utc}; +use chrono::{DateTime, DurationRound, Utc}; use denylist::denylist::DenyList; use file_store::{ iot_beacon_report::{IotBeaconIngestReport, IotBeaconReport}, @@ -31,7 +31,7 @@ use iot_config::{ use lazy_static::lazy_static; use rust_decimal::Decimal; use sqlx::PgPool; -use std::f64::consts::PI; +use std::{f64::consts::PI, time::Duration}; pub type GenericVerifyResult = Result; @@ -55,9 +55,9 @@ lazy_static! { /// would disqualify the hotspot from validating further beacons static ref DEFAULT_TX_SCALE: Decimal = Decimal::new(2000, 4); /// max permitted lag between the first witness and all subsequent witnesses - static ref MAX_WITNESS_LAG: Duration = Duration::milliseconds(1500); + static ref MAX_WITNESS_LAG: chrono::Duration = chrono::Duration::milliseconds(1500); /// max permitted lag between the beaconer and a witness - static ref MAX_BEACON_TO_WITNESS_LAG: Duration = Duration::milliseconds(4000); + static ref MAX_BEACON_TO_WITNESS_LAG: chrono::Duration = chrono::Duration::milliseconds(4000); } #[derive(Debug, PartialEq)] pub struct InvalidResponse { @@ -98,7 +98,7 @@ impl Poc { entropy_start: DateTime, entropy_version: i32, ) -> Self { - let entropy_end = entropy_start + Duration::seconds(ENTROPY_LIFESPAN); + let entropy_end = entropy_start + ENTROPY_LIFESPAN; Self { pool, beacon_interval, @@ -158,7 +158,7 @@ impl Poc { &self.beacon_report, &beaconer_info, &beaconer_region_info.region_params, - self.beacon_interval, + chrono::Duration::from_std(self.beacon_interval)?, ) { Ok(()) => { let tx_scale = hex_density_map @@ -380,7 +380,7 @@ pub fn do_beacon_verifications( beacon_report: &IotBeaconIngestReport, beaconer_info: &GatewayInfo, beaconer_region_params: &[BlockchainRegionParamV1], - beacon_interval: Duration, + beacon_interval: chrono::Duration, ) -> GenericVerifyResult { tracing::debug!( "verifying beacon from beaconer: {:?}", @@ -489,7 +489,7 @@ pub fn do_witness_verifications( fn verify_beacon_schedule( last_beacon: &Option, beacon_received_ts: DateTime, - beacon_interval: Duration, + beacon_interval: chrono::Duration, ) -> GenericVerifyResult { match last_beacon { Some(last_beacon) => { diff --git a/iot_verifier/src/poc_report.rs b/iot_verifier/src/poc_report.rs index 9235ee251..450e641f8 100644 --- a/iot_verifier/src/poc_report.rs +++ b/iot_verifier/src/poc_report.rs @@ -1,7 +1,8 @@ use crate::entropy::ENTROPY_LIFESPAN; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::{Postgres, Transaction}; +use std::time::Duration; const REPORT_INSERT_SQL: &str = "insert into poc_report ( id, @@ -190,7 +191,7 @@ impl Report { where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let entropy_min_time = Utc::now() - Duration::seconds(ENTROPY_LIFESPAN); + let entropy_min_time = Utc::now() - ENTROPY_LIFESPAN; Ok(sqlx::query_as::<_, Self>( r#" select poc_report.id, diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index 6f52ec05b..c9a01d7db 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -1,5 +1,4 @@ use crate::{entropy::Entropy, poc_report::Report, telemetry}; -use chrono::Duration; use file_store::{ file_sink::FileSinkClient, iot_beacon_report::IotBeaconIngestReport, @@ -15,9 +14,9 @@ use futures::{ use helium_proto::services::poc_lora::{ InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, }; -use lazy_static::lazy_static; +use humantime_serde::re::humantime; use sqlx::{PgPool, Postgres}; -use std::ops::DerefMut; +use std::{ops::DerefMut, time::Duration}; use task_manager::ManagedTask; use tokio::{ sync::Mutex, @@ -27,15 +26,6 @@ use tokio::{ const DB_POLL_TIME: time::Duration = time::Duration::from_secs(60 * 35); const PURGER_WORKERS: usize = 50; -lazy_static! { - /// the period after which a beacon report in the DB will be deemed stale - static ref BEACON_STALE_PERIOD: Duration = Duration::minutes(45); - /// the period after which a witness report in the DB will be deemed stale - static ref WITNESS_STALE_PERIOD: Duration = Duration::minutes(45); - /// the period after which an entropy entry in the DB will be deemed stale - static ref ENTROPY_STALE_PERIOD: Duration = Duration::minutes(60); -} - pub struct Purger { pub pool: PgPool, pub base_stale_period: Duration, @@ -110,7 +100,8 @@ impl Purger { // once the report is safely on s3 we can then proceed to purge from the db let beacon_stale_period = self.base_stale_period + self.beacon_stale_period; tracing::info!( - "starting query get_stale_pending_beacons with stale period: {beacon_stale_period}" + "starting query get_stale_pending_beacons with stale period: {}", + humantime::format_duration(beacon_stale_period) ); let stale_beacons = Report::get_stale_beacons(&self.pool, beacon_stale_period).await?; tracing::info!("completed query get_stale_beacons"); @@ -132,7 +123,8 @@ impl Purger { let witness_stale_period = self.base_stale_period + self.witness_stale_period; tracing::info!( - "starting query get_stale_pending_witnesses with stale period: {witness_stale_period}" + "starting query get_stale_pending_witnesses with stale period: {}", + humantime::format_duration(witness_stale_period) ); let stale_witnesses = Report::get_stale_witnesses(&self.pool, witness_stale_period).await?; tracing::info!("completed query get_stale_witnesses"); diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 1db4be290..2c273fc37 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -2,7 +2,7 @@ use crate::{ reward_share::{self, GatewayShares}, telemetry, }; -use chrono::{DateTime, Duration, TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; use futures::future::LocalBoxFuture; @@ -10,22 +10,23 @@ use helium_proto::services::poc_lora as proto; use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; use helium_proto::services::poc_lora::{UnallocatedReward, UnallocatedRewardType}; use helium_proto::RewardManifest; +use humantime_serde::re::humantime; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgExecutor, PgPool, Pool, Postgres}; -use std::ops::Range; +use std::{ops::Range, time::Duration}; use task_manager::ManagedTask; use tokio::time::sleep; -const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; +const REWARDS_NOT_CURRENT_DELAY_PERIOD: Duration = Duration::from_secs(5 * 60); pub struct Rewarder { pub pool: Pool, pub rewards_sink: file_sink::FileSinkClient, pub reward_manifests_sink: file_sink::FileSinkClient, - pub reward_period_hours: i64, + pub reward_period_hours: Duration, pub reward_offset: Duration, pub price_tracker: PriceTracker, } @@ -44,7 +45,7 @@ impl Rewarder { pool: PgPool, rewards_sink: file_sink::FileSinkClient, reward_manifests_sink: file_sink::FileSinkClient, - reward_period_hours: i64, + reward_period_hours: Duration, reward_offset: Duration, price_tracker: PriceTracker, ) -> Self { @@ -61,7 +62,7 @@ impl Rewarder { pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("Starting rewarder"); - let reward_period_length = Duration::hours(self.reward_period_hours); + let reward_period_length = self.reward_period_hours; loop { let now = Utc::now(); @@ -87,9 +88,10 @@ impl Rewarder { scheduler.sleep_duration(Utc::now())? } else { tracing::info!( - "rewards will be retried in {REWARDS_NOT_CURRENT_DELAY_PERIOD} minutes:" + "rewards will be retried in {}", + humantime::format_duration(REWARDS_NOT_CURRENT_DELAY_PERIOD) ); - Duration::minutes(REWARDS_NOT_CURRENT_DELAY_PERIOD).to_std()? + REWARDS_NOT_CURRENT_DELAY_PERIOD } } else { scheduler.sleep_duration(Utc::now())? diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 4e0656ec7..7ef74ffff 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -50,7 +50,7 @@ lazy_static! { pub struct Runner { pub pool: PgPool, - pub beacon_interval: ChronoDuration, + pub beacon_interval: Duration, pub max_witnesses_per_poc: u64, pub beacon_max_retries: u64, pub witness_max_retries: u64, @@ -110,13 +110,13 @@ where hex_density_map: HexDensityMap, witness_updater: WitnessUpdater, ) -> anyhow::Result { - let beacon_interval = settings.beacon_interval()?; + let beacon_interval = settings.beacon_interval; let max_witnesses_per_poc = settings.max_witnesses_per_poc; let beacon_max_retries = settings.beacon_max_retries; let witness_max_retries = settings.witness_max_retries; let deny_list_latest_url = settings.denylist.denylist_url.clone(); let mut deny_list = DenyList::new(&settings.denylist)?; - let region_cache = RegionCache::new(settings.region_params_refresh_interval(), gateways)?; + let region_cache = RegionCache::new(settings.region_params_refresh_interval, gateways)?; // force update to latest in order to update the tag name // when first run, the denylist will load the local filter // but we dont save the tag name so it defaults to 0 @@ -139,7 +139,7 @@ where beacon_max_retries, witness_max_retries, deny_list_latest_url, - deny_list_trigger_interval: settings.denylist.trigger_interval(), + deny_list_trigger_interval: settings.denylist.trigger_interval, deny_list, invalid_beacon_sink, invalid_witness_sink, diff --git a/iot_verifier/src/settings.rs b/iot_verifier/src/settings.rs index c4f8e74a3..34993eb9c 100644 --- a/iot_verifier/src/settings.rs +++ b/iot_verifier/src/settings.rs @@ -1,9 +1,8 @@ use anyhow::bail; -use chrono::Duration; use config::{Config, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::path::Path; -use tokio::time; +use std::{path::Path, time::Duration}; #[derive(Debug, Deserialize, Clone)] pub struct Settings { @@ -17,17 +16,17 @@ pub struct Settings { /// if this is set, this value will be added to the entropy and report /// stale periods and is to prevent data being unnecessarily purged /// in the event the verifier is down for an extended period of time - #[serde(default = "default_base_stale_period")] - pub base_stale_period: i64, + #[serde(with = "humantime_serde", default = "default_base_stale_period")] + pub base_stale_period: Duration, /// the period after which a beacon report in the DB will be deemed stale - #[serde(default = "default_beacon_stale_period")] - pub beacon_stale_period: i64, + #[serde(with = "humantime_serde", default = "default_beacon_stale_period")] + pub beacon_stale_period: Duration, /// the period after which a witness report in the DB will be deemed stale - #[serde(default = "default_witness_stale_period")] - pub witness_stale_period: i64, + #[serde(with = "humantime_serde", default = "default_witness_stale_period")] + pub witness_stale_period: Duration, /// the period after which an entropy report in the DB will be deemed stale - #[serde(default = "default_entropy_stale_period")] - pub entropy_stale_period: i64, + #[serde(with = "humantime_serde", default = "default_entropy_stale_period")] + pub entropy_stale_period: Duration, pub database: db_store::Settings, pub iot_config_client: iot_config::client::Settings, pub ingest: file_store::Settings, @@ -39,50 +38,55 @@ pub struct Settings { pub denylist: denylist::Settings, pub price_tracker: price::price_tracker::Settings, /// Reward period in hours. (Default to 24) - #[serde(default = "default_reward_period")] - pub rewards: i64, + #[serde(with = "humantime_serde", default = "default_reward_period")] + pub reward_period: Duration, /// Reward calculation offset in minutes, rewards will be calculated at the end - /// of the reward period + reward_offset_minutes - #[serde(default = "default_reward_offset_minutes")] - pub reward_offset_minutes: i64, + /// of the reward_period + reward_period_offset + #[serde(with = "humantime_serde", default = "default_reward_period_offset")] + pub reward_period_offset: Duration, #[serde(default = "default_max_witnesses_per_poc")] pub max_witnesses_per_poc: u64, /// The cadence at which hotspots are permitted to beacon (in seconds) /// this should be a factor of 24 so that we can have clear /// beaconing bucket sizes - #[serde(default = "default_beacon_interval")] - pub beacon_interval: u64, + #[serde(with = "humantime_serde", default = "default_beacon_interval")] + pub beacon_interval: Duration, + // FIXME: unused /// Trigger interval for generating a transmit scaling map - #[serde(default = "default_transmit_scale_interval")] - pub transmit_scale_interval: i64, + #[serde(with = "humantime_serde", default = "default_transmit_scale_interval")] + pub transmit_scale_interval: Duration, // roll up time defined in the ingestors ( in seconds ) // ie the time after which they will write out files to s3 // this will be used when padding out the witness // loader window before and after values - #[serde(default = "default_ingestor_rollup_time")] - pub ingestor_rollup_time: i64, + #[serde(with = "humantime_serde", default = "default_ingestor_rollup_time")] + pub ingestor_rollup_time: Duration, /// window width for the poc report loader ( in seconds ) /// each poll the loader will load reports from start time to start time + window width /// NOTE: the window width should be as a minimum equal to the ingestor roll up period /// any less and the verifier will potentially miss incoming reports - #[serde(default = "default_poc_loader_window_width")] - pub poc_loader_window_width: i64, + #[serde(with = "humantime_serde", default = "default_poc_loader_window_width")] + pub poc_loader_window_width: Duration, /// cadence for how often to look for poc reports from s3 buckets - #[serde(default = "default_poc_loader_poll_time")] - pub poc_loader_poll_time: u64, + #[serde(with = "humantime_serde", default = "default_poc_loader_poll_time")] + pub poc_loader_poll_time: Duration, + // FIXME: unused /// the lifespan of a piece of entropy - #[serde(default = "default_entropy_lifespan ")] - pub entropy_lifespan: i64, + #[serde(with = "humantime_serde", default = "default_entropy_lifespan ")] + pub entropy_lifespan: Duration, /// max window age for the poc report loader ( in seconds ) /// the starting point of the window will never be older than now - max age - #[serde(default = "default_loader_window_max_lookback_age")] - pub loader_window_max_lookback_age: i64, + #[serde( + with = "humantime_serde", + default = "default_loader_window_max_lookback_age" + )] + pub loader_window_max_lookback_age: Duration, /// File store poll interval for incoming entropy reports, in seconds - #[serde(default = "default_entropy_interval")] - pub entropy_interval: i64, - /// File store poll interval for incoming packets, in seconds. (Default is 900; 15 minutes) - #[serde(default = "default_packet_interval")] - pub packet_interval: i64, + #[serde(with = "humantime_serde", default = "default_entropy_interval")] + pub entropy_interval: Duration, + /// File store poll interval for incoming packets, in seconds. (Default 15 minutes) + #[serde(with = "humantime_serde", default = "default_packet_interval")] + pub packet_interval: Duration, /// the max number of times a beacon report will be retried /// after this the report will be ignored and eventually be purged #[serde(default = "default_beacon_max_retries")] @@ -92,103 +96,97 @@ pub struct Settings { #[serde(default = "default_witness_max_retries")] pub witness_max_retries: u64, /// interval at which gateways are refreshed - #[serde(default = "default_gateway_refresh_interval")] - pub gateway_refresh_interval: i64, + #[serde(with = "humantime_serde", default = "default_gateway_refresh_interval")] + pub gateway_refresh_interval: Duration, /// interval at which region params in the cache are refreshed - #[serde(default = "default_region_params_refresh_interval")] - pub region_params_refresh_interval: u64, + #[serde( + with = "humantime_serde", + default = "default_region_params_refresh_interval" + )] + pub region_params_refresh_interval: Duration, } -// Default: 30 minutes -fn default_gateway_refresh_interval() -> i64 { - 30 * 60 +fn default_gateway_refresh_interval() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } -// Default: 30 minutes -fn default_region_params_refresh_interval() -> u64 { - 30 * 60 +fn default_region_params_refresh_interval() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } -// Default: 60 minutes // this should be at least poc_loader_window_width * 2 -pub fn default_loader_window_max_lookback_age() -> i64 { - 60 * 60 +fn default_loader_window_max_lookback_age() -> Duration { + humantime::parse_duration("60 minutes").unwrap() } -// Default: 5 minutes -fn default_entropy_interval() -> i64 { - 5 * 60 +fn default_entropy_interval() -> Duration { + humantime::parse_duration("5 minutes").unwrap() } -// Default: 5 minutes -pub fn default_entropy_lifespan() -> i64 { - 5 * 60 +fn default_entropy_lifespan() -> Duration { + humantime::parse_duration("5 minutes").unwrap() } -// Default: 5 minutes -pub fn default_poc_loader_window_width() -> i64 { - 5 * 60 +fn default_poc_loader_window_width() -> Duration { + humantime::parse_duration("5 minutes").unwrap() } -// Default: 5 minutes -pub fn default_ingestor_rollup_time() -> i64 { - 5 * 60 +fn default_ingestor_rollup_time() -> Duration { + humantime::parse_duration("5 minutes").unwrap() } -// Default: 5 minutes + // in normal operational mode the poll time should be set same as that of the window width // however, if for example we are loading historic data, ie looking back 24hours, we will want // the loader to be catching up as quickly as possible and so we will want to poll more often // in order to iterate quickly over the historic data // the average time it takes to load the data available within with window width needs to be // considered here -pub fn default_poc_loader_poll_time() -> u64 { - 5 * 60 +fn default_poc_loader_poll_time() -> Duration { + humantime::parse_duration("5 minutes").unwrap() } -// Default: 6 hours -pub fn default_beacon_interval() -> u64 { - 6 * 60 * 60 +fn default_beacon_interval() -> Duration { + humantime::parse_duration("6 hours").unwrap() } -// Default: 30 min -pub fn default_transmit_scale_interval() -> i64 { - 1800 +fn default_transmit_scale_interval() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } -pub fn default_log() -> String { +fn default_log() -> String { "iot_verifier=debug,poc_store=info".to_string() } -pub fn default_base_stale_period() -> i64 { - 0 +fn default_base_stale_period() -> Duration { + Duration::default() } -pub fn default_beacon_stale_period() -> i64 { - 60 * 45 +fn default_beacon_stale_period() -> Duration { + humantime::parse_duration("45 minutes").unwrap() } -pub fn default_witness_stale_period() -> i64 { - 60 * 45 +fn default_witness_stale_period() -> Duration { + humantime::parse_duration("45 minutes").unwrap() } -pub fn default_entropy_stale_period() -> i64 { - 60 * 60 +fn default_entropy_stale_period() -> Duration { + humantime::parse_duration("60 minutes").unwrap() } -fn default_reward_period() -> i64 { - 24 +fn default_reward_period() -> Duration { + humantime::parse_duration("24 hours").unwrap() } -fn default_reward_offset_minutes() -> i64 { - 30 +fn default_reward_period_offset() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } -pub fn default_max_witnesses_per_poc() -> u64 { +fn default_max_witnesses_per_poc() -> u64 { 14 } -fn default_packet_interval() -> i64 { - 900 +fn default_packet_interval() -> Duration { + humantime::parse_duration("15 minutes").unwrap() } // runner runs at 30 sec intervals @@ -229,64 +227,13 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn reward_offset_duration(&self) -> Duration { - Duration::minutes(self.reward_offset_minutes) - } - - pub fn poc_loader_window_width(&self) -> Duration { - Duration::seconds(self.poc_loader_window_width) - } - - pub fn ingestor_rollup_time(&self) -> Duration { - Duration::seconds(self.ingestor_rollup_time) - } - - pub fn poc_loader_poll_time(&self) -> time::Duration { - time::Duration::from_secs(self.poc_loader_poll_time) - } - - pub fn loader_window_max_lookback_age(&self) -> Duration { - Duration::seconds(self.loader_window_max_lookback_age) - } - - pub fn entropy_lifespan(&self) -> Duration { - Duration::seconds(self.entropy_lifespan) - } - - pub fn base_stale_period(&self) -> Duration { - Duration::seconds(self.base_stale_period) - } - - pub fn beacon_stale_period(&self) -> Duration { - Duration::seconds(self.beacon_stale_period) - } - - pub fn witness_stale_period(&self) -> Duration { - Duration::seconds(self.witness_stale_period) - } - - pub fn entropy_stale_period(&self) -> Duration { - Duration::seconds(self.entropy_stale_period) - } - - pub fn entropy_interval(&self) -> Duration { - Duration::seconds(self.entropy_interval) - } - pub fn packet_interval(&self) -> Duration { - Duration::seconds(self.packet_interval) - } - pub fn gateway_refresh_interval(&self) -> Duration { - Duration::seconds(self.gateway_refresh_interval) - } - pub fn region_params_refresh_interval(&self) -> time::Duration { - time::Duration::from_secs(self.region_params_refresh_interval) - } pub fn beacon_interval(&self) -> anyhow::Result { + // FIXME: // validate the beacon_interval value is a factor of 24, if not bail out - if (24 * 60 * 60) % self.beacon_interval != 0 { + if (24 * 60 * 60) % self.beacon_interval.as_secs() != 0 { bail!("beacon interval is not a factor of 24") } else { - Ok(Duration::seconds(self.beacon_interval as i64)) + Ok(self.beacon_interval) } } } diff --git a/iot_verifier/src/tx_scaler.rs b/iot_verifier/src/tx_scaler.rs index 5728c8e49..eaf8c0d21 100644 --- a/iot_verifier/src/tx_scaler.rs +++ b/iot_verifier/src/tx_scaler.rs @@ -3,16 +3,17 @@ use crate::{ hex_density::{compute_hex_density_map, GlobalHexMap, HexDensityMap}, last_beacon_reciprocity::LastBeaconReciprocity, }; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use futures::future::LocalBoxFuture; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use task_manager::ManagedTask; // The number in minutes within which the gateway has registered a beacon // to the oracle for inclusion in transmit scaling density calculations -const HIP_17_INTERACTIVITY_LIMIT: i64 = 3600; +// 60 hours +const HIP_17_INTERACTIVITY_LIMIT: Duration = Duration::from_secs(60 * 60 * 60); pub struct Server { pub hex_density_map: HexDensityMap, @@ -101,7 +102,7 @@ impl Server { &self, now: DateTime, ) -> anyhow::Result>> { - let interactivity_deadline = now - Duration::minutes(HIP_17_INTERACTIVITY_LIMIT); + let interactivity_deadline = now - HIP_17_INTERACTIVITY_LIMIT; Ok( LastBeaconReciprocity::get_all_since(&self.pool, interactivity_deadline) .await? diff --git a/iot_verifier/tests/integrations/common/mod.rs b/iot_verifier/tests/integrations/common/mod.rs index c9f00f01c..0dea4c251 100644 --- a/iot_verifier/tests/integrations/common/mod.rs +++ b/iot_verifier/tests/integrations/common/mod.rs @@ -419,11 +419,10 @@ pub const BEACONER5: &str = "112BwpY6ARmnMsPZE9iBauh6EJVDvH7MimZtvWnd99nXmmGcKeM pub const WITNESS1: &str = "13ABbtvMrRK8jgYrT3h6Y9Zu44nS6829kzsamiQn9Eefeu3VAZs"; pub const WITNESS2: &str = "112e5E4NCpZ88ivqoXeyWwiVCC4mJFv4kMPowycNMXjoDRSP6ZnS"; -#[allow(dead_code)] + pub const UNKNOWN_GATEWAY1: &str = "1YiZUsuCwxE7xyxjke1ogehv5WSuYZ9o7uM2ZKvRpytyqb8Be63"; pub const NO_METADATA_GATEWAY1: &str = "1YpopKVbRDELWGR3nMd1MAU8a5GxP1uQSDj9AeXHEi3fHSsWGRi"; -#[allow(dead_code)] pub const DENIED_PUBKEY1: &str = "112bUGwooPd1dCDd3h3yZwskjxCzBsQNKeaJTuUF4hSgYedcsFa9"; pub const LOCAL_ENTROPY: [u8; 4] = [233, 70, 25, 176]; @@ -436,7 +435,7 @@ pub const POC_DATA: [u8; 51] = [ 203, 122, 146, 49, 241, 156, 148, 74, 246, 68, 17, 8, 212, 48, 6, 152, 58, 221, 158, 186, 101, 37, 59, 135, 126, 18, 72, 244, 65, 174, ]; -#[allow(dead_code)] + pub const ENTROPY_TIMESTAMP: i64 = 1677163710000; const EU868_PARAMS: &[u8] = &[ diff --git a/iot_verifier/tests/integrations/purger_tests.rs b/iot_verifier/tests/integrations/purger_tests.rs index 6daafb641..876917dce 100644 --- a/iot_verifier/tests/integrations/purger_tests.rs +++ b/iot_verifier/tests/integrations/purger_tests.rs @@ -1,5 +1,5 @@ use crate::common; -use chrono::{Duration as ChronoDuration, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora::{ InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, LoraWitnessReportReqV1, @@ -16,10 +16,10 @@ async fn test_purger(pool: PgPool) -> anyhow::Result<()> { let (invalid_witness_client, mut invalid_witnesses) = common::create_file_sink(); // default stale periods after which the purger will delete reports from the db - let base_stale_period = ChronoDuration::seconds(0); - let beacon_stale_period = ChronoDuration::seconds(3); - let witness_stale_period = ChronoDuration::seconds(3); - let entropy_stale_period = ChronoDuration::seconds(3); + let base_stale_period = Duration::from_secs(0); + let beacon_stale_period = Duration::from_secs(3); + let witness_stale_period = Duration::from_secs(3); + let entropy_stale_period = Duration::from_secs(3); // create the purger let purger = Purger { @@ -34,7 +34,7 @@ async fn test_purger(pool: PgPool) -> anyhow::Result<()> { // default reports timestamp let entropy_ts = Utc.timestamp_millis_opt(common::ENTROPY_TIMESTAMP).unwrap(); - let report_ts = entropy_ts + ChronoDuration::minutes(1); + let report_ts = entropy_ts + Duration::from_secs(60); // // inject a beacon, witness & entropy report into the db diff --git a/iot_verifier/tests/integrations/runner_tests.rs b/iot_verifier/tests/integrations/runner_tests.rs index 628980600..3c8fd7fec 100644 --- a/iot_verifier/tests/integrations/runner_tests.rs +++ b/iot_verifier/tests/integrations/runner_tests.rs @@ -1,6 +1,6 @@ use crate::common::{self, MockFileSinkReceiver}; use async_trait::async_trait; -use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use denylist::DenyList; use futures_util::{stream, StreamExt as FuturesStreamExt}; use helium_crypto::PublicKeyBinary; @@ -23,7 +23,9 @@ use sqlx::PgPool; use std::{self, str::FromStr, time::Duration}; lazy_static! { - static ref BEACON_INTERVAL: ChronoDuration = ChronoDuration::seconds(21600); + static ref BEACON_INTERVAL: Duration = Duration::from_secs(21600); + static ref BEACON_INTERVAL_PLUS_TWO_HOURS: Duration = + *BEACON_INTERVAL + Duration::from_secs(2 * 60 * 60); } #[derive(Debug, Clone)] pub struct MockIotConfigClient { @@ -65,7 +67,7 @@ struct TestContext { } impl TestContext { - async fn setup(pool: PgPool, beacon_interval: ChronoDuration) -> anyhow::Result { + async fn setup(pool: PgPool, beacon_interval: Duration) -> anyhow::Result { // setup file sinks let (invalid_beacon_client, invalid_beacons) = common::create_file_sink(); let (invalid_witness_client, invalid_witnesses) = common::create_file_sink(); @@ -82,7 +84,7 @@ impl TestContext { let deny_list: DenyList = vec![PublicKeyBinary::from_str(common::DENIED_PUBKEY1).unwrap()] .try_into() .unwrap(); - let refresh_interval = ChronoDuration::seconds(30); + let refresh_interval = Duration::from_secs(30); let (gateway_updater_receiver, _gateway_updater_server) = GatewayUpdater::new(refresh_interval, iot_config_client.clone()).await?; let gateway_cache = GatewayCache::new(gateway_updater_receiver.clone()); @@ -115,7 +117,7 @@ impl TestContext { // and all beacon and witness reports will be created // with a received_ts based on an offset from this ts let entropy_ts = Utc.timestamp_millis_opt(common::ENTROPY_TIMESTAMP).unwrap(); - let report_ts = entropy_ts + ChronoDuration::minutes(1); + let report_ts = entropy_ts + Duration::from_secs(60); // add the entropy to the DB common::inject_entropy_report(pool.clone(), entropy_ts).await?; @@ -151,25 +153,25 @@ async fn valid_beacon_and_witness(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -219,25 +221,25 @@ async fn confirm_valid_reports_unmodified(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -280,13 +282,13 @@ async fn confirm_invalid_reports_unmodified(pool: PgPool) -> anyhow::Result<()> common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - ChronoDuration::hours(1), + now - chrono::Duration::hours(1).to_std()?, ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - ChronoDuration::hours(1), + now - chrono::Duration::hours(1).to_std()?, ) .await?; txn.commit().await?; @@ -330,25 +332,25 @@ async fn confirm_valid_beacon_invalid_witness_reports_unmodified( common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -395,25 +397,25 @@ async fn valid_beacon_irregular_schedule_with_witness(pool: PgPool) -> anyhow::R common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -498,13 +500,13 @@ async fn valid_beacon_irregular_schedule_no_witness(pool: PgPool) -> anyhow::Res common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -591,25 +593,25 @@ async fn invalid_beacon_irregular_schedule_with_witness(pool: PgPool) -> anyhow: common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -702,13 +704,13 @@ async fn valid_beacon_gateway_not_found(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -765,25 +767,25 @@ async fn invalid_witness_no_metadata(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -841,13 +843,13 @@ async fn invalid_beacon_no_gateway_found(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - ChronoDuration::hours(1), + now - chrono::Duration::hours(1), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - ChronoDuration::hours(1), + now - chrono::Duration::hours(1), ) .await?; txn.commit().await?; @@ -932,7 +934,7 @@ async fn invalid_beacon_bad_payload(pool: PgPool) -> anyhow::Result<()> { ctx.runner.handle_db_tick().await?; tokio::time::sleep(Duration::from_secs(3)).await; let mut txn = pool.begin().await?; - let beacon_report = Report::get_stale_beacons(&mut txn, ChronoDuration::seconds(1)).await?; + let beacon_report = Report::get_stale_beacons(&mut txn, Duration::from_secs(1)).await?; // max attempts is 2, once that is exceeded the report is no longer retried // so even tho we called handle_db_tick 5 times above, the report was only retried twice assert_eq!(2, beacon_report[0].attempts); @@ -963,13 +965,13 @@ async fn valid_beacon_and_witness_no_beacon_reciprocity(pool: PgPool) -> anyhow: common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -1022,13 +1024,13 @@ async fn valid_beacon_and_witness_no_witness_reciprocity(pool: PgPool) -> anyhow common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + now - (*BEACON_INTERVAL_PLUS_TWO_HOURS), ) .await?; txn.commit().await?; @@ -1070,7 +1072,7 @@ async fn valid_beacon_and_witness_no_witness_reciprocity(pool: PgPool) -> anyhow #[sqlx::test] async fn valid_new_gateway_witness_first_reciprocity(pool: PgPool) -> anyhow::Result<()> { - let test_beacon_interval = ChronoDuration::seconds(5); + let test_beacon_interval = Duration::from_secs(5); let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; let now = ctx.entropy_ts; @@ -1099,13 +1101,13 @@ async fn valid_new_gateway_witness_first_reciprocity(pool: PgPool) -> anyhow::Re common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; txn.commit().await?; @@ -1198,13 +1200,13 @@ async fn valid_new_gateway_witness_first_reciprocity(pool: PgPool) -> anyhow::Re common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; txn.commit().await?; @@ -1238,7 +1240,7 @@ async fn valid_new_gateway_witness_first_reciprocity(pool: PgPool) -> anyhow::Re #[sqlx::test] async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Result<()> { - let test_beacon_interval = ChronoDuration::seconds(5); + let test_beacon_interval = Duration::from_secs(5); let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; let now = ctx.entropy_ts; @@ -1295,13 +1297,13 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; txn.commit().await?; @@ -1340,7 +1342,7 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res tokio::time::sleep(Duration::from_secs(6)).await; let beacon_to_inject = common::create_valid_beacon_report( common::BEACONER5, - ctx.entropy_ts + ChronoDuration::seconds(5), + ctx.entropy_ts + Duration::from_secs(5), ); let witness_to_inject = common::create_valid_witness_report(common::WITNESS2, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; @@ -1351,13 +1353,13 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res common::inject_last_beacon( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; common::inject_last_witness( &mut txn, witness_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; txn.commit().await?; @@ -1392,7 +1394,7 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res #[sqlx::test] async fn valid_lone_wolf_beacon(pool: PgPool) -> anyhow::Result<()> { - let test_beacon_interval = ChronoDuration::seconds(5); + let test_beacon_interval = Duration::from_secs(5); let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; let now = ctx.entropy_ts; @@ -1446,13 +1448,13 @@ async fn valid_lone_wolf_beacon(pool: PgPool) -> anyhow::Result<()> { common::inject_last_beacon( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; common::inject_last_witness( &mut txn, beacon_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), + now - (test_beacon_interval + Duration::from_secs(10)), ) .await?; txn.commit().await?; @@ -1494,7 +1496,7 @@ async fn valid_lone_wolf_beacon(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn valid_two_isolated_gateways_beaconing_and_witnessing(pool: PgPool) -> anyhow::Result<()> { - let test_beacon_interval = ChronoDuration::seconds(5); + let test_beacon_interval = Duration::from_secs(5); let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; // simulate two gateways with no recent activity coming online and @@ -1576,11 +1578,11 @@ async fn valid_two_isolated_gateways_beaconing_and_witnessing(pool: PgPool) -> a let beacon_to_inject = common::create_valid_beacon_report( common::BEACONER1, - ctx.entropy_ts + ChronoDuration::seconds(5), + ctx.entropy_ts + chrono::Duration::seconds(5), ); let witness_to_inject = common::create_valid_witness_report( common::WITNESS1, - ctx.entropy_ts + ChronoDuration::seconds(5), + ctx.entropy_ts + chrono::Duration::seconds(5), ); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 6e49b156d..820a84280 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -35,3 +35,4 @@ triggered = {workspace = true} http = {workspace = true} http-serde = {workspace = true} sha2 = {workspace = true} +humantime-serde = { workspace=true } \ No newline at end of file diff --git a/mobile_packet_verifier/pkg/settings-template.toml b/mobile_packet_verifier/pkg/settings-template.toml index 1e47a6b8e..836071aed 100644 --- a/mobile_packet_verifier/pkg/settings-template.toml +++ b/mobile_packet_verifier/pkg/settings-template.toml @@ -8,11 +8,11 @@ cache = "/var/data/verified-reports" # We will burn data credits from the solana chain every `burn_period` hours. # Default value is 1 hour. -burn_period = 1 +burn_period = "1 hour" # In case burn fails we will accelarate and burn in `min_burn_period` minutes instead of `burn_period` hours. # Default value is 15 minutes. -min_burn_period = 15 +min_burn_period = "15 minutes" # If set to true, enables integration with the Solana network. This includes # checking payer balances and burning data credits. If this is disabled, all diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 6692783f9..767132e1b 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -46,8 +46,8 @@ impl Daemon { pool, burner, reports, - burn_period: settings.burn_period(), - min_burn_period: settings.min_burn_period(), + burn_period: settings.burn_period, + min_burn_period: settings.min_burn_period, gateway_info_resolver, authorization_verifier, invalid_data_session_report_sink, @@ -169,7 +169,7 @@ impl Cmd { Utc.timestamp_millis_opt(0).unwrap(), )) .prefix(FileType::DataTransferSessionIngestReport.to_string()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .create() .await?; diff --git a/mobile_packet_verifier/src/event_ids.rs b/mobile_packet_verifier/src/event_ids.rs index caac3f22f..88b6747b8 100644 --- a/mobile_packet_verifier/src/event_ids.rs +++ b/mobile_packet_verifier/src/event_ids.rs @@ -1,5 +1,6 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres, Transaction}; +use std::time::Duration; use task_manager::ManagedTask; use crate::settings::Settings; @@ -37,13 +38,13 @@ impl EventIdPurger { pub fn from_settings(conn: Pool, settings: &Settings) -> Self { Self { conn, - interval: settings.purger_interval(), - max_age: settings.purger_max_age(), + interval: settings.purger_interval, + max_age: settings.purger_max_age, } } pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { - let mut timer = tokio::time::interval(self.interval.to_std()?); + let mut timer = tokio::time::interval(self.interval); loop { tokio::select! { diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index f4b190d6e..72ef9b586 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -1,7 +1,8 @@ -use chrono::{DateTime, Duration, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use config::{Config, ConfigError, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::path::Path; +use std::{path::Path, time::Duration}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -11,12 +12,12 @@ pub struct Settings { pub log: String, /// Cache location for generated verified reports pub cache: String, - /// Burn period in hours. (Default is 1) - #[serde(default = "default_burn_period")] - pub burn_period: i64, - /// Minimum burn period when error, in minutes. (Default is 15) - #[serde(default = "default_min_burn_period")] - pub min_burn_period: i64, + /// Burn period in hours. (Default is 1 hour) + #[serde(with = "humantime_serde", default = "default_burn_period")] + pub burn_period: Duration, + /// Minimum burn period when error. (Default is 15 minutes) + #[serde(with = "humantime_serde", default = "default_min_burn_period")] + pub min_burn_period: Duration, pub database: db_store::Settings, pub ingest: file_store::Settings, pub output: file_store::Settings, @@ -26,39 +27,35 @@ pub struct Settings { pub solana: Option, pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] - pub start_after: u64, - #[serde(default = "default_purger_interval_in_hours")] - pub purger_interval_in_hours: u64, - #[serde(default = "default_purger_max_age_in_hours")] - pub purger_max_age_in_hours: u64, + pub start_after: DateTime, + #[serde(with = "humantime_serde", default = "default_purger_interval")] + pub purger_interval: Duration, + #[serde(with = "humantime_serde", default = "default_purger_max_age")] + pub purger_max_age: Duration, } -pub fn default_purger_interval_in_hours() -> u64 { - 1 +fn default_purger_interval() -> Duration { + humantime::parse_duration("1 hour").unwrap() } -pub fn default_purger_max_age_in_hours() -> u64 { - 24 +fn default_purger_max_age() -> Duration { + humantime::parse_duration("24 hours").unwrap() } -pub fn default_start_after() -> u64 { - 0 +fn default_start_after() -> DateTime { + DateTime::UNIX_EPOCH } -pub fn default_url() -> http::Uri { - http::Uri::from_static("http://127.0.0.1:8080") -} - -pub fn default_log() -> String { +fn default_log() -> String { "mobile_packet_verifier=debug,poc_store=info".to_string() } -pub fn default_burn_period() -> i64 { - 1 +fn default_burn_period() -> Duration { + humantime::parse_duration("1 hour").unwrap() } -pub fn default_min_burn_period() -> i64 { - 15 +fn default_min_burn_period() -> Duration { + humantime::parse_duration("15 minutes").unwrap() } impl Settings { @@ -83,26 +80,4 @@ impl Settings { .build() .and_then(|config| config.try_deserialize()) } - - pub fn start_after(&self) -> DateTime { - Utc.timestamp_opt(self.start_after as i64, 0) - .single() - .unwrap() - } - - pub fn burn_period(&self) -> tokio::time::Duration { - tokio::time::Duration::from_secs(60 * 60 * self.burn_period as u64) - } - - pub fn min_burn_period(&self) -> tokio::time::Duration { - tokio::time::Duration::from_secs(60 * self.min_burn_period as u64) - } - - pub fn purger_interval(&self) -> Duration { - Duration::hours(self.purger_interval_in_hours as i64) - } - - pub fn purger_max_age(&self) -> Duration { - Duration::hours(self.purger_max_age_in_hours as i64) - } } diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 00889525f..2620b6536 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -51,6 +51,7 @@ retainer = {workspace = true} uuid = {workspace = true} task-manager = {path = "../task_manager"} solana-sdk = {workspace = true} +humantime-serde = { workspace = true } [dev-dependencies] backon = "0" diff --git a/mobile_verifier/pkg/settings-template.toml b/mobile_verifier/pkg/settings-template.toml index 519525106..57b462eca 100644 --- a/mobile_verifier/pkg/settings-template.toml +++ b/mobile_verifier/pkg/settings-template.toml @@ -7,14 +7,11 @@ cache = "/var/data/verfied-reports" # Reward period in hours. (Default is 24) -# rewards = 24 - -# Verifications per rewards period. Default is 8 -# verifications = 8 +# rewards = "24 hours" # Verification offset in minutes, verification will occur at the end of -# the verification period + verification_offset_minutes; Default = 30 -# verification_offset_minutes = 30 +# the reward period + reward_offset; Default = 30 minutes +# reward_offset_minutes = "30 minutes" [database] diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 9401f0070..506d7e389 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -115,7 +115,7 @@ impl CoverageDaemon { file_source::continuous_source::() .state(pool.clone()) .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::CoverageObjectIngestReport.to_string()) .create() .await?; diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 2aa744177..b519fed64 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -49,7 +49,7 @@ impl DataSessionIngestor { file_source::continuous_source::() .state(pool.clone()) .store(data_transfer_ingest) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::ValidDataTransferSession.to_string()) .create() .await?; diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 3560c1e60..0275915b6 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -56,7 +56,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::CbrsHeartbeatIngestReport.to_string()) .queue_size(1) .create() @@ -66,7 +66,7 @@ where pool, gateway_resolver, cbrs_heartbeats, - settings.modeled_coverage_start(), + settings.modeled_coverage_start, settings.max_asserted_distance_deviation, settings.max_distance_from_coverage, valid_heartbeats, diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 03a3701a2..51433e8e7 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -55,7 +55,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::WifiHeartbeatIngestReport.to_string()) .create() .await?; @@ -64,7 +64,7 @@ where pool, gateway_resolver, wifi_heartbeats, - settings.modeled_coverage_start(), + settings.modeled_coverage_start, settings.max_asserted_distance_deviation, settings.max_distance_from_coverage, valid_heartbeats, diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 2821cf78c..fb058ced0 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -97,7 +97,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::RadioThresholdIngestReport.to_string()) .create() .await?; @@ -107,7 +107,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string()) .create() .await?; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 70989003a..8a0f7d839 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -8,7 +8,7 @@ use crate::{ subscriber_location, telemetry, Settings, }; use anyhow::bail; -use chrono::{DateTime, Duration, TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use db_store::meta; use file_store::{ file_sink::{self, FileSinkClient}, @@ -34,7 +34,7 @@ use reward_scheduler::Scheduler; use rust_decimal::{prelude::*, Decimal}; use rust_decimal_macros::dec; use sqlx::{PgExecutor, Pool, Postgres}; -use std::ops::Range; +use std::{ops::Range, time::Duration}; use task_manager::{ManagedTask, TaskManager}; use tokio::time::sleep; @@ -67,7 +67,6 @@ where ) -> anyhow::Result { let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; - let reward_period_hours = settings.rewards; let (mobile_rewards, mobile_rewards_server) = file_sink::FileSinkBuilder::new( FileType::MobileRewardShare, settings.store_base_path(), @@ -92,8 +91,8 @@ where pool.clone(), carrier_service_verifier, hex_boosting_info_resolver, - Duration::hours(reward_period_hours), - Duration::minutes(settings.reward_offset_minutes), + settings.reward_period, + settings.reward_period_offset, mobile_rewards, reward_manifests, price_tracker, @@ -149,7 +148,7 @@ where self.reward(&scheduler).await?; continue; } else { - Duration::minutes(REWARDS_NOT_CURRENT_DELAY_PERIOD).to_std()? + chrono::Duration::minutes(REWARDS_NOT_CURRENT_DELAY_PERIOD).to_std()? } } else { scheduler.sleep_duration(now)? diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index 54e64cc43..e154fdb2e 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -1,7 +1,11 @@ -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use config::{Config, ConfigError, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -11,11 +15,11 @@ pub struct Settings { pub log: String, /// Cache location for generated verified reports pub cache: String, - /// Reward period in hours. (Default is 24) - #[serde(default = "default_reward_period")] - pub rewards: i64, - #[serde(default = "default_reward_offset_minutes")] - pub reward_offset_minutes: i64, + /// Reward period in hours. (Default is 24 hours) + #[serde(with = "humantime_serde", default = "default_reward_period")] + pub reward_period: Duration, + #[serde(with = "humantime_serde", default = "default_reward_period_offset")] + pub reward_period_offset: Duration, pub database: db_store::Settings, pub ingest: file_store::Settings, pub data_transfer_ingest: file_store::Settings, @@ -24,8 +28,8 @@ pub struct Settings { pub price_tracker: price::price_tracker::Settings, pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] - pub start_after: u64, - pub modeled_coverage_start: u64, + pub start_after: DateTime, + pub modeled_coverage_start: DateTime, /// Max distance in meters between the heartbeat and all of the hexes in /// its respective coverage object #[serde(default = "default_max_distance_from_coverage")] @@ -51,29 +55,29 @@ fn default_fencing_resolution() -> u8 { 7 } -pub fn default_max_distance_from_coverage() -> u32 { +fn default_max_distance_from_coverage() -> u32 { // Default is 2 km 2000 } -pub fn default_max_asserted_distance_deviation() -> u32 { +fn default_max_asserted_distance_deviation() -> u32 { 100 } -pub fn default_log() -> String { +fn default_log() -> String { "mobile_verifier=debug,poc_store=info".to_string() } -pub fn default_start_after() -> u64 { - 0 +fn default_start_after() -> DateTime { + DateTime::UNIX_EPOCH } -pub fn default_reward_period() -> i64 { - 24 +fn default_reward_period() -> Duration { + humantime::parse_duration("24 hours").unwrap() } -pub fn default_reward_offset_minutes() -> i64 { - 30 +fn default_reward_period_offset() -> Duration { + humantime::parse_duration("30 minutes").unwrap() } impl Settings { @@ -99,18 +103,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn start_after(&self) -> DateTime { - Utc.timestamp_opt(self.start_after as i64, 0) - .single() - .unwrap() - } - - pub fn modeled_coverage_start(&self) -> DateTime { - Utc.timestamp_opt(self.modeled_coverage_start as i64, 0) - .single() - .unwrap() - } - pub fn usa_region_paths(&self) -> anyhow::Result> { let paths = std::fs::read_dir(&self.usa_geofence_regions)?; Ok(paths diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 2704e2da4..f6599d930 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -85,7 +85,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::CellSpeedtestIngestReport.to_string()) .create() .await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 75c32c923..930c20983 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -67,7 +67,7 @@ where file_source::continuous_source::() .state(pool.clone()) .store(file_store.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) .prefix(FileType::SubscriberLocationIngestReport.to_string()) .create() .await?; diff --git a/price/Cargo.toml b/price/Cargo.toml index c98b65c15..0ce8851fc 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -32,3 +32,4 @@ triggered = {workspace = true} solana-client = {workspace = true} solana-sdk = {workspace = true} task-manager = {path = "../task_manager"} +humantime-serde = { workspace = true } \ No newline at end of file diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index c176b5b1c..dfa2278a7 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -6,7 +6,7 @@ source = "https://api.devnet.solana.com" # Price tick interval (secs). Default = 60s. Optional. -interval = 60 +interval = "60 seconds" # Cache folder to use. Default blow # diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index 2459c00af..9624a7ab1 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -1,13 +1,13 @@ use crate::{metrics::Metrics, Settings}; use anyhow::{anyhow, bail, Error, Result}; -use chrono::{DateTime, Duration, TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use file_store::file_sink; use futures::{future::LocalBoxFuture, TryFutureExt}; use helium_proto::{BlockchainTokenTypeV1, PriceReportV1}; use serde::{Deserialize, Serialize}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey as SolPubkey; -use std::{cmp::Ordering, path::PathBuf, str::FromStr}; +use std::{cmp::Ordering, path::PathBuf, str::FromStr, time::Duration}; use task_manager::ManagedTask; use tokio::{fs, time}; @@ -90,12 +90,12 @@ impl PriceGenerator { client, key: settings.price_key(token_type)?, default_price: settings.default_price(token_type)?, - interval_duration: settings.interval().to_std()?, - stale_price_duration: settings.stale_price_duration(), + interval_duration: settings.interval, + stale_price_duration: settings.stale_price_duration, latest_price_file: PathBuf::from_str(&settings.cache)? .join(format!("{token_type:?}.latest")), file_sink: Some(file_sink), - pyth_price_interval: settings.pyth_price_interval().to_std()?, + pyth_price_interval: settings.pyth_price_interval, }) } diff --git a/price/src/settings.rs b/price/src/settings.rs index a4655c40a..1da3adfe9 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -1,10 +1,10 @@ use anyhow::{anyhow, Result}; -use chrono::Duration; use config::{Config, Environment, File}; use helium_proto::BlockchainTokenTypeV1; +use humantime_serde::re::humantime; use serde::Deserialize; use solana_sdk::pubkey::Pubkey as SolPubkey; -use std::{path::Path, str::FromStr}; +use std::{path::Path, str::FromStr, time::Duration}; #[derive(Debug, Deserialize, Clone)] pub struct ClusterConfig { @@ -48,41 +48,37 @@ pub struct Settings { /// Metrics settings pub metrics: poc_metrics::Settings, /// Tick interval (secs). Default = 60s. - #[serde(default = "default_interval")] - pub interval: i64, + #[serde(with = "humantime_serde", default = "default_interval")] + pub interval: Duration, /// Cluster Configuration - #[serde(default = "default_cluster")] + #[serde(default)] pub cluster: ClusterConfig, /// How long to use a stale price in minutes - #[serde(default = "default_stale_price_minutes")] - pub stale_price_minutes: u64, + #[serde(with = "humantime_serde", default = "default_stale_price_duration")] + pub stale_price_duration: Duration, /// Interval when retrieving a pyth price from on chain - #[serde(default = "default_pyth_price_interval")] - pub pyth_price_interval_in_seconds: u64, + #[serde(with = "humantime_serde", default = "default_pyth_price_interval")] + pub pyth_price_interval: Duration, } -pub fn default_pyth_price_interval() -> u64 { - 60 * 60 * 2 +fn default_pyth_price_interval() -> Duration { + humantime::parse_duration("2 hours").unwrap() } -pub fn default_source() -> String { +fn default_source() -> String { "https://api.devnet.solana.com".to_string() } -pub fn default_log() -> String { +fn default_log() -> String { "price=debug".to_string() } -pub fn default_interval() -> i64 { - 60 +fn default_interval() -> Duration { + humantime::parse_duration("1 minute").unwrap() } -pub fn default_stale_price_minutes() -> u64 { - 12 * 60 -} - -pub fn default_cluster() -> ClusterConfig { - ClusterConfig::default() +fn default_stale_price_duration() -> Duration { + humantime::parse_duration("12 hours").unwrap() } pub fn default_cache() -> String { @@ -112,18 +108,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn interval(&self) -> Duration { - Duration::seconds(self.interval) - } - - pub fn pyth_price_interval(&self) -> Duration { - Duration::seconds(self.pyth_price_interval_in_seconds as i64) - } - - pub fn stale_price_duration(&self) -> Duration { - Duration::minutes(self.stale_price_minutes as i64) - } - pub fn price_key(&self, token_type: BlockchainTokenTypeV1) -> Result> { self.key(token_type)? .as_ref() diff --git a/reward_index/Cargo.toml b/reward_index/Cargo.toml index ff25749d7..29c34841c 100644 --- a/reward_index/Cargo.toml +++ b/reward_index/Cargo.toml @@ -40,3 +40,4 @@ rust_decimal_macros = {workspace = true} tonic = {workspace = true} rand = {workspace = true} async-trait = {workspace = true} +humantime-serde = { workspace = true } \ No newline at end of file diff --git a/reward_index/pkg/settings-template.toml b/reward_index/pkg/settings-template.toml index 7d1c1e51d..0e939a68d 100644 --- a/reward_index/pkg/settings-template.toml +++ b/reward_index/pkg/settings-template.toml @@ -5,7 +5,7 @@ # Interval for checking verifier bucket (in seconds). Default below (15 minutes) # -# interval = 900 +# interval = "15 minutes" # Mode to operate the indexer in. "iot" or "mobile" mode = "iot" diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 9adbc8a2c..600c6764f 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use chrono::{TimeZone, Utc}; use clap::Parser; use file_store::{ file_info_poller::LookbackBehavior, file_source, reward_manifest::RewardManifest, FileStore, @@ -76,18 +75,13 @@ impl Server { telemetry::initialize(&pool).await?; let file_store = FileStore::from_settings(&settings.verifier).await?; - let (receiver, server) = file_source::continuous_source::() .state(pool.clone()) .store(file_store) .prefix(FileType::RewardManifest.to_string()) - .lookback(LookbackBehavior::StartAfter( - Utc.timestamp_opt(settings.start_after as i64, 0) - .single() - .unwrap(), - )) - .poll_duration(settings.interval()) - .offset(settings.interval() * 2) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .poll_duration(settings.interval) + .offset(settings.interval * 2) .create() .await?; let source_join_handle = server.start(shutdown_listener.clone()).await?; diff --git a/reward_index/src/settings.rs b/reward_index/src/settings.rs index 790f1375b..e41231059 100644 --- a/reward_index/src/settings.rs +++ b/reward_index/src/settings.rs @@ -1,7 +1,8 @@ -use chrono::Duration; +use chrono::{DateTime, Utc}; use config::{Config, Environment, File}; +use humantime_serde::re::humantime; use serde::Deserialize; -use std::{fmt, path::Path}; +use std::{fmt, path::Path, time::Duration}; /// Mode to start the indexer in. Each mode uses different files from /// the verifier @@ -28,8 +29,8 @@ pub struct Settings { #[serde(default = "default_log")] pub log: String, /// Check interval in seconds. (Default is 900; 15 minutes) - #[serde(default = "default_interval")] - pub interval: i64, + #[serde(with = "humantime_serde", default = "default_interval")] + pub interval: Duration, /// Mode to run the server in (iot or mobile). Required pub mode: Mode, pub database: db_store::Settings, @@ -38,14 +39,18 @@ pub struct Settings { pub operation_fund_key: Option, pub unallocated_reward_entity_key: Option, #[serde(default = "default_start_after")] - pub start_after: u64, + pub start_after: DateTime, } -pub fn default_start_after() -> u64 { - 0 +fn default_interval() -> Duration { + humantime::parse_duration("15 minutes").unwrap() } -pub fn default_log() -> String { +fn default_start_after() -> DateTime { + DateTime::UNIX_EPOCH +} + +fn default_log() -> String { "reward_index=debug,poc_store=info".to_string() } @@ -72,10 +77,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn interval(&self) -> Duration { - Duration::seconds(self.interval) - } - pub fn operation_fund_key(&self) -> Option { self.operation_fund_key.clone() } @@ -84,7 +85,3 @@ impl Settings { self.unallocated_reward_entity_key.clone() } } - -fn default_interval() -> i64 { - 900 -} diff --git a/reward_scheduler/src/lib.rs b/reward_scheduler/src/lib.rs index edd8efc78..788f4eb79 100644 --- a/reward_scheduler/src/lib.rs +++ b/reward_scheduler/src/lib.rs @@ -1,5 +1,5 @@ -use chrono::{DateTime, Duration, Utc}; -use std::ops::Range; +use chrono::{DateTime, Utc}; +use std::{ops::Range, time::Duration}; #[derive(Debug)] pub struct Scheduler { @@ -43,7 +43,7 @@ impl Scheduler { let duration = if self.reward_period.end + self.reward_offset > now { self.reward_period.end + self.reward_offset - now } else if next_reward_period.end + self.reward_offset <= now { - Duration::zero() + chrono::Duration::zero() } else { (next_reward_period.end + self.reward_offset) - now }; @@ -63,11 +63,11 @@ mod tests { } fn reward_period_length() -> Duration { - Duration::hours(24) + chrono::Duration::hours(24).to_std().unwrap() } fn standard_duration(minutes: i64) -> Result { - Duration::minutes(minutes) + chrono::Duration::minutes(minutes) .to_std() .map_err(|_| OutOfRangeError) } @@ -78,7 +78,7 @@ mod tests { reward_period_length(), dt(2022, 12, 1, 0, 0, 0), dt(2022, 12, 2, 0, 0, 0), - Duration::minutes(30), + chrono::Duration::minutes(30).to_std().unwrap(), ); let now = dt(2022, 12, 1, 1, 0, 0); @@ -98,7 +98,7 @@ mod tests { reward_period_length(), dt(2022, 12, 1, 0, 0, 0), dt(2022, 12, 2, 0, 0, 0), - Duration::minutes(30), + chrono::Duration::minutes(30).to_std().unwrap(), ); let now = dt(2022, 12, 2, 0, 30, 0); @@ -122,7 +122,7 @@ mod tests { reward_period_length(), dt(2022, 12, 1, 0, 0, 0), dt(2022, 12, 2, 0, 0, 0), - Duration::minutes(30), + chrono::Duration::minutes(30).to_std().unwrap(), ); let now = dt(2022, 12, 2, 0, 15, 0);