Skip to content

Commit

Permalink
Merge pull request #769 from helium/andymck/runner-tidy-up
Browse files Browse the repository at this point in the history
runner refactor/tidy
  • Loading branch information
andymck authored May 22, 2024
2 parents 172f929 + 3ab7dd6 commit 6f56703
Showing 1 changed file with 102 additions and 67 deletions.
169 changes: 102 additions & 67 deletions iot_verifier/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
witness_updater::WitnessUpdater,
Settings,
};
use chrono::{Duration as ChronoDuration, Utc};
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use denylist::DenyList;
use file_store::{
file_sink::FileSinkClient,
Expand Down Expand Up @@ -283,6 +283,7 @@ where
&self.deny_list,
)
.await?;

match beacon_verify_result {
VerifyBeaconResult {
result: VerificationStatus::Valid,
Expand All @@ -309,8 +310,9 @@ where
if !verified_witnesses_result.failed_witnesses.is_empty() {
tracing::warn!("failed to handle witness");
for failed_witness_report in verified_witnesses_result.failed_witnesses {
let failed_witness = failed_witness_report.report;
let id = failed_witness.report_id(failed_witness_report.received_timestamp);
let id = failed_witness_report
.report
.report_id(failed_witness_report.received_timestamp);
Report::update_attempts(&self.pool, &id, Utc::now()).await?;
}
return Ok(());
Expand All @@ -327,15 +329,15 @@ where
.await;
}

let verified_witnesses = self
let final_verified_witnesses = self
.verify_witnesses_reciprocity(verified_witnesses_result.verified_witnesses)
.await?;

self.handle_valid_poc(
poc,
beacon_info,
beacon_verify_result.hex_scale,
verified_witnesses,
final_verified_witnesses,
)
.await
}
Expand All @@ -359,10 +361,6 @@ where
beacon_hex_scale: Option<Decimal>,
verified_witnesses: Vec<IotVerifiedWitnessReport>,
) -> anyhow::Result<()> {
let beacon_received_ts = poc.beacon_report.received_timestamp;
let packet_data = poc.beacon_report.report.data.clone();
let beacon_report_id = poc.beacon_report.report.report_id(beacon_received_ts);

let max_witnesses_per_poc = self.max_witnesses_per_poc as usize;

// filter witnesses into selected and unselected lists
Expand All @@ -374,55 +372,32 @@ where
// none of which will be rewarded
// we exclude self witnesses from the unselected lists
// these are dropped to the floor, never make it to s3
let (mut selected_witnesses, invalid_witnesses) = filter_witnesses(verified_witnesses);

// keep a subset of our selected and valid witnesses
let mut unselected_witnesses =
sort_and_split_witnesses(&mut selected_witnesses, max_witnesses_per_poc)?;

// concat the unselected valid witnesses and the invalid witnesses
// these will then form the unselected list on the poc
unselected_witnesses = [&unselected_witnesses[..], &invalid_witnesses[..]].concat();
let (mut selected_witnesses, unselected_witnesses) =
filter_and_split_witnesses(verified_witnesses, max_witnesses_per_poc)?;

// get the number of valid witnesses in our selected list
let num_valid_selected_witnesses = selected_witnesses.len();

// get reward units based on the count of valid selected witnesses
let beaconer_reward_units = poc_beaconer_reward_unit(num_valid_selected_witnesses as u32)?;
let witness_reward_units =
poc_per_witness_reward_unit(num_valid_selected_witnesses as u32)?;
// update the reward units for those valid witnesses within our selected list
selected_witnesses
.iter_mut()
.for_each(|witness| match witness.status {
VerificationStatus::Valid => witness.reward_unit = witness_reward_units,
VerificationStatus::Invalid => witness.reward_unit = Decimal::ZERO,
});

// metadata at this point will always be Some...
let (location, gain, elevation) = match beacon_info.metadata {
Some(metadata) => (Some(metadata.location), metadata.gain, metadata.elevation),
None => (None, 0, 0),
};
update_witness_reward_units(&mut selected_witnesses, num_valid_selected_witnesses)?;

let valid_beacon_report = IotValidBeaconReport {
received_timestamp: beacon_received_ts,
location,
gain,
elevation,
hex_scale: beacon_hex_scale
.ok_or(RunnerError::NotFound("invalid hex scaling factor"))?,
report: poc.beacon_report.report.clone(),
reward_unit: beaconer_reward_units,
};
let beacon_received_ts = poc.beacon_report.received_timestamp;
let beacon_report_id = poc.beacon_report.report.report_id(beacon_received_ts);
let packet_data = poc.beacon_report.report.data.clone();

let iot_poc: IotPoc = IotPoc {
poc_id: beacon_report_id.clone(),
beacon_report: valid_beacon_report,
selected_witnesses: selected_witnesses.clone(),
unselected_witnesses: unselected_witnesses.clone(),
};
// collect all the invalid reasons, we will use these later for metrics
let invalid_reasons = collect_invalid_witness_reasons(&unselected_witnesses);

let iot_poc = create_iot_poc(
poc,
beacon_hex_scale,
selected_witnesses,
unselected_witnesses,
beacon_report_id.clone(),
beacon_received_ts,
beacon_info,
)?;

// save the gateway shares for each poc to db
let mut transaction = self.pool.begin().await?;
for reward_share in GatewayPocShare::shares_from_poc(&iot_poc) {
reward_share.save(&mut transaction).await?;
Expand All @@ -440,11 +415,11 @@ where
return Ok(());
}
}
// write out metrics for any witness which failed verification
fire_invalid_witness_metric(&selected_witnesses);
fire_invalid_witness_metric(&unselected_witnesses);

Report::delete_poc(&self.pool, &packet_data).await?;

// write out metrics for any witness which failed verification
fire_invalid_witness_metric(invalid_reasons);
telemetry::decrement_num_beacons();
Ok(())
}
Expand Down Expand Up @@ -575,6 +550,54 @@ where
}
}

fn create_iot_poc(
poc: Poc,
beacon_hex_scale: Option<Decimal>,
selected_witnesses: Vec<IotVerifiedWitnessReport>,
unselected_witnesses: Vec<IotVerifiedWitnessReport>,
beacon_report_id: Vec<u8>,
beacon_received_ts: DateTime<Utc>,
beacon_info: GatewayInfo,
) -> anyhow::Result<IotPoc> {
let (location, gain, elevation) = match beacon_info.metadata {
Some(metadata) => (Some(metadata.location), metadata.gain, metadata.elevation),
None => (None, 0, 0),
};
let num_valid_selected_witnesses = selected_witnesses.len();
let beaconer_reward_units = poc_beaconer_reward_unit(num_valid_selected_witnesses as u32)?;
let valid_beacon_report = IotValidBeaconReport {
received_timestamp: beacon_received_ts,
location,
gain,
elevation,
hex_scale: beacon_hex_scale.ok_or(RunnerError::NotFound("invalid hex scaling factor"))?,
report: poc.beacon_report.report,
reward_unit: beaconer_reward_units,
};
Ok(IotPoc {
poc_id: beacon_report_id,
beacon_report: valid_beacon_report,
selected_witnesses,
unselected_witnesses,
})
}

fn update_witness_reward_units(
selected_witnesses: &mut [IotVerifiedWitnessReport],
num_valid_selected_witnesses: usize,
) -> anyhow::Result<()> {
// get reward units based on the count of valid selected witnesses
let witness_reward_units = poc_per_witness_reward_unit(num_valid_selected_witnesses as u32)?;
// update the reward units for those valid witnesses within our selected list
selected_witnesses
.iter_mut()
.for_each(|witness| match witness.status {
VerificationStatus::Valid => witness.reward_unit = witness_reward_units,
VerificationStatus::Invalid => witness.reward_unit = Decimal::ZERO,
});
Ok(())
}

fn poc_beaconer_reward_unit(num_witnesses: u32) -> anyhow::Result<Decimal> {
let reward_units = if num_witnesses == 0 {
Decimal::ZERO
Expand Down Expand Up @@ -633,10 +656,11 @@ fn sort_and_split_witnesses(
Ok(unselected_witnesses)
}

fn filter_witnesses(
fn filter_and_split_witnesses(
witnesses: Vec<IotVerifiedWitnessReport>,
) -> (Vec<IotVerifiedWitnessReport>, Vec<IotVerifiedWitnessReport>) {
let (valid_witnesses, invalid_witnesses) = witnesses
max_witnesses_per_poc: usize,
) -> anyhow::Result<(Vec<IotVerifiedWitnessReport>, Vec<IotVerifiedWitnessReport>)> {
let (mut selected_witnesses, invalid_witnesses) = witnesses
.into_iter()
.filter(|witness| {
matches!(
Expand All @@ -645,7 +669,15 @@ fn filter_witnesses(
)
})
.partition(|witness| witness.status == VerificationStatus::Valid);
(valid_witnesses, invalid_witnesses)

// keep a subset of our selected and valid witnesses
let mut unselected_witnesses =
sort_and_split_witnesses(&mut selected_witnesses, max_witnesses_per_poc)?;

// concat the unselected valid witnesses and the invalid witnesses
// these will then form the unselected list on the poc
unselected_witnesses.extend(invalid_witnesses);
Ok((selected_witnesses, unselected_witnesses))
}

fn filter_witness(invalid_reason: InvalidReason) -> FilterStatus {
Expand All @@ -654,17 +686,18 @@ fn filter_witness(invalid_reason: InvalidReason) -> FilterStatus {
_ => FilterStatus::Include,
}
}

fn fire_invalid_witness_metric(witnesses: &[IotVerifiedWitnessReport]) {
fn collect_invalid_witness_reasons(witnesses: &[IotVerifiedWitnessReport]) -> Vec<InvalidReason> {
witnesses
.iter()
.filter(|witness| !matches!(witness.invalid_reason, InvalidReason::ReasonNone))
.for_each(|witness| {
telemetry::increment_invalid_witnesses(&[(
"reason",
witness.invalid_reason.clone().as_str_name(),
)])
});
.map(|witness| witness.invalid_reason)
.collect()
}

fn fire_invalid_witness_metric(invalid_reasons: Vec<InvalidReason>) {
invalid_reasons.iter().for_each(|reason| {
telemetry::increment_invalid_witnesses(&[("reason", reason.as_str_name())])
})
}

#[cfg(test)]
Expand All @@ -677,6 +710,7 @@ mod tests {
use helium_proto::DataRate;
use rust_decimal::Decimal;
use std::str::FromStr;
const NUM_WITNESSES_PER_POC: usize = 14;

#[test]
fn witness_filtering() {
Expand Down Expand Up @@ -752,7 +786,8 @@ mod tests {
};

let witnesses = vec![witness1, witness2, witness3, witness4];
let (included_witnesses, excluded_witnesses) = filter_witnesses(witnesses);
let (included_witnesses, excluded_witnesses) =
filter_and_split_witnesses(witnesses, NUM_WITNESSES_PER_POC).unwrap();
assert_eq!(2, excluded_witnesses.len());
assert_eq!(1, included_witnesses.len());
assert_eq!(
Expand Down

0 comments on commit 6f56703

Please sign in to comment.