Skip to content

Commit

Permalink
Merge pull request #709 from helium/andymck/reciprocity-check
Browse files Browse the repository at this point in the history
reciprocity checks
  • Loading branch information
andymck authored Feb 12, 2024
2 parents d079cb1 + cd32bc6 commit 934065c
Show file tree
Hide file tree
Showing 11 changed files with 903 additions and 100 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions iot_verifier/migrations/14_last_witness.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create table last_witness (
id bytea primary key not null,
timestamp timestamptz not null
);
-- seed last_witness with timestamps from last_beacon
insert into last_witness (id, timestamp)
select id, timestamp from last_beacon
where timestamp > now() - interval '7 day';
44 changes: 14 additions & 30 deletions iot_verifier/src/last_beacon.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use chrono::{DateTime, Utc};
use file_store::traits::TimestampDecode;
use serde::{Deserialize, Serialize};

#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)]
Expand All @@ -9,20 +8,8 @@ pub struct LastBeacon {
pub timestamp: DateTime<Utc>,
}

#[derive(thiserror::Error, Debug)]
pub enum LastBeaconError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
}

impl LastBeacon {
pub async fn insert_kv<'c, E>(
executor: E,
id: &[u8],
val: &str,
) -> Result<Self, LastBeaconError>
pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result<Self>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand All @@ -39,7 +26,7 @@ impl LastBeacon {
.await?)
}

pub async fn get<'c, E>(executor: E, id: &[u8]) -> Result<Option<Self>, LastBeaconError>
pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result<Option<Self>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand All @@ -52,47 +39,44 @@ impl LastBeacon {
}

pub async fn get_all_since<'c, E>(
deadline: DateTime<Utc>,
executor: E,
) -> Result<Vec<Self>, sqlx::Error>
timestamp: DateTime<Utc>,
) -> anyhow::Result<Vec<Self>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres> + 'c,
{
sqlx::query_as::<_, Self>(r#" select * from last_beacon where timestamp >= $1; "#)
.bind(deadline)
.fetch_all(executor)
.await
Ok(
sqlx::query_as::<_, Self>(r#" select * from last_beacon where timestamp >= $1; "#)
.bind(timestamp)
.fetch_all(executor)
.await?,
)
}

pub async fn last_timestamp<'c, E>(
executor: E,
id: &[u8],
) -> Result<Option<DateTime<Utc>>, LastBeaconError>
) -> anyhow::Result<Option<DateTime<Utc>>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let height = sqlx::query_scalar::<_, String>(
let height = sqlx::query_scalar(
r#"
select timestamp from last_beacon
where id = $1
"#,
)
.bind(id)
.fetch_optional(executor)
.await?
.and_then(|v| {
v.parse::<u64>()
.map_or_else(|_| None, |secs| Some(secs.to_timestamp()))
})
.transpose()?;
.await?;
Ok(height)
}

pub async fn update_last_timestamp<'c, E>(
executor: E,
id: &[u8],
timestamp: DateTime<Utc>,
) -> Result<(), LastBeaconError>
) -> anyhow::Result<()>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand Down
104 changes: 104 additions & 0 deletions iot_verifier/src/last_witness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use chrono::{DateTime, Utc};

use helium_crypto::PublicKeyBinary;
use serde::{Deserialize, Serialize};

#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)]
#[sqlx(type_name = "last_witness")]
pub struct LastWitness {
pub id: Vec<u8>,
pub timestamp: DateTime<Utc>,
}

impl LastWitness {
pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result<Self>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Ok(sqlx::query_as::<_, Self>(
r#" insert into last_witness ( id, timestamp )
values ($1, $2)
on conflict (key) do nothing
returning *;
"#,
)
.bind(id)
.bind(val)
.fetch_one(executor)
.await?)
}

pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result<Option<Self>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Ok(
sqlx::query_as::<_, LastWitness>(r#" select * from last_witness where id = $1;"#)
.bind(id)
.fetch_optional(executor)
.await?,
)
}

pub async fn last_timestamp<'c, E>(
executor: E,
id: &[u8],
) -> anyhow::Result<Option<DateTime<Utc>>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let height = sqlx::query_scalar(
r#"
select timestamp from last_witness
where id = $1
"#,
)
.bind(id)
.fetch_optional(executor)
.await?;
Ok(height)
}

pub async fn update_last_timestamp<'c, E>(
executor: E,
id: &[u8],
timestamp: DateTime<Utc>,
) -> anyhow::Result<()>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let _ = sqlx::query(
r#"
insert into last_witness (id, timestamp)
values ($1, $2)
on conflict (id) do update set
timestamp = EXCLUDED.timestamp
"#,
)
.bind(id)
.bind(timestamp)
.execute(executor)
.await?;
Ok(())
}

pub async fn bulk_update_last_timestamps(
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
ids: Vec<(PublicKeyBinary, DateTime<Utc>)>,
) -> anyhow::Result<()> {
const NUMBER_OF_FIELDS_IN_QUERY: u16 = 2;
const MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize;
let mut txn = db.begin().await?;
for updates in ids.chunks(MAX_BATCH_ENTRIES) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new(" insert into last_witness (id, timestamp) ");
query_builder.push_values(updates, |mut builder, (id, ts)| {
builder.push_bind(id.as_ref()).push_bind(ts);
});
query_builder.push(" on conflict (id) do update set timestamp = EXCLUDED.timestamp ");
query_builder.build().execute(&mut *txn).await?;
}
txn.commit().await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions iot_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod gateway_cache;
pub mod gateway_updater;
pub mod hex_density;
pub mod last_beacon;
pub mod last_witness;
pub mod loader;
pub mod meta;
pub mod packet_loader;
Expand Down
Loading

0 comments on commit 934065c

Please sign in to comment.