Skip to content

Commit

Permalink
Follower (#3)
Browse files Browse the repository at this point in the history
* use blockchain-node follower service to look up hotspots and follow tens
* Ingest heartbeats and speed tests but not store them yet

Co-authored-by: Marc Nijdam <[email protected]>
Co-authored-by: jeffgrunewald <[email protected]>
Co-authored-by: Rahul Garg <[email protected]>
  • Loading branch information
4 people authored Jul 15, 2022
1 parent 3d1abc8 commit 31de316
Show file tree
Hide file tree
Showing 40 changed files with 2,403 additions and 749 deletions.
776 changes: 634 additions & 142 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,27 @@ thiserror = "1"
serde = {version = "1", features=["derive"]}
serde_json = "1"
clap = {version = "3", features = ["derive"]}
sqlx = {version = "0", features = ["postgres", "uuid", "chrono", "migrate", "macros", "runtime-tokio-rustls"]}
sqlx = {version = "0", features = ["postgres", "uuid", "decimal", "chrono", "migrate", "macros", "runtime-tokio-rustls"]}
tokio = { version = "1", default-features=false, features=["fs", "macros", "signal", "rt", "process", "time"] }
tracing = "0"
tracing-subscriber = { version = "0", features = ["env-filter"] }
axum = "0"
hyper = "*"
base64 = "0"
sha2 = "*"
http = "*"
tonic = "0"
lazy_static = "*"
chrono = {version = "0", features = ["serde"]}
tower-http = {version = "*", features = ["auth", "trace"]}
triggered = "0"
futures = "*"
futures-util = "*"
prost = "*"
csv = "1"
once_cell = "1"
async-compression = {version = "0", features = ["tokio", "gzip"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-crypto = {git = "https://github.com/helium/helium-crypto-rs", tag="v0.3.4"}
rust_decimal = "1"
rust_decimal_macros = "1"
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}
}
9 changes: 0 additions & 9 deletions migrations/2_cell_attach_event copy.sql

This file was deleted.

7 changes: 7 additions & 0 deletions migrations/2_follower.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table follower_meta (
key text primary key not null,
value text
);

insert into follower_meta (key, value)
values ('last_height', '995041')
17 changes: 0 additions & 17 deletions migrations/3_cell_heartbeat.sql

This file was deleted.

11 changes: 11 additions & 0 deletions migrations/3_gateway.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table gateway (
address text primary key not null,
owner text not null,
location text,

last_heartbeat timestamptz,
last_speedtest timestamptz,
last_attach timestamptz,

created_at timestamptz default now()
);
15 changes: 0 additions & 15 deletions migrations/4_cell_speedtest.sql

This file was deleted.

54 changes: 54 additions & 0 deletions src/api/attach_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::{api::api_error, Error, Imsi, PublicKey, Result};
use axum::{extract::Extension, http::StatusCode, Json};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sqlx::{PgPool, Row};

pub async fn create_cell_attach_event(
Json(event): Json<CellAttachEvent>,
Extension(pool): Extension<PgPool>,
) -> std::result::Result<Json<Value>, (StatusCode, String)> {
event
.insert_into(&pool)
.await
.map(|pubkey: PublicKey| {
json!({
"pubkey": pubkey,
})
})
.map(Json)
.map_err(api_error)
}

#[derive(sqlx::FromRow, Deserialize, Serialize)]
pub struct CellAttachEvent {
pub imsi: Imsi,
#[serde(alias = "publicAddress")]
pub pubkey: PublicKey,
#[serde(alias = "iso_timestamp")]
pub timestamp: DateTime<Utc>,
}

impl CellAttachEvent {
pub async fn insert_into<'e, 'c, E>(&self, executor: E) -> Result<PublicKey>
where
E: 'e + sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query(
r#"
insert into gateways (pubkey, owner, payer, height, txn_hash, block_timestamp, last_heartbeat, last_speedtest, last_attach)
values ($1, NULL, NULL, 0, NULL, NULL, NULL, NULL, $2)
on conflict (pubkey) do update set
last_attach = EXCLUDED.last_attach
returning pubkey
"#,
)
.bind(&self.pubkey)
.bind(&self.timestamp)
.fetch_one(executor)
.await
.and_then(|row| row.try_get("pubkey"))
.map_err(Error::from)
}
}
37 changes: 0 additions & 37 deletions src/api/attach_events.rs

This file was deleted.

Loading

0 comments on commit 31de316

Please sign in to comment.