Skip to content

Commit

Permalink
Merge pull request #246 from superfly/handle-deser-error
Browse files Browse the repository at this point in the history
corro-client: handle deserialization error correctly
  • Loading branch information
pborzenkov authored Jul 30, 2024
2 parents f93048a + 6835bfc commit 8f8e552
Showing 1 changed file with 44 additions and 13 deletions.
57 changes: 44 additions & 13 deletions crates/corro-client/src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use bytes::{Buf, Bytes, BytesMut};
use corro_api_types::{ChangeId, TypedQueryEvent};
use corro_api_types::{ChangeId, QueryEvent, TypedQueryEvent};
use futures::{ready, Future, Stream};
use hyper::{client::HttpConnector, Body};
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -144,24 +144,34 @@ where
Some(Ok(b)) => match serde_json::from_slice(&b) {
Ok(evt) => {
if let TypedQueryEvent::EndOfQuery { change_id, .. } = &evt {
self.observed_eoq = true;
self.last_change_id = *change_id;
self.handle_eoq(*change_id);
}

if let TypedQueryEvent::Change(_, _, _, change_id) = &evt {
match self.last_change_id {
Some(id) if id + 1 != *change_id => {
return Poll::Ready(Some(Err(SubscriptionError::MissedChange {
expected: id + 1,
got: *change_id,
})))
}
_ => (),
if let Err(e) = self.handle_change(*change_id) {
return Poll::Ready(Some(Err(e)));
}
self.last_change_id = Some(*change_id);
}

Poll::Ready(Some(Ok(evt)))
}
Err(e) => Poll::Ready(Some(Err(e.into()))),
Err(deser_err) => {
// It failed to deserialize, try untyped variant to extract the metadata
if let Ok(evt) = serde_json::from_slice::<QueryEvent>(&b) {
if let TypedQueryEvent::EndOfQuery { change_id, .. } = &evt {
self.handle_eoq(*change_id);
}

if let TypedQueryEvent::Change(_, _, _, change_id) = &evt {
if let Err(e) = self.handle_change(*change_id) {
return Poll::Ready(Some(Err(e)));
}
}
}

// But return the original error anyway (unless this is out-of-order event)
Poll::Ready(Some(Err(deser_err.into())))
}
},
Some(Err(e)) => match e {
LinesCodecError::MaxLineLengthExceeded => {
Expand All @@ -173,6 +183,27 @@ where
}
}

fn handle_eoq(&mut self, change_id: Option<ChangeId>) {
self.observed_eoq = true;
self.last_change_id = change_id;
}

fn handle_change(&mut self, change_id: ChangeId) -> Result<(), SubscriptionError> {
match self.last_change_id {
Some(id) if id + 1 != change_id => {
return Err(SubscriptionError::MissedChange {
expected: id + 1,
got: change_id,
})
}
_ => (),
}

self.last_change_id = Some(change_id);

Ok(())
}

fn poll_request(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down

0 comments on commit 8f8e552

Please sign in to comment.