diff --git a/crates/fuel-streams-ws/src/client/mod.rs b/crates/fuel-streams-ws/src/client/mod.rs index 287d3387..a3394d0b 100644 --- a/crates/fuel-streams-ws/src/client/mod.rs +++ b/crates/fuel-streams-ws/src/client/mod.rs @@ -1,4 +1,10 @@ -use fuel_streams::{subjects::IntoSubject, types::FuelNetwork}; +use fuel_streams::{ + logs::Log, + subjects::IntoSubject, + types::{Block, FuelNetwork, Input, Output, Receipt, Transaction}, + utxos::Utxo, + Streamable, +}; use reqwest::{ header::{ ACCEPT, @@ -22,11 +28,14 @@ use url::Url; use crate::server::{ http::models::{LoginRequest, LoginResponse}, - ws::models::{ - ClientMessage, - ServerMessage, - SubscriptionPayload, - SubscriptionType, + ws::{ + errors::WsSubscriptionError, + models::{ + ClientMessage, + ServerMessage, + SubscriptionPayload, + SubscriptionType, + }, }, }; @@ -165,6 +174,7 @@ impl WebSocketClient { let (tx, rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { + let mut subsciption_topic = String::new(); loop { let msg = socket.read(); match msg { @@ -174,12 +184,51 @@ impl WebSocketClient { } Message::Binary(bin) => { println!("Received binary: {:?} bytes", bin.len()); - let decoded = + let server_message = serde_json::from_slice::(&bin) .unwrap(); - println!("Decoded server message: {:?}", decoded); - if tx.send(decoded).is_err() { - break; + println!( + "Decoded server message: {:?}", + server_message + ); + + match &server_message { + ServerMessage::Subscribed(sub) => { + println!( + "Subscribed to topic: {:?}", + sub.topic + ); + let SubscriptionType::Stream(sub) = + &sub.topic; + subsciption_topic = sub.clone(); + } + ServerMessage::Unsubscribed(sub) => { + println!( + "Unsubscribed from topic: {:?}", + sub.topic + ); + } + ServerMessage::Update(update) => { + println!( + "Received update: {:?}", + update.len() + ); + decode_print( + &subsciption_topic, + update.clone(), + ) + .unwrap(); + if tx.send(server_message).is_err() { + break; + } + } + ServerMessage::Error(err) => { + println!( + "Received error from ws: {:?}", + err + ); + break; + } } } Message::Ping(ping) => { @@ -216,3 +265,50 @@ impl WebSocketClient { Ok(rx) } } + +pub fn decode_print( + name: &str, + s3_payload: Vec, +) -> Result<(), WsSubscriptionError> { + match name { + Transaction::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Transaction {:?}", entity); + } + Block::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Block {:?}", entity); + } + Input::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Input {:?}", entity); + } + Output::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Output {:?}", entity); + } + Receipt::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Receipt {:?}", entity); + } + Utxo::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Utxo {:?}", entity); + } + Log::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Log {:?}", entity); + } + _ => { + eprintln!("Unknown entity {:?}", name.to_string()); + } + } + Ok(()) +} diff --git a/crates/fuel-streams-ws/src/server/ws/socket.rs b/crates/fuel-streams-ws/src/server/ws/socket.rs index 8ec84e11..8af8610d 100644 --- a/crates/fuel-streams-ws/src/server/ws/socket.rs +++ b/crates/fuel-streams-ws/src/server/ws/socket.rs @@ -7,6 +7,13 @@ use actix_web::{ Responder, }; use actix_ws::{Message, Session}; +use fuel_streams::{ + logs::Log, + types::{Block, Input, Output, Receipt, Transaction}, + utxos::Utxo, + StreamEncoder, + Streamable, +}; use futures::StreamExt; use uuid::Uuid; @@ -20,7 +27,7 @@ use crate::{ state::ServerState, ws::{ fuel_streams::FuelStreamsExt, - models::{ServerMessage, SubscriptionType}, + models::{ServerMessage, SubscriptionPayload, SubscriptionType}, }, }, telemetry::Telemetry, @@ -119,6 +126,21 @@ pub async fn get_ws( // start the streamer async let mut stream_session = session.clone(); + // reply to socket with subscription + send_message_to_socket( + &mut session, + ServerMessage::Subscribed( + SubscriptionPayload { + topic: SubscriptionType::Stream( + subject_wildcard.clone(), + ), + from: None, + to: None, + }, + ), + ) + .await; + // receive streaming in a background thread let streams = streams.clone(); let telemetry = telemetry.clone(); @@ -149,21 +171,30 @@ pub async fn get_ws( }; // consume and forward to the ws - while let Some(res) = sub.next().await { - let serialized_payload = - match stream_to_server_message(res) { - Ok(res) => res, - Err(e) => { - telemetry.update_error_metrics( - &subject_wildcard, - &e.to_string(), - ); - tracing::error!("Error serializing received stream message: {:?}", e); - continue; - } - }; + while let Some(s3_serialized_payload) = + sub.next().await + { + // decode and serialize back to ws payload + let serialized_ws_payload = match decode( + &subject_wildcard, + s3_serialized_payload, + ) + .await + { + Ok(res) => res, + Err(e) => { + telemetry.update_error_metrics( + &subject_wildcard, + &e.to_string(), + ); + tracing::error!("Error serializing received stream message: {:?}", e); + continue; + } + }; + + // send the payload over the stream let _ = stream_session - .binary(serialized_payload) + .binary(serialized_ws_payload) .await; } }); @@ -190,6 +221,21 @@ pub async fn get_ws( return; } + // send a message to the client about unsubscribing + send_message_to_socket( + &mut session, + ServerMessage::Unsubscribed( + SubscriptionPayload { + topic: SubscriptionType::Stream( + subject_wildcard, + ), + from: None, + to: None, + }, + ), + ) + .await; + // TODO: implement unsubscribe and session management } } @@ -282,9 +328,63 @@ async fn close_socket_with_error( telemetry.update_unsubscribed(user_id, &subject_wildcard); } telemetry.decrement_subscriptions_count(); - let err = serde_json::to_vec(&ServerMessage::Error(e.to_string())) - .ok() - .unwrap_or_default(); - let _ = session.binary(err).await; + send_message_to_socket(&mut session, ServerMessage::Error(e.to_string())) + .await; let _ = session.close(None).await; } + +async fn send_message_to_socket(session: &mut Session, message: ServerMessage) { + let data = serde_json::to_vec(&message).ok().unwrap_or_default(); + let _ = session.binary(data).await; +} + +async fn decode( + name: &str, + s3_payload: Vec, +) -> Result, WsSubscriptionError> { + match name { + Transaction::NAME => { + let entity = Transaction::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Block::NAME => { + let entity = Block::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Input::NAME => { + let entity = Input::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Output::NAME => { + let entity = Output::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Receipt::NAME => { + let entity = Receipt::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Utxo::NAME => { + let entity = Utxo::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Log::NAME => { + let entity = Log::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + _ => Err(WsSubscriptionError::UnknownSubjectName(name.to_string())), + } +}