diff --git a/.gitignore b/.gitignore index e44210bb..c3ed4a0a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ lcov.info node_modules/ docker/db* tmp -/docker/db* +**/**/docker/db* build_rs_cov.profraw profile.json .rust-version diff --git a/Makefile b/Makefile index 937e3fb6..4c771304 100644 --- a/Makefile +++ b/Makefile @@ -209,18 +209,46 @@ run-publisher: check-network $(if $(TELEMETRY_PORT),--telemetry-port $(TELEMETRY_PORT),) \ $(if $(extra_args),--extra-args "$(extra_args)",) -run-mainnet-dev: +run-publisher-mainnet-dev: $(MAKE) run-publisher NETWORK=mainnet MODE=dev -run-mainnet-profiling: +run-publisher-mainnet-profiling: $(MAKE) run-publisher NETWORK=mainnet MODE=profiling -run-testnet-dev: +run-publisher-testnet-dev: $(MAKE) run-publisher NETWORK=testnet MODE=dev -run-testnet-profiling: +run-publisher-testnet-profiling: $(MAKE) run-publisher NETWORK=testnet MODE=profiling +# ------------------------------------------------------------ +# Streamer Run Commands +# ------------------------------------------------------------ + +run-streamer-local: + cargo run --package fuel-streams-ws --bin ws-streamer -- --config-path crates/fuel-streams-ws/config.toml + +run-client-local: + cargo run --package fuel-streams-ws --bin ws-client -- --config-path crates/fuel-streams-ws/config.toml + +run-streamer: check-network + @./scripts/run_streamer.sh \ + --mode $(MODE) \ + $(if $(CONFIG_PATH),--config-path $(CONFIG_PATH),) \ + $(if $(extra_args),--extra-args "$(extra_args)",) + +run-streamer-mainnet-dev: + $(MAKE) run-streamer NETWORK=mainnet MODE=dev CONFIG_PATH=crates/fuel-streams-ws/config.toml + +run-streamer-mainnet-profiling: + $(MAKE) run-streamer NETWORK=mainnet MODE=profiling CONFIG_PATH=crates/fuel-streams-ws/config.toml + +run-streamer-testnet-dev: + $(MAKE) run-streamer NETWORK=testnet MODE=dev CONFIG_PATH=crates/fuel-streams-ws/config.toml + +run-streamer-testnet-profiling: + $(MAKE) run-streamer NETWORK=testnet MODE=profiling CONFIG_PATH=crates/fuel-streams-ws/config.toml + # ------------------------------------------------------------ # Docker Compose # ------------------------------------------------------------ diff --git a/Tiltfile b/Tiltfile index 8d6a0b54..2618c336 100755 --- a/Tiltfile +++ b/Tiltfile @@ -33,7 +33,7 @@ custom_build( # Build streamer ws image with proper configuration for Minikube custom_build( ref='fuel-streams-ws:latest', - command=['./cluster/scripts/build_ws_streamer.sh'], + command=['./cluster/scripts/build_streamer.sh'], deps=[ './src', './Cargo.toml', diff --git a/cluster/scripts/build_ws_streamer.sh b/cluster/scripts/build_streamer.sh similarity index 100% rename from cluster/scripts/build_ws_streamer.sh rename to cluster/scripts/build_streamer.sh diff --git a/crates/fuel-streams-core/src/nats/nats_client_opts.rs b/crates/fuel-streams-core/src/nats/nats_client_opts.rs index 139ae32f..a4299c26 100644 --- a/crates/fuel-streams-core/src/nats/nats_client_opts.rs +++ b/crates/fuel-streams-core/src/nats/nats_client_opts.rs @@ -1,6 +1,7 @@ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; use async_nats::ConnectOptions; +use serde::{Deserialize, Serialize}; use super::NatsNamespace; @@ -11,7 +12,10 @@ pub enum NatsUserRole { Default, } -#[derive(Debug, Copy, Clone, Default, clap::ValueEnum)] +#[derive( + Debug, Copy, Clone, Default, clap::ValueEnum, Serialize, Deserialize, +)] +#[serde(rename_all = "lowercase")] pub enum FuelNetwork { Local, #[default] @@ -19,6 +23,18 @@ pub enum FuelNetwork { Mainnet, } +impl FromStr for FuelNetwork { + type Err = (); + fn from_str(s: &str) -> Result { + match s { + "local" => Ok(FuelNetwork::Local), + "testnet" => Ok(FuelNetwork::Testnet), + "mainnet" => Ok(FuelNetwork::Mainnet), + _ => Err(()), + } + } +} + impl FuelNetwork { pub fn to_url(&self) -> String { match self { diff --git a/crates/fuel-streams-ws/config.toml b/crates/fuel-streams-ws/config.toml index 3a5984fe..99a63c94 100644 --- a/crates/fuel-streams-ws/config.toml +++ b/crates/fuel-streams-ws/config.toml @@ -4,6 +4,9 @@ port = 9003 [nats] url = "nats://localhost:4222" +[fuel] +network = "mainnet" + [s3] enabled = false region = "us-west-rack1" diff --git a/crates/fuel-streams-ws/src/bin/ws-client.rs b/crates/fuel-streams-ws/src/bin/ws-client.rs index 18c8319b..1d9be1fc 100755 --- a/crates/fuel-streams-ws/src/bin/ws-client.rs +++ b/crates/fuel-streams-ws/src/bin/ws-client.rs @@ -1,157 +1,31 @@ -use fuel_streams_ws::server::{ - http::models::{LoginRequest, LoginResponse}, - ws::models::{ +use fuel_streams::types::FuelNetwork; +use fuel_streams_ws::{ + client::WebSocketClient, + server::ws::models::{ ClientMessage, - ServerMessage, SubscriptionPayload, SubscriptionType, }, }; -use reqwest::header::{ACCEPT, CONTENT_TYPE}; -use tungstenite::{ - handshake::client::{generate_key, Request}, - Message, -}; -use url::Url; #[tokio::main] -async fn main() -> Result<(), Box> { - // get jwt token - let jwt_url = Url::parse("http://localhost:9003/api/v1/jwt").unwrap(); - let client = reqwest::Client::new(); - let json_body = serde_json::to_string(&LoginRequest { - username: "client".to_string(), - password: "client".to_string(), - })?; - let response = client - .get(jwt_url) - .header(ACCEPT, "application/json") - .header(CONTENT_TYPE, "application/json") - .body(json_body) - .send() - .await?; - let jwt = if response.status().is_success() { - let json_body = response.json::().await?; - println!("Jwt endpoint response JSON: {:?}", json_body); - json_body.jwt_token - } else { - panic!("Failed to fetch jwt data: {}", response.status()); - }; - - // open websocket connection - let url = Url::parse("ws://localhost:9003/api/v1/ws").unwrap(); - println!("Using websocket url: {:?}", url.as_str()); - - // url.query_pairs_mut() - // .append_pair("Authorization", &urlencoding::encode(&format!("Bearer {}", jwt))); - // let (mut socket, response) = connect(url.as_str()).expect("Can't connect to ws"); +async fn main() -> anyhow::Result<()> { + let mut client = + WebSocketClient::new(FuelNetwork::Mainnet, "admin", "admin").await?; - // Create the WebSocket request with the Authorization header - let host = url.host_str().expect("Invalid host"); - let request = Request::builder() - .uri(url.as_str()) - .header("Authorization", format!("Bearer {}", jwt)) - .header("Host", host) - .header("Upgrade", "websocket") - .header("Connection", "Upgrade") - .header("Sec-WebSocket-Key", generate_key()) - .header("Sec-WebSocket-Version", "13") - .body(()) - .expect("Failed to build request"); + client.connect()?; - let (mut socket, response) = - match tungstenite::client::connect_with_config(request, None, 5) { - Ok((socket, response)) => (socket, response), - Err(err) => { - eprintln!( - "Failed to connect to the server: {:?}", - err.to_string() - ); - panic!("Failed to connect to the server"); - } - }; + client.send_message(ClientMessage::Subscribe(SubscriptionPayload { + topic: SubscriptionType::Stream("blocks.*.*".to_string()), + from: None, + to: None, + }))?; - for (header, value) in response.headers() { - println!("* {}: {:?}", header, value); - } + let mut receiver = client.listen()?; - println!("Connected to the server"); - println!("Response HTTP code: {}", response.status()); - println!("Response contains the following headers:"); - for (ref header, _value) in response.headers() { - println!("* {}", header); + while let Some(message) = receiver.recv().await { + println!("Received: {:?}", message); } - let stream_topic_wildcard = "blocks.*.*".to_owned(); - let msg = ClientMessage::Subscribe(SubscriptionPayload { - topic: SubscriptionType::Stream(stream_topic_wildcard.clone()), - }); - socket - .send(Message::Text(serde_json::to_string(&msg).unwrap())) - .unwrap(); - - socket - .send(Message::Binary(serde_json::to_vec(&msg).unwrap())) - .unwrap(); - - socket - .send(Message::Ping(serde_json::to_vec(&msg).unwrap())) - .unwrap(); - - socket - .send(Message::Pong(serde_json::to_vec(&msg).unwrap())) - .unwrap(); - - let bad_sub = serde_json::json!({ - "subscribe": { - "topics": { - "stream": stream_topic_wildcard - } - } - }); - socket - .send(Message::Binary(serde_json::to_vec(&bad_sub).unwrap())) - .unwrap(); - - let jh = tokio::spawn(async move { - loop { - let msg = socket.read(); - println!("Received: {:?}", msg); - match msg { - Ok(msg) => match msg { - Message::Text(text) => { - println!("Received text: {:?}", text); - } - Message::Binary(bin) => { - println!("Received binary: {:?}", bin); - let decoded = - serde_json::from_slice::(&bin) - .unwrap(); - println!("Received server message: {:?}", decoded); - } - Message::Ping(ping) => { - println!("Received ping: {:?}", ping); - } - Message::Pong(pong) => { - println!("Received pong: {:?}", pong); - } - Message::Close(close) => { - println!("Received close: {:?}", close); - break; - } - _ => { - println!("Received unknown message type"); - } - }, - Err(e) => { - println!("Error reading message: {:?}", e); - break; - } - } - } - }); - - jh.await?; - Ok(()) } diff --git a/crates/fuel-streams-ws/src/client/mod.rs b/crates/fuel-streams-ws/src/client/mod.rs new file mode 100644 index 00000000..5108384a --- /dev/null +++ b/crates/fuel-streams-ws/src/client/mod.rs @@ -0,0 +1,183 @@ +use fuel_streams::types::FuelNetwork; +use reqwest::{ + header::{ + ACCEPT, + AUTHORIZATION, + CONNECTION, + CONTENT_TYPE, + HOST, + SEC_WEBSOCKET_KEY, + SEC_WEBSOCKET_VERSION, + UPGRADE, + }, + Client as HttpClient, +}; +use tokio::sync::mpsc; +use tungstenite::{ + handshake::client::generate_key, + protocol::Message, + stream::MaybeTlsStream, +}; +use url::Url; +use urls::{get_web_url, get_ws_url}; + +use crate::server::{ + http::models::{LoginRequest, LoginResponse}, + ws::models::{ClientMessage, ServerMessage}, +}; + +pub mod urls; + +#[derive(Debug)] +pub struct WebSocketClient { + socket: Option>>, + jwt_token: String, + ws_url: Url, +} + +impl WebSocketClient { + pub async fn new( + newtork: FuelNetwork, + username: &str, + password: &str, + ) -> anyhow::Result { + let jwt_token = Self::fetch_jwt(newtork, username, password).await?; + + let ws_url = get_ws_url(newtork) + .join("/api/v1/ws") + .expect("valid relative path"); + + Ok(Self { + socket: None, + jwt_token, + ws_url, + }) + } + + async fn fetch_jwt( + newtork: FuelNetwork, + username: &str, + password: &str, + ) -> anyhow::Result { + let client = HttpClient::new(); + let json_body = serde_json::to_string(&LoginRequest { + username: username.to_string(), + password: password.to_string(), + })?; + + let api_url = get_web_url(newtork) + .join("/api/v1/jwt") + .expect("valid relative path"); + + let response = client + .get(api_url) + .header(ACCEPT, "application/json") + .header(CONTENT_TYPE, "application/json") + .body(json_body) + .send() + .await?; + + if response.status().is_success() { + let json_body = response.json::().await?; + Ok(json_body.jwt_token) + } else { + Err(anyhow::anyhow!( + "Failed to fetch JWT: {}", + response.status() + )) + } + } + + pub fn connect(&mut self) -> anyhow::Result<()> { + let host = self.ws_url.host_str().expect("Invalid host"); + let request = tungstenite::handshake::client::Request::builder() + .uri(self.ws_url.as_str()) + .header(AUTHORIZATION, format!("Bearer {}", self.jwt_token)) + .header(HOST, host) + .header(UPGRADE, "websocket") + .header(CONNECTION, "Upgrade") + .header(SEC_WEBSOCKET_KEY, generate_key()) + .header(SEC_WEBSOCKET_VERSION, "13") + .body(()) + .expect("Failed to build request"); + + let (socket, _response) = tungstenite::connect(request)?; + self.socket = Some(socket); + + Ok(()) + } + + pub fn send_message( + &mut self, + message: ClientMessage, + ) -> anyhow::Result<()> { + let socket = self + .socket + .as_mut() + .ok_or_else(|| anyhow::anyhow!("Socket not connected"))?; + let serialized = serde_json::to_vec(&message)?; + socket.send(Message::Binary(serialized))?; + Ok(()) + } + + pub fn listen( + &mut self, + ) -> anyhow::Result> { + let mut socket = self + .socket + .take() + .ok_or_else(|| anyhow::anyhow!("Socket not connected"))?; + + let (tx, rx) = mpsc::unbounded_channel::(); + tokio::spawn(async move { + loop { + let msg = socket.read(); + match msg { + Ok(msg) => match msg { + Message::Text(text) => { + println!("Received text: {:?} bytes", text.len()); + } + Message::Binary(bin) => { + println!("Received binary: {:?} bytes", bin.len()); + let decoded = + serde_json::from_slice::(&bin) + .unwrap(); + println!("Decoded server message: {:?}", decoded); + if tx.send(decoded).is_err() { + break; + } + } + Message::Ping(ping) => { + println!("Received ping: {:?} bytes", ping.len()); + } + Message::Pong(pong) => { + println!("Received pong: {:?} bytes", pong.len()); + } + Message::Close(close) => { + let close_code = close + .as_ref() + .map(|c| c.code.to_string()) + .unwrap_or_default(); + let close_reason = close + .as_ref() + .map(|c| c.reason.to_string()) + .unwrap_or_default(); + println!("Received close with code: {:?} and reason: {:?}", close_code, close_reason); + break; + } + _ => { + eprintln!("Received unknown message type"); + break; + } + }, + Err(e) => { + eprintln!("Error reading message: {:?}", e); + break; + } + } + } + }); + + Ok(rx) + } +} diff --git a/crates/fuel-streams-ws/src/client/urls.rs b/crates/fuel-streams-ws/src/client/urls.rs new file mode 100644 index 00000000..665d6f1e --- /dev/null +++ b/crates/fuel-streams-ws/src/client/urls.rs @@ -0,0 +1,30 @@ +use fuel_streams::types::FuelNetwork; +use url::Url; + +pub fn get_web_url(network: FuelNetwork) -> Url { + match network { + FuelNetwork::Local => { + Url::parse("http://0.0.0.0:9003").expect("working url") + } + FuelNetwork::Testnet => { + Url::parse("http://0.0.0.0:9003").expect("working url") + } + FuelNetwork::Mainnet => { + Url::parse("http://0.0.0.0:9003").expect("working url") + } + } +} + +pub fn get_ws_url(network: FuelNetwork) -> Url { + match network { + FuelNetwork::Local => { + Url::parse("ws://0.0.0.0:9003").expect("working url") + } + FuelNetwork::Testnet => { + Url::parse("ws://0.0.0.0:9003").expect("working url") + } + FuelNetwork::Mainnet => { + Url::parse("ws://0.0.0.0:9003").expect("working url") + } + } +} diff --git a/crates/fuel-streams-ws/src/config.rs b/crates/fuel-streams-ws/src/config.rs index 9ea85a36..3f91f006 100644 --- a/crates/fuel-streams-ws/src/config.rs +++ b/crates/fuel-streams-ws/src/config.rs @@ -7,6 +7,7 @@ use std::{ use confy::ConfyError; use displaydoc::Display as DisplayDoc; +use fuel_streams::types::FuelNetwork; use serde::{Deserialize, Deserializer}; use thiserror::Error; use tokio::{fs::File, io::AsyncReadExt}; @@ -25,8 +26,8 @@ pub enum Error { ReadMeta(std::io::Error), /// Failed to read env config: {0} Confy(ConfyError), - /// Missing config element: {0} - MissingConfigElement(&'static str), + /// Undecodable config element: {0} + UndecodableConfigElement(&'static str), /// Parse int error: {0} ParseInt(ParseIntError), /// Parse bool error: {0} @@ -62,6 +63,12 @@ pub struct AuthConfig { pub jwt_secret: String, } +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields, rename_all = "kebab-case")] +pub struct FuelConfig { + pub network: FuelNetwork, +} + #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields, rename_all = "kebab-case")] pub struct NatsConfig { @@ -75,6 +82,7 @@ pub struct Config { pub auth: AuthConfig, pub s3: S3Config, pub nats: NatsConfig, + pub fuel: FuelConfig, } impl Default for Config { @@ -94,6 +102,9 @@ impl Default for Config { bucket: String::new(), endpoint: String::new(), }, + fuel: FuelConfig { + network: FuelNetwork::Local, + }, } } } @@ -163,6 +174,13 @@ impl Config { if let Ok(jwt_secret) = dotenvy::var("JWT_AUTH_SECRET") { config.auth.jwt_secret = jwt_secret; } + + // ----------------------FUEL-------------------------------- + if let Ok(network) = dotenvy::var("NETWORK") { + config.fuel.network = FuelNetwork::from_str(&network) + .map_err(|_| Error::UndecodableConfigElement("NETWORK"))?; + } + Ok(config) } } diff --git a/crates/fuel-streams-ws/src/lib.rs b/crates/fuel-streams-ws/src/lib.rs index 445241f1..80eb33c0 100644 --- a/crates/fuel-streams-ws/src/lib.rs +++ b/crates/fuel-streams-ws/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli; +pub mod client; pub mod config; pub mod server; pub mod systems; diff --git a/crates/fuel-streams-ws/src/server/context.rs b/crates/fuel-streams-ws/src/server/context.rs index 5969de2d..8749bd5b 100644 --- a/crates/fuel-streams-ws/src/server/context.rs +++ b/crates/fuel-streams-ws/src/server/context.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use aws_sdk_s3::Client as S3Client; +use fuel_streams::client::Client; use fuel_streams_core::prelude::*; use crate::{ @@ -15,7 +16,8 @@ pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(90); #[allow(dead_code)] #[derive(Clone)] pub struct Context { - pub nats_client: Arc, + pub client: Client, + pub nats_client: NatsClient, pub streams: Arc, pub telemetry: Arc, pub s3_client: Option>, @@ -28,12 +30,14 @@ impl Context { .with_custom_url(config.nats.url.clone()); let nats_client = NatsClient::connect(&nats_client_opts).await?; let streams = Arc::new(Streams::new(&nats_client).await); + let client = Client::connect(config.fuel.network).await?; let telemetry = Telemetry::new(None).await?; telemetry.start().await?; Ok(Context { streams, - nats_client: Arc::new(nats_client), + nats_client, + client, telemetry, s3_client: if config.s3.enabled { Some(Arc::new(s3_connect(config.s3.clone()).await)) @@ -44,13 +48,18 @@ impl Context { }) } - pub async fn default(nats_client: &NatsClient) -> anyhow::Result { + pub async fn default( + nats_client: &NatsClient, + fuel_network: FuelNetwork, + ) -> anyhow::Result { + let client = Client::connect(fuel_network).await?; Ok(Context { streams: Arc::new(Streams::new(nats_client).await), - nats_client: Arc::new(nats_client.clone()), + nats_client: nats_client.clone(), telemetry: Telemetry::new(None).await?, s3_client: None, jwt_secret: String::new(), + client, }) } diff --git a/crates/fuel-streams-ws/src/server/ws/errors.rs b/crates/fuel-streams-ws/src/server/ws/errors.rs index 7b4ffc80..e9b482ab 100644 --- a/crates/fuel-streams-ws/src/server/ws/errors.rs +++ b/crates/fuel-streams-ws/src/server/ws/errors.rs @@ -10,4 +10,6 @@ pub enum WsSubscriptionError { UnknownSubjectName(String), /// Unsupported wildcard pattern: `{0}` UnsupportedWildcardPattern(String), + /// Unserializable message payload: `{0}` + UnserializableMessagePayload(serde_json::Error), } diff --git a/crates/fuel-streams-ws/src/server/ws/models.rs b/crates/fuel-streams-ws/src/server/ws/models.rs index c3b70d92..35a2c690 100644 --- a/crates/fuel-streams-ws/src/server/ws/models.rs +++ b/crates/fuel-streams-ws/src/server/ws/models.rs @@ -10,6 +10,10 @@ pub enum SubscriptionType { #[serde(rename_all = "camelCase")] pub struct SubscriptionPayload { pub topic: SubscriptionType, + #[serde(skip_serializing_if = "Option::is_none")] + pub from: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub to: Option, } #[derive(Eq, PartialEq, Debug, Deserialize, Serialize)] @@ -24,6 +28,7 @@ pub enum ClientMessage { pub enum ServerMessage { Subscribed(SubscriptionPayload), Unsubscribed(SubscriptionPayload), + Update(Vec), Error(String), } @@ -36,6 +41,8 @@ mod tests { let stream_topic_wildcard = "blocks.*.*".to_owned(); let msg = ClientMessage::Subscribe(SubscriptionPayload { topic: SubscriptionType::Stream(stream_topic_wildcard.clone()), + from: None, + to: None, }); let ser_str_value = serde_json::to_string(&msg).unwrap(); println!("Ser value {:?}", ser_str_value); diff --git a/crates/fuel-streams-ws/src/server/ws/socket.rs b/crates/fuel-streams-ws/src/server/ws/socket.rs index 212c5737..939c1316 100644 --- a/crates/fuel-streams-ws/src/server/ws/socket.rs +++ b/crates/fuel-streams-ws/src/server/ws/socket.rs @@ -6,7 +6,8 @@ use actix_web::{ HttpRequest, Responder, }; -use actix_ws::{AggregatedMessage, Session}; +use actix_ws::{Message, Session}; +use fuel_streams::{types::Block, StreamData, Streamable}; use uuid::Uuid; use super::{ @@ -44,12 +45,12 @@ pub async fn get_ws( }; // split the request into response, session, and message stream - let (response, mut session, msg_stream) = actix_ws::handle(&req, body)?; + let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; // increase the maximum allowed frame size to 1MiB and aggregate continuation frames - let mut msg_stream = msg_stream - .max_frame_size(1024 * 1024) - .aggregate_continuations(); + // let mut msg_stream = msg_stream + // .max_frame_size(1024 * 1024) + // .aggregate_continuations(); // record the new subscription state.context.telemetry.record_subscriptions_count(); @@ -59,19 +60,19 @@ pub async fn get_ws( tracing::info!("Ws opened for user id {:?}", user_id.to_string()); while let Some(Ok(msg)) = msg_stream.recv().await { match msg { - AggregatedMessage::Ping(bytes) => { + Message::Ping(bytes) => { tracing::info!("Received ping, {:?}", bytes); if session.pong(&bytes).await.is_err() { return; } } - AggregatedMessage::Pong(bytes) => { + Message::Pong(bytes) => { tracing::info!("Received pong, {:?}", bytes); } - AggregatedMessage::Text(string) => { + Message::Text(string) => { tracing::info!("Received text, {string}"); } - AggregatedMessage::Binary(bytes) => { + Message::Binary(bytes) => { tracing::info!("Received binary {:?}", bytes); let client_message = match parse_client_message(bytes) { Ok(msg) => msg, @@ -91,6 +92,7 @@ pub async fn get_ws( let SubscriptionType::Stream(subject_wildcard) = payload.topic; + // verify the subject name if let Err(e) = verify_subject_name(&subject_wildcard) { @@ -98,6 +100,7 @@ pub async fn get_ws( return; } + // update metrics state .context .telemetry @@ -105,6 +108,32 @@ pub async fn get_ws( user_id, &subject_wildcard, ); + + // start the streamer async + let mut stream_session = session.clone(); + let mut rx = + Streams::run_streamable_consumer::( + state.context.client.clone(), + ) + .await + .unwrap(); + + // receive in a background thread + actix_web::rt::spawn(async move { + while let Some(res) = rx.recv().await { + let serialized_payload = + match stream_to_server_message(res) { + Ok(res) => res, + Err(e) => { + tracing::error!("Error serializing received stream message: {:?}", e); + continue; + } + }; + let _ = stream_session + .binary(serialized_payload) + .await; + } + }); } ClientMessage::Unsubscribe(payload) => { tracing::info!( @@ -123,7 +152,7 @@ pub async fn get_ws( } } } - AggregatedMessage::Close(reason) => { + Message::Close(reason) => { tracing::info!( "Got close event, terminating session with reason {:?}", reason @@ -131,6 +160,11 @@ pub async fn get_ws( let _ = session.close(reason).await; return; } + _ => { + tracing::error!("Received unknown message type"); + let _ = session.close(None).await; + return; + } }; } }); @@ -146,6 +180,17 @@ fn parse_client_message( Ok(msg) } +fn stream_to_server_message( + msg: StreamData, +) -> Result, WsSubscriptionError> { + let serialized_data = serde_json::to_vec::>(&msg) + .map_err(WsSubscriptionError::UnserializableMessagePayload)?; + let server_message = + serde_json::to_vec(&ServerMessage::Update(serialized_data)) + .map_err(WsSubscriptionError::UnserializableMessagePayload)?; + Ok(server_message) +} + fn verify_subject_name( subject_wildcard: &str, ) -> Result { diff --git a/crates/fuel-streams-ws/src/server/ws/streams.rs b/crates/fuel-streams-ws/src/server/ws/streams.rs index 02fbfc7d..8147b1bb 100644 --- a/crates/fuel-streams-ws/src/server/ws/streams.rs +++ b/crates/fuel-streams-ws/src/server/ws/streams.rs @@ -2,6 +2,7 @@ use async_nats::{jetstream::stream::State as StreamState, RequestErrorKind}; use fuel_streams::{client::Client, types::Log, StreamConfig}; use fuel_streams_core::prelude::*; use futures_util::StreamExt; +use tokio::sync::mpsc; #[derive(Debug)] pub enum StreamableType { @@ -16,13 +17,13 @@ pub enum StreamableType { pub fn get_streamable_type(name: &str) -> Option { match name { - "transactions" => Some(StreamableType::Transaction), - "blocks" => Some(StreamableType::Block), - "inputs" => Some(StreamableType::Input), - "outputs" => Some(StreamableType::Output), - "receipts" => Some(StreamableType::Receipt), - "utxos" => Some(StreamableType::Utxo), - "logs" => Some(StreamableType::Log), + Transaction::NAME => Some(StreamableType::Transaction), + Block::NAME => Some(StreamableType::Block), + Input::NAME => Some(StreamableType::Input), + Output::NAME => Some(StreamableType::Output), + Receipt::NAME => Some(StreamableType::Receipt), + Utxo::NAME => Some(StreamableType::Utxo), + Log::NAME => Some(StreamableType::Log), _ => None, } } @@ -37,6 +38,7 @@ pub struct Streams { pub receipts: Stream, pub utxos: Stream, pub logs: Stream, + pub nats_client: NatsClient, } impl Streams { @@ -49,6 +51,7 @@ impl Streams { receipts: Stream::::new(nats_client).await, utxos: Stream::::new(nats_client).await, logs: Stream::::new(nats_client).await, + nats_client: nats_client.clone(), } } @@ -134,41 +137,41 @@ impl Streams { ]) } - pub async fn run_dynamic_consumer( - name: &str, - client: &Client, - ) -> anyhow::Result<()> { - match get_streamable_type(name) { - Some(StreamableType::Transaction) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Block) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Input) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Output) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Receipt) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Utxo) => { - Streams::run_streamable_consumer::(client).await - } - Some(StreamableType::Log) => { - Streams::run_streamable_consumer::(client).await - } - None => Err(anyhow::anyhow!("Unknown streamable type: {}", name)), - } - } - - pub async fn run_streamable_consumer( - client: &Client, - ) -> anyhow::Result<()> { + // pub async fn run_dynamic_consumer( + // name: &str, + // client: Client, + // ) -> anyhow::Result>> { + // match get_streamable_type(name) { + // Some(StreamableType::Transaction) => { + // let rx = Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Block) => { + // Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Input) => { + // Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Output) => { + // Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Receipt) => { + // Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Utxo) => { + // Streams::run_streamable_consumer::(client).await + // } + // Some(StreamableType::Log) => { + // Streams::run_streamable_consumer::(client).await + // } + // None => Err(anyhow::anyhow!("Unknown streamable type: {}", name)), + // } + // } + + pub async fn run_streamable_consumer( + client: Client, + ) -> anyhow::Result>> { // Create a new stream for blocks - let stream = fuel_streams::Stream::::new(client).await; + let stream = fuel_streams::Stream::::new(&client).await; // Configure the stream to start from the last published block let config = StreamConfig { @@ -178,17 +181,33 @@ impl Streams { // Subscribe to the block stream with the specified configuration let mut sub = stream.subscribe_with_config(config).await?; + let (tx, rx) = mpsc::unbounded_channel::>(); + // Process incoming blocks - while let Some(bytes) = sub.next().await { - match bytes.as_ref() { - Err(_) => {} - Ok(message) => { - let _decoded_msg = - S::decode_raw(message.payload.to_vec()).await; + actix_web::rt::spawn(async move { + while let Some(bytes) = sub.next().await { + match bytes.as_ref() { + Ok(message) => { + tracing::info!("Received message: {:?}", message); + let decoded_msg = + S::decode_raw(message.payload.to_vec()).await; + if let Err(e) = tx.send(decoded_msg) { + tracing::error!( + "Error sending decoded message: {:?}", + e + ); + } + } + Err(e) => { + tracing::error!( + "Error receiving message from stream: {:?}", + e + ); + } } } - } + }); - Ok(()) + Ok(rx) } } diff --git a/scripts/run_publisher.sh b/scripts/run_publisher.sh index df17d5b3..7d29d677 100755 --- a/scripts/run_publisher.sh +++ b/scripts/run_publisher.sh @@ -98,8 +98,8 @@ COMMON_ARGS=( "--relayer" "${RELAYER}" "--ip=0.0.0.0" "--service-name" "fuel-${NETWORK}-node" - "--db-path" "./docker/db-${NETWORK}" - "--snapshot" "./docker/chain-config/${NETWORK}" + "--db-path" "./cluster/docker/db-${NETWORK}" + "--snapshot" "./cluster/chain-config/${NETWORK}" "--nats-url" "nats://localhost:4222" "--port" "${PORT}" "--telemetry-port" "${TELEMETRY_PORT}" diff --git a/scripts/run_ws_streamer.sh b/scripts/run_streamer.sh similarity index 94% rename from scripts/run_ws_streamer.sh rename to scripts/run_streamer.sh index 8b87255f..17bead27 100755 --- a/scripts/run_ws_streamer.sh +++ b/scripts/run_streamer.sh @@ -17,7 +17,7 @@ usage() { echo "Examples:" echo " $0 # Runs with all defaults" echo " $0 --config-path # Runs with default config.toml" - echo " $0 --mod dev # Runs with dev mode" + echo " $0 --mode dev # Runs with dev mode" echo " $0 --config-path ../config.toml --mode dev # Custom config toml path and mode" exit 1 } @@ -49,7 +49,7 @@ done # ------------------------------ # Load Environment # ------------------------------ -source ./scripts/set_envs.sh +source ./scripts/set_env.sh # Print the configuration being used echo -e "\n==========================================" @@ -58,7 +58,7 @@ echo -e "==========================================" # Runtime Configuration echo "Runtime Settings:" -echo " → Mode: $MODE" +echo "→ Mode: $MODE" if [ -n "$CONFIG_PATH" ]; then echo "→ Config path: $CONFIG_PATH" fi @@ -68,7 +68,7 @@ fi # Environment Variables echo -e "\nEnvironment Variables:" -echo " → Use Metrics: ${USE_METRICS}..." +echo " → Use Metrics: ${USE_METRICS}" echo " → Use Elastic Logging: $USE_ELASTIC_LOGGING" echo " → S3 Enabled: $S3_ENABLED" echo " → S3 Region: $S3_REGION"