From 7e3c343b017522d125b6b7c2b09c14918adaf1be Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Sun, 12 Jan 2025 06:25:41 +0200 Subject: [PATCH] feat(repo): Added web-utils crate and added telemetry to all svc (#373) --- Cargo.lock | 68 ++++-- Cargo.toml | 1 + Makefile | 4 + crates/fuel-web-utils/Cargo.toml | 55 +++++ crates/fuel-web-utils/src/lib.rs | 15 ++ crates/fuel-web-utils/src/server/api.rs | 210 ++++++++++++++++++ .../src/server/http/handlers.rs | 24 ++ crates/fuel-web-utils/src/server/http/mod.rs | 1 + .../src/server/middlewares/auth/jwt.rs} | 0 .../src/server/middlewares/auth/mod.rs | 2 + .../src/server/middlewares/auth/transform.rs} | 2 +- .../src/server/middlewares/mod.rs | 0 crates/fuel-web-utils/src/server/mod.rs | 4 + crates/fuel-web-utils/src/server/state.rs | 71 ++++++ .../src/shutdown.rs | 20 +- .../src/telemetry/elastic_search.rs | 0 .../fuel-web-utils/src/telemetry/metrics.rs | 34 +++ .../src/telemetry/mod.rs | 119 ++-------- .../src/telemetry/runtime.rs | 0 .../src/telemetry/system.rs | 0 crates/sv-consumer/Cargo.toml | 5 +- crates/sv-consumer/src/cli.rs | 18 ++ crates/sv-consumer/src/lib.rs | 2 + crates/sv-consumer/src/main.rs | 36 ++- crates/sv-consumer/src/metrics.rs | 39 ++++ crates/sv-consumer/src/state.rs | 121 ++++++++++ crates/sv-publisher/Cargo.toml | 5 + crates/sv-publisher/src/cli.rs | 17 ++ crates/sv-publisher/src/lib.rs | 3 +- crates/sv-publisher/src/main.rs | 50 ++++- crates/sv-publisher/src/metrics.rs | 187 ++++++++++++++++ crates/sv-publisher/src/state.rs | 59 +++++ crates/sv-webserver/Cargo.toml | 18 +- crates/sv-webserver/src/lib.rs | 2 +- crates/sv-webserver/src/main.rs | 22 +- .../src/{telemetry => }/metrics.rs | 78 ++++++- crates/sv-webserver/src/server/api.rs | 88 -------- crates/sv-webserver/src/server/context.rs | 68 ------ .../sv-webserver/src/server/http/handlers.rs | 40 +--- crates/sv-webserver/src/server/mod.rs | 5 +- crates/sv-webserver/src/server/state.rs | 75 +++++-- crates/sv-webserver/src/server/svc.rs | 28 +++ .../src/server/ws/{socket.rs => handlers.rs} | 45 ++-- crates/sv-webserver/src/server/ws/mod.rs | 3 +- crates/sv-webserver/src/server/ws/state.rs | 55 ----- scripts/run_publisher.sh | 2 +- scripts/run_webserver.sh | 2 +- 47 files changed, 1250 insertions(+), 453 deletions(-) create mode 100644 crates/fuel-web-utils/Cargo.toml create mode 100644 crates/fuel-web-utils/src/lib.rs create mode 100644 crates/fuel-web-utils/src/server/api.rs create mode 100644 crates/fuel-web-utils/src/server/http/handlers.rs create mode 100644 crates/fuel-web-utils/src/server/http/mod.rs rename crates/{sv-webserver/src/server/auth.rs => fuel-web-utils/src/server/middlewares/auth/jwt.rs} (100%) create mode 100644 crates/fuel-web-utils/src/server/middlewares/auth/mod.rs rename crates/{sv-webserver/src/server/middlewares/auth.rs => fuel-web-utils/src/server/middlewares/auth/transform.rs} (98%) rename crates/{sv-webserver => fuel-web-utils}/src/server/middlewares/mod.rs (100%) create mode 100644 crates/fuel-web-utils/src/server/mod.rs create mode 100644 crates/fuel-web-utils/src/server/state.rs rename crates/{sv-publisher => fuel-web-utils}/src/shutdown.rs (79%) rename crates/{sv-webserver => fuel-web-utils}/src/telemetry/elastic_search.rs (100%) create mode 100644 crates/fuel-web-utils/src/telemetry/metrics.rs rename crates/{sv-webserver => fuel-web-utils}/src/telemetry/mod.rs (60%) rename crates/{sv-webserver => fuel-web-utils}/src/telemetry/runtime.rs (100%) rename crates/{sv-webserver => fuel-web-utils}/src/telemetry/system.rs (100%) create mode 100644 crates/sv-consumer/src/metrics.rs create mode 100644 crates/sv-consumer/src/state.rs create mode 100644 crates/sv-publisher/src/metrics.rs create mode 100644 crates/sv-publisher/src/state.rs rename crates/sv-webserver/src/{telemetry => }/metrics.rs (76%) delete mode 100644 crates/sv-webserver/src/server/api.rs delete mode 100644 crates/sv-webserver/src/server/context.rs create mode 100644 crates/sv-webserver/src/server/svc.rs rename crates/sv-webserver/src/server/ws/{socket.rs => handlers.rs} (91%) delete mode 100644 crates/sv-webserver/src/server/ws/state.rs diff --git a/Cargo.lock b/Cargo.lock index a95fa303..c3dc6999 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4354,6 +4354,46 @@ dependencies = [ "tai64", ] +[[package]] +name = "fuel-web-utils" +version = "0.0.16" +dependencies = [ + "actix-cors", + "actix-server", + "actix-service", + "actix-web", + "anyhow", + "async-trait", + "chrono", + "derive_more 1.0.0", + "displaydoc", + "dotenvy", + "elasticsearch", + "fuel-data-parser", + "fuel-streams-nats", + "futures", + "futures-util", + "jsonwebtoken 9.3.0", + "num_cpus", + "parking_lot", + "prometheus", + "rand", + "rust_decimal", + "serde", + "serde_json", + "serde_prometheus", + "sysinfo", + "thiserror 2.0.9", + "time", + "tokio", + "tokio-util", + "tracing", + "tracing-actix-web", + "url", + "urlencoding", + "uuid", +] + [[package]] name = "funty" version = "2.0.0" @@ -9112,18 +9152,21 @@ version = "0.0.16" dependencies = [ "anyhow", "async-nats", + "async-trait", "clap 4.5.23", "displaydoc", "dotenvy", "fuel-core", "fuel-streams-core", "fuel-streams-executors", + "fuel-web-utils", "futures", "hex", "num_cpus", "openssl", + "prometheus", + "serde", "serde_json", - "sv-publisher", "thiserror 2.0.9", "tokio", "tokio-util", @@ -9137,6 +9180,7 @@ version = "0.0.16" dependencies = [ "anyhow", "async-nats", + "async-trait", "clap 4.5.23", "displaydoc", "fuel-core", @@ -9144,8 +9188,12 @@ dependencies = [ "fuel-core-types 0.40.2", "fuel-streams-core", "fuel-streams-executors", + "fuel-web-utils", "futures", "openssl", + "prometheus", + "serde", + "serde_json", "thiserror 2.0.9", "tokio", "tokio-util", @@ -9156,45 +9204,31 @@ dependencies = [ name = "sv-webserver" version = "0.0.16" dependencies = [ - "actix-cors", - "actix-server", - "actix-service", "actix-web", "actix-ws", "anyhow", "async-nats", - "bytestring", - "chrono", + "async-trait", "clap 4.5.23", - "derive_more 1.0.0", "displaydoc", "dotenvy", - "elasticsearch", "fuel-data-parser", "fuel-streams-core", "fuel-streams-nats", "fuel-streams-storage", + "fuel-web-utils", "futures", - "futures-util", - "jsonwebtoken 9.3.0", "num_cpus", "openssl", "parking_lot", "prometheus", - "rand", - "rust_decimal", "serde", "serde_json", - "serde_prometheus", - "sysinfo", "thiserror 2.0.9", "time", "tokio", "tracing", - "tracing-actix-web", "tracing-subscriber", - "url", - "urlencoding", "uuid", "validator", ] diff --git a/Cargo.toml b/Cargo.toml index 489ffafa..43383a9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ thiserror = "2.0" fuel-streams = { version = "0.0.16", path = "crates/fuel-streams" } fuel-data-parser = { version = "0.0.16", path = "crates/fuel-data-parser" } +fuel-web-utils = { version = "0.0.16", path = "crates/fuel-web-utils" } fuel-streams-core = { version = "0.0.16", path = "crates/fuel-streams-core" } fuel-streams-macros = { version = "0.0.16", path = "crates/fuel-streams-macros" } fuel-streams-nats = { version = "0.0.16", path = "crates/fuel-streams-nats" } diff --git a/Makefile b/Makefile index cd54fd14..59a5d530 100644 --- a/Makefile +++ b/Makefile @@ -230,9 +230,11 @@ run-publisher-testnet-profiling: run-consumer: NATS_CORE_URL="localhost:4222" run-consumer: NATS_PUBLISHER_URL="localhost:4223" +run-consumer: PORT="9003" run-consumer: cargo run --package sv-consumer --profile dev -- \ --nats-core-url $(NATS_CORE_URL) \ + --port $(PORT) \ --nats-publisher-url $(NATS_PUBLISHER_URL) # ------------------------------------------------------------ @@ -241,9 +243,11 @@ run-consumer: run-consumer: NATS_URL="localhost:4222" run-consumer: NATS_PUBLISHER_URL="localhost:4333" +run-consumer: PORT="9003" run-consumer: cargo run --package sv-consumer --profile dev -- \ --nats-url $(NATS_URL) \ + --port $(PORT) \ --nats-publisher-url $(NATS_PUBLISHER_URL) # ------------------------------------------------------------ diff --git a/crates/fuel-web-utils/Cargo.toml b/crates/fuel-web-utils/Cargo.toml new file mode 100644 index 00000000..d6176b14 --- /dev/null +++ b/crates/fuel-web-utils/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "fuel-web-utils" +description = "Fuel library for web utils" +authors = { workspace = true } +keywords = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +actix-cors = { workspace = true } +actix-server = { workspace = true } +actix-service = "2.0.2" +actix-web = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +chrono = { workspace = true } +derive_more = { version = "1.0", features = ["full"] } +displaydoc = { workspace = true } +dotenvy = { workspace = true } +elasticsearch = "8.15.0-alpha.1" +fuel-data-parser = { workspace = true } +fuel-streams-nats = { workspace = true, features = ["test-helpers"] } +futures = { workspace = true } +futures-util = { workspace = true } +jsonwebtoken = "9.3.0" +num_cpus = { workspace = true } +parking_lot = { version = "0.12", features = ["serde"] } +prometheus = { version = "0.13", features = ["process"] } +rand = { workspace = true } +rust_decimal = { version = "1.13" } +serde = { workspace = true } +serde_json = { workspace = true } +serde_prometheus = { version = "0.2" } +sysinfo = { version = "0.29" } +thiserror = "2.0" +time = { version = "0.3", features = ["serde"] } +tokio = { workspace = true } +tokio-util = "0.7.13" +tracing = { workspace = true } +tracing-actix-web = { workspace = true } +url = "2.5" +urlencoding = "2.1" +uuid = { version = "1.11.0", features = ["serde", "v4"] } + +# in an individual package Cargo.toml +[package.metadata.cargo-machete] +ignored = ["fuel-data-parser"] + +[features] +default = [] +test-helpers = [] diff --git a/crates/fuel-web-utils/src/lib.rs b/crates/fuel-web-utils/src/lib.rs new file mode 100644 index 00000000..7bea99ab --- /dev/null +++ b/crates/fuel-web-utils/src/lib.rs @@ -0,0 +1,15 @@ +pub mod server; +pub mod shutdown; +pub mod telemetry; + +use std::sync::LazyLock; + +pub static MAX_WORKERS: LazyLock = LazyLock::new(|| { + let available_cpus = num_cpus::get(); + let default_threads = 2 * available_cpus; + + dotenvy::var("MAX_WORKERS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(default_threads) +}); diff --git a/crates/fuel-web-utils/src/server/api.rs b/crates/fuel-web-utils/src/server/api.rs new file mode 100644 index 00000000..f3db8124 --- /dev/null +++ b/crates/fuel-web-utils/src/server/api.rs @@ -0,0 +1,210 @@ +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::Arc, +}; + +use actix_cors::Cors; +use actix_server::{Server, ServerHandle}; +use actix_web::{ + http::{self, Method}, + middleware::{Compress, Logger as ActixLogger}, + web::{self, ServiceConfig}, + App, + HttpServer, +}; +use tokio::task::JoinHandle; +use tracing_actix_web::TracingLogger; + +use super::{ + http::handlers::{get_health, get_metrics}, + state::StateProvider, +}; +use crate::MAX_WORKERS; + +const API_VERSION: &str = "v1"; + +pub fn with_prefixed_route(route: &str) -> String { + format!("/api/{}/{}", API_VERSION, route) +} + +type ConfigureRoutes = + Option>; + +pub struct ApiServerBuilder { + port: u16, + state: Arc, + configure_routes: ConfigureRoutes, +} + +impl ApiServerBuilder { + pub fn new(port: u16, state: T) -> Self { + Self { + port, + state: Arc::new(state), + configure_routes: None, + } + } + + /// Add dynamic routes to the server + pub fn with_dynamic_routes(mut self, configure: F) -> Self + where + F: Fn(&mut ServiceConfig) + Send + Sync + 'static, + { + self.configure_routes = Some(Arc::new(configure)); + self + } + + /// Build and run the server + pub fn build(self) -> anyhow::Result { + let server_addr = std::net::SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + self.port, + )); + let state = self.state.clone(); + let configure_routes = self.configure_routes.clone(); + + let server = HttpServer::new(move || { + let state = state.clone(); + + // Create CORS middleware + let cors = Cors::default() + .allow_any_origin() + .allowed_methods(vec![ + Method::GET, + Method::POST, + Method::PUT, + Method::OPTIONS, + Method::DELETE, + Method::PATCH, + Method::TRACE, + ]) + .allowed_headers(vec![ + http::header::AUTHORIZATION, + http::header::ACCEPT, + ]) + .allowed_header(http::header::CONTENT_TYPE) + .max_age(3600); + + App::new() + .app_data(web::Data::new(state)) + .wrap(ActixLogger::default()) + .wrap(TracingLogger::default()) + .wrap(Compress::default()) + .wrap(cors) + // Mandatory routes + .service( + web::resource(with_prefixed_route("health")).route(web::get().to(get_health::)), + ) + .service( + web::resource(with_prefixed_route("metrics")).route(web::get().to(get_metrics::)), + ) + // Optional custom routes + .configure(|cfg| { + if let Some(configure_routes) = configure_routes.as_ref() { + configure_routes(cfg); + } + }) + }) + .bind(server_addr)? + .workers(*MAX_WORKERS) // or any configurable value + .shutdown_timeout(20) + .run(); + + Ok(server) + } +} + +pub async fn spawn_web_server(server: Server) -> JoinHandle<()> { + tokio::spawn(async move { + tracing::info!("Starting actix server ..."); + if let Err(err) = server.await { + tracing::error!("Actix Web server error: {:?}", err); + } + }) +} + +pub async fn build_and_spawn_web_server< + T: StateProvider + Send + Sync + 'static, +>( + port: u16, + state: T, +) -> anyhow::Result { + let server = ApiServerBuilder::new(port, state).build()?; + let server_handle = server.handle(); + spawn_web_server(server).await; + Ok(server_handle) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use actix_web::{http, test, web, App, HttpResponse}; + + use crate::server::{ + api::with_prefixed_route, + state::{DefaultHealthResponse, DefaultServerState, StateProvider}, + }; + + #[actix_web::test] + async fn test_default_health_route() { + let state = DefaultServerState::new(); + let test_route = with_prefixed_route("health"); + + let app = test::init_service( + App::new().app_data(web::Data::new(state.clone())).route( + &test_route, + web::get().to( + |state: web::Data| async move { + if !state.is_healthy().await { + return HttpResponse::ServiceUnavailable() + .body("Service Unavailable"); + } + HttpResponse::Ok().json(state.get_health().await) + }, + ), + ), + ) + .await; + + let uptime = Duration::from_secs(2); + tokio::time::sleep(uptime).await; + + let req = test::TestRequest::get().uri(&test_route).to_request(); + let resp = test::call_service(&app, req).await; + + assert_eq!(resp.status(), http::StatusCode::OK); + + let result: DefaultHealthResponse = test::read_body_json(resp).await; + assert!(result.uptime >= uptime.as_secs()); + } + + #[actix_web::test] + async fn test_default_metrics_route() { + let state = DefaultServerState::new(); + let test_route = with_prefixed_route("metrics"); + + let app = test::init_service( + App::new().app_data(web::Data::new(state.clone())).route( + &test_route, + web::get().to( + |state: web::Data| async move { + HttpResponse::Ok().json(state.get_metrics().await) + }, + ), + ), + ) + .await; + + let uptime = Duration::from_secs(2); + tokio::time::sleep(uptime).await; + + let req = test::TestRequest::get().uri(&test_route).to_request(); + let resp = test::call_service(&app, req).await; + + assert_eq!(resp.status(), http::StatusCode::OK); + + let result: String = test::read_body_json(resp).await; + assert!(result.contains("uptime")); + } +} diff --git a/crates/fuel-web-utils/src/server/http/handlers.rs b/crates/fuel-web-utils/src/server/http/handlers.rs new file mode 100644 index 00000000..52e981f3 --- /dev/null +++ b/crates/fuel-web-utils/src/server/http/handlers.rs @@ -0,0 +1,24 @@ +use actix_web::{web, HttpResponse, Result}; + +use crate::server::state::StateProvider; + +pub async fn get_metrics( + state: web::Data, +) -> Result { + Ok(HttpResponse::Ok() + .content_type( + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ) + .body(state.get_metrics().await)) +} + +pub async fn get_health( + state: web::Data, +) -> Result { + if !state.is_healthy().await { + return Ok( + HttpResponse::ServiceUnavailable().body("Service Unavailable") + ); + } + Ok(HttpResponse::Ok().json(state.get_health().await)) +} diff --git a/crates/fuel-web-utils/src/server/http/mod.rs b/crates/fuel-web-utils/src/server/http/mod.rs new file mode 100644 index 00000000..c3d44956 --- /dev/null +++ b/crates/fuel-web-utils/src/server/http/mod.rs @@ -0,0 +1 @@ +pub mod handlers; diff --git a/crates/sv-webserver/src/server/auth.rs b/crates/fuel-web-utils/src/server/middlewares/auth/jwt.rs similarity index 100% rename from crates/sv-webserver/src/server/auth.rs rename to crates/fuel-web-utils/src/server/middlewares/auth/jwt.rs diff --git a/crates/fuel-web-utils/src/server/middlewares/auth/mod.rs b/crates/fuel-web-utils/src/server/middlewares/auth/mod.rs new file mode 100644 index 00000000..96a68b36 --- /dev/null +++ b/crates/fuel-web-utils/src/server/middlewares/auth/mod.rs @@ -0,0 +1,2 @@ +pub mod jwt; +pub mod transform; diff --git a/crates/sv-webserver/src/server/middlewares/auth.rs b/crates/fuel-web-utils/src/server/middlewares/auth/transform.rs similarity index 98% rename from crates/sv-webserver/src/server/middlewares/auth.rs rename to crates/fuel-web-utils/src/server/middlewares/auth/transform.rs index e53cd9ec..da23f140 100644 --- a/crates/sv-webserver/src/server/middlewares/auth.rs +++ b/crates/fuel-web-utils/src/server/middlewares/auth/transform.rs @@ -12,7 +12,7 @@ use actix_web::{ }; use futures_util::future::{ready, LocalBoxFuture, Ready}; -use crate::server::auth::authorize_request; +use super::jwt::authorize_request; pub struct JwtAuth { jwt_secret: String, diff --git a/crates/sv-webserver/src/server/middlewares/mod.rs b/crates/fuel-web-utils/src/server/middlewares/mod.rs similarity index 100% rename from crates/sv-webserver/src/server/middlewares/mod.rs rename to crates/fuel-web-utils/src/server/middlewares/mod.rs diff --git a/crates/fuel-web-utils/src/server/mod.rs b/crates/fuel-web-utils/src/server/mod.rs new file mode 100644 index 00000000..994ccf17 --- /dev/null +++ b/crates/fuel-web-utils/src/server/mod.rs @@ -0,0 +1,4 @@ +pub mod api; +pub mod http; +pub mod middlewares; +pub mod state; diff --git a/crates/fuel-web-utils/src/server/state.rs b/crates/fuel-web-utils/src/server/state.rs new file mode 100644 index 00000000..5863d885 --- /dev/null +++ b/crates/fuel-web-utils/src/server/state.rs @@ -0,0 +1,71 @@ +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +#[async_trait] +pub trait StateProvider: Send + Sync { + /// Returns if the server is healthy + async fn is_healthy(&self) -> bool; + + /// Returns the health information + async fn get_health(&self) -> serde_json::Value; + + /// Returns the metrics in a string format + async fn get_metrics(&self) -> String; +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DefaultHealthResponse { + pub uptime: u64, + pub is_healthy: bool, +} + +#[derive(Clone)] +pub struct DefaultServerState { + pub start_time: Instant, +} + +impl Default for DefaultServerState { + fn default() -> Self { + Self::new() + } +} + +impl DefaultServerState { + pub fn new() -> Self { + Self { + start_time: Instant::now(), + } + } + + pub async fn get_health(&self) -> DefaultHealthResponse { + DefaultHealthResponse { + uptime: self.uptime().as_secs(), + is_healthy: true, + } + } + + pub fn uptime(&self) -> Duration { + self.start_time.elapsed() + } +} + +#[async_trait] +impl StateProvider for DefaultServerState { + async fn is_healthy(&self) -> bool { + true + } + + async fn get_health(&self) -> serde_json::Value { + serde_json::to_value(DefaultHealthResponse { + uptime: self.uptime().as_secs(), + is_healthy: true, + }) + .unwrap_or(serde_json::json!({})) + } + + async fn get_metrics(&self) -> String { + format!("uptime: {}s", self.uptime().as_secs()) + } +} diff --git a/crates/sv-publisher/src/shutdown.rs b/crates/fuel-web-utils/src/shutdown.rs similarity index 79% rename from crates/sv-publisher/src/shutdown.rs rename to crates/fuel-web-utils/src/shutdown.rs index 6d66e7b1..19ce2fb1 100644 --- a/crates/sv-publisher/src/shutdown.rs +++ b/crates/fuel-web-utils/src/shutdown.rs @@ -1,7 +1,25 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use fuel_streams_nats::NatsClient; use tokio_util::sync::CancellationToken; +pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(90); + +pub async fn shutdown_nats_with_timeout(nats_client: &NatsClient) { + let _ = tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async { + tracing::info!("Flushing in-flight messages to nats ..."); + match nats_client.nats_client.flush().await { + Ok(_) => { + tracing::info!("Flushed all streams successfully!"); + } + Err(e) => { + tracing::error!("Failed to flush all streams: {:?}", e); + } + } + }) + .await; +} + #[derive(Clone)] pub struct ShutdownController { token: CancellationToken, diff --git a/crates/sv-webserver/src/telemetry/elastic_search.rs b/crates/fuel-web-utils/src/telemetry/elastic_search.rs similarity index 100% rename from crates/sv-webserver/src/telemetry/elastic_search.rs rename to crates/fuel-web-utils/src/telemetry/elastic_search.rs diff --git a/crates/fuel-web-utils/src/telemetry/metrics.rs b/crates/fuel-web-utils/src/telemetry/metrics.rs new file mode 100644 index 00000000..eb1f3db2 --- /dev/null +++ b/crates/fuel-web-utils/src/telemetry/metrics.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use prometheus::Registry; +use rand::{distributions::Alphanumeric, Rng}; + +#[async_trait] +pub trait TelemetryMetrics: Send + Sync + 'static { + fn registry(&self) -> &Registry; + + fn metrics(&self) -> Option + where + Self: std::marker::Sized; + + fn generate_random_prefix() -> String { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .filter(|c| c.is_ascii_alphabetic()) + .take(6) + .map(char::from) + .collect() + } + + fn gather_metrics(&self) -> String { + use prometheus::Encoder; + + let encoder = prometheus::TextEncoder::new(); + let mut buffer = Vec::new(); + + if let Err(e) = encoder.encode(&self.registry().gather(), &mut buffer) { + tracing::error!("could not encode custom metrics: {}", e); + } + + String::from_utf8(buffer).unwrap_or_default() + } +} diff --git a/crates/sv-webserver/src/telemetry/mod.rs b/crates/fuel-web-utils/src/telemetry/mod.rs similarity index 60% rename from crates/sv-webserver/src/telemetry/mod.rs rename to crates/fuel-web-utils/src/telemetry/mod.rs index db2a82db..e85d567a 100644 --- a/crates/sv-webserver/src/telemetry/mod.rs +++ b/crates/fuel-web-utils/src/telemetry/mod.rs @@ -13,34 +13,28 @@ use elastic_search::{ ElasticSearch, LogEntry, }; -use metrics::Metrics; +use metrics::TelemetryMetrics; // TODO: Consider using tokio's Rwlock instead use parking_lot::RwLock; use runtime::Runtime; use system::{System, SystemMetricsWrapper}; #[derive(Clone)] -pub struct Telemetry { +pub struct Telemetry { runtime: Arc, system: Arc>, - metrics: Option>, + metrics: Option>, elastic_search: Option>, } -impl Telemetry { +impl Telemetry { const DEDICATED_THREADS: usize = 2; - pub async fn new(prefix: Option) -> anyhow::Result> { + pub async fn new(metrics: Option) -> anyhow::Result> { let runtime = Runtime::new(Self::DEDICATED_THREADS, Duration::from_secs(20)); let system = Arc::new(RwLock::new(System::new().await)); - let metrics = if should_use_metrics() { - Some(Arc::new(Metrics::new(prefix)?)) - } else { - None - }; - let elastic_search = if should_use_elasticsearch() { Some(Arc::new(new_elastic_search().await?)) } else { @@ -50,7 +44,11 @@ impl Telemetry { Ok(Arc::new(Self { runtime: Arc::new(runtime), system, - metrics, + metrics: if should_use_metrics() { + metrics.map(Arc::new) + } else { + None + }, elastic_search, })) } @@ -78,6 +76,10 @@ impl Telemetry { Ok(()) } + pub fn base_metrics(&self) -> Option { + self.metrics.clone().and_then(|m| m.metrics()) + } + pub fn log_info(&self, message: &str) { let entry = LogEntry::new("INFO", message); self.maybe_elog(entry); @@ -97,70 +99,9 @@ impl Telemetry { } } - pub fn update_user_subscription_metrics( - &self, - user_id: uuid::Uuid, - subject_wildcard: &str, - ) { - self.maybe_use_metrics(|metrics| { - // Increment total user subscribed messages - metrics - .user_subscribed_messages - .with_label_values(&[ - user_id.to_string().as_str(), - subject_wildcard, - ]) - .inc(); - - // Increment throughput for the subscribed messages - metrics - .subs_messages_throughput - .with_label_values(&[subject_wildcard]) - .inc(); - }); - } - - pub fn update_error_metrics( - &self, - subject_wildcard: &str, - error_type: &str, - ) { - self.maybe_use_metrics(|metrics| { - metrics - .subs_messages_error_rates - .with_label_values(&[subject_wildcard, error_type]) - .inc(); - }); - } - - pub fn increment_subscriptions_count(&self) { - self.maybe_use_metrics(|metrics| { - metrics.total_ws_subs.with_label_values(&[]).inc(); - }); - } - - pub fn decrement_subscriptions_count(&self) { - self.maybe_use_metrics(|metrics| { - metrics.total_ws_subs.with_label_values(&[]).inc(); - }); - } - - pub fn update_unsubscribed( - &self, - user_id: uuid::Uuid, - subject_wildcard: &str, - ) { - self.maybe_use_metrics(|metrics| { - metrics - .user_subscribed_messages - .with_label_values(&[&user_id.to_string(), subject_wildcard]) - .dec(); - }); - } - pub fn maybe_use_metrics(&self, f: F) where - F: Fn(&Metrics), + F: Fn(&M), { if let Some(metrics) = &self.metrics { f(metrics); @@ -171,29 +112,15 @@ impl Telemetry { use prometheus::Encoder; let encoder = prometheus::TextEncoder::new(); + // if no metrics, return if self.metrics.is_none() { return "".to_string(); } - // fetch all measured metrics - let mut buffer = Vec::new(); - if let Err(e) = encoder.encode( - &self.metrics.as_ref().unwrap().registry.gather(), - &mut buffer, - ) { - tracing::error!("could not encode custom metrics: {}", e); - }; - let mut res = match String::from_utf8(buffer.clone()) { - Ok(v) => v, - Err(e) => { - tracing::error!( - "custom metrics could not be from_utf8'd: {}", - e - ); - String::default() - } - }; - buffer.clear(); + let mut result = String::new(); + if let Some(metrics) = &self.metrics { + result.push_str(&metrics.gather_metrics()); + } let mut buffer = Vec::new(); if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) { @@ -211,7 +138,7 @@ impl Telemetry { }; buffer.clear(); - res.push_str(&res_custom); + result.push_str(&res_custom); // now fetch and add system metrics let system_metrics = match self.system.read().metrics() { @@ -237,9 +164,9 @@ impl Telemetry { String::default() } }; - res.push_str(&system_metrics); + result.push_str(&system_metrics); - res + result } } diff --git a/crates/sv-webserver/src/telemetry/runtime.rs b/crates/fuel-web-utils/src/telemetry/runtime.rs similarity index 100% rename from crates/sv-webserver/src/telemetry/runtime.rs rename to crates/fuel-web-utils/src/telemetry/runtime.rs diff --git a/crates/sv-webserver/src/telemetry/system.rs b/crates/fuel-web-utils/src/telemetry/system.rs similarity index 100% rename from crates/sv-webserver/src/telemetry/system.rs rename to crates/fuel-web-utils/src/telemetry/system.rs diff --git a/crates/sv-consumer/Cargo.toml b/crates/sv-consumer/Cargo.toml index ca105b03..21f20c74 100644 --- a/crates/sv-consumer/Cargo.toml +++ b/crates/sv-consumer/Cargo.toml @@ -18,17 +18,20 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } async-nats = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true } displaydoc = { workspace = true } dotenvy = { workspace = true } fuel-core = { workspace = true, default-features = false, features = ["p2p", "relayer", "rocksdb"] } fuel-streams-core = { workspace = true, features = ["test-helpers"] } fuel-streams-executors = { workspace = true, features = ["test-helpers"] } +fuel-web-utils = { workspace = true } futures = { workspace = true } hex = { workspace = true } num_cpus = { workspace = true } +prometheus = { version = "0.13", features = ["process"] } +serde = { workspace = true } serde_json = { workspace = true } -sv-publisher = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = "0.7.13" diff --git a/crates/sv-consumer/src/cli.rs b/crates/sv-consumer/src/cli.rs index 6b51f6bf..b7e2b709 100644 --- a/crates/sv-consumer/src/cli.rs +++ b/crates/sv-consumer/src/cli.rs @@ -2,6 +2,15 @@ use clap::Parser; #[derive(Clone, Parser)] pub struct Cli { + /// API port number + #[arg( + long, + value_name = "PORT", + env = "PORT", + default_value = "9003", + help = "Port number for the API server" + )] + pub port: u16, /// Fuel Network to connect to. #[arg( long, @@ -11,6 +20,7 @@ pub struct Cli { help = "NATS URL to connect to." )] pub nats_url: String, + /// Nats publisher URL #[arg( long, value_name = "NATS_PUBLISHER_URL", @@ -19,4 +29,12 @@ pub struct Cli { help = "NATS Publisher URL to connect to." )] pub nats_publisher_url: String, + /// Use metrics + #[arg( + long, + env = "USE_METRICS", + default_value = "false", + help = "Enable metrics" + )] + pub use_metrics: bool, } diff --git a/crates/sv-consumer/src/lib.rs b/crates/sv-consumer/src/lib.rs index 1cddb9ed..3fa43b53 100644 --- a/crates/sv-consumer/src/lib.rs +++ b/crates/sv-consumer/src/lib.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use fuel_streams_core::prelude::*; pub mod cli; +pub mod metrics; +pub mod state; #[derive(Debug, Clone, Default)] pub enum Client { diff --git a/crates/sv-consumer/src/main.rs b/crates/sv-consumer/src/main.rs index 34a2bece..0084784e 100644 --- a/crates/sv-consumer/src/main.rs +++ b/crates/sv-consumer/src/main.rs @@ -15,9 +15,13 @@ use clap::Parser; use displaydoc::Display as DisplayDoc; use fuel_streams_core::prelude::*; use fuel_streams_executors::*; +use fuel_web_utils::{ + server::api::build_and_spawn_web_server, + shutdown::{shutdown_nats_with_timeout, ShutdownController}, + telemetry::Telemetry, +}; use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; -use sv_consumer::{cli::Cli, Client}; -use sv_publisher::shutdown::ShutdownController; +use sv_consumer::{cli::Cli, state::ServerState, Client}; use tokio_util::sync::CancellationToken; use tracing::level_filters::LevelFilter; use tracing_subscriber::fmt::time; @@ -46,6 +50,10 @@ pub enum ConsumerError { Semaphore(#[from] tokio::sync::AcquireError), /// Failed to setup storage: {0} Storage(#[from] fuel_streams_core::storage::StorageError), + /// Failed to start telemetry + TelemetryStart, + /// Failed to start web server + WebServerStart, } #[tokio::main] @@ -145,8 +153,25 @@ async fn process_messages( let storage = setup_storage().await?; let (_, publisher_stream) = FuelStreams::setup_all(&core_client, &publisher_client, &storage).await; - let fuel_streams: Arc = publisher_stream.arc(); + + let telemetry = Telemetry::new(None) + .await + .map_err(|_| ConsumerError::TelemetryStart)?; + telemetry + .start() + .await + .map_err(|_| ConsumerError::TelemetryStart)?; + + let server_state = ServerState::new( + publisher_client.clone(), + Arc::clone(&telemetry), + Arc::clone(&fuel_streams), + ); + let server_handle = build_and_spawn_web_server(cli.port, server_state) + .await + .map_err(|_| ConsumerError::WebServerStart)?; + let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); while !token.is_cancelled() { let mut messages = @@ -195,6 +220,11 @@ async fn process_messages( log_task(results, start_time, end_time, payload); } } + tracing::info!("Stopping actix server ..."); + server_handle.stop(true).await; + tracing::info!("Actix server stopped. Goodbye!"); + shutdown_nats_with_timeout(&publisher_client).await; + Ok(()) } diff --git a/crates/sv-consumer/src/metrics.rs b/crates/sv-consumer/src/metrics.rs new file mode 100644 index 00000000..36ec02c8 --- /dev/null +++ b/crates/sv-consumer/src/metrics.rs @@ -0,0 +1,39 @@ +use async_trait::async_trait; +use fuel_web_utils::telemetry::metrics::TelemetryMetrics; +use prometheus::Registry; + +#[derive(Clone, Debug)] +pub struct Metrics { + // TODO: add more metrics + pub registry: Registry, +} + +impl Default for Metrics { + fn default() -> Self { + Metrics::new(None).expect("Failed to create default Metrics") + } +} + +#[async_trait] +impl TelemetryMetrics for Metrics { + fn registry(&self) -> &Registry { + &self.registry + } + + fn metrics(&self) -> Option { + Some(self.clone()) + } +} + +impl Metrics { + pub fn new_with_random_prefix() -> anyhow::Result { + Metrics::new(Some(Metrics::generate_random_prefix())) + } + + pub fn new(prefix: Option) -> anyhow::Result { + let registry = + Registry::new_custom(prefix, None).expect("registry to be created"); + + Ok(Self { registry }) + } +} diff --git a/crates/sv-consumer/src/state.rs b/crates/sv-consumer/src/state.rs new file mode 100644 index 00000000..ef373d86 --- /dev/null +++ b/crates/sv-consumer/src/state.rs @@ -0,0 +1,121 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use async_nats::jetstream::stream::State; +use async_trait::async_trait; +use fuel_streams_core::{nats::NatsClient, FuelStreamsExt}; +use fuel_web_utils::{server::state::StateProvider, telemetry::Telemetry}; +use serde::{Deserialize, Serialize}; + +use crate::metrics::Metrics; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct StreamInfo { + consumers: Vec, + state: StreamState, + stream_name: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +pub struct StreamState { + /// The number of messages contained in this stream + pub messages: u64, + /// The number of bytes of all messages contained in this stream + pub bytes: u64, + /// The lowest sequence number still present in this stream + #[serde(rename = "first_seq")] + pub first_sequence: u64, + /// The time associated with the oldest message still present in this stream + #[serde(rename = "first_ts")] + pub first_timestamp: i64, + /// The last sequence number assigned to a message in this stream + #[serde(rename = "last_seq")] + pub last_sequence: u64, + /// The time that the last message was received by this stream + #[serde(rename = "last_ts")] + pub last_timestamp: i64, + /// The number of consumers configured to consume this stream + pub consumer_count: usize, +} + +impl From for StreamState { + fn from(state: State) -> Self { + StreamState { + messages: state.messages, + bytes: state.bytes, + first_sequence: state.first_sequence, + first_timestamp: state.first_timestamp.unix_timestamp(), + last_sequence: state.last_sequence, + last_timestamp: state.last_timestamp.unix_timestamp(), + consumer_count: state.consumer_count, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct HealthResponse { + pub uptime_secs: u64, + pub is_healthy: bool, + pub streams_info: Vec, +} + +pub struct ServerState { + pub start_time: Instant, + pub nats_client: Arc, + pub telemetry: Arc>, + pub fuel_streams: Arc, +} + +impl ServerState { + pub fn new( + nats_client: Arc, + telemetry: Arc>, + fuel_streams: Arc, + ) -> Self { + Self { + start_time: Instant::now(), + nats_client, + telemetry, + fuel_streams, + } + } + + pub fn uptime(&self) -> Duration { + self.start_time.elapsed() + } +} + +#[async_trait] +impl StateProvider for ServerState { + async fn is_healthy(&self) -> bool { + self.nats_client.is_connected() + } + + async fn get_health(&self) -> serde_json::Value { + let streams_info = self + .fuel_streams + .get_consumers_and_state() + .await + .unwrap_or_default() + .into_iter() + .map(|res| StreamInfo { + consumers: res.1, + state: res.2.into(), + stream_name: res.0, + }) + .collect::>(); + + let resp = HealthResponse { + uptime_secs: self.uptime().as_secs(), + is_healthy: self.is_healthy().await, + streams_info, + }; + serde_json::to_value(resp).unwrap_or(serde_json::json!({})) + } + + async fn get_metrics(&self) -> String { + self.telemetry.get_metrics().await + } +} diff --git a/crates/sv-publisher/Cargo.toml b/crates/sv-publisher/Cargo.toml index b6fee200..a5098ecb 100644 --- a/crates/sv-publisher/Cargo.toml +++ b/crates/sv-publisher/Cargo.toml @@ -18,6 +18,7 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } async-nats = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true } displaydoc = { workspace = true } fuel-core = { workspace = true, default-features = false, features = ["p2p", "relayer", "rocksdb"] } @@ -29,7 +30,11 @@ fuel-core-bin = { workspace = true, default-features = false, features = [ fuel-core-types = { workspace = true, default-features = false, features = ["std", "serde"] } fuel-streams-core = { workspace = true, features = ["test-helpers"] } fuel-streams-executors = { workspace = true, features = ["test-helpers"] } +fuel-web-utils = { workspace = true } futures = { workspace = true } +prometheus = { version = "0.13", features = ["process"] } +serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = "0.7.13" diff --git a/crates/sv-publisher/src/cli.rs b/crates/sv-publisher/src/cli.rs index 4573fedd..35ab6a36 100644 --- a/crates/sv-publisher/src/cli.rs +++ b/crates/sv-publisher/src/cli.rs @@ -9,6 +9,15 @@ use clap::Parser; /// - `fuel_core_config`: Configuration for the Fuel Core service, parsed using a flattened command. #[derive(Clone, Parser)] pub struct Cli { + /// API port number + #[arg( + long, + value_name = "PORT", + env = "PORT", + default_value = "9003", + help = "Port number for the API server" + )] + pub port: u16, /// Flattened command structure for Fuel Core configuration. #[command(flatten)] pub fuel_core_config: fuel_core_bin::cli::run::Command, @@ -21,4 +30,12 @@ pub struct Cli { help = "NATS URL to connect to." )] pub nats_url: String, + /// Use metrics + #[arg( + long, + env = "USE_METRICS", + default_value = "false", + help = "Enable metrics" + )] + pub use_metrics: bool, } diff --git a/crates/sv-publisher/src/lib.rs b/crates/sv-publisher/src/lib.rs index 5bf4a4b0..8575ea14 100644 --- a/crates/sv-publisher/src/lib.rs +++ b/crates/sv-publisher/src/lib.rs @@ -1,2 +1,3 @@ pub mod cli; -pub mod shutdown; +pub mod metrics; +pub mod state; diff --git a/crates/sv-publisher/src/main.rs b/crates/sv-publisher/src/main.rs index 63bbe695..d9dc54aa 100644 --- a/crates/sv-publisher/src/main.rs +++ b/crates/sv-publisher/src/main.rs @@ -10,8 +10,13 @@ use displaydoc::Display as DisplayDoc; use fuel_core_types::blockchain::SealedBlock; use fuel_streams_core::prelude::*; use fuel_streams_executors::*; +use fuel_web_utils::{ + server::api::build_and_spawn_web_server, + shutdown::{shutdown_nats_with_timeout, ShutdownController}, + telemetry::Telemetry, +}; use futures::StreamExt; -use sv_publisher::{cli::Cli, shutdown::ShutdownController}; +use sv_publisher::{cli::Cli, metrics::Metrics, state::ServerState}; use thiserror::Error; use tokio_util::sync::CancellationToken; @@ -30,8 +35,17 @@ async fn main() -> anyhow::Result<()> { let fuel_core: Arc = FuelCore::new(config).await?; fuel_core.start().await?; + let telemetry = Telemetry::new(None).await?; + telemetry.start().await?; + let storage = setup_storage().await?; let nats_client = setup_nats(&cli.nats_url).await?; + + let server_state = + ServerState::new(nats_client.clone(), Arc::clone(&telemetry)); + let server_handle = + build_and_spawn_web_server(cli.port, server_state).await?; + let last_block_height = Arc::new(fuel_core.get_latest_block_height()?); let last_published = Arc::new(find_last_published_height(&nats_client, &storage).await?); @@ -49,12 +63,14 @@ async fn main() -> anyhow::Result<()> { last_block_height, last_published, shutdown.token().clone(), + Arc::clone(&telemetry), ); let live = process_live_blocks( &nats_client.jetstream, fuel_core.clone(), shutdown.token().clone(), + Arc::clone(&telemetry) ); tokio::join!(historical, live) @@ -64,7 +80,11 @@ async fn main() -> anyhow::Result<()> { } _ = shutdown.wait_for_shutdown() => { tracing::info!("Shutdown signal received, waiting for processing to complete..."); - fuel_core.stop().await + fuel_core.stop().await; + tracing::info!("Stopping actix server ..."); + server_handle.stop(true).await; + tracing::info!("Actix server stopped. Goodbye!"); + shutdown_nats_with_timeout(&nats_client).await; } } @@ -138,6 +158,7 @@ fn process_historical_blocks( last_block_height: Arc, last_published_height: Arc, token: CancellationToken, + telemetry: Arc>, ) -> tokio::task::JoinHandle<()> { let jetstream = nats_client.jetstream.clone(); tokio::spawn(async move { @@ -153,11 +174,18 @@ fn process_historical_blocks( let fuel_core = fuel_core.clone(); let sealed_block = fuel_core.get_sealed_block_by_height(height); let sealed_block = Arc::new(sealed_block); + let telemetry = telemetry.clone(); async move { - publish_block(&jetstream, &fuel_core, &sealed_block).await + publish_block( + &jetstream, + &fuel_core, + &sealed_block, + telemetry, + ) + .await } }) - .buffer_unordered(100) + .buffered(100) .take_until(token.cancelled()) .collect::>() .await; @@ -168,6 +196,7 @@ async fn process_live_blocks( jetstream: &Context, fuel_core: Arc, token: CancellationToken, + telemetry: Arc>, ) -> Result<(), LiveBlockProcessingError> { let mut subscription = fuel_core.blocks_subscription(); while let Ok(data) = subscription.recv().await { @@ -175,7 +204,8 @@ async fn process_live_blocks( break; } let sealed_block = Arc::new(data.sealed_block.clone()); - publish_block(jetstream, &fuel_core, &sealed_block).await?; + publish_block(jetstream, &fuel_core, &sealed_block, telemetry.clone()) + .await?; } Ok(()) } @@ -194,13 +224,16 @@ async fn publish_block( jetstream: &Context, fuel_core: &Arc, sealed_block: &Arc, + telemetry: Arc>, ) -> Result<(), PublishError> { let metadata = Metadata::new(fuel_core, sealed_block); let fuel_core = Arc::clone(fuel_core); let payload = BlockPayload::new(fuel_core, sealed_block, &metadata)?; + let encoded_payload = payload.encode().await?; + let payload_size = encoded_payload.len(); let publish = Publish::build() .message_id(payload.message_id()) - .payload(payload.encode().await?.into()); + .payload(encoded_payload.into()); jetstream .send_publish(payload.subject(), publish) @@ -209,6 +242,11 @@ async fn publish_block( .await .map_err(PublishError::NatsPublish)?; + if let Some(metrics) = telemetry.base_metrics() { + metrics + .update_publisher_success_metrics(&payload.subject(), payload_size); + } + tracing::info!("New block submitted: {}", payload.block_height()); Ok(()) } diff --git a/crates/sv-publisher/src/metrics.rs b/crates/sv-publisher/src/metrics.rs new file mode 100644 index 00000000..9b89a7d9 --- /dev/null +++ b/crates/sv-publisher/src/metrics.rs @@ -0,0 +1,187 @@ +use async_trait::async_trait; +use fuel_web_utils::telemetry::metrics::TelemetryMetrics; +use prometheus::{ + register_histogram_vec, + register_int_counter_vec, + register_int_gauge_vec, + HistogramVec, + IntCounterVec, + IntGaugeVec, + Registry, +}; + +#[derive(Clone, Debug)] +pub struct Metrics { + pub registry: Registry, + pub total_subs: IntGaugeVec, + pub total_published_messages: IntCounterVec, + pub total_failed_messages: IntCounterVec, + pub last_published_block_height: IntGaugeVec, + pub last_published_block_timestamp: IntGaugeVec, + pub published_messages_throughput: IntCounterVec, + pub publishing_latency_histogram: HistogramVec, + pub message_size_histogram: HistogramVec, + pub error_rates: IntCounterVec, +} + +impl Default for Metrics { + fn default() -> Self { + Metrics::new(None).expect("Failed to create default Metrics") + } +} + +#[async_trait] +impl TelemetryMetrics for Metrics { + fn registry(&self) -> &Registry { + &self.registry + } + + fn metrics(&self) -> Option { + Some(self.clone()) + } +} + +impl Metrics { + pub fn new_with_random_prefix() -> anyhow::Result { + Metrics::new(Some(Metrics::generate_random_prefix())) + } + + pub fn new(prefix: Option) -> anyhow::Result { + let metric_prefix = prefix + .clone() + .map(|p| format!("{}_", p)) + .unwrap_or_default(); + + let total_subs = register_int_gauge_vec!( + format!("{}publisher_metrics_total_subscriptions", metric_prefix), + "A metric counting the number of active subscriptions", + &[], + ) + .expect("metric must be created"); + + let total_published_messages = register_int_counter_vec!( + format!( + "{}publisher_metrics_total_published_messages", + metric_prefix + ), + "A metric counting the number of published messages", + &[], + ) + .expect("metric must be created"); + + let total_failed_messages = register_int_counter_vec!( + format!("{}publisher_metrics_total_failed_messages", metric_prefix), + "A metric counting the number of unpublished and failed messages", + &[], + ) + .expect("metric must be created"); + + let last_published_block_height = register_int_gauge_vec!( + format!( + "{}publisher_metrics_last_published_block_height", + metric_prefix + ), + "A metric that represents the last published block height", + &[], + ) + .expect("metric must be created"); + + let last_published_block_timestamp = register_int_gauge_vec!( + format!( + "{}publisher_metrics_last_published_block_timestamp", + metric_prefix + ), + "A metric that represents the last published transaction timestamp", + &[], + ) + .expect("metric must be created"); + + let published_messages_throughput = register_int_counter_vec!( + format!("{}publisher_metrics_published_messages_throughput", metric_prefix), + "A metric counting the number of published messages per subject wildcard", + &["subject_wildcard"], + ) + .expect("metric must be created"); + + // New histogram metric for block latency + let publishing_latency_histogram = register_histogram_vec!( + format!("{}publisher_metrics_block_latency_seconds", metric_prefix), + "Histogram of latencies between receiving and publishing a block", + &["subject_wildcard"], + // buckets for latency measurement (e.g., 0.1s, 0.5s, 1s, 5s, 10s) + vec![0.1, 0.5, 1.0, 5.0, 10.0], + ) + .expect("metric must be created"); + + let message_size_histogram = register_histogram_vec!( + format!("{}publisher_metrics_message_size_bytes", metric_prefix), + "Histogram of message sizes in bytes", + &["subject_wildcard"], + vec![100.0, 500.0, 1000.0, 5000.0, 10000.0, 100000.0, 1000000.0] + ) + .expect("metric must be created"); + + let error_rates = register_int_counter_vec!( + format!("{}publisher_metrics_error_rates", metric_prefix), + "A metric counting errors or failures during message processing", + &["subject_wildcard", "error_type"], + ) + .expect("metric must be created"); + + let registry = + Registry::new_custom(prefix, None).expect("registry to be created"); + registry.register(Box::new(total_subs.clone()))?; + registry.register(Box::new(total_published_messages.clone()))?; + registry.register(Box::new(total_failed_messages.clone()))?; + registry.register(Box::new(last_published_block_height.clone()))?; + registry.register(Box::new(last_published_block_timestamp.clone()))?; + registry.register(Box::new(published_messages_throughput.clone()))?; + registry.register(Box::new(publishing_latency_histogram.clone()))?; + registry.register(Box::new(message_size_histogram.clone()))?; + registry.register(Box::new(error_rates.clone()))?; + + Ok(Self { + registry, + total_subs, + total_published_messages, + total_failed_messages, + last_published_block_height, + last_published_block_timestamp, + published_messages_throughput, + publishing_latency_histogram, + message_size_histogram, + error_rates, + }) + } + + pub fn update_publisher_success_metrics( + &self, + subject: &str, + published_data_size: usize, + ) { + // Update message size histogram + self.message_size_histogram + .with_label_values(&[subject]) + .observe(published_data_size as f64); + + // Increment total published messages + self.total_published_messages.with_label_values(&[]).inc(); + + // Increment throughput for the published messages + self.published_messages_throughput + .with_label_values(&[subject]) + .inc(); + } + + pub fn update_publisher_error_metrics(&self, subject: &str, error: &str) { + self.error_rates.with_label_values(&[subject, error]).inc(); + } + + pub fn record_streams_count(&self, count: usize) { + self.total_subs.with_label_values(&[]).set(count as i64); + } + + pub fn record_failed_publishing(&self) { + self.total_failed_messages.with_label_values(&[]).inc(); + } +} diff --git a/crates/sv-publisher/src/state.rs b/crates/sv-publisher/src/state.rs new file mode 100644 index 00000000..6d6e86ed --- /dev/null +++ b/crates/sv-publisher/src/state.rs @@ -0,0 +1,59 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use fuel_streams_core::nats::NatsClient; +use fuel_web_utils::{server::state::StateProvider, telemetry::Telemetry}; +use serde::{Deserialize, Serialize}; + +use crate::metrics::Metrics; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct HealthResponse { + pub uptime_secs: u64, + pub is_healthy: bool, +} + +pub struct ServerState { + pub start_time: Instant, + pub nats_client: NatsClient, + pub telemetry: Arc>, +} + +impl ServerState { + pub fn new( + nats_client: NatsClient, + telemetry: Arc>, + ) -> Self { + Self { + start_time: Instant::now(), + nats_client, + telemetry, + } + } + + pub fn uptime(&self) -> Duration { + self.start_time.elapsed() + } +} + +#[async_trait] +impl StateProvider for ServerState { + async fn is_healthy(&self) -> bool { + self.nats_client.is_connected() + } + + async fn get_health(&self) -> serde_json::Value { + let resp = HealthResponse { + uptime_secs: self.uptime().as_secs(), + is_healthy: self.is_healthy().await, + }; + serde_json::to_value(resp).unwrap_or(serde_json::json!({})) + } + + async fn get_metrics(&self) -> String { + self.telemetry.get_metrics().await + } +} diff --git a/crates/sv-webserver/Cargo.toml b/crates/sv-webserver/Cargo.toml index 81b60db4..26c00403 100644 --- a/crates/sv-webserver/Cargo.toml +++ b/crates/sv-webserver/Cargo.toml @@ -15,44 +15,30 @@ name = "sv-webserver" path = "src/main.rs" [dependencies] -actix-cors = { workspace = true } -actix-server = { workspace = true } -actix-service = "2.0.2" actix-web = { workspace = true } actix-ws = "0.3.0" anyhow = { workspace = true } async-nats = { workspace = true } -bytestring = "1.4.0" -chrono = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true } -derive_more = { version = "1.0", features = ["full"] } displaydoc = { workspace = true } dotenvy = { workspace = true } -elasticsearch = "8.15.0-alpha.1" fuel-data-parser = { workspace = true } fuel-streams-core = { workspace = true, features = ["test-helpers"] } fuel-streams-nats = { workspace = true, features = ["test-helpers"] } fuel-streams-storage = { workspace = true, features = ["test-helpers"] } +fuel-web-utils = { workspace = true } futures = { workspace = true } -futures-util = { workspace = true } -jsonwebtoken = "9.3.0" num_cpus = { workspace = true } parking_lot = { version = "0.12", features = ["serde"] } prometheus = { version = "0.13", features = ["process"] } -rand = { workspace = true } -rust_decimal = { version = "1.13" } serde = { workspace = true } serde_json = { workspace = true } -serde_prometheus = { version = "0.2" } -sysinfo = { version = "0.29" } thiserror = "2.0" time = { version = "0.3", features = ["serde"] } tokio = { workspace = true } tracing = { workspace = true } -tracing-actix-web = { workspace = true } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -url = "2.5" -urlencoding = "2.1" uuid = { version = "1.11.0", features = ["serde", "v4"] } validator = { version = "0.19.0", features = ["derive"] } diff --git a/crates/sv-webserver/src/lib.rs b/crates/sv-webserver/src/lib.rs index ed3faedc..eef8857c 100644 --- a/crates/sv-webserver/src/lib.rs +++ b/crates/sv-webserver/src/lib.rs @@ -1,7 +1,7 @@ pub mod cli; pub mod config; +pub mod metrics; pub mod server; -pub mod telemetry; use std::sync::LazyLock; diff --git a/crates/sv-webserver/src/main.rs b/crates/sv-webserver/src/main.rs index 253f6fb3..9b908dac 100644 --- a/crates/sv-webserver/src/main.rs +++ b/crates/sv-webserver/src/main.rs @@ -1,6 +1,7 @@ +use fuel_web_utils::server::api::{spawn_web_server, ApiServerBuilder}; use sv_webserver::{ config::Config, - server::{api::create_api, context::Context, state::ServerState}, + server::{state::ServerState, svc::svc_handlers}, }; use tracing::level_filters::LevelFilter; use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; @@ -22,20 +23,13 @@ async fn main() -> anyhow::Result<()> { } let config = Config::load()?; - let context = Context::new(&config).await?; - let state = ServerState::new(context).await; - let server = create_api(&config, state)?; + let server_state = ServerState::new(&config).await?; + let server = ApiServerBuilder::new(config.api.port, server_state.clone()) + .with_dynamic_routes(svc_handlers(server_state)) + .build()?; let server_handle = server.handle(); - - // spawn the server in the background - let jh = tokio::spawn(async move { - tracing::info!("Starting actix server ..."); - if let Err(err) = server.await { - tracing::error!("Actix Web server error: {:?}", err); - } - }); - - let _ = tokio::join!(jh); + let server_task = spawn_web_server(server).await; + let _ = tokio::join!(server_task); // Await the Actix server shutdown tracing::info!("Stopping actix server ..."); diff --git a/crates/sv-webserver/src/telemetry/metrics.rs b/crates/sv-webserver/src/metrics.rs similarity index 76% rename from crates/sv-webserver/src/telemetry/metrics.rs rename to crates/sv-webserver/src/metrics.rs index 5a65f1a0..0b23c56b 100644 --- a/crates/sv-webserver/src/telemetry/metrics.rs +++ b/crates/sv-webserver/src/metrics.rs @@ -1,3 +1,5 @@ +use async_trait::async_trait; +use fuel_web_utils::telemetry::metrics::TelemetryMetrics; use prometheus::{ register_int_counter_vec, register_int_gauge_vec, @@ -5,7 +7,6 @@ use prometheus::{ IntGaugeVec, Registry, }; -use rand::{distributions::Alphanumeric, Rng}; #[derive(Clone, Debug)] pub struct Metrics { @@ -22,16 +23,18 @@ impl Default for Metrics { } } -impl Metrics { - pub fn generate_random_prefix() -> String { - rand::thread_rng() - .sample_iter(&Alphanumeric) - .filter(|c| c.is_ascii_alphabetic()) - .take(6) - .map(char::from) - .collect() +#[async_trait] +impl TelemetryMetrics for Metrics { + fn registry(&self) -> &Registry { + &self.registry } + fn metrics(&self) -> Option { + Some(self.clone()) + } +} + +impl Metrics { pub fn new_with_random_prefix() -> anyhow::Result { Metrics::new(Some(Metrics::generate_random_prefix())) } @@ -89,6 +92,63 @@ impl Metrics { subs_messages_error_rates, }) } + + pub fn update_user_subscription_metrics( + &self, + user_id: uuid::Uuid, + subject_wildcard: &str, + ) { + // Increment total user subscribed messages + self.user_subscribed_messages + .with_label_values(&[ + user_id.to_string().as_str(), + subject_wildcard, + ]) + .inc(); + + // Increment throughput for the subscribed messages + self.subs_messages_throughput + .with_label_values(&[subject_wildcard]) + .inc(); + } + + pub fn update_error_metrics( + &self, + subject_wildcard: &str, + error_type: &str, + ) { + self.subs_messages_error_rates + .with_label_values(&[subject_wildcard, error_type]) + .inc(); + } + + pub fn increment_subscriptions_count(&self) { + self.total_ws_subs.with_label_values(&[]).inc(); + } + + pub fn decrement_subscriptions_count(&self) { + self.total_ws_subs.with_label_values(&[]).inc(); + } + + pub fn update_unsubscribed( + &self, + user_id: uuid::Uuid, + subject_wildcard: &str, + ) { + self.user_subscribed_messages + .with_label_values(&[&user_id.to_string(), subject_wildcard]) + .dec(); + } + + pub fn update_subscribed( + &self, + user_id: uuid::Uuid, + subject_wildcard: &str, + ) { + self.user_subscribed_messages + .with_label_values(&[&user_id.to_string(), subject_wildcard]) + .inc(); + } } #[cfg(test)] diff --git a/crates/sv-webserver/src/server/api.rs b/crates/sv-webserver/src/server/api.rs deleted file mode 100644 index 00876a1d..00000000 --- a/crates/sv-webserver/src/server/api.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::net::{Ipv4Addr, SocketAddrV4}; - -use actix_cors::Cors; -use actix_server::Server; -use actix_web::{ - http::{self, Method}, - middleware::{Compress, Logger as ActixLogger}, - web, - App, - HttpServer, -}; -use tracing_actix_web::TracingLogger; - -use super::{ - http::handlers::{get_health, get_metrics, request_jwt}, - middlewares::auth::JwtAuth, - state::ServerState, - ws::socket::get_ws, -}; -use crate::{config::Config, STREAMER_MAX_WORKERS}; - -const API_VERSION: &str = "v1"; - -fn with_prefixed_route(route: &str) -> String { - format!("/api/{}/{}", API_VERSION, route) -} - -pub fn create_api( - config: &Config, - state: ServerState, -) -> anyhow::Result { - let server_addr = std::net::SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::UNSPECIFIED, - config.api.port, - )); - let jwt_secret = config.auth.jwt_secret.clone(); - let server = HttpServer::new(move || { - let jwt_secret = jwt_secret.clone(); - // create cors - let cors = Cors::default() - .allow_any_origin() - .allowed_methods(vec![ - Method::GET, - Method::POST, - Method::PUT, - Method::OPTIONS, - Method::DELETE, - Method::PATCH, - Method::TRACE, - ]) - .allowed_headers(vec![ - http::header::AUTHORIZATION, - http::header::ACCEPT, - ]) - .allowed_header(http::header::CONTENT_TYPE) - .max_age(3600); - - App::new() - .app_data(web::Data::new(state.clone())) - .wrap(ActixLogger::default()) - .wrap(TracingLogger::default()) - .wrap(Compress::default()) - .wrap(cors) - .service( - web::resource(with_prefixed_route("health")) - .route(web::get().to(get_health)), - ) - .service( - web::resource(with_prefixed_route("metrics")) - .route(web::get().to(get_metrics)), - ) - .service( - web::resource(with_prefixed_route("jwt")) - .route(web::post().to(request_jwt)), - ) - .service( - web::resource(with_prefixed_route("ws")) - .wrap(JwtAuth::new(jwt_secret)) - .route(web::get().to(get_ws)), - ) - }) - .bind(server_addr)? - .workers(*STREAMER_MAX_WORKERS) - .shutdown_timeout(20) - .run(); - - Ok(server) -} diff --git a/crates/sv-webserver/src/server/context.rs b/crates/sv-webserver/src/server/context.rs deleted file mode 100644 index 0da19c2b..00000000 --- a/crates/sv-webserver/src/server/context.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use fuel_streams_core::prelude::*; -use fuel_streams_storage::S3Storage; - -use crate::{config::Config, telemetry::Telemetry}; - -pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(90); - -#[allow(dead_code)] -#[derive(Clone)] -pub struct Context { - pub nats_client: NatsClient, - pub fuel_streams: Arc, - pub telemetry: Arc, - pub storage: Option>, - pub jwt_secret: String, -} - -impl Context { - pub async fn new(config: &Config) -> anyhow::Result { - let nats_client_opts = NatsClientOpts::admin_opts() - .with_url(config.nats.url.clone()) - .with_domain("CORE"); - let nats_client = NatsClient::connect(&nats_client_opts).await?; - let storage_opts = S3StorageOpts::admin_opts(); - let storage = Arc::new(S3Storage::new(storage_opts).await?); - let fuel_streams = - Arc::new(FuelStreams::new(&nats_client, &storage).await); - let telemetry = Telemetry::new(None).await?; - telemetry.start().await?; - - Ok(Context { - fuel_streams, - nats_client, - telemetry, - storage: if config.s3.enabled { - Some(storage) - } else { - None - }, - jwt_secret: config.auth.jwt_secret.clone(), - }) - } - - #[allow(dead_code)] - async fn shutdown_services_with_timeout(&self) -> anyhow::Result<()> { - tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async { - Context::flush_await_all_streams(&self.nats_client).await; - }) - .await?; - - Ok(()) - } - - #[allow(dead_code)] - async fn flush_await_all_streams(nats_client: &NatsClient) { - tracing::info!("Flushing in-flight messages to nats ..."); - match nats_client.nats_client.flush().await { - Ok(_) => { - tracing::info!("Flushed all streams successfully!"); - } - Err(e) => { - tracing::error!("Failed to flush all streams: {:?}", e); - } - } - } -} diff --git a/crates/sv-webserver/src/server/http/handlers.rs b/crates/sv-webserver/src/server/http/handlers.rs index 45fd8aa8..74b7084d 100644 --- a/crates/sv-webserver/src/server/http/handlers.rs +++ b/crates/sv-webserver/src/server/http/handlers.rs @@ -1,11 +1,17 @@ use std::{collections::HashMap, sync::LazyLock}; -use actix_web::{web, HttpResponse, Result}; +use actix_web::{web, HttpResponse}; +use fuel_web_utils::server::middlewares::auth::jwt::{ + create_jwt, + AuthError, + UserError, + UserRole, +}; use uuid::Uuid; use super::models::{LoginRequest, LoginResponse}; use crate::server::{ - auth::{create_jwt, AuthError, UserError, UserRole}, + // auth::{create_jwt, AuthError, UserError, UserRole}, state::ServerState, }; @@ -23,26 +29,7 @@ pub static AUTH_DATA: LazyLock> = ]) }); -pub async fn get_metrics( - state: web::Data, -) -> Result { - Ok(HttpResponse::Ok() - .content_type( - "application/openmetrics-text; version=1.0.0; charset=utf-8", - ) - .body(state.context.telemetry.get_metrics().await)) -} - -pub async fn get_health(state: web::Data) -> Result { - if !state.is_healthy() { - return Ok( - HttpResponse::ServiceUnavailable().body("Service Unavailable") - ); - } - Ok(HttpResponse::Ok().json(state.get_health().await)) -} - -// request jwt +// request jwt endpoint pub async fn request_jwt( state: web::Data, req_body: web::Json, @@ -58,12 +45,9 @@ pub async fn request_jwt( } // if all good, generate a jwt with the user role encoded - let jwt_token = create_jwt( - &uuid.to_string(), - user_role, - state.context.jwt_secret.as_bytes(), - ) - .map_err(|_| AuthError::JWTTokenCreationError)?; + let jwt_token = + create_jwt(&uuid.to_string(), user_role, state.jwt_secret.as_bytes()) + .map_err(|_| AuthError::JWTTokenCreationError)?; Ok(HttpResponse::Ok().json(&LoginResponse { id: uuid.to_owned(), diff --git a/crates/sv-webserver/src/server/mod.rs b/crates/sv-webserver/src/server/mod.rs index 1ecd4c65..7869e3b1 100644 --- a/crates/sv-webserver/src/server/mod.rs +++ b/crates/sv-webserver/src/server/mod.rs @@ -1,7 +1,4 @@ -pub mod api; -pub mod auth; -pub mod context; pub mod http; -pub mod middlewares; pub mod state; +pub mod svc; pub mod ws; diff --git a/crates/sv-webserver/src/server/state.rs b/crates/sv-webserver/src/server/state.rs index 19e379d2..911cc9fa 100644 --- a/crates/sv-webserver/src/server/state.rs +++ b/crates/sv-webserver/src/server/state.rs @@ -4,11 +4,15 @@ use std::{ }; use async_nats::jetstream::stream::State; -use fuel_streams_core::prelude::FuelStreamsExt; +use async_trait::async_trait; +use fuel_streams_core::{nats::NatsClient, FuelStreams, FuelStreamsExt}; +use fuel_streams_nats::NatsClientOpts; +use fuel_streams_storage::{S3Storage, S3StorageOpts, Storage, StorageConfig}; +use fuel_web_utils::{server::state::StateProvider, telemetry::Telemetry}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use super::context::Context; +use crate::{config::Config, metrics::Metrics}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct StreamInfo { @@ -55,38 +59,64 @@ impl From for StreamState { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct HealthResponse { - pub uptime: u64, + pub uptime_secs: u64, + pub is_healthy: bool, pub streams_info: Vec, } #[derive(Clone)] pub struct ServerState { - pub context: Context, pub start_time: Instant, + pub nats_client: NatsClient, + pub telemetry: Arc>, + pub fuel_streams: Arc, pub connection_count: Arc>, + pub storage: Option>, + pub jwt_secret: String, } impl ServerState { - pub async fn new(context: Context) -> Self { - Self { + pub async fn new(config: &Config) -> anyhow::Result { + let nats_client_opts = NatsClientOpts::admin_opts() + .with_url(config.nats.url.clone()) + .with_domain("CORE"); + let nats_client = NatsClient::connect(&nats_client_opts).await?; + let storage_opts = S3StorageOpts::admin_opts(); + let storage = Arc::new(S3Storage::new(storage_opts).await?); + let fuel_streams = + Arc::new(FuelStreams::new(&nats_client, &storage).await); + let metrics = Metrics::new_with_random_prefix()?; + let telemetry = Telemetry::new(Some(metrics)).await?; + telemetry.start().await?; + + Ok(Self { start_time: Instant::now(), + fuel_streams, + nats_client, + telemetry, + storage: if config.s3.enabled { + Some(storage) + } else { + None + }, + jwt_secret: config.auth.jwt_secret.clone(), connection_count: Arc::new(RwLock::new(0)), - context, - } + }) + } + + pub fn uptime(&self) -> Duration { + self.start_time.elapsed() } } -impl ServerState { - pub fn is_healthy(&self) -> bool { - if !self.context.nats_client.is_connected() { - return false; - } - true +#[async_trait] +impl StateProvider for ServerState { + async fn is_healthy(&self) -> bool { + self.nats_client.is_connected() } - pub async fn get_health(&self) -> HealthResponse { + async fn get_health(&self) -> serde_json::Value { let streams_info = self - .context .fuel_streams .get_consumers_and_state() .await @@ -98,13 +128,16 @@ impl ServerState { stream_name: res.0, }) .collect::>(); - HealthResponse { - uptime: self.uptime().as_secs(), + + let resp = HealthResponse { + uptime_secs: self.uptime().as_secs(), + is_healthy: self.is_healthy().await, streams_info, - } + }; + serde_json::to_value(resp).unwrap_or(serde_json::json!({})) } - pub fn uptime(&self) -> Duration { - self.start_time.elapsed() + async fn get_metrics(&self) -> String { + self.telemetry.get_metrics().await } } diff --git a/crates/sv-webserver/src/server/svc.rs b/crates/sv-webserver/src/server/svc.rs new file mode 100644 index 00000000..0a139385 --- /dev/null +++ b/crates/sv-webserver/src/server/svc.rs @@ -0,0 +1,28 @@ +use actix_web::web; +use fuel_web_utils::server::{ + api::with_prefixed_route, + middlewares::auth::transform::JwtAuth, +}; + +use super::http::handlers::request_jwt; +use crate::server::{state::ServerState, ws::handlers::get_ws}; + +pub fn svc_handlers( + state: ServerState, +) -> impl Fn(&mut web::ServiceConfig) + Send + Sync + 'static { + move |cfg: &mut web::ServiceConfig| { + cfg.service( + web::resource(with_prefixed_route("jwt")) + .route(web::post().to(request_jwt)), + ); + cfg.service( + web::resource(with_prefixed_route("ws")) + .wrap(JwtAuth::new(state.jwt_secret.clone())) + .route(web::get().to({ + move |req, body, state: web::Data| { + get_ws(req, body, state) + } + })), + ); + } +} diff --git a/crates/sv-webserver/src/server/ws/socket.rs b/crates/sv-webserver/src/server/ws/handlers.rs similarity index 91% rename from crates/sv-webserver/src/server/ws/socket.rs rename to crates/sv-webserver/src/server/ws/handlers.rs index 8c132356..85a06b74 100644 --- a/crates/sv-webserver/src/server/ws/socket.rs +++ b/crates/sv-webserver/src/server/ws/handlers.rs @@ -8,6 +8,7 @@ use actix_web::{ }; use actix_ws::{Message, Session}; use fuel_streams_core::prelude::*; +use fuel_web_utils::telemetry::Telemetry; use futures::StreamExt; use uuid::Uuid; @@ -16,11 +17,11 @@ use super::{ models::{ClientMessage, ResponseMessage}, }; use crate::{ + metrics::Metrics, server::{ state::ServerState, ws::models::{ServerMessage, SubscriptionPayload}, }, - telemetry::Telemetry, }; static _NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); @@ -50,12 +51,9 @@ pub async fn get_ws( // split the request into response, session, and message stream let (response, session, mut msg_stream) = actix_ws::handle(&req, body)?; - // record the new subscription - state.context.telemetry.increment_subscriptions_count(); - // spawm an actor handling the ws connection - let streams = state.context.fuel_streams.clone(); - let telemetry = state.context.telemetry.clone(); + let streams = state.fuel_streams.clone(); + let telemetry = state.telemetry.clone(); actix_web::rt::spawn(async move { tracing::info!("Ws opened for user id {:?}", user_id.to_string()); while let Some(Ok(msg)) = msg_stream.recv().await { @@ -136,7 +134,7 @@ async fn handle_binary_message( msg: Bytes, user_id: uuid::Uuid, mut session: Session, - telemetry: Arc, + telemetry: Arc>, streams: Arc, ) -> Result<(), WsSubscriptionError> { tracing::info!("Received binary {:?}", msg); @@ -191,10 +189,12 @@ async fn handle_binary_message( let telemetry = telemetry.clone(); actix_web::rt::spawn(async move { // update metrics - telemetry.update_user_subscription_metrics( - user_id, - &subject_wildcard, - ); + if let Some(metrics) = telemetry.base_metrics() { + metrics.update_user_subscription_metrics( + user_id, + &subject_wildcard, + ); + } // subscribe to the stream let config = SubscriptionConfig { @@ -234,10 +234,12 @@ async fn handle_binary_message( { Ok(res) => res, Err(e) => { - telemetry.update_error_metrics( - &subject_wildcard, - &e.to_string(), - ); + if let Some(metrics) = telemetry.base_metrics() { + metrics.update_error_metrics( + &subject_wildcard, + &e.to_string(), + ); + } tracing::error!("Error serializing received stream message: {:?}", e); continue; } @@ -313,14 +315,19 @@ async fn close_socket_with_error( user_id: uuid::Uuid, mut session: Session, subject_wildcard: Option, - telemetry: Arc, + telemetry: Arc>, ) { tracing::error!("ws subscription error: {:?}", e.to_string()); if let Some(subject_wildcard) = subject_wildcard { - telemetry.update_error_metrics(&subject_wildcard, &e.to_string()); - telemetry.update_unsubscribed(user_id, &subject_wildcard); + if let Some(metrics) = telemetry.base_metrics() { + metrics.update_error_metrics(&subject_wildcard, &e.to_string()); + metrics.update_unsubscribed(user_id, &subject_wildcard); + } + } + + if let Some(metrics) = telemetry.base_metrics() { + metrics.decrement_subscriptions_count(); } - telemetry.decrement_subscriptions_count(); send_message_to_socket(&mut session, ServerMessage::Error(e.to_string())) .await; let _ = session.close(None).await; diff --git a/crates/sv-webserver/src/server/ws/mod.rs b/crates/sv-webserver/src/server/ws/mod.rs index bb1b9404..4fef2941 100644 --- a/crates/sv-webserver/src/server/ws/mod.rs +++ b/crates/sv-webserver/src/server/ws/mod.rs @@ -1,4 +1,3 @@ pub mod errors; +pub mod handlers; pub mod models; -pub mod socket; -pub mod state; diff --git a/crates/sv-webserver/src/server/ws/state.rs b/crates/sv-webserver/src/server/ws/state.rs deleted file mode 100644 index 4ee4ced1..00000000 --- a/crates/sv-webserver/src/server/ws/state.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::sync::Arc; - -use actix_ws::Session; -use bytestring::ByteString; -use futures_util::{stream::FuturesUnordered, StreamExt as _}; -use tokio::sync::Mutex; - -#[allow(dead_code)] -#[derive(Clone)] -struct WsClient { - inner: Arc>, -} - -#[allow(dead_code)] -struct WsClientInner { - sessions: Vec, -} - -#[allow(dead_code)] -impl WsClient { - fn new() -> Self { - WsClient { - inner: Arc::new(Mutex::new(WsClientInner { - sessions: Vec::new(), - })), - } - } - - async fn insert(&self, session: Session) { - self.inner.lock().await.sessions.push(session); - } - - async fn broadcast(&self, msg: impl Into) { - let msg = msg.into(); - - let mut inner = self.inner.lock().await; - let mut unordered = FuturesUnordered::new(); - - for mut session in inner.sessions.drain(..) { - let msg = msg.clone(); - - unordered.push(async move { - let res = session.text(msg).await; - res.map(|_| session) - .map_err(|_| tracing::debug!("Dropping session")) - }); - } - - while let Some(res) = unordered.next().await { - if let Ok(session) = res { - inner.sessions.push(session); - } - } - } -} diff --git a/scripts/run_publisher.sh b/scripts/run_publisher.sh index 3ffa1868..bc2cd3b6 100755 --- a/scripts/run_publisher.sh +++ b/scripts/run_publisher.sh @@ -15,7 +15,7 @@ usage() { echo " Default: profiling" echo " --port : Specify the port number" echo " Default: 4000" - echo " --telemtry-port : Specify the telemetry port number" + echo " --telemetry-port : Specify the telemetry port number" echo " Default: 8080" echo " --extra-args : Optional additional arguments to append (in quotes)" echo "" diff --git a/scripts/run_webserver.sh b/scripts/run_webserver.sh index 5b91c749..f375349c 100755 --- a/scripts/run_webserver.sh +++ b/scripts/run_webserver.sh @@ -16,7 +16,7 @@ usage() { echo "Usage: $0 [options]" echo "Options:" echo " --mode : Specify the run mode (dev|profiling)" - echo " --port : Port number for the API server (default: 9003)" + echo " --port : Port number for the API server (default: 9003)" echo " --nats-url : NATS URL (default: nats://localhost:4222)" echo " --extra-args : Optional additional arguments to append (in quotes)" echo ""