Skip to content

Commit

Permalink
Add poc-injector working logic (#71)
Browse files Browse the repository at this point in the history
* Add working logic for poc_iot_injector

- Construct poc_receipt v2 transaction
- Add txn_service for txn submission
- Add reward_shares to poc_receipt & poc_witnesses
  • Loading branch information
vihu authored Oct 13, 2022
1 parent 9f66316 commit a5b8f9f
Show file tree
Hide file tree
Showing 18 changed files with 631 additions and 211 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ flamegraph.*
*.json
.envrc
.env
*.DS_STORE

!/minio/bucket-policy.json
*.bak
38 changes: 22 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mobile_rewards/src/traits/txn_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ mod test {
txn.reward_server_signature = sig.clone();

// Check that we can verify this signature
assert!(txn.verify(&kp.public_key(), &sig).is_ok());
assert!(txn.verify(kp.public_key(), &sig).is_ok());

let txn_hash = txn.hash().expect("unable to hash");
let txn_hash_b64url = txn_hash.to_b64_url().expect("unable to b64url enc");
Expand Down
21 changes: 14 additions & 7 deletions poc_iot_injector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,34 @@ license.workspace = true
authors.workspace = true

[dependencies]
rand = "*"
rust_decimal = { version = "1", features = [ "maths" ] }
dotenv = {workspace = true}
clap = {workspace = true}
thiserror = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
tokio = { workspace = true }
triggered = {workspace = true}
rand = "*"
http = {workspace = true}
base64 = {workspace = true}
sha2 = {workspace = true}
futures = {workspace = true}
futures-util = {workspace = true}
prost = {workspace = true}
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = {workspace = true}
sqlx = {workspace = true}
tonic = {workspace = true}
tokio = {workspace = true}
http = {workspace = true}
triggered = {workspace = true}
base64 = {workspace = true}
sha2 = {workspace = true}
rust_decimal_macros = {workspace = true}
metrics = {workspace = true }
metrics-exporter-prometheus = { workspace = true }
helium-crypto = { workspace = true }
helium-proto = { workspace = true }
helium-crypto = {workspace = true }
poc-metrics = { path = "../metrics" }
file-store = {path = "../file_store"}
db-store = {path = "../db_store"}

[package.metadata.deb]
depends = "$auto"
Expand Down
27 changes: 27 additions & 0 deletions poc_iot_injector/migrations/1_setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This extension gives us `uuid_generate_v1mc()` which generates UUIDs that cluster better than `gen_random_uuid()`
-- while still being difficult to predict and enumerate.
-- Also, while unlikely, `gen_random_uuid()` can in theory produce collisions which can trigger spurious errors on
-- insertion, whereas it's much less likely with `uuid_generate_v1mc()`.
create extension if not exists "uuid-ossp";

create or replace function set_updated_at()
returns trigger as
$$
begin
NEW.updated_at = now();
return NEW;
end;
$$ language plpgsql;

create or replace function trigger_updated_at(tablename regclass)
returns void as
$$
begin
execute format('CREATE TRIGGER set_updated_at
BEFORE UPDATE
ON %s
FOR EACH ROW
WHEN (OLD is distinct from NEW)
EXECUTE FUNCTION set_updated_at();', tablename);
end;
$$ language plpgsql;
4 changes: 4 additions & 0 deletions poc_iot_injector/migrations/2_meta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create table meta (
key text primary key not null,
value text
);
9 changes: 5 additions & 4 deletions poc_iot_injector/pkg/deb/env-template
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
RUST_LOG=info
API_SOCKET_ADDR=0.0.0.0:8080
BUCKET=examplenet-poc-iot-injector
METRICS_SCRAPE_ENDPOINT=127.0.0.1:19000
ENTROPY_STORE=/tmp/
ENTROPY_URL=https://quicknode.url.pro:443/token
DATABASE_URL=db_url
FOLLOWER_URI=http://127.0.0.1:8080
BUCKET=examplenet-pociot-verified
POC_ORACLE_KEY=/tmp/poc_oracle_key
LAST_POC_SUBMISSION_TS=0000000000
49 changes: 49 additions & 0 deletions poc_iot_injector/src/cli/generate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::{keypair::load_from_file, receipt_txn::handle_report_msg, Result};
use chrono::{NaiveDateTime, TimeZone, Utc};
use file_store::{FileStore, FileType};
use futures::stream::{self, StreamExt};
use helium_proto::{blockchain_txn::Txn, BlockchainTxn, Message};

/// Generate poc rewards
#[derive(Debug, clap::Args)]
pub struct Cmd {
/// Required start time to look for (inclusive)
#[clap(long)]
after: NaiveDateTime,
/// Required before time to look for (inclusive)
#[clap(long)]
before: NaiveDateTime,
}

impl Cmd {
pub async fn run(&self) -> Result {
let store = FileStore::from_env().await?;

let after_utc = Utc.from_utc_datetime(&self.after);
let before_utc = Utc.from_utc_datetime(&self.before);

let file_list = store
.list_all(FileType::LoraValidPoc, after_utc, before_utc)
.await?;

let mut stream = store.source(stream::iter(file_list).map(Ok).boxed());
let ts = before_utc.timestamp_millis();

let poc_injector_kp_path =
std::env::var("POC_ORACLE_KEY").unwrap_or_else(|_| String::from("/tmp/poc_oracle_key"));
let poc_oracle_key = load_from_file(&poc_injector_kp_path)?;

while let Some(Ok(msg)) = stream.next().await {
if let Ok(Some((txn, _hash, _hash_b64_url))) =
handle_report_msg(msg, &poc_oracle_key, ts)
{
let tx = BlockchainTxn {
txn: Some(Txn::PocReceiptsV2(txn)),
};

tracing::debug!("txn_bin: {:?}", tx.encode_to_vec());
}
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions poc_iot_injector/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod generate;
pub mod server;
37 changes: 37 additions & 0 deletions poc_iot_injector/src/cli/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::{keypair::load_from_file, mk_db_pool, server::Server, Result};
use tokio::signal;

/// Start rewards server
#[derive(Debug, clap::Args)]
pub struct Cmd {}

impl Cmd {
pub async fn run(&self) -> Result {
// Install the prometheus metrics exporter
poc_metrics::install_metrics();

// Create database pool
let pool = mk_db_pool(10).await?;
sqlx::migrate!().run(&pool).await?;

// Configure shutdown trigger
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
shutdown_trigger.trigger()
});

// injector server keypair from env
let poc_injector_kp_path =
std::env::var("POC_ORACLE_KEY").unwrap_or_else(|_| String::from("/tmp/poc_oracle_key"));
let poc_oracle_key = load_from_file(&poc_injector_kp_path)?;

// poc_iot_injector server
let mut poc_iot_injector_server = Server::new(pool, poc_oracle_key).await?;

poc_iot_injector_server
.run(shutdown_listener.clone())
.await?;
Ok(())
}
}
64 changes: 62 additions & 2 deletions poc_iot_injector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,70 @@ pub type Result<T = ()> = std::result::Result<T, Error>;

#[derive(Error, Debug)]
pub enum Error {
#[error("environment error")]
DotEnv(#[from] dotenv::Error),
#[error("crypto error")]
Crypto(#[from] helium_crypto::Error),
#[error("io error")]
Io(#[from] std::io::Error),
#[error("environment error")]
DotEnv(#[from] dotenv::Error),
#[error("store error")]
Store(#[from] file_store::Error),
#[error("sql error")]
Sql(#[from] sqlx::Error),
#[error("env error")]
Env(#[from] std::env::VarError),
#[error("encode error")]
Encode(#[from] EncodeError),
#[error("decode error")]
Decode(#[from] DecodeError),
#[error("grpc {}", .0.message())]
Grpc(#[from] tonic::Status),
#[error("parse int error")]
ParseInt(#[from] std::num::ParseIntError),
#[error("env not found: {0}")]
EnvNotFound(String),
#[error("migration error")]
Migrate(#[from] sqlx::migrate::MigrateError),
#[error("zero witnesses error")]
ZeroWitnesses,
#[error("invalid exponent {0} error")]
InvalidExponent(String),
#[error("meta error")]
MetaError(#[from] db_store::MetaError),
}

#[derive(Error, Debug)]
pub enum DecodeError {
#[error("prost error")]
Prost(#[from] helium_proto::DecodeError),
#[error("parse int error")]
ParseInt(#[from] std::num::ParseIntError),
#[error("uri error")]
Uri(#[from] http::uri::InvalidUri),
}

#[derive(Error, Debug)]
pub enum EncodeError {
#[error("prost error")]
Prost(#[from] helium_proto::EncodeError),
#[error("json error")]
Json(#[from] serde_json::Error),
}

macro_rules! from_err {
($to_type:ty, $from_type:ty) => {
impl From<$from_type> for Error {
fn from(v: $from_type) -> Self {
Self::from(<$to_type>::from(v))
}
}
};
}

// Encode Errors
from_err!(EncodeError, prost::EncodeError);
from_err!(EncodeError, serde_json::Error);

// Decode Errors
from_err!(DecodeError, http::uri::InvalidUri);
from_err!(DecodeError, prost::DecodeError);
Loading

0 comments on commit a5b8f9f

Please sign in to comment.