Skip to content

Commit

Permalink
implement telemetry format v2 and new endpoint /nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Trisfald committed Jun 3, 2024
1 parent 61a8c4c commit f157416
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 52 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ An application to collect NEAR clients telemetry data and store it in a relation
## Endpoints
Default port: `8080`

- `/nodes/mainnet`: POST node telemetry
- `/nodes/testnet`: POST node telemetry
- `/nodes/mainnet`: POST node telemetry v1+
- `/nodes/testnet`: POST node telemetry v1+
- `/nodes`: POST node telemetry v2+
- `/metrics`: Prometheus metrics
- `/healthz`: health check

## Telemetry versions

- `v1`: legacy telemetry format
- `v2`: new telemetry format post [#11444](https://github.com/near/nearcore/pull/11444)

## Development

### Requirements
Expand Down
1 change: 0 additions & 1 deletion res/example_telemetry_payload_v2

This file was deleted.

1 change: 1 addition & 0 deletions res/example_telemetry_payload_v2_mainnet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"agent":{"build":"1.36.1-533-g1098b24d2","name":"near-rs","protocol_version":67,"version":"trunk"},"chain":{"account_id":"test.near","block_production_tracking_delay":0.1,"chain_id":"mainnet","is_validator":true,"latest_block_hash":"BauFWYx2gMrntNepXF6GN6j1rhQqQEVeLU39WKp2b4C2","latest_block_height":604,"max_block_production_delay":2.0,"max_block_wait_delay":6.0,"min_block_production_delay":0.6,"node_id":"ed25519:6Hat46Wuxrk1czrhENjJrS3GuYUXYDmMgFtGLFyWGWNq","num_peers":0,"status":"NoSync"},"extra_info":"{\"block_production_tracking_delay\":0.1,\"max_block_production_delay\":2.0,\"max_block_wait_delay\":6.0,\"min_block_production_delay\":0.6}","signature":"ed25519:yMax8NT8ptBu3tGa2LWFVSJRxLLRdXmwuVrE6wPnnXdHFqBDbXQvjP4btNGeraFsu75BzkSGvHV7xaJc3vQzUpQ","system":{"bandwidth_download":0,"bandwidth_upload":0,"boot_time_seconds":1715338798,"cpu_usage":0.0,"memory_usage":68648}}
1 change: 1 addition & 0 deletions res/example_telemetry_payload_v2_other
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"agent":{"build":"1.36.1-533-g1098b24d2","name":"near-rs","protocol_version":67,"version":"trunk"},"chain":{"account_id":"test.near","block_production_tracking_delay":0.1,"chain_id":"other","is_validator":true,"latest_block_hash":"BauFWYx2gMrntNepXF6GN6j1rhQqQEVeLU39WKp2b4C2","latest_block_height":604,"max_block_production_delay":2.0,"max_block_wait_delay":6.0,"min_block_production_delay":0.6,"node_id":"ed25519:6Hat46Wuxrk1czrhENjJrS3GuYUXYDmMgFtGLFyWGWNq","num_peers":0,"status":"NoSync"},"extra_info":"{\"block_production_tracking_delay\":0.1,\"max_block_production_delay\":2.0,\"max_block_wait_delay\":6.0,\"min_block_production_delay\":0.6}","signature":"ed25519:yMax8NT8ptBu3tGa2LWFVSJRxLLRdXmwuVrE6wPnnXdHFqBDbXQvjP4btNGeraFsu75BzkSGvHV7xaJc3vQzUpQ","system":{"bandwidth_download":0,"bandwidth_upload":0,"boot_time_seconds":1715338798,"cpu_usage":0.0,"memory_usage":68648}}
1 change: 1 addition & 0 deletions res/example_telemetry_payload_v2_testnet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"agent":{"build":"1.36.1-533-g1098b24d2","name":"near-rs","protocol_version":67,"version":"trunk"},"chain":{"account_id":"test.near","block_production_tracking_delay":0.1,"chain_id":"testnet","is_validator":true,"latest_block_hash":"BauFWYx2gMrntNepXF6GN6j1rhQqQEVeLU39WKp2b4C2","latest_block_height":604,"max_block_production_delay":2.0,"max_block_wait_delay":6.0,"min_block_production_delay":0.6,"node_id":"ed25519:6Hat46Wuxrk1czrhENjJrS3GuYUXYDmMgFtGLFyWGWNq","num_peers":0,"status":"NoSync"},"extra_info":"{\"block_production_tracking_delay\":0.1,\"max_block_production_delay\":2.0,\"max_block_wait_delay\":6.0,\"min_block_production_delay\":0.6}","signature":"ed25519:yMax8NT8ptBu3tGa2LWFVSJRxLLRdXmwuVrE6wPnnXdHFqBDbXQvjP4btNGeraFsu75BzkSGvHV7xaJc3vQzUpQ","system":{"bandwidth_download":0,"bandwidth_upload":0,"boot_time_seconds":1715338798,"cpu_usage":0.0,"memory_usage":68648}}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Error {
DBError(#[from] sea_orm::DbErr),
#[error("input error ({0})/n{1}")]
InputError(String, String),
#[error("database not found error")]
DatabaseNotFound,
#[error("unknown error")]
Unknown,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub struct Migration;

impl MigrationName for Migration {
fn name(&self) -> &str {
"m_20240603_000002_add_extra_fields"
"m_20240603_000002_node_v2"
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/migrator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sea_orm_migration::prelude::*;

mod m20240508_000001_create_tables;
mod m20240603_000002_add_extra_fields;
mod m20240603_000002_node_v2;

pub struct Migrator;

Expand All @@ -10,7 +10,7 @@ impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20240508_000001_create_tables::Migration),
Box::new(m20240603_000002_add_extra_fields::Migration),
Box::new(m20240603_000002_node_v2::Migration),
]
}
}
71 changes: 57 additions & 14 deletions src/nodes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, sync::Arc};

use axum::{extract::State, http::StatusCode, response::IntoResponse};
use sea_orm::{sea_query::OnConflict, ActiveValue, DatabaseConnection, EntityTrait, Iterable};
Expand All @@ -13,17 +13,19 @@ use crate::{
Error,
};

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub enum ChainId {
Mainnet,
Testnet,
Other(String),
}

impl fmt::Display for ChainId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
match self {
ChainId::Mainnet => write!(f, "mainnet"),
ChainId::Testnet => write!(f, "testnet"),
ChainId::Other(_) => write!(f, "other"),
}
}
}
Expand All @@ -32,29 +34,57 @@ pub(crate) async fn nodes_handler_mainnet(
state: State<ServerState>,
body: String,
) -> impl IntoResponse {
nodes_handler(state, body, ChainId::Mainnet).await
nodes_handler_impl(state, body, Some(ChainId::Mainnet)).await
}

pub(crate) async fn nodes_handler_testnet(
state: State<ServerState>,
body: String,
) -> impl IntoResponse {
nodes_handler(state, body, ChainId::Testnet).await
nodes_handler_impl(state, body, Some(ChainId::Testnet)).await
}

async fn nodes_handler(
pub(crate) async fn nodes_handler(state: State<ServerState>, body: String) -> impl IntoResponse {
nodes_handler_impl(state, body, None).await
}

async fn nodes_handler_impl(
state: State<ServerState>,
body: String,
chain: ChainId,
chain_from_path: Option<ChainId>,
) -> impl IntoResponse {
let now = Instant::now();

trace!("chain_from_path: {chain_from_path:?}, request body: {body}");

let telemetry: Result<TelemetryInfo, Error> =
serde_json::from_str(&body).map_err(|err| Error::InputError(err.to_string(), body));

let chain_from_telemetry = telemetry.as_ref().ok().and_then(|info| {
info.chain
.chain_id
.as_ref()
.map(|chain| match chain.as_str() {
"mainnet" => ChainId::Mainnet,
"testnet" => ChainId::Testnet,
_ => ChainId::Other(chain.clone()),
})
});
// Determine the chain-id. In order of priority:
// 1. chain-id sent inside the json
// 2. HTTP path
let chain = match (chain_from_telemetry, chain_from_path) {
(Some(chain), _) => chain,
(None, Some(chain)) => chain,
_ => ChainId::Other("unknown".to_string()),
};

debug!("received node telemetry for {chain}");
trace!("request body: {body}");

let labels = Labels::new(chain.to_string());
state.metrics.total_requests.get_or_create(&labels).inc();

let result = parse_and_store_telemetry(state.database(chain), body).await;
let result = store_telemetry(state.database(&chain), &chain, telemetry).await;

let elapsed = now.elapsed();
state
Expand Down Expand Up @@ -84,9 +114,22 @@ async fn nodes_handler(
}
}

async fn parse_and_store_telemetry(db: &DatabaseConnection, body: String) -> Result<(), Error> {
let telemetry: TelemetryInfo =
serde_json::from_str(&body).map_err(|err| Error::InputError(err.to_string(), body))?;
async fn store_telemetry(
db: Option<&Arc<DatabaseConnection>>,
chain: &ChainId,
telemetry: Result<TelemetryInfo, Error>,
) -> Result<(), Error> {
let telemetry = telemetry?;

if matches!(chain, ChainId::Other(_)) {
debug!("persisting telemetry for chains other than mainnet and testnet is disabled");
return Ok(());
}

let db: &DatabaseConnection = match db {
Some(db) => db,
None => return Err(Error::DatabaseNotFound),
};

let node = node::ActiveModel {
id: ActiveValue::Set(telemetry.chain.node_id),
Expand All @@ -111,8 +154,8 @@ async fn parse_and_store_telemetry(db: &DatabaseConnection, body: String) -> Res
min_block_production_delay: ActiveValue::Set(telemetry.chain.min_block_production_delay),
max_block_production_delay: ActiveValue::Set(telemetry.chain.max_block_production_delay),
max_block_wait_delay: ActiveValue::Set(telemetry.chain.max_block_wait_delay),
chain_id: ActiveValue::Set(None),
protocol_version: ActiveValue::Set(None),
chain_id: ActiveValue::Set(telemetry.chain.chain_id),
protocol_version: ActiveValue::Set(telemetry.agent.protocol_version.map(|n| n as i32)),
};

let on_conflict = OnConflict::column(node::Column::Id)
Expand Down
10 changes: 6 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::info;

use crate::health::health_handler;
use crate::metrics::{create_registry_and_metrics, metric_handler, Metrics};
use crate::nodes::{nodes_handler_mainnet, nodes_handler_testnet, ChainId};
use crate::nodes::{nodes_handler, nodes_handler_mainnet, nodes_handler_testnet, ChainId};
use crate::Error;

pub struct Server {
Expand All @@ -30,10 +30,11 @@ pub(crate) struct ServerState {
}

impl ServerState {
pub(crate) fn database(&self, chain: ChainId) -> &Arc<DatabaseConnection> {
pub(crate) fn database(&self, chain: &ChainId) -> Option<&Arc<DatabaseConnection>> {
match chain {
ChainId::Mainnet => &self.db_mainnet,
ChainId::Testnet => &self.db_testnet,
ChainId::Mainnet => Some(&self.db_mainnet),
ChainId::Testnet => Some(&self.db_testnet),
ChainId::Other(_) => None,
}
}
}
Expand Down Expand Up @@ -73,6 +74,7 @@ impl Server {
.route("/healthz", get(health_handler))
.route("/nodes/mainnet", post(nodes_handler_mainnet))
.route("/nodes/testnet", post(nodes_handler_testnet))
.route("/nodes", post(nodes_handler))
.layer((
// Graceful shutdown will wait for outstanding requests to complete. Add a timeout so
// requests don't hang forever.
Expand Down
Loading

0 comments on commit f157416

Please sign in to comment.