From bbf29216c8b5e22bba5a42e82dd152fcbd47c83a Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Thu, 12 Dec 2024 17:45:18 +0200 Subject: [PATCH] fix(repo): Added better deserialization --- crates/fuel-streams-ws/src/client/mod.rs | 116 ++++++++++++++++-- .../fuel-streams-ws/src/server/ws/socket.rs | 95 +++++++++++--- 2 files changed, 187 insertions(+), 24 deletions(-) diff --git a/crates/fuel-streams-ws/src/client/mod.rs b/crates/fuel-streams-ws/src/client/mod.rs index 287d3387..a3394d0b 100644 --- a/crates/fuel-streams-ws/src/client/mod.rs +++ b/crates/fuel-streams-ws/src/client/mod.rs @@ -1,4 +1,10 @@ -use fuel_streams::{subjects::IntoSubject, types::FuelNetwork}; +use fuel_streams::{ + logs::Log, + subjects::IntoSubject, + types::{Block, FuelNetwork, Input, Output, Receipt, Transaction}, + utxos::Utxo, + Streamable, +}; use reqwest::{ header::{ ACCEPT, @@ -22,11 +28,14 @@ use url::Url; use crate::server::{ http::models::{LoginRequest, LoginResponse}, - ws::models::{ - ClientMessage, - ServerMessage, - SubscriptionPayload, - SubscriptionType, + ws::{ + errors::WsSubscriptionError, + models::{ + ClientMessage, + ServerMessage, + SubscriptionPayload, + SubscriptionType, + }, }, }; @@ -165,6 +174,7 @@ impl WebSocketClient { let (tx, rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { + let mut subsciption_topic = String::new(); loop { let msg = socket.read(); match msg { @@ -174,12 +184,51 @@ impl WebSocketClient { } Message::Binary(bin) => { println!("Received binary: {:?} bytes", bin.len()); - let decoded = + let server_message = serde_json::from_slice::(&bin) .unwrap(); - println!("Decoded server message: {:?}", decoded); - if tx.send(decoded).is_err() { - break; + println!( + "Decoded server message: {:?}", + server_message + ); + + match &server_message { + ServerMessage::Subscribed(sub) => { + println!( + "Subscribed to topic: {:?}", + sub.topic + ); + let SubscriptionType::Stream(sub) = + &sub.topic; + subsciption_topic = sub.clone(); + } + ServerMessage::Unsubscribed(sub) => { + println!( + "Unsubscribed from topic: {:?}", + sub.topic + ); + } + ServerMessage::Update(update) => { + println!( + "Received update: {:?}", + update.len() + ); + decode_print( + &subsciption_topic, + update.clone(), + ) + .unwrap(); + if tx.send(server_message).is_err() { + break; + } + } + ServerMessage::Error(err) => { + println!( + "Received error from ws: {:?}", + err + ); + break; + } } } Message::Ping(ping) => { @@ -216,3 +265,50 @@ impl WebSocketClient { Ok(rx) } } + +pub fn decode_print( + name: &str, + s3_payload: Vec, +) -> Result<(), WsSubscriptionError> { + match name { + Transaction::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Transaction {:?}", entity); + } + Block::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Block {:?}", entity); + } + Input::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Input {:?}", entity); + } + Output::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Output {:?}", entity); + } + Receipt::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Receipt {:?}", entity); + } + Utxo::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Utxo {:?}", entity); + } + Log::NAME => { + let entity = serde_json::from_slice::(&s3_payload) + .map_err(WsSubscriptionError::UnparsablePayload)?; + println!("Log {:?}", entity); + } + _ => { + eprintln!("Unknown entity {:?}", name.to_string()); + } + } + Ok(()) +} diff --git a/crates/fuel-streams-ws/src/server/ws/socket.rs b/crates/fuel-streams-ws/src/server/ws/socket.rs index 8ec84e11..21d4a0a5 100644 --- a/crates/fuel-streams-ws/src/server/ws/socket.rs +++ b/crates/fuel-streams-ws/src/server/ws/socket.rs @@ -7,6 +7,13 @@ use actix_web::{ Responder, }; use actix_ws::{Message, Session}; +use fuel_streams::{ + logs::Log, + types::{Block, Input, Output, Receipt, Transaction}, + utxos::Utxo, + StreamEncoder, + Streamable, +}; use futures::StreamExt; use uuid::Uuid; @@ -149,21 +156,30 @@ pub async fn get_ws( }; // consume and forward to the ws - while let Some(res) = sub.next().await { - let serialized_payload = - match stream_to_server_message(res) { - Ok(res) => res, - Err(e) => { - telemetry.update_error_metrics( - &subject_wildcard, - &e.to_string(), - ); - tracing::error!("Error serializing received stream message: {:?}", e); - continue; - } - }; + while let Some(s3_serialized_payload) = + sub.next().await + { + // decode and serialize back to ws payload + let serialized_ws_payload = match decode( + &subject_wildcard, + s3_serialized_payload, + ) + .await + { + Ok(res) => res, + Err(e) => { + telemetry.update_error_metrics( + &subject_wildcard, + &e.to_string(), + ); + tracing::error!("Error serializing received stream message: {:?}", e); + continue; + } + }; + + // send the payload over the stream let _ = stream_session - .binary(serialized_payload) + .binary(serialized_ws_payload) .await; } }); @@ -288,3 +304,54 @@ async fn close_socket_with_error( let _ = session.binary(err).await; let _ = session.close(None).await; } + +pub async fn decode( + name: &str, + s3_payload: Vec, +) -> Result, WsSubscriptionError> { + match name { + Transaction::NAME => { + let entity = Transaction::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Block::NAME => { + let entity = Block::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Input::NAME => { + let entity = Input::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Output::NAME => { + let entity = Output::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Receipt::NAME => { + let entity = Receipt::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Utxo::NAME => { + let entity = Utxo::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + Log::NAME => { + let entity = Log::decode_or_panic(s3_payload); + let serialized_data = serde_json::to_vec(&entity) + .map_err(WsSubscriptionError::UnparsablePayload)?; + stream_to_server_message(serialized_data) + } + _ => Err(WsSubscriptionError::UnknownSubjectName(name.to_string())), + } +}