Skip to content

Commit

Permalink
mobile verifier support for invalidated threshold reports (#754)
Browse files Browse the repository at this point in the history
* ingestor support for hotspot thresholds
add cbsd_id support
refactor for proto msg renames

* bump proto

* bump proto

* mobile verifier support for hotspot thresholds

recut branch & add support for cbsd_id in hotspot thresholds

handle grandfathering

* bump proto

* tweaks

* review tweaks/fixes

* Update mobile_verifier/src/reward_shares.rs

Co-authored-by: Matthew Plant <[email protected]>

* fix it

* ingestor and filestore support for invalidated threshold reports

* handle invalidated threshold reports

* tweaks

* rebase artifacts

* tweaks

---------

Co-authored-by: Matthew Plant <[email protected]>
Co-authored-by: Matthew Plant <[email protected]>
  • Loading branch information
3 people authored Mar 7, 2024
1 parent 3694c31 commit 6d46823
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 5 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions mobile_verifier/migrations/27_subscriber_radio_threshold.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ CREATE TABLE IF NOT EXISTS radio_threshold (
bytes_threshold BIGINT NOT NULL,
subscriber_threshold INT NOT NULL,
threshold_timestamp TIMESTAMPTZ NOT NULL,
threshold_met BOOLEAN DEFAULT FALSE,
recv_timestamp TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE UNIQUE INDEX IF NOT EXISTS radio_threshold_hotspot_pubkey_cbsd_id_idx ON radio_threshold (hotspot_pubkey, cbsd_id);
CREATE UNIQUE INDEX IF NOT EXISTS radio_threshold_hotspot_pubkey_cbsd_id_idx ON radio_threshold (hotspot_pubkey, cbsd_id) NULLS NOT DISTINCT;

-- temp table for grandfathered radio thresholds
CREATE TABLE IF NOT EXISTS grandfathered_radio_threshold (
Expand All @@ -20,4 +21,4 @@ CREATE TABLE IF NOT EXISTS grandfathered_radio_threshold (
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE UNIQUE INDEX IF NOT EXISTS grandfathered_radio_threshold_hotspot_pubkey_cbsd_id_idx ON grandfathered_radio_threshold (hotspot_pubkey, cbsd_id);
CREATE UNIQUE INDEX IF NOT EXISTS grandfathered_radio_threshold_hotspot_pubkey_cbsd_id_idx ON grandfathered_radio_threshold (hotspot_pubkey, cbsd_id) NULLS NOT DISTINCT;
36 changes: 36 additions & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
boosting_oracles::Urbanization, coverage::CoverageDaemon, data_session::DataSessionIngestor,
geofence::Geofence, heartbeats::cbrs::HeartbeatDaemon as CellHeartbeatDaemon,
heartbeats::wifi::HeartbeatDaemon as WifiHeartbeatDaemon,
invalidated_radio_threshold::InvalidatedRadioThresholdIngestor,
radio_threshold::RadioThresholdIngestor, rewarder::Rewarder, speedtests::SpeedtestDaemon,
subscriber_location::SubscriberLocationIngestor, telemetry, Settings,
};
Expand All @@ -10,6 +11,7 @@ use chrono::Duration;
use file_store::{
coverage::CoverageObjectIngestReport, file_info_poller::LookbackBehavior, file_sink,
file_source, file_upload, heartbeat::CbrsHeartbeatIngestReport,
mobile_radio_invalidated_threshold::InvalidatedRadioThresholdIngestReport,
mobile_radio_threshold::RadioThresholdIngestReport,
mobile_subscriber::SubscriberLocationIngestReport, mobile_transfer::ValidDataTransferSession,
speedtest::CellSpeedtestIngestReport, wifi_heartbeat::WifiHeartbeatIngestReport, FileStore,
Expand Down Expand Up @@ -315,6 +317,37 @@ impl Cmd {
auth_client.clone(),
);

// invalidated radio threshold reports
let (invalidated_radio_threshold_ingest, invalidated_radio_threshold_ingest_server) =
file_source::continuous_source::<InvalidatedRadioThresholdIngestReport, _>()
.state(pool.clone())
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string())
.create()
.await?;

let (verified_invalidated_radio_threshold, verified_invalidated_radio_threshold_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedInvalidatedRadioThresholdIngestReport,
store_base_path,
concat!(
env!("CARGO_PKG_NAME"),
"_verified_invalidated_radio_threshold"
),
)
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.create()
.await?;

let invalidated_radio_threshold_ingestor = InvalidatedRadioThresholdIngestor::new(
pool.clone(),
invalidated_radio_threshold_ingest,
verified_invalidated_radio_threshold,
auth_client.clone(),
);

// data transfers
let (data_session_ingest, data_session_ingest_server) =
file_source::continuous_source::<ValidDataTransferSession, _>()
Expand Down Expand Up @@ -342,6 +375,8 @@ impl Cmd {
.add_task(subscriber_location_ingestor)
.add_task(radio_threshold_ingestor)
.add_task(verified_radio_threshold_server)
.add_task(invalidated_radio_threshold_ingestor)
.add_task(verified_invalidated_radio_threshold_server)
.add_task(data_session_ingest_server)
.add_task(price_daemon)
.add_task(cbrs_heartbeat_daemon)
Expand All @@ -354,6 +389,7 @@ impl Cmd {
.add_task(rewarder)
.add_task(subscriber_location_ingest_server)
.add_task(radio_threshold_ingest_server)
.add_task(invalidated_radio_threshold_ingest_server)
.add_task(data_session_ingestor)
.start()
.await
Expand Down
160 changes: 160 additions & 0 deletions mobile_verifier/src/invalidated_radio_threshold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use chrono::Utc;
use file_store::{
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
mobile_radio_invalidated_threshold::{
InvalidatedRadioThresholdIngestReport, InvalidatedRadioThresholdReportReq,
VerifiedInvalidatedRadioThresholdIngestReport,
},
};
use futures::{StreamExt, TryStreamExt};
use futures_util::TryFutureExt;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
mobile_config::NetworkKeyRole,
poc_mobile::{
InvalidatedRadioThresholdReportVerificationStatus,
VerifiedInvalidatedRadioThresholdIngestReportV1,
},
};
use mobile_config::client::authorization_client::AuthorizationVerifier;
use sqlx::{PgPool, Postgres, Transaction};
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct InvalidatedRadioThresholdIngestor<AV> {
pool: PgPool,
reports_receiver: Receiver<FileInfoStream<InvalidatedRadioThresholdIngestReport>>,
verified_report_sink: FileSinkClient,
authorization_verifier: AV,
}

impl<AV> ManagedTask for InvalidatedRadioThresholdIngestor<AV>
where
AV: AuthorizationVerifier + Send + Sync + 'static,
{
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl<AV> InvalidatedRadioThresholdIngestor<AV>
where
AV: AuthorizationVerifier,
{
pub fn new(
pool: sqlx::Pool<Postgres>,
reports_receiver: Receiver<FileInfoStream<InvalidatedRadioThresholdIngestReport>>,
verified_report_sink: FileSinkClient,
authorization_verifier: AV,
) -> Self {
Self {
pool,
reports_receiver,
verified_report_sink,
authorization_verifier,
}
}

async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting invalidated radio threshold ingestor");
loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
Some(file) = self.reports_receiver.recv() => {
self.process_file(file).await?;
}
}
}
tracing::info!("stopping invalidated radio threshold ingestor");
Ok(())
}

async fn process_file(
&self,
file_info_stream: FileInfoStream<InvalidatedRadioThresholdIngestReport>,
) -> anyhow::Result<()> {
let mut transaction = self.pool.begin().await?;
file_info_stream
.into_stream(&mut transaction)
.await?
.map(anyhow::Ok)
.try_fold(transaction, |mut transaction, ingest_report| async move {
// verify the report
let verified_report_status = self.verify_report(&ingest_report.report).await;

// if the report is valid then delete the thresholds from the DB
if verified_report_status == InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusValid {
delete(&ingest_report, &mut transaction).await?;
}

// write out paper trail of verified report, valid or invalid
let verified_report_proto: VerifiedInvalidatedRadioThresholdIngestReportV1 =
VerifiedInvalidatedRadioThresholdIngestReport {
report: ingest_report,
status: verified_report_status,
timestamp: Utc::now(),
}
.into();
self.verified_report_sink
.write(
verified_report_proto,
&[("report_status", verified_report_status.as_str_name())],
)
.await?;
Ok(transaction)
})
.await?
.commit()
.await?;
self.verified_report_sink.commit().await?;
Ok(())
}

async fn verify_report(
&self,
report: &InvalidatedRadioThresholdReportReq,
) -> InvalidatedRadioThresholdReportVerificationStatus {
if !self.verify_known_carrier_key(&report.carrier_pub_key).await {
return InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusInvalidCarrierKey;
};
InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusValid
}

async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> bool {
match self
.authorization_verifier
.verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier)
.await
{
Ok(res) => res,
Err(_err) => false,
}
}
}

pub async fn delete(
ingest_report: &InvalidatedRadioThresholdIngestReport,
db: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
DELETE FROM radio_threshold
WHERE hotspot_pubkey = $1 AND (cbsd_id is null or cbsd_id = $2)
"#,
)
.bind(ingest_report.report.hotspot_pubkey.to_string())
.bind(ingest_report.report.cbsd_id.clone())
.execute(&mut *db)
.await?;
Ok(())
}
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod coverage;
pub mod data_session;
pub mod geofence;
pub mod heartbeats;
pub mod invalidated_radio_threshold;
pub mod radio_threshold;
pub mod reward_shares;
pub mod rewarder;
Expand Down
3 changes: 2 additions & 1 deletion mobile_verifier/src/radio_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ pub async fn save(
bytes_threshold,
subscriber_threshold,
threshold_timestamp,
threshold_met,
recv_timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
VALUES ($1, $2, $3, $4, $5, true, $6)
ON CONFLICT (hotspot_pubkey, cbsd_id)
DO UPDATE SET
bytes_threshold = EXCLUDED.bytes_threshold,
Expand Down

0 comments on commit 6d46823

Please sign in to comment.