diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index 061868925..3fd948667 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -3,7 +3,6 @@ use boost_manager::{ activator::Activator, purger::Purger, settings::Settings, telemetry, updater::Updater, watcher::Watcher, }; -use chrono::Duration; use clap::Parser; use file_store::{ file_info_poller::LookbackBehavior, file_sink, file_source, file_upload, @@ -11,7 +10,10 @@ use file_store::{ }; use mobile_config::client::hex_boosting_client::HexBoostingClient; use solana::start_boost::SolanaRpc; -use std::path::{self, PathBuf}; +use std::{ + path::{self, PathBuf}, + time::Duration, +}; use task_manager::TaskManager; #[derive(Debug, clap::Parser)] @@ -103,7 +105,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_boosted_hex_update"), ) - .roll_time(Duration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 0081e53ca..f102e155a 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -1,9 +1,10 @@ use crate::{file_upload::FileUpload, Error, Result}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use futures::{future::LocalBoxFuture, SinkExt, TryFutureExt}; use metrics::Label; +use std::time::Duration; use std::{ io, mem, path::{Path, PathBuf}, @@ -20,12 +21,12 @@ use tokio::{ }; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedWrite}; -pub const DEFAULT_SINK_ROLL_MINS: i64 = 3; +pub const DEFAULT_SINK_ROLL_SECS: u64 = 3 * 60; #[cfg(not(test))] -pub const SINK_CHECK_MILLIS: i64 = 60_000; +pub const SINK_CHECK_MILLIS: u64 = 60_000; #[cfg(test)] -pub const SINK_CHECK_MILLIS: i64 = 50; +pub const SINK_CHECK_MILLIS: u64 = 50; pub const MAX_FRAME_LENGTH: usize = 15_000_000; @@ -80,7 +81,7 @@ impl FileSinkBuilder { target_path: target_path.to_path_buf(), tmp_path: target_path.join("tmp"), max_size: 50_000_000, - roll_time: Duration::minutes(DEFAULT_SINK_ROLL_MINS), + roll_time: Duration::from_secs(DEFAULT_SINK_ROLL_SECS), file_upload, auto_commit: true, metric, @@ -340,11 +341,7 @@ impl FileSink { self.target_path.display() ); - let mut rollover_timer = time::interval( - Duration::milliseconds(SINK_CHECK_MILLIS) - .to_std() - .expect("valid sink roll time"), - ); + let mut rollover_timer = time::interval(Duration::from_millis(SINK_CHECK_MILLIS)); rollover_timer.set_missed_tick_behavior(time::MissedTickBehavior::Burst); loop { @@ -548,7 +545,7 @@ mod tests { file_upload, "fake_metric", ) - .roll_time(chrono::Duration::milliseconds(100)) + .roll_time(Duration::from_millis(100)) .create() .await .expect("failed to create file sink"); @@ -596,7 +593,7 @@ mod tests { file_upload, "fake_metric", ) - .roll_time(chrono::Duration::milliseconds(100)) + .roll_time(Duration::from_millis(100)) .auto_commit(false) .create() .await diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index 2bb4b7dcd..dbaf78397 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -1,6 +1,6 @@ use crate::Settings; use anyhow::{Error, Result}; -use chrono::{Duration, Utc}; +use chrono::Utc; use file_store::{ file_sink::{self, FileSinkClient}, file_upload, @@ -19,7 +19,7 @@ use helium_proto::services::poc_lora::{ LoraStreamSessionInitV1, LoraStreamSessionOfferV1, LoraWitnessIngestReportV1, LoraWitnessReportReqV1, LoraWitnessReportRespV1, }; -use std::{convert::TryFrom, net::SocketAddr, path::Path}; +use std::{convert::TryFrom, net::SocketAddr, path::Path, time::Duration}; use task_manager::{ManagedTask, TaskManager}; use tokio::{sync::mpsc::Sender, time::Instant}; use tokio_stream::wrappers::ReceiverStream; @@ -369,7 +369,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_beacon_report"), ) - .roll_time(Duration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; @@ -380,7 +380,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_witness_report"), ) - .roll_time(Duration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index afc1943aa..06d57d5d0 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -1,6 +1,6 @@ use crate::Settings; use anyhow::{bail, Error, Result}; -use chrono::{Duration, Utc}; +use chrono::Utc; use file_store::{ file_sink::{self, FileSinkClient}, file_upload, @@ -27,8 +27,6 @@ use tonic::{ transport, Request, Response, Status, }; -const INGEST_WAIT_DURATION_MINUTES: i64 = 15; - pub type GrpcResult = std::result::Result, Status>; pub type VerifyResult = std::result::Result; @@ -354,7 +352,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -365,7 +363,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_wifi_heartbeat_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -376,7 +374,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -390,7 +388,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { "_mobile_data_transfer_session_report" ), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -401,7 +399,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_subscriber_location_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -412,7 +410,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_radio_threshold_ingest_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -426,7 +424,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { "_invalidated_radio_threshold_ingest_report" ), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; @@ -437,7 +435,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_coverage_object_report"), ) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .roll_time(settings.roll_time) .create() .await?; diff --git a/ingest/src/settings.rs b/ingest/src/settings.rs index 9c9b8213a..b08f53b01 100644 --- a/ingest/src/settings.rs +++ b/ingest/src/settings.rs @@ -33,6 +33,9 @@ pub struct Settings { /// Settings for exposed public API /// Target bucket for uploads pub output: file_store::Settings, + /// Timeout of session key session in seconds + #[serde(with = "humantime_serde", default = "default_roll_time")] + pub roll_time: Duration, /// API token required as part of a Bearer authentication GRPC request /// header. Used only by the mobile mode currently pub token: Option, @@ -40,6 +43,10 @@ pub struct Settings { pub metrics: poc_metrics::Settings, } +fn default_roll_time() -> Duration { + humantime::parse_duration("15 minutes").unwrap() +} + fn default_session_key_timeout() -> Duration { humantime::parse_duration("30 minutes").unwrap() } diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index a1f3d4351..bf8bc07fd 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -1,6 +1,5 @@ use crate::entropy_loader::EntropyLoader; use anyhow::Result; -use chrono::Duration as ChronoDuration; use clap::Parser; use file_store::{ entropy_report::EntropyReport, file_info_poller::LookbackBehavior, file_sink, file_source, @@ -13,7 +12,7 @@ use iot_verifier::{ tx_scaler::Server as DensityScaler, witness_updater::WitnessUpdater, Settings, }; use price::PriceTracker; -use std::path; +use std::{path, time::Duration}; use task_manager::TaskManager; #[derive(Debug, clap::Parser)] @@ -175,7 +174,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"), ) - .roll_time(ChronoDuration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; @@ -248,7 +247,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"), ) - .roll_time(ChronoDuration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; @@ -259,7 +258,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), ) - .roll_time(ChronoDuration::minutes(5)) + .roll_time(Duration::from_secs(5 * 60)) .create() .await?; @@ -269,7 +268,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_valid_poc"), ) - .roll_time(ChronoDuration::minutes(2)) + .roll_time(Duration::from_secs(2 * 60)) .create() .await?; diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index e0065e055..ca60b2358 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -258,7 +258,7 @@ impl DataSetDownloaderDaemon { concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), ) .auto_commit(true) - .roll_time(chrono::Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index c1ded037c..b6bb453af 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ boosting_oracles::DataSetDownloaderDaemon, coverage::{new_coverage_object_notification_channel, CoverageDaemon}, @@ -11,7 +13,6 @@ use crate::{ telemetry, Settings, }; use anyhow::Result; -use chrono::Duration; use file_store::{file_sink, file_upload, FileStore, FileType}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, @@ -52,7 +53,7 @@ impl Cmd { concat!(env!("CARGO_PKG_NAME"), "_heartbeat"), ) .auto_commit(false) - .roll_time(Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; @@ -64,7 +65,7 @@ impl Cmd { concat!(env!("CARGO_PKG_NAME"), "_seniority_update"), ) .auto_commit(false) - .roll_time(Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; @@ -75,7 +76,7 @@ impl Cmd { concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), ) .auto_commit(false) - .roll_time(Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 3da4e98b4..47a254bc3 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -2,7 +2,7 @@ use crate::{ heartbeats::{HbType, KeyType, OwnedKeyType}, IsAuthorized, Settings, }; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use file_store::{ coverage::{self, CoverageObjectIngestReport}, file_info_poller::{FileInfoStream, LookbackBehavior}, @@ -38,7 +38,7 @@ use std::{ num::NonZeroU32, pin::pin, sync::Arc, - time::Instant, + time::{Duration, Instant}, }; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -89,7 +89,7 @@ impl CoverageDaemon { concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), ) .auto_commit(false) - .roll_time(Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index f6599d930..9809b3a0b 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -2,7 +2,7 @@ use crate::{ speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE}, Settings, }; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, file_sink::{self, FileSinkClient}, @@ -22,7 +22,10 @@ use helium_proto::services::poc_mobile::{ }; use mobile_config::client::gateway_client::GatewayInfoResolver; use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction}; -use std::{collections::HashMap, time::Instant}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; @@ -77,7 +80,7 @@ where concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"), ) .auto_commit(false) - .roll_time(Duration::minutes(15)) + .roll_time(Duration::from_secs(15 * 60)) .create() .await?; @@ -263,7 +266,7 @@ pub async fn get_latest_speedtests_for_pubkey( "#, ) .bind(pubkey) - .bind(timestamp - Duration::hours(SPEEDTEST_LAPSE)) + .bind(timestamp - chrono::Duration::hours(SPEEDTEST_LAPSE)) .bind(timestamp) .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) .fetch_all(exec) @@ -277,7 +280,7 @@ pub async fn aggregate_epoch_speedtests<'a>( ) -> Result { let mut speedtests = EpochSpeedTests::new(); // use latest speedtest which are no older than N hours, defined by SPEEDTEST_LAPSE - let start = epoch_end - Duration::hours(SPEEDTEST_LAPSE); + let start = epoch_end - chrono::Duration::hours(SPEEDTEST_LAPSE); // pull the last N most recent speedtests from prior to the epoch end for each pubkey let mut rows = sqlx::query_as::<_, Speedtest>( "select * from ( @@ -304,7 +307,7 @@ pub async fn clear_speedtests( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, epoch_end: &DateTime, ) -> Result<(), sqlx::Error> { - let oldest_ts = *epoch_end - Duration::hours(SPEEDTEST_LAPSE); + let oldest_ts = *epoch_end - chrono::Duration::hours(SPEEDTEST_LAPSE); sqlx::query("DELETE FROM speedtests WHERE timestamp < $1") .bind(oldest_ts) .execute(&mut *tx) diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index 1879ab59d..ddfea4944 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -1,13 +1,12 @@ use anyhow::{Error, Result}; -use chrono::Duration; use clap::Parser; use file_store::{file_sink, file_upload, FileType}; use futures_util::TryFutureExt; use poc_entropy::{entropy_generator::EntropyGenerator, server::ApiServer, Settings}; -use std::{net::SocketAddr, path}; +use std::{net::SocketAddr, path, time::Duration}; use tokio::{self, signal}; -const ENTROPY_SINK_ROLL_MINS: i64 = 2; +const ENTROPY_SINK_ROLL_SECS: u64 = 2 * 60; #[derive(Debug, clap::Parser)] #[clap(version = env!("CARGO_PKG_VERSION"))] @@ -77,7 +76,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_report_submission"), ) - .roll_time(Duration::minutes(ENTROPY_SINK_ROLL_MINS)) + .roll_time(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)) .create() .await?; diff --git a/price/src/main.rs b/price/src/main.rs index 29a911c67..886e97b49 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -1,13 +1,15 @@ use anyhow::Result; -use chrono::Duration; use clap::Parser; use file_store::{file_sink, file_upload, FileType}; use helium_proto::BlockchainTokenTypeV1; use price::{cli::check, PriceGenerator, Settings}; -use std::path::{self, PathBuf}; +use std::{ + path::{self, PathBuf}, + time::Duration, +}; use task_manager::TaskManager; -const PRICE_SINK_ROLL_MINS: i64 = 3; +const PRICE_SINK_ROLL_SECS: u64 = 3 * 60; #[derive(Debug, clap::Parser)] #[clap(version = env!("CARGO_PKG_VERSION"))] @@ -86,7 +88,7 @@ impl Server { file_upload.clone(), concat!(env!("CARGO_PKG_NAME"), "_report_submission"), ) - .roll_time(Duration::minutes(PRICE_SINK_ROLL_MINS)) + .roll_time(Duration::from_secs(PRICE_SINK_ROLL_SECS)) .create() .await?;