Skip to content

Commit

Permalink
fix(repo): Added better deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
0xterminator committed Dec 12, 2024
1 parent 1965e6e commit a342601
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 29 deletions.
116 changes: 106 additions & 10 deletions crates/fuel-streams-ws/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use fuel_streams::{subjects::IntoSubject, types::FuelNetwork};
use fuel_streams::{
logs::Log,
subjects::IntoSubject,
types::{Block, FuelNetwork, Input, Output, Receipt, Transaction},
utxos::Utxo,
Streamable,
};
use reqwest::{
header::{
ACCEPT,
Expand All @@ -22,11 +28,14 @@ use url::Url;

use crate::server::{
http::models::{LoginRequest, LoginResponse},
ws::models::{
ClientMessage,
ServerMessage,
SubscriptionPayload,
SubscriptionType,
ws::{
errors::WsSubscriptionError,
models::{
ClientMessage,
ServerMessage,
SubscriptionPayload,
SubscriptionType,
},
},
};

Expand Down Expand Up @@ -165,6 +174,7 @@ impl WebSocketClient {

let (tx, rx) = mpsc::unbounded_channel::<ServerMessage>();
tokio::spawn(async move {
let mut subsciption_topic = String::new();

Check warning on line 177 in crates/fuel-streams-ws/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Cargo verifications

"subsciption" should be "subscription".
loop {
let msg = socket.read();
match msg {
Expand All @@ -174,12 +184,51 @@ impl WebSocketClient {
}
Message::Binary(bin) => {
println!("Received binary: {:?} bytes", bin.len());
let decoded =
let server_message =
serde_json::from_slice::<ServerMessage>(&bin)
.unwrap();
println!("Decoded server message: {:?}", decoded);
if tx.send(decoded).is_err() {
break;
println!(
"Decoded server message: {:?}",
server_message
);

match &server_message {
ServerMessage::Subscribed(sub) => {
println!(
"Subscribed to topic: {:?}",
sub.topic
);
let SubscriptionType::Stream(sub) =
&sub.topic;
subsciption_topic = sub.clone();

Check warning on line 203 in crates/fuel-streams-ws/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Cargo verifications

"subsciption" should be "subscription".
}
ServerMessage::Unsubscribed(sub) => {
println!(
"Unsubscribed from topic: {:?}",
sub.topic
);
}
ServerMessage::Update(update) => {
println!(
"Received update: {:?}",
update.len()
);
decode_print(
&subsciption_topic,

Check warning on line 217 in crates/fuel-streams-ws/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Cargo verifications

"subsciption" should be "subscription".
update.clone(),
)
.unwrap();
if tx.send(server_message).is_err() {
break;
}
}
ServerMessage::Error(err) => {
println!(
"Received error from ws: {:?}",
err
);
break;
}
}
}
Message::Ping(ping) => {
Expand Down Expand Up @@ -216,3 +265,50 @@ impl WebSocketClient {
Ok(rx)
}
}

pub fn decode_print(
name: &str,
s3_payload: Vec<u8>,
) -> Result<(), WsSubscriptionError> {
match name {
Transaction::NAME => {
let entity = serde_json::from_slice::<Transaction>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Transaction {:?}", entity);
}
Block::NAME => {
let entity = serde_json::from_slice::<Block>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Block {:?}", entity);
}
Input::NAME => {
let entity = serde_json::from_slice::<Input>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Input {:?}", entity);
}
Output::NAME => {
let entity = serde_json::from_slice::<Output>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Output {:?}", entity);
}
Receipt::NAME => {
let entity = serde_json::from_slice::<Receipt>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Receipt {:?}", entity);
}
Utxo::NAME => {
let entity = serde_json::from_slice::<Utxo>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Utxo {:?}", entity);
}
Log::NAME => {
let entity = serde_json::from_slice::<Log>(&s3_payload)
.map_err(WsSubscriptionError::UnparsablePayload)?;
println!("Log {:?}", entity);
}
_ => {
eprintln!("Unknown entity {:?}", name.to_string());
}
}
Ok(())
}
138 changes: 119 additions & 19 deletions crates/fuel-streams-ws/src/server/ws/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ use actix_web::{
Responder,
};
use actix_ws::{Message, Session};
use fuel_streams::{
logs::Log,
types::{Block, Input, Output, Receipt, Transaction},
utxos::Utxo,
StreamEncoder,
Streamable,
};
use futures::StreamExt;
use uuid::Uuid;

Expand All @@ -20,7 +27,7 @@ use crate::{
state::ServerState,
ws::{
fuel_streams::FuelStreamsExt,
models::{ServerMessage, SubscriptionType},
models::{ServerMessage, SubscriptionPayload, SubscriptionType},
},
},
telemetry::Telemetry,
Expand Down Expand Up @@ -119,6 +126,21 @@ pub async fn get_ws(
// start the streamer async
let mut stream_session = session.clone();

// reply to socket with subscription
send_message_to_socket(
&mut session,
ServerMessage::Subscribed(
SubscriptionPayload {
topic: SubscriptionType::Stream(
subject_wildcard.clone(),
),
from: None,
to: None,
},
),
)
.await;

// receive streaming in a background thread
let streams = streams.clone();
let telemetry = telemetry.clone();
Expand Down Expand Up @@ -149,21 +171,30 @@ pub async fn get_ws(
};

// consume and forward to the ws
while let Some(res) = sub.next().await {
let serialized_payload =
match stream_to_server_message(res) {
Ok(res) => res,
Err(e) => {
telemetry.update_error_metrics(
&subject_wildcard,
&e.to_string(),
);
tracing::error!("Error serializing received stream message: {:?}", e);
continue;
}
};
while let Some(s3_serialized_payload) =
sub.next().await
{
// decode and serialize back to ws payload
let serialized_ws_payload = match decode(
&subject_wildcard,
s3_serialized_payload,
)
.await
{
Ok(res) => res,
Err(e) => {
telemetry.update_error_metrics(
&subject_wildcard,
&e.to_string(),
);
tracing::error!("Error serializing received stream message: {:?}", e);
continue;
}
};

// send the payload over the stream
let _ = stream_session
.binary(serialized_payload)
.binary(serialized_ws_payload)
.await;
}
});
Expand All @@ -190,6 +221,21 @@ pub async fn get_ws(
return;
}

// send a message to the client about unsubscribing
send_message_to_socket(
&mut session,
ServerMessage::Unsubscribed(
SubscriptionPayload {
topic: SubscriptionType::Stream(
subject_wildcard,
),
from: None,
to: None,
},
),
)
.await;

// TODO: implement unsubscribe and session management
}
}
Expand Down Expand Up @@ -282,9 +328,63 @@ async fn close_socket_with_error(
telemetry.update_unsubscribed(user_id, &subject_wildcard);
}
telemetry.decrement_subscriptions_count();
let err = serde_json::to_vec(&ServerMessage::Error(e.to_string()))
.ok()
.unwrap_or_default();
let _ = session.binary(err).await;
send_message_to_socket(&mut session, ServerMessage::Error(e.to_string()))
.await;
let _ = session.close(None).await;
}

async fn send_message_to_socket(session: &mut Session, message: ServerMessage) {
let data = serde_json::to_vec(&message).ok().unwrap_or_default();
let _ = session.binary(data).await;
}

async fn decode(
name: &str,
s3_payload: Vec<u8>,
) -> Result<Vec<u8>, WsSubscriptionError> {
match name {
Transaction::NAME => {
let entity = Transaction::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Block::NAME => {
let entity = Block::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Input::NAME => {
let entity = Input::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Output::NAME => {
let entity = Output::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Receipt::NAME => {
let entity = Receipt::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Utxo::NAME => {
let entity = Utxo::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
Log::NAME => {
let entity = Log::decode_or_panic(s3_payload);
let serialized_data = serde_json::to_vec(&entity)
.map_err(WsSubscriptionError::UnparsablePayload)?;
stream_to_server_message(serialized_data)
}
_ => Err(WsSubscriptionError::UnknownSubjectName(name.to_string())),
}
}

0 comments on commit a342601

Please sign in to comment.