diff --git a/Cargo.lock b/Cargo.lock index 1a4e053a7..a5aed6730 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1448,7 +1448,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#53373c6d7b6abe900a49a205593380787978bfb4" +source = "git+https://github.com/helium/proto?branch=master#c2b1dfa06e6f726ab19cd50444b2a90dd6c841e0" dependencies = [ "base64 0.21.0", "byteorder", @@ -3337,7 +3337,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#53373c6d7b6abe900a49a205593380787978bfb4" +source = "git+https://github.com/helium/proto?branch=master#c2b1dfa06e6f726ab19cd50444b2a90dd6c841e0" dependencies = [ "bytes", "prost", diff --git a/iot_verifier/migrations/14_last_witness.sql b/iot_verifier/migrations/14_last_witness.sql new file mode 100644 index 000000000..1c0e81a23 --- /dev/null +++ b/iot_verifier/migrations/14_last_witness.sql @@ -0,0 +1,8 @@ +create table last_witness ( + id bytea primary key not null, + timestamp timestamptz not null +); +-- seed last_witness with timestamps from last_beacon +insert into last_witness (id, timestamp) +select id, timestamp from last_beacon +where timestamp > now() - interval '7 day'; diff --git a/iot_verifier/src/last_beacon.rs b/iot_verifier/src/last_beacon.rs index 0a0cf3b77..e0becd293 100644 --- a/iot_verifier/src/last_beacon.rs +++ b/iot_verifier/src/last_beacon.rs @@ -1,5 +1,4 @@ use chrono::{DateTime, Utc}; -use file_store::traits::TimestampDecode; use serde::{Deserialize, Serialize}; #[derive(sqlx::FromRow, Deserialize, Serialize, Debug)] @@ -9,20 +8,8 @@ pub struct LastBeacon { pub timestamp: DateTime, } -#[derive(thiserror::Error, Debug)] -pub enum LastBeaconError { - #[error("database error: {0}")] - DatabaseError(#[from] sqlx::Error), - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), -} - impl LastBeacon { - pub async fn insert_kv<'c, E>( - executor: E, - id: &[u8], - val: &str, - ) -> Result + pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { @@ -39,7 +26,7 @@ impl LastBeacon { .await?) } - pub async fn get<'c, E>(executor: E, id: &[u8]) -> Result, LastBeaconError> + pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { @@ -52,26 +39,28 @@ impl LastBeacon { } pub async fn get_all_since<'c, E>( - deadline: DateTime, executor: E, - ) -> Result, sqlx::Error> + timestamp: DateTime, + ) -> anyhow::Result> where E: sqlx::Executor<'c, Database = sqlx::Postgres> + 'c, { - sqlx::query_as::<_, Self>(r#" select * from last_beacon where timestamp >= $1; "#) - .bind(deadline) - .fetch_all(executor) - .await + Ok( + sqlx::query_as::<_, Self>(r#" select * from last_beacon where timestamp >= $1; "#) + .bind(timestamp) + .fetch_all(executor) + .await?, + ) } pub async fn last_timestamp<'c, E>( executor: E, id: &[u8], - ) -> Result>, LastBeaconError> + ) -> anyhow::Result>> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let height = sqlx::query_scalar::<_, String>( + let height = sqlx::query_scalar( r#" select timestamp from last_beacon where id = $1 @@ -79,12 +68,7 @@ impl LastBeacon { ) .bind(id) .fetch_optional(executor) - .await? - .and_then(|v| { - v.parse::() - .map_or_else(|_| None, |secs| Some(secs.to_timestamp())) - }) - .transpose()?; + .await?; Ok(height) } @@ -92,7 +76,7 @@ impl LastBeacon { executor: E, id: &[u8], timestamp: DateTime, - ) -> Result<(), LastBeaconError> + ) -> anyhow::Result<()> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { diff --git a/iot_verifier/src/last_witness.rs b/iot_verifier/src/last_witness.rs new file mode 100644 index 000000000..325bd3b54 --- /dev/null +++ b/iot_verifier/src/last_witness.rs @@ -0,0 +1,104 @@ +use chrono::{DateTime, Utc}; + +use helium_crypto::PublicKeyBinary; +use serde::{Deserialize, Serialize}; + +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)] +#[sqlx(type_name = "last_witness")] +pub struct LastWitness { + pub id: Vec, + pub timestamp: DateTime, +} + +impl LastWitness { + pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + Ok(sqlx::query_as::<_, Self>( + r#" insert into last_witness ( id, timestamp ) + values ($1, $2) + on conflict (key) do nothing + returning *; + "#, + ) + .bind(id) + .bind(val) + .fetch_one(executor) + .await?) + } + + pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + Ok( + sqlx::query_as::<_, LastWitness>(r#" select * from last_witness where id = $1;"#) + .bind(id) + .fetch_optional(executor) + .await?, + ) + } + + pub async fn last_timestamp<'c, E>( + executor: E, + id: &[u8], + ) -> anyhow::Result>> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let height = sqlx::query_scalar( + r#" + select timestamp from last_witness + where id = $1 + "#, + ) + .bind(id) + .fetch_optional(executor) + .await?; + Ok(height) + } + + pub async fn update_last_timestamp<'c, E>( + executor: E, + id: &[u8], + timestamp: DateTime, + ) -> anyhow::Result<()> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let _ = sqlx::query( + r#" + insert into last_witness (id, timestamp) + values ($1, $2) + on conflict (id) do update set + timestamp = EXCLUDED.timestamp + "#, + ) + .bind(id) + .bind(timestamp) + .execute(executor) + .await?; + Ok(()) + } + + pub async fn bulk_update_last_timestamps( + db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy, + ids: Vec<(PublicKeyBinary, DateTime)>, + ) -> anyhow::Result<()> { + const NUMBER_OF_FIELDS_IN_QUERY: u16 = 2; + const MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; + let mut txn = db.begin().await?; + for updates in ids.chunks(MAX_BATCH_ENTRIES) { + let mut query_builder: sqlx::QueryBuilder = + sqlx::QueryBuilder::new(" insert into last_witness (id, timestamp) "); + query_builder.push_values(updates, |mut builder, (id, ts)| { + builder.push_bind(id.as_ref()).push_bind(ts); + }); + query_builder.push(" on conflict (id) do update set timestamp = EXCLUDED.timestamp "); + query_builder.build().execute(&mut *txn).await?; + } + txn.commit().await?; + Ok(()) + } +} diff --git a/iot_verifier/src/lib.rs b/iot_verifier/src/lib.rs index 136145fd6..0dc3b7bb8 100644 --- a/iot_verifier/src/lib.rs +++ b/iot_verifier/src/lib.rs @@ -4,6 +4,7 @@ pub mod gateway_cache; pub mod gateway_updater; pub mod hex_density; pub mod last_beacon; +pub mod last_witness; pub mod loader; pub mod meta; pub mod packet_loader; diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index 2bee5171f..c5e0ad56f 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -1,10 +1,7 @@ use crate::{ - entropy::ENTROPY_LIFESPAN, - gateway_cache::GatewayCache, - gateway_cache::GatewayCacheError, - hex_density::HexDensityMap, - last_beacon::{LastBeacon, LastBeaconError}, - region_cache::{RegionCache, RegionCacheError}, + entropy::ENTROPY_LIFESPAN, gateway_cache::GatewayCache, gateway_cache::GatewayCacheError, + hex_density::HexDensityMap, last_beacon::LastBeacon, last_witness::LastWitness, + region_cache::RegionCache, }; use beacon; use chrono::{DateTime, Duration, DurationRound, Utc}; @@ -29,7 +26,7 @@ use iot_config::{ use lazy_static::lazy_static; use rust_decimal::Decimal; use sqlx::PgPool; -use std::{convert::Infallible, f64::consts::PI}; +use std::f64::consts::PI; pub type GenericVerifyResult = std::result::Result; @@ -56,6 +53,8 @@ lazy_static! { static ref MAX_WITNESS_LAG: Duration = Duration::milliseconds(1500); /// max permitted lag between the beaconer and a witness static ref MAX_BEACON_TO_WITNESS_LAG: Duration = Duration::milliseconds(4000); + /// the duration in which a beaconer or witness must have a valid opposite report from + static ref RECIPROCITY_WINDOW: Duration = Duration::hours(48); } #[derive(Debug, PartialEq)] @@ -65,6 +64,8 @@ pub struct InvalidResponse { } pub struct Poc { + pool: PgPool, + beacon_interval: Duration, beacon_report: IotBeaconIngestReport, witness_reports: Vec, entropy_start: DateTime, @@ -80,27 +81,16 @@ pub struct VerifyBeaconResult { pub hex_scale: Option, } +#[derive(Clone, Debug)] pub struct VerifyWitnessesResult { pub verified_witnesses: Vec, pub failed_witnesses: Vec, } -#[derive(thiserror::Error, Debug)] -pub enum VerificationError { - #[error("not found: {0}")] - NotFound(&'static str), - #[error("last beacon error: {0}")] - LastBeaconError(#[from] LastBeaconError), - #[error("calc distance error: {0}")] - CalcDistanceError(#[from] CalcDistanceError), - #[error("error querying gateway info from iot config service")] - GatewayCache(#[from] GatewayCacheError), - #[error("error querying region info from iot config service")] - RegionCache(#[from] RegionCacheError), -} - impl Poc { pub async fn new( + pool: PgPool, + beacon_interval: Duration, beacon_report: IotBeaconIngestReport, witness_reports: Vec, entropy_start: DateTime, @@ -108,6 +98,8 @@ impl Poc { ) -> Self { let entropy_end = entropy_start + Duration::seconds(ENTROPY_LIFESPAN); Self { + pool, + beacon_interval, beacon_report, witness_reports, entropy_start, @@ -116,16 +108,13 @@ impl Poc { } } - #[allow(clippy::too_many_arguments)] pub async fn verify_beacon( &mut self, hex_density_map: &HexDensityMap, gateway_cache: &GatewayCache, region_cache: &RegionCache, - pool: &PgPool, - beacon_interval: Duration, deny_list: &DenyList, - ) -> Result> + ) -> anyhow::Result where G: Gateways, { @@ -154,10 +143,10 @@ impl Poc { .await { Ok(res) => res, - Err(err) => return Err(VerificationError::RegionCache(err)), + Err(err) => return Err(anyhow::Error::from(err)), }; // we have beaconer info, proceed to verifications - let last_beacon = LastBeacon::get(pool, beaconer_pub_key.as_ref()).await?; + let last_beacon = LastBeacon::get(&self.pool, beaconer_pub_key.as_ref()).await?; match do_beacon_verifications( deny_list, self.entropy_start, @@ -167,14 +156,32 @@ impl Poc { &self.beacon_report, &beaconer_info, &beaconer_region_info.region_params, - beacon_interval, + self.beacon_interval, ) { Ok(()) => { let tx_scale = hex_density_map .get(beaconer_metadata.location) .await .unwrap_or(*DEFAULT_TX_SCALE); - Ok(VerifyBeaconResult::valid(beaconer_info, tx_scale)) + // update 'last beacon' timestamp if the beacon has passed regular validations + LastBeacon::update_last_timestamp( + &self.pool, + beaconer_pub_key.as_ref(), + self.beacon_report.received_timestamp, + ) + .await?; + // post regular validations, check for beacon reciprocity + // if this check fails we will invalidate the beacon + // even tho it has passed all regular validations + if !self.verify_beacon_reciprocity().await? { + Ok(VerifyBeaconResult::invalid( + InvalidReason::GatewayNoValidWitnesses, + None, + beaconer_info, + )) + } else { + Ok(VerifyBeaconResult::valid(beaconer_info, tx_scale)) + } } Err(invalid_response) => Ok(VerifyBeaconResult::invalid( invalid_response.reason, @@ -190,11 +197,13 @@ impl Poc { hex_density_map: &HexDensityMap, gateway_cache: &GatewayCache, deny_list: &DenyList, - ) -> Result> { + ) -> anyhow::Result { + let mut witnesses_to_update: Vec<(PublicKeyBinary, DateTime)> = Vec::new(); let mut verified_witnesses: Vec = Vec::new(); let mut failed_witnesses: Vec = Vec::new(); let mut existing_gateways: Vec = Vec::new(); let witnesses = self.witness_reports.clone(); + if !witnesses.is_empty() { let witness_earliest_received_ts = witnesses[0].received_timestamp; for witness_report in witnesses { @@ -214,10 +223,30 @@ impl Poc { ) .await { - Ok(verified_witness) => { + Ok(mut verified_witness) => { // track which gateways we have saw a witness report from existing_gateways.push(verified_witness.report.pub_key.clone()); - verified_witnesses.push(verified_witness) + if verified_witness.status == VerificationStatus::Valid { + // add to list of witness to update last timestamp off + witnesses_to_update.push(( + witness_report.report.pub_key.clone(), + verified_witness.received_timestamp, + )); + // post regular validations, check for witness reciprocity + // if this check fails we will invalidate the witness + // even tho it has passed all regular validations + if !self + .verify_witness_reciprocity(&witness_report.report.pub_key) + .await? + { + verified_witness.status = VerificationStatus::Invalid; + verified_witness.invalid_reason = + InvalidReason::GatewayNoValidBeacons; + verified_witness.participant_side = + InvalidParticipantSide::Witness; + } + }; + verified_witnesses.push(verified_witness); } Err(_) => failed_witnesses.push(witness_report), } @@ -238,6 +267,12 @@ impl Poc { } } } + + // update the last witness timestamp for any witness which has successfully passed regular validations + if !witnesses_to_update.is_empty() { + LastWitness::bulk_update_last_timestamps(&self.pool, witnesses_to_update).await? + }; + let resp = VerifyWitnessesResult { verified_witnesses, failed_witnesses, @@ -253,7 +288,7 @@ impl Poc { gateway_cache: &GatewayCache, hex_density_map: &HexDensityMap, witness_first_ts: DateTime, - ) -> Result> { + ) -> anyhow::Result { let witness = &witness_report.report; let witness_pub_key = witness.pub_key.clone(); // pull the witness info from our follower @@ -342,6 +377,28 @@ impl Poc { )), } } + + async fn verify_beacon_reciprocity(&self) -> anyhow::Result { + let last_witness = + LastWitness::get(&self.pool, self.beacon_report.report.pub_key.as_ref()).await?; + if let Some(last_witness) = last_witness { + if self.beacon_report.received_timestamp - last_witness.timestamp < *RECIPROCITY_WINDOW + { + return Ok(true); + } + }; + Ok(false) + } + + async fn verify_witness_reciprocity(&self, pubkey: &PublicKeyBinary) -> anyhow::Result { + let last_beacon = LastBeacon::get(&self.pool, pubkey.as_ref()).await?; + if let Some(last_beacon) = last_beacon { + if self.beacon_report.received_timestamp - last_beacon.timestamp < *RECIPROCITY_WINDOW { + return Ok(true); + } + }; + Ok(false) + } } #[allow(clippy::too_many_arguments)] diff --git a/iot_verifier/src/poc_report.rs b/iot_verifier/src/poc_report.rs index 0a228d753..9235ee251 100644 --- a/iot_verifier/src/poc_report.rs +++ b/iot_verifier/src/poc_report.rs @@ -1,6 +1,7 @@ use crate::entropy::ENTROPY_LIFESPAN; use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Serialize}; +use sqlx::{Postgres, Transaction}; const REPORT_INSERT_SQL: &str = "insert into poc_report ( id, @@ -102,13 +103,10 @@ impl Report { Ok(()) } - pub async fn bulk_insert<'c, E>( - executor: E, + pub async fn bulk_insert( + txn: &mut Transaction<'_, Postgres>, bindings: Vec, - ) -> Result<(), ReportError> - where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, - { + ) -> Result<(), ReportError> { let mut query_builder: sqlx::QueryBuilder = sqlx::QueryBuilder::new(REPORT_INSERT_SQL); query_builder.push_values(bindings, |mut b, insert| { @@ -124,7 +122,7 @@ impl Report { query_builder.push(" on conflict (id) do nothing "); let query = query_builder.build(); query - .execute(executor) + .execute(&mut *txn) .await .map(|_| ()) .map_err(ReportError::from) diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 48252de6a..d4ac65e5f 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -1,7 +1,6 @@ use crate::{ gateway_cache::GatewayCache, hex_density::HexDensityMap, - last_beacon::LastBeacon, poc::{Poc, VerifyBeaconResult}, poc_report::Report, region_cache::RegionCache, @@ -222,6 +221,7 @@ where } async fn handle_beacon_report(&self, db_beacon: Report) -> anyhow::Result<()> { + // TODO: look at wrapping all db access from this point onwards in a transaction let entropy_start_time = match db_beacon.timestamp { Some(v) => v, None => return Ok(()), @@ -252,6 +252,8 @@ where // create the struct defining this POC let mut poc = Poc::new( + self.pool.clone(), + self.beacon_interval, beacon_report.clone(), witnesses.clone(), entropy_start_time, @@ -265,8 +267,6 @@ where &self.hex_density_map, &self.gateway_cache, &self.region_cache, - &self.pool, - self.beacon_interval, &self.deny_list, ) .await?; @@ -282,6 +282,7 @@ where &self.deny_list, ) .await?; + // check if there are any failed witnesses // if so update the DB attempts count // and halt here, let things be reprocessed next tick @@ -464,7 +465,6 @@ where unselected_witnesses: Vec, ) -> anyhow::Result<()> { let received_timestamp = valid_beacon_report.received_timestamp; - let pub_key = valid_beacon_report.report.pub_key.clone(); let beacon_id = valid_beacon_report.report.report_id(received_timestamp); let packet_data = valid_beacon_report.report.data.clone(); let beacon_report_id = valid_beacon_report.report.report_id(received_timestamp); @@ -500,8 +500,7 @@ where // but could nae get it to get a way past the lack of COPY fire_invalid_witness_metric(&selected_witnesses); fire_invalid_witness_metric(&unselected_witnesses); - // update timestamp of last beacon for the beaconer - LastBeacon::update_last_timestamp(&self.pool, pub_key.as_ref(), received_timestamp).await?; + Report::delete_poc(&self.pool, &packet_data).await?; telemetry::decrement_num_beacons(); Ok(()) diff --git a/iot_verifier/src/tx_scaler.rs b/iot_verifier/src/tx_scaler.rs index 8d382c217..3b8a81257 100644 --- a/iot_verifier/src/tx_scaler.rs +++ b/iot_verifier/src/tx_scaler.rs @@ -75,10 +75,7 @@ impl Server { let refresh_start = Utc::now() - self.refresh_offset; tracing::info!("density_scaler: generating hex scaling map, starting at {refresh_start:?}"); let mut global_map = GlobalHexMap::new(); - let active_gateways = self - .gateways_recent_activity(refresh_start) - .await - .map_err(sqlx::Error::from)?; + let active_gateways = self.gateways_recent_activity(refresh_start).await?; for k in active_gateways.keys() { let pubkey = PublicKeyBinary::from(k.clone()); if let Some(gateway_info) = self.gateway_cache_receiver.borrow().get(&pubkey) { @@ -104,10 +101,10 @@ impl Server { async fn gateways_recent_activity( &self, now: DateTime, - ) -> Result, DateTime>, sqlx::Error> { + ) -> anyhow::Result, DateTime>> { let interactivity_deadline = now - Duration::minutes(HIP_17_INTERACTIVITY_LIMIT); Ok( - LastBeacon::get_all_since(interactivity_deadline, &self.pool) + LastBeacon::get_all_since(&self.pool, interactivity_deadline) .await? .into_iter() .map(|beacon| (beacon.id, beacon.timestamp)) diff --git a/iot_verifier/tests/common/mod.rs b/iot_verifier/tests/common/mod.rs index 1e4558475..d461ed94b 100644 --- a/iot_verifier/tests/common/mod.rs +++ b/iot_verifier/tests/common/mod.rs @@ -21,11 +21,13 @@ use iot_config::{ }; use iot_verifier::{ entropy::Entropy, + last_beacon::LastBeacon, + last_witness::LastWitness, poc_report::{InsertBindings, IotStatus, Report, ReportType}, }; use prost::Message; -use sqlx::PgPool; +use sqlx::{PgPool, Postgres, Transaction}; use std::{self, ops::DerefMut, str::FromStr}; use tokio::{sync::mpsc::error::TryRecvError, sync::Mutex, time::timeout}; @@ -303,6 +305,24 @@ pub async fn inject_entropy_report(pool: PgPool, ts: DateTime) -> anyhow::R Ok(()) } +#[allow(dead_code)] +pub async fn inject_last_beacon( + txn: &mut Transaction<'_, Postgres>, + gateway: PublicKeyBinary, + ts: DateTime, +) -> anyhow::Result<()> { + LastBeacon::update_last_timestamp(&mut *txn, gateway.as_ref(), ts).await +} + +#[allow(dead_code)] +pub async fn inject_last_witness( + txn: &mut Transaction<'_, Postgres>, + gateway: PublicKeyBinary, + ts: DateTime, +) -> anyhow::Result<()> { + LastWitness::update_last_timestamp(&mut *txn, gateway.as_ref(), ts).await +} + #[allow(dead_code)] pub fn valid_gateway() -> GatewayInfo { GatewayInfo { @@ -355,6 +375,16 @@ pub fn valid_gateway_stream() -> Vec { }), is_full_hotspot: true, }, + GatewayInfo { + address: PublicKeyBinary::from_str(BEACONER5).unwrap(), + metadata: Some(GatewayMetadata { + location: 627111975465463807, + elevation: 0, + gain: 20, + region: ProtoRegion::Eu868, + }), + is_full_hotspot: true, + }, GatewayInfo { address: PublicKeyBinary::from_str(WITNESS1).unwrap(), metadata: Some(GatewayMetadata { @@ -365,6 +395,16 @@ pub fn valid_gateway_stream() -> Vec { }), is_full_hotspot: true, }, + GatewayInfo { + address: PublicKeyBinary::from_str(WITNESS2).unwrap(), + metadata: Some(GatewayMetadata { + location: 631615575095659519, + elevation: 0, + gain: 20, + region: ProtoRegion::Eu868, + }), + is_full_hotspot: true, + }, GatewayInfo { address: PublicKeyBinary::from_str(NO_METADATA_GATEWAY1).unwrap(), metadata: None, @@ -389,8 +429,10 @@ pub const BEACONER1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT pub const BEACONER2: &str = "11z69eJ3czc92k6snrfR9ek7g2uRWXosFbnG9v4bXgwhfUCivUo"; pub const BEACONER3: &str = "1ZPNnNd9k5qiQXXigKifQpCPiy5HTbszQDSyLM56ywk7ihNRvt6"; pub const BEACONER4: &str = "1ZAxCrEsigGVbLUM37Jki6p88kyZ5NVqjVC6oHSbqu49t7bQDym"; +pub const BEACONER5: &str = "112BwpY6ARmnMsPZE9iBauh6EJVDvH7MimZtvWnd99nXmmGcKeMD"; pub const WITNESS1: &str = "13ABbtvMrRK8jgYrT3h6Y9Zu44nS6829kzsamiQn9Eefeu3VAZs"; +pub const WITNESS2: &str = "112e5E4NCpZ88ivqoXeyWwiVCC4mJFv4kMPowycNMXjoDRSP6ZnS"; #[allow(dead_code)] pub const UNKNOWN_GATEWAY1: &str = "1YiZUsuCwxE7xyxjke1ogehv5WSuYZ9o7uM2ZKvRpytyqb8Be63"; pub const NO_METADATA_GATEWAY1: &str = "1YpopKVbRDELWGR3nMd1MAU8a5GxP1uQSDj9AeXHEi3fHSsWGRi"; diff --git a/iot_verifier/tests/runner_tests.rs b/iot_verifier/tests/runner_tests.rs index 1de1598b5..937e19d38 100644 --- a/iot_verifier/tests/runner_tests.rs +++ b/iot_verifier/tests/runner_tests.rs @@ -7,6 +7,7 @@ use futures_util::{stream, StreamExt as FuturesStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora::{ InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, LoraWitnessReportReqV1, + VerificationStatus, }; use helium_proto::Region as ProtoRegion; use iot_config::{ @@ -17,9 +18,13 @@ use iot_verifier::{ gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, poc_report::Report, region_cache::RegionCache, runner::Runner, tx_scaler::Server as DensityScaler, }; +use lazy_static::lazy_static; use sqlx::PgPool; use std::{self, str::FromStr, time::Duration}; +lazy_static! { + static ref BEACON_INTERVAL: ChronoDuration = ChronoDuration::seconds(21600); +} #[derive(Debug, Clone)] pub struct MockIotConfigClient { resolve_gateway: GatewayInfo, @@ -60,7 +65,7 @@ struct TestContext { } impl TestContext { - async fn setup(pool: PgPool) -> anyhow::Result { + async fn setup(pool: PgPool, beacon_interval: ChronoDuration) -> anyhow::Result { // setup file sinks let (invalid_beacon_client, invalid_beacons) = common::create_file_sink(); let (invalid_witness_client, invalid_witnesses) = common::create_file_sink(); @@ -88,7 +93,7 @@ impl TestContext { // create the runner let runner = Runner { pool: pool.clone(), - beacon_interval: ChronoDuration::seconds(21600), + beacon_interval, max_witnesses_per_poc: 16, beacon_max_retries: 2, witness_max_retries: 2, @@ -125,21 +130,53 @@ impl TestContext { #[sqlx::test] async fn valid_beacon_and_witness(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; // test with a valid beacon and a valid witness - // let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); let witness_to_inject = common::create_valid_witness_report(common::WITNESS1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB + // avoid the reports declared invalid due to reciprocity check + // when setting the last time consider the beacon interval setup + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_beacon( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + ctx.runner.handle_db_tick().await?; let valid_poc = ctx.valid_pocs.receive_valid_poc().await; assert_eq!(1, valid_poc.selected_witnesses.len()); assert_eq!(0, valid_poc.unselected_witnesses.len()); let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); - let valid_witness = valid_poc.selected_witnesses[0].report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); // assert the pubkeys in the outputted reports // match those which we injected assert_eq!( @@ -160,17 +197,54 @@ async fn valid_beacon_and_witness(pool: PgPool) -> anyhow::Result<()> { valid_witness, LoraWitnessReportReqV1::from(witness_to_inject.clone()) ); - + // assert the witness reports status + assert_eq!( + valid_witness_report.status, + VerificationStatus::Valid as i32 + ); Ok(()) } #[sqlx::test] async fn valid_beacon_and_no_witness(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; // test with a valid beacon and no witnesses let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + + // inject last beacons and witness reports into the DB + // avoid the reports declared invalid due to reciprocity check + // when setting the last time consider the beacon interval setup + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + ctx.runner.handle_db_tick().await?; let valid_poc = ctx.valid_pocs.receive_valid_poc().await; @@ -193,8 +267,9 @@ async fn valid_beacon_and_no_witness(pool: PgPool) -> anyhow::Result<()> { } #[sqlx::test] -async fn invalid_beacon_gateway_not_found(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; +async fn valid_beacon_gateway_not_found(pool: PgPool) -> anyhow::Result<()> { + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; // // test with a valid beacon and an invalid witness // witness is invalid due to not found in iot config @@ -204,6 +279,25 @@ async fn invalid_beacon_gateway_not_found(pool: PgPool) -> anyhow::Result<()> { common::create_valid_witness_report(common::UNKNOWN_GATEWAY1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB + // avoid the reports declared invalid due to reciprocity check + // when setting the last time consider the beacon interval setup + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + ctx.runner.handle_db_tick().await?; let valid_poc = ctx.valid_pocs.receive_valid_poc().await; @@ -241,13 +335,18 @@ async fn invalid_beacon_gateway_not_found(pool: PgPool) -> anyhow::Result<()> { invalid_witness, LoraWitnessReportReqV1::from(witness_to_inject.clone()) ); - + // assert the witness reports status + assert_eq!( + invalid_witness_report.status, + VerificationStatus::Invalid as i32 + ); Ok(()) } #[sqlx::test] async fn invalid_witness_no_metadata(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; // // test with a valid beacon and an invalid witness // witness is invalid due no metadata in iot config @@ -257,6 +356,34 @@ async fn invalid_witness_no_metadata(pool: PgPool) -> anyhow::Result<()> { common::create_valid_witness_report(common::NO_METADATA_GATEWAY1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_beacon( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + ctx.runner.handle_db_tick().await?; let valid_poc = ctx.valid_pocs.receive_valid_poc().await; @@ -275,6 +402,11 @@ async fn invalid_witness_no_metadata(pool: PgPool) -> anyhow::Result<()> { PublicKeyBinary::from(invalid_witness_report.report.unwrap().pub_key.clone()), PublicKeyBinary::from_str(common::NO_METADATA_GATEWAY1).unwrap() ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Invalid as i32, + invalid_witness_report.status + ); // assert the invalid details assert_eq!( InvalidReason::NotAsserted as i32, @@ -300,7 +432,8 @@ async fn invalid_witness_no_metadata(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn invalid_beacon_no_gateway_found(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = Utc::now(); // // test with an invalid beacon & 1 witness // beacon is invalid as GW is unknown @@ -310,12 +443,28 @@ async fn invalid_beacon_no_gateway_found(pool: PgPool) -> anyhow::Result<()> { let witness_to_inject = common::create_valid_witness_report(common::WITNESS1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - ChronoDuration::hours(1), + ) + .await?; + common::inject_last_witness( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - ChronoDuration::hours(1), + ) + .await?; + txn.commit().await?; + ctx.runner.handle_db_tick().await?; let invalid_beacon_report = ctx.invalid_beacons.receive_invalid_beacon().await; let invalid_witness_report = ctx.invalid_witnesses.receive_invalid_witness().await; let invalid_beacon = invalid_beacon_report.report.clone().unwrap(); - let invalid_witness = invalid_witness_report.report.clone().unwrap(); + let invalid_witness = invalid_witness_report.clone().report.unwrap(); // assert the pubkeys in the outputted reports // match those which we injected assert_eq!( @@ -355,7 +504,7 @@ async fn invalid_beacon_no_gateway_found(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn invalid_beacon_gateway_not_found_no_witnesses(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; // // test with an invalid beacon, no witnesses // beacon is invalid as GW is unknown @@ -390,7 +539,7 @@ async fn invalid_beacon_gateway_not_found_no_witnesses(pool: PgPool) -> anyhow:: #[sqlx::test] async fn invalid_beacon_bad_payload(pool: PgPool) -> anyhow::Result<()> { - let mut ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; // // test with an invalid beacon, no witnesses // the beacon will have an invalid payload, resulting in an error @@ -419,3 +568,467 @@ async fn invalid_beacon_bad_payload(pool: PgPool) -> anyhow::Result<()> { Ok(()) } + +#[sqlx::test] +async fn valid_beacon_and_witness_no_beacon_reciprocity(pool: PgPool) -> anyhow::Result<()> { + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; + + // test with a valid beacon and a valid witness + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::WITNESS1, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB + // we will only insert these for the witness + // the beaconer will fail the reciprocity check + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let invalid_beacon = ctx.invalid_beacons.receive_invalid_beacon().await; + let invalid_witness = ctx.invalid_witnesses.receive_invalid_witness().await; + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(invalid_beacon.report.as_ref().unwrap().pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(invalid_witness.report.as_ref().unwrap().pub_key.clone()), + PublicKeyBinary::from_str(common::WITNESS1).unwrap() + ); + // assert the invalid details + assert_eq!( + InvalidReason::GatewayNoValidWitnesses as i32, + invalid_beacon.reason + ); + assert_eq!( + InvalidReason::GatewayNoValidWitnesses as i32, + invalid_witness.reason + ); + assert_eq!( + InvalidParticipantSide::Beaconer as i32, + invalid_witness.participant_side + ); + // assert the beacon and witness reports outputted to filestore + // are unmodified from those submitted + assert_eq!( + invalid_beacon.report.unwrap(), + LoraBeaconReportReqV1::from(beacon_to_inject.clone()) + ); + assert_eq!( + invalid_witness.report.unwrap(), + LoraWitnessReportReqV1::from(witness_to_inject.clone()) + ); + Ok(()) +} + +#[sqlx::test] +async fn valid_beacon_and_witness_no_witness_reciprocity(pool: PgPool) -> anyhow::Result<()> { + let mut ctx = TestContext::setup(pool.clone(), *BEACON_INTERVAL).await?; + let now = ctx.entropy_ts; + + // test with a valid beacon and a valid witness + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::WITNESS1, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB + // we will only insert these for the witness + // the beaconer will fail the reciprocity check + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (*BEACON_INTERVAL + ChronoDuration::hours(2)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(0, valid_poc.selected_witnesses.len()); + assert_eq!(1, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let invalid_witness_report = valid_poc.unselected_witnesses[0].clone(); + let invalid_witness = invalid_witness_report.report.clone().unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(invalid_witness_report.report.unwrap().pub_key.clone()), + PublicKeyBinary::from_str(common::WITNESS1).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Invalid as i32, + invalid_witness_report.status + ); + // assert the invalid details + assert_eq!( + InvalidReason::GatewayNoValidBeacons as i32, + invalid_witness_report.invalid_reason + ); + assert_eq!( + InvalidParticipantSide::Witness as i32, + invalid_witness_report.participant_side + ); + // assert the beacon and witness reports outputted to filestore + // are unmodified from those submitted + assert_eq!( + valid_beacon, + LoraBeaconReportReqV1::from(beacon_to_inject.clone()) + ); + assert_eq!( + invalid_witness, + LoraWitnessReportReqV1::from(witness_to_inject.clone()) + ); + Ok(()) +} + +#[sqlx::test] +async fn valid_new_gateway_witness_first_reciprocity(pool: PgPool) -> anyhow::Result<()> { + let test_beacon_interval = ChronoDuration::seconds(5); + let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; + let now = ctx.entropy_ts; + + // simulate a new gateway coming online or a gateway coming online after an extended period offline + // the gateway uses beaconer5 pubkey + // the gateways first activity will be to submit a witness report for a beacon it sees + // this will fail the reciprocity check as there will be no previous/recent beacon from this gateway + // whilst the witness will fail reciprocity, its last valid witness timestamp will be updated + // as all regular ( ie non reciprocity check ) validations passed + // the gateway will then subsequently beacon which will pass ( reciprocity check is ok as previously witnessed ) + // it will then witness again and this time it will pass ( reciprocity check is ok as previously beaconed ) + + // + // step 1 - generate a witness from beaconer5, + // this witness will be valid but will fail reciprocity check as no previous beacon + // last witness timestamp will be updated + // + + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::BEACONER5, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB for beaconer 1 + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(0, valid_poc.selected_witnesses.len()); + assert_eq!(1, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let invalid_witness_report = valid_poc.unselected_witnesses[0].clone(); + let invalid_witness = invalid_witness_report.report.unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(invalid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Invalid as i32, + invalid_witness_report.status + ); + // assert the invalid details + assert_eq!( + InvalidReason::GatewayNoValidBeacons as i32, + invalid_witness_report.invalid_reason + ); + assert_eq!( + InvalidParticipantSide::Witness as i32, + invalid_witness_report.participant_side + ); + + // + // step 2 + // generate a beacon from beaconer5 + // as it will now have a previous valid witness, the beacon will pass the reciprocity check + // the gateway will then have a valid 'last beacon' and 'last witness' timestamp + // + + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER5, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::BEACONER1, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(1, valid_poc.selected_witnesses.len()); + assert_eq!(0, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(valid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Valid as i32, + valid_witness_report.status + ); + // + // step 3 + // generate a witness from beaconer5 + // as the gateway will have both a valid 'last beacon' and 'last witness' timestamp + // the reciprocity check will pass + // + + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER2, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::BEACONER5, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB for beaconer 1 + // avoid the reports declared invalid due to reciprocity check + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(1, valid_poc.selected_witnesses.len()); + assert_eq!(0, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER2).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(valid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Valid as i32, + valid_witness_report.status + ); + Ok(()) +} + +#[sqlx::test] +async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Result<()> { + let test_beacon_interval = ChronoDuration::seconds(5); + let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; + let now = ctx.entropy_ts; + + // simulate a new gateway coming online or a gateway coming online after an extended period offline + // the gateway uses beaconer5 pubkey + // the gateways first activity will be to submit a beacon report + // this will fail the reciprocity check due to no previous/recent witness from this gateway + // whilst the beacon will fail reciprocity, its last valid beacon timestamp will be updated + // as all regular ( ie non reciprocity check ) validations passed + // the gateway will then subsequently witness which will pass ( reciprocity check is ok as previously beacon ) + // it will then beacon again and this time it will pass ( reciprocity check is ok as previously witness ) + + // + // step 1 - generate a beacon from beaconer5, + // this beacon will be valid but will fail reciprocity check as no previous witness + // last beacon timestamp will be updated + // + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER5, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + + ctx.runner.handle_db_tick().await?; + + let invalid_beacon = ctx.invalid_beacons.receive_invalid_beacon().await; + let invalid_beacon_report = invalid_beacon.report.clone().unwrap(); + println!("{:?}", invalid_beacon); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(invalid_beacon_report.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + // assert the invalid details + assert_eq!( + InvalidReason::GatewayNoValidWitnesses as i32, + invalid_beacon.reason + ); + + // + // step 2 + // generate a witness from beaconer5 + // as it will now have a previous valid beacon, the witness will pass the reciprocity check + // the gateway will then have a valid 'last beacon' and 'last witness' timestamp + // + + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::BEACONER5, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB for beaconer 1 + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + common::inject_last_witness( + &mut txn, + beacon_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(1, valid_poc.selected_witnesses.len()); + assert_eq!(0, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(valid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Valid as i32, + valid_witness_report.status + ); + + // + // step 3 + // generate a beacon from beaconer5 + // as the gateway will now have both a valid 'last beacon' and 'last witness' timestamp + // the reciprocity check will pass + // + tokio::time::sleep(Duration::from_secs(6)).await; + let beacon_to_inject = common::create_valid_beacon_report( + common::BEACONER5, + ctx.entropy_ts + ChronoDuration::seconds(5), + ); + let witness_to_inject = common::create_valid_witness_report(common::WITNESS2, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + // inject last beacons and witness reports into the DB for witness 2 + let mut txn = pool.begin().await?; + common::inject_last_beacon( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + common::inject_last_witness( + &mut txn, + witness_to_inject.report.pub_key.clone(), + now - (test_beacon_interval + ChronoDuration::seconds(10)), + ) + .await?; + txn.commit().await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(1, valid_poc.selected_witnesses.len()); + assert_eq!(0, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); + + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER5).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(valid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::WITNESS2).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Valid as i32, + valid_witness_report.status + ); + Ok(()) +}