Skip to content

Commit

Permalink
add cache for last witness reads
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jan 16, 2024
1 parent 37847ea commit f31a120
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 10 deletions.
12 changes: 11 additions & 1 deletion iot_verifier/src/last_witness.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::{DateTime, Utc};
use futures::Stream;
use serde::{Deserialize, Serialize};

#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)]
Expand Down Expand Up @@ -31,13 +32,22 @@ impl LastWitness {
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Ok(
sqlx::query_as::<_, LastWitness>(r#" select * from last_witness where id = $1;"#)
sqlx::query_as::<_, LastWitness>(r#" select * from last_witness where id = $1"#)
.bind(id)
.fetch_optional(executor)
.await?,
)
}

pub async fn get_all<'c, E>(
executor: E,
) -> impl Stream<Item = Result<LastWitness, sqlx::Error>> + 'c
where
E: sqlx::Executor<'c, Database = sqlx::Postgres> + 'c,
{
sqlx::query_as::<_, Self>(r#" select * from last_witness "#).fetch(executor)
}

pub async fn last_timestamp<'c, E>(
executor: E,
id: &[u8],
Expand Down
32 changes: 32 additions & 0 deletions iot_verifier/src/last_witness_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::last_witness_cache_updater::MessageReceiver;
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;

#[derive(Clone)]
pub struct LastWitnessCache {
last_witness_cache_receiver: MessageReceiver,
}

impl LastWitnessCache {
pub fn new(last_witness_cache_receiver: MessageReceiver) -> Self {
Self {
last_witness_cache_receiver,
}
}

pub async fn resolve_last_witness_info(
&self,
address: &PublicKeyBinary,
) -> anyhow::Result<Option<DateTime<Utc>>> {
match self.last_witness_cache_receiver.borrow().get(address) {
Some(hit) => {
metrics::increment_counter!("last_witness_cache_hit");
Ok(Some(*hit))
}
None => {
metrics::increment_counter!("last_witness_cache_miss");
Ok(None)
}
}
}
}
93 changes: 93 additions & 0 deletions iot_verifier/src/last_witness_cache_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use crate::last_witness::LastWitness;
use chrono::{DateTime, Duration, Utc};
use futures::{future::LocalBoxFuture, stream::StreamExt, TryFutureExt};
use helium_crypto::PublicKeyBinary;

use sqlx::PgPool;
use std::collections::HashMap;
use task_manager::ManagedTask;
use tokio::sync::watch;
use tokio::time;

pub type LastWitnessMap = HashMap<PublicKeyBinary, DateTime<Utc>>;
pub type MessageSender = watch::Sender<LastWitnessMap>;
pub type MessageReceiver = watch::Receiver<LastWitnessMap>;

pub struct LastWitnessUpdater {
pool: PgPool,
refresh_interval: Duration,
sender: MessageSender,
}

impl ManagedTask for LastWitnessUpdater {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl LastWitnessUpdater {
pub async fn new(
pool: PgPool,
refresh_interval: Duration,
) -> anyhow::Result<(MessageReceiver, Self)> {
let last_witness_map = refresh_last_witnesses(&pool).await?;
let (sender, receiver) = watch::channel(last_witness_map);
Ok((
receiver,
Self {
pool,
refresh_interval,
sender,
},
))
}

pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting last_witness_updater");
let mut trigger_timer = time::interval(
self.refresh_interval
.to_std()
.expect("valid interval in seconds for last witness cache updater"),
);
loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = trigger_timer.tick() => self.handle_refresh_tick().await?,
}
}
tracing::info!("stopping last_witness_updater");
Ok(())
}

async fn handle_refresh_tick(&mut self) -> anyhow::Result<()> {
tracing::info!("handling refresh tick");
let updated_map = refresh_last_witnesses(&self.pool).await?;
let count = updated_map.len();
if count > 0 {
tracing::info!("completed refreshing last witnesses, total count: {count}");
self.sender.send(updated_map)?;
} else {
tracing::warn!("failed to refresh last witnesses, empty map...");
}
Ok(())
}
}

pub async fn refresh_last_witnesses(pool: &PgPool) -> anyhow::Result<LastWitnessMap> {
tracing::info!("refreshing last witness cache");
let mut map = LastWitnessMap::new();
let mut stream = LastWitness::get_all(pool).await;
while let Some(last_witness_info) = stream.next().await.transpose()? {
map.insert(last_witness_info.id.into(), last_witness_info.timestamp);
}
Ok(map)
}
2 changes: 2 additions & 0 deletions iot_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub mod gateway_updater;
pub mod hex_density;
pub mod last_beacon;
pub mod last_witness;
pub mod last_witness_cache;
pub mod last_witness_cache_updater;
pub mod loader;
pub mod meta;
pub mod packet_loader;
Expand Down
9 changes: 8 additions & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use file_store::{
};
use iot_config::client::Client as IotConfigClient;
use iot_verifier::{
entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader,
entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater,
last_witness_cache::LastWitnessCache, last_witness_cache_updater::LastWitnessUpdater, loader,
packet_loader, purger, rewarder::Rewarder, runner, telemetry,
tx_scaler::Server as DensityScaler, Settings,
};
Expand Down Expand Up @@ -87,6 +88,10 @@ impl Server {
.await?;
let gateway_cache = GatewayCache::new(gateway_updater_receiver.clone());

let (last_witness_updater_receiver, last_witness_updater_server) =
LastWitnessUpdater::new(pool.clone(), settings.gateway_refresh_interval()).await?;
let last_witness_cache = LastWitnessCache::new(last_witness_updater_receiver.clone());

// *
// setup the price tracker requirements
// *
Expand Down Expand Up @@ -282,6 +287,7 @@ impl Server {
iot_config_client.clone(),
pool.clone(),
gateway_cache.clone(),
last_witness_cache.clone(),
runner_invalid_beacon_sink,
runner_invalid_witness_sink,
runner_poc_sink,
Expand All @@ -302,6 +308,7 @@ impl Server {
.add_task(price_daemon)
.add_task(density_scaler)
.add_task(gateway_updater_server)
.add_task(last_witness_updater_server)
.add_task(purger)
.add_task(runner)
.add_task(entropy_loader)
Expand Down
20 changes: 14 additions & 6 deletions iot_verifier/src/poc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
entropy::ENTROPY_LIFESPAN, gateway_cache::GatewayCache, gateway_cache::GatewayCacheError,
hex_density::HexDensityMap, last_beacon::LastBeacon, last_witness::LastWitness,
region_cache::RegionCache,
last_witness_cache::LastWitnessCache, region_cache::RegionCache,
};
use beacon;
use chrono::{DateTime, Duration, DurationRound, Utc};
Expand Down Expand Up @@ -196,6 +196,7 @@ impl Poc {
beacon_info: &GatewayInfo,
hex_density_map: &HexDensityMap,
gateway_cache: &GatewayCache,
last_witness_cache: &LastWitnessCache,
deny_list: &DenyList,
) -> anyhow::Result<VerifyWitnessesResult> {
let mut verified_witnesses: Vec<IotVerifiedWitnessReport> = Vec::new();
Expand Down Expand Up @@ -236,7 +237,10 @@ impl Poc {
// if this check fails we will invalidate the witness
// even tho it has passed all regular validations
if !self
.verify_witness_reciprocity(&witness_report.report.pub_key)
.verify_witness_reciprocity(
last_witness_cache,
&witness_report.report.pub_key,
)
.await?
{
verified_witness.status = VerificationStatus::Invalid;
Expand Down Expand Up @@ -385,10 +389,14 @@ impl Poc {
Ok(false)
}

async fn verify_witness_reciprocity(&self, pubkey: &PublicKeyBinary) -> anyhow::Result<bool> {
let last_beacon = LastBeacon::get(&self.pool, pubkey.as_ref()).await?;
if let Some(last_beacon) = last_beacon {
if self.beacon_report.received_timestamp - last_beacon.timestamp < *RECIPROCITY_WINDOW {
async fn verify_witness_reciprocity(
&self,
last_witness_cache: &LastWitnessCache,
pubkey: &PublicKeyBinary,
) -> anyhow::Result<bool> {
let last_beacon_ts = last_witness_cache.resolve_last_witness_info(pubkey).await?;
if let Some(last_beacon_ts) = last_beacon_ts {
if self.beacon_report.received_timestamp - last_beacon_ts < *RECIPROCITY_WINDOW {
return Ok(true);
}
};
Expand Down
5 changes: 5 additions & 0 deletions iot_verifier/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
gateway_cache::GatewayCache,
hex_density::HexDensityMap,
last_witness_cache::LastWitnessCache,
poc::{Poc, VerifyBeaconResult},
poc_report::Report,
region_cache::RegionCache,
Expand Down Expand Up @@ -50,6 +51,7 @@ pub struct Runner<G> {
pub deny_list: DenyList,
pub gateway_cache: GatewayCache,
pub region_cache: RegionCache<G>,
pub last_witness_cache: LastWitnessCache,
pub invalid_beacon_sink: FileSinkClient,
pub invalid_witness_sink: FileSinkClient,
pub poc_sink: FileSinkClient,
Expand Down Expand Up @@ -94,6 +96,7 @@ where
gateways: G,
pool: PgPool,
gateway_cache: GatewayCache,
last_witness_cache: LastWitnessCache,
invalid_beacon_sink: FileSinkClient,
invalid_witness_sink: FileSinkClient,
poc_sink: FileSinkClient,
Expand Down Expand Up @@ -125,6 +128,7 @@ where
max_witnesses_per_poc,
gateway_cache,
region_cache,
last_witness_cache,
beacon_max_retries,
witness_max_retries,
deny_list_latest_url,
Expand Down Expand Up @@ -278,6 +282,7 @@ where
&beacon_info,
&self.hex_density_map,
&self.gateway_cache,
&self.last_witness_cache,
&self.deny_list,
)
.await?;
Expand Down
11 changes: 9 additions & 2 deletions iot_verifier/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use iot_config::{
gateway_info::{GatewayInfo, GatewayInfoStream},
};
use iot_verifier::{
gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, poc_report::Report,
region_cache::RegionCache, runner::Runner, tx_scaler::Server as DensityScaler,
gateway_cache::GatewayCache, gateway_updater::GatewayUpdater,
last_witness_cache::LastWitnessCache, last_witness_cache_updater::LastWitnessUpdater,
poc_report::Report, region_cache::RegionCache, runner::Runner,
tx_scaler::Server as DensityScaler,
};
use lazy_static::lazy_static;
use sqlx::PgPool;
Expand Down Expand Up @@ -86,6 +88,9 @@ impl TestContext {
let (gateway_updater_receiver, _gateway_updater_server) =
GatewayUpdater::new(refresh_interval, iot_config_client.clone()).await?;
let gateway_cache = GatewayCache::new(gateway_updater_receiver.clone());
let (last_witness_updater_receiver, _last_witness_updater_server) =
LastWitnessUpdater::new(pool.clone(), refresh_interval).await?;
let last_witness_cache = LastWitnessCache::new(last_witness_updater_receiver.clone());
let density_scaler =
DensityScaler::new(refresh_interval, pool.clone(), gateway_updater_receiver).await?;
let region_cache = RegionCache::new(Duration::from_secs(60), iot_config_client.clone())?;
Expand All @@ -103,6 +108,7 @@ impl TestContext {
deny_list,
gateway_cache: gateway_cache.clone(),
region_cache,
last_witness_cache: last_witness_cache.clone(),
invalid_beacon_sink: invalid_beacon_client,
invalid_witness_sink: invalid_witness_client,
poc_sink: valid_poc_client,
Expand Down Expand Up @@ -921,6 +927,7 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res
now - (test_beacon_interval + ChronoDuration::seconds(10)),
)
.await?;

common::inject_last_witness(
pool.clone(),
beacon_to_inject.report.pub_key.clone(),
Expand Down

0 comments on commit f31a120

Please sign in to comment.