Skip to content

Commit

Permalink
serialize last witness DB writes (#735)
Browse files Browse the repository at this point in the history
tidy up

mor tidy

remove branch dividian

misc tweaks

tokio tweaks

remove ded code

tidy

use witness report ts, not beacon ts for witness reciprocity

fix fmting

Refactor cache to RwLock and hide behind impl

Added metric around witness_updater queue size

Refacotr
  • Loading branch information
andymck authored Feb 27, 2024
1 parent 40101d2 commit f3cd719
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 75 deletions.
36 changes: 25 additions & 11 deletions iot_verifier/src/last_beacon.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgRow, FromRow, Row};

#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)]
#[sqlx(type_name = "last_beacon")]
#[derive(Deserialize, Serialize, Debug)]
pub struct LastBeacon {
pub id: Vec<u8>,
pub id: PublicKeyBinary,
pub timestamp: DateTime<Utc>,
}

impl FromRow<'_, PgRow> for LastBeacon {
fn from_row(row: &PgRow) -> sqlx::Result<Self> {
Ok(Self {
id: row.get::<Vec<u8>, &str>("id").into(),
timestamp: row.get::<DateTime<Utc>, &str>("timestamp"),
})
}
}

impl LastBeacon {
pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result<Self>
pub async fn insert_kv<'c, E>(
executor: E,
id: &PublicKeyBinary,
val: &str,
) -> anyhow::Result<Self>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand All @@ -20,19 +34,19 @@ impl LastBeacon {
returning *;
"#,
)
.bind(id)
.bind(id.as_ref())
.bind(val)
.fetch_one(executor)
.await?)
}

pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result<Option<Self>>
pub async fn get<'c, E>(executor: E, id: &PublicKeyBinary) -> anyhow::Result<Option<Self>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Ok(
sqlx::query_as::<_, LastBeacon>(r#" select * from last_beacon where id = $1;"#)
.bind(id)
.bind(id.as_ref())
.fetch_optional(executor)
.await?,
)
Expand All @@ -55,7 +69,7 @@ impl LastBeacon {

pub async fn last_timestamp<'c, E>(
executor: E,
id: &[u8],
id: &PublicKeyBinary,
) -> anyhow::Result<Option<DateTime<Utc>>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
Expand All @@ -66,15 +80,15 @@ impl LastBeacon {
where id = $1
"#,
)
.bind(id)
.bind(id.as_ref())
.fetch_optional(executor)
.await?;
Ok(height)
}

pub async fn update_last_timestamp<'c, E>(
executor: E,
id: &[u8],
id: &PublicKeyBinary,
timestamp: DateTime<Utc>,
) -> anyhow::Result<()>
where
Expand All @@ -88,7 +102,7 @@ impl LastBeacon {
timestamp = EXCLUDED.timestamp
"#,
)
.bind(id)
.bind(id.as_ref())
.bind(timestamp)
.execute(executor)
.await?;
Expand Down
44 changes: 29 additions & 15 deletions iot_verifier/src/last_witness.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
use chrono::{DateTime, Utc};

use helium_crypto::PublicKeyBinary;
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgRow, FromRow, Row};

#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)]
#[sqlx(type_name = "last_witness")]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct LastWitness {
pub id: Vec<u8>,
pub id: PublicKeyBinary,
pub timestamp: DateTime<Utc>,
}

impl FromRow<'_, PgRow> for LastWitness {
fn from_row(row: &PgRow) -> sqlx::Result<Self> {
Ok(Self {
id: row.get::<Vec<u8>, &str>("id").into(),
timestamp: row.get::<DateTime<Utc>, &str>("timestamp"),
})
}
}

impl LastWitness {
pub async fn insert_kv<'c, E>(executor: E, id: &[u8], val: &str) -> anyhow::Result<Self>
pub async fn insert_kv<'c, E>(
executor: E,
id: &PublicKeyBinary,
val: &str,
) -> anyhow::Result<Self>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand All @@ -22,27 +34,27 @@ impl LastWitness {
returning *;
"#,
)
.bind(id)
.bind(id.as_ref())
.bind(val)
.fetch_one(executor)
.await?)
}

pub async fn get<'c, E>(executor: E, id: &[u8]) -> anyhow::Result<Option<Self>>
pub async fn get<'c, E>(executor: E, id: &PublicKeyBinary) -> anyhow::Result<Option<Self>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Ok(
sqlx::query_as::<_, LastWitness>(r#" select * from last_witness where id = $1;"#)
.bind(id)
.bind(id.as_ref())
.fetch_optional(executor)
.await?,
)
}

pub async fn last_timestamp<'c, E>(
executor: E,
id: &[u8],
id: &PublicKeyBinary,
) -> anyhow::Result<Option<DateTime<Utc>>>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
Expand All @@ -53,15 +65,15 @@ impl LastWitness {
where id = $1
"#,
)
.bind(id)
.bind(id.as_ref())
.fetch_optional(executor)
.await?;
Ok(height)
}

pub async fn update_last_timestamp<'c, E>(
executor: E,
id: &[u8],
id: &PublicKeyBinary,
timestamp: DateTime<Utc>,
) -> anyhow::Result<()>
where
Expand All @@ -75,7 +87,7 @@ impl LastWitness {
timestamp = EXCLUDED.timestamp
"#,
)
.bind(id)
.bind(id.as_ref())
.bind(timestamp)
.execute(executor)
.await?;
Expand All @@ -84,16 +96,18 @@ impl LastWitness {

pub async fn bulk_update_last_timestamps(
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
ids: Vec<(PublicKeyBinary, DateTime<Utc>)>,
ids: Vec<&LastWitness>,
) -> anyhow::Result<()> {
const NUMBER_OF_FIELDS_IN_QUERY: u16 = 2;
const MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize;
let mut txn = db.begin().await?;
for updates in ids.chunks(MAX_BATCH_ENTRIES) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new(" insert into last_witness (id, timestamp) ");
query_builder.push_values(updates, |mut builder, (id, ts)| {
builder.push_bind(id.as_ref()).push_bind(ts);
query_builder.push_values(updates, |mut builder, last_witness| {
builder
.push_bind(last_witness.id.as_ref())
.push_bind(last_witness.timestamp);
});
query_builder.push(" on conflict (id) do update set timestamp = EXCLUDED.timestamp ");
query_builder.build().execute(&mut *txn).await?;
Expand Down
1 change: 1 addition & 0 deletions iot_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ pub mod runner;
mod settings;
pub mod telemetry;
pub mod tx_scaler;
pub mod witness_updater;
pub use settings::Settings;
8 changes: 7 additions & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use iot_config::client::Client as IotConfigClient;
use iot_verifier::{
entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader,
packet_loader, purger, rewarder::Rewarder, runner, telemetry,
tx_scaler::Server as DensityScaler, Settings,
tx_scaler::Server as DensityScaler, witness_updater::WitnessUpdater, Settings,
};
use price::PriceTracker;
use std::path;
Expand Down Expand Up @@ -77,6 +77,10 @@ impl Server {

let iot_config_client = IotConfigClient::from_settings(&settings.iot_config_client)?;

// create the witness updater to handle serialization of last witness updates to db
// also exposes a cache of the last witness updates
let (witness_updater, witness_updater_server) = WitnessUpdater::new(pool.clone()).await?;

// *
// setup caches
// *
Expand Down Expand Up @@ -288,6 +292,7 @@ impl Server {
runner_invalid_witness_sink,
runner_poc_sink,
density_scaler.hex_density_map.clone(),
witness_updater,
)
.await?;

Expand All @@ -300,6 +305,7 @@ impl Server {
.add_task(purger_invalid_witness_sink_server)
.add_task(runner_invalid_beacon_sink_server)
.add_task(runner_invalid_witness_sink_server)
.add_task(witness_updater_server)
.add_task(runner_poc_sink_server)
.add_task(price_daemon)
.add_task(density_scaler)
Expand Down
Loading

0 comments on commit f3cd719

Please sign in to comment.