Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 30, 2024
1 parent 8de1010 commit fc5a0e9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
48 changes: 22 additions & 26 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use relay_base_schema::project::ProjectKey;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStore;
use crate::services::buffer::sqlite_envelope_store::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};

/// An error returned when doing an operation on [`SQLiteEnvelopeStack`].
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStackError {
/// The stack is empty.
#[error("the stack is empty")]
Empty,

/// The database encountered an unexpected error.
#[error("a database error occurred")]
DatabaseError(#[from] sqlx::Error),
/// The envelope store encountered an error.
#[error("an error occurred in the envelope store: {0}")]
EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
}

#[derive(Debug)]
Expand Down Expand Up @@ -99,8 +97,10 @@ impl SqliteEnvelopeStack {
// the buffer are lost. We are doing this on purposes, since if we were to have a
// database corruption during runtime, and we were to put the values back into the buffer
// we will end up with an infinite cycle.
// TODO: handle error.
self.envelope_store.insert_many(envelopes).await.unwrap();
self.envelope_store
.insert_many(envelopes)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;

// If we successfully spooled to disk, we know that data should be there.
self.check_disk = true;
Expand All @@ -116,7 +116,6 @@ impl SqliteEnvelopeStack {
/// In case an envelope fails deserialization due to malformed data in the database, the affected
/// envelope will not be unspooled and unspooling will continue with the remaining envelopes.
async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
// TODO: handle error.
let envelopes = self
.envelope_store
.delete_many(
Expand All @@ -125,15 +124,9 @@ impl SqliteEnvelopeStack {
self.batch_size.get() as i64,
)
.await
.unwrap();
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;

if envelopes.is_empty() {
// If there was a database error and no envelopes have been returned, we assume that we are
// in a critical state, so we return an error.
// if let Some(db_error) = db_error {
// return Err(SqliteEnvelopeStackError::DatabaseError(db_error));
// }

// In case no envelopes were unspooled, we will mark the disk as empty until another
// round of spooling takes place.
self.check_disk = false;
Expand Down Expand Up @@ -193,11 +186,13 @@ impl EnvelopeStack for SqliteEnvelopeStack {
self.unspool_from_disk().await?
}

Ok(self
let last = self
.batches_buffer
.back()
.and_then(|last_batch| last_batch.last())
.map(|boxed| boxed.as_ref()))
.map(|last_batch| last_batch.as_ref());

Ok(last)
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Expand All @@ -209,6 +204,9 @@ impl EnvelopeStack for SqliteEnvelopeStack {
self.batches_buffer_size -= 1;
last_batch.pop()
});
if result.is_none() {
return Ok(None);
}

// Since we might leave a batch without elements, we want to pop it from the buffer.
if self
Expand Down Expand Up @@ -362,7 +360,7 @@ mod tests {
let envelope = mock_envelope(Instant::now());
assert!(matches!(
stack.push(envelope).await,
Err(SqliteEnvelopeStackError::DatabaseError(_))
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));

// The stack now contains the last of the 3 elements that were added. If we add a new one
Expand Down Expand Up @@ -405,7 +403,7 @@ mod tests {
// We pop with an invalid db.
assert!(matches!(
stack.pop().await,
Err(SqliteEnvelopeStackError::DatabaseError(_))
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));
}

Expand All @@ -422,10 +420,8 @@ mod tests {
);

// We pop with no elements.
assert!(matches!(
stack.pop().await,
Err(SqliteEnvelopeStackError::Empty)
));
// We pop with no elements.
assert!(stack.pop().await.unwrap().is_none());
}

#[tokio::test]
Expand Down
29 changes: 23 additions & 6 deletions relay-server/src/services/buffer/sqlite_envelope_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use relay_base_schema::project::ProjectKey;
use relay_config::Config;

use crate::extractors::StartTime;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::Envelope;

pub struct InsertEnvelope {
Expand Down Expand Up @@ -53,11 +52,17 @@ pub enum SqliteEnvelopeStoreError {
#[error("an error occurred while spooling envelopes: {0}")]
SpoolingError(sqlx::Error),

#[error("an error occurred while unspooling envelopes: {0}")]
UnspoolingError(sqlx::Error),

#[error("no file path for the spool was provided")]
NoFilePath,

#[error("error during the migration of the database: {0}")]
MigrationError(MigrateError),

#[error("error while extracting the envelope from the database")]
EnvelopeExtractionError,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -206,6 +211,7 @@ impl SqliteEnvelopeStore {
}

let mut extracted_envelopes = Vec::with_capacity(limit as usize);
let mut db_error = None;
while let Some(envelope) = envelopes.as_mut().next().await {
let envelope = match envelope {
Ok(envelope) => envelope,
Expand All @@ -214,6 +220,8 @@ impl SqliteEnvelopeStore {
error = &err as &dyn Error,
"failed to unspool the envelopes from the disk",
);
db_error = Some(err);

continue;
}
};
Expand All @@ -231,6 +239,15 @@ impl SqliteEnvelopeStore {
}
}

// If we have no envelopes and there was at least one error, we signal total failure to the
// caller. We do this under the assumption that if there are envelopes and failures, we are
// fine with just logging the failure and not failing completely.
if extracted_envelopes.is_empty() {
if let Some(db_error) = db_error {
return Err(SqliteEnvelopeStoreError::UnspoolingError(db_error));
}
}

// We sort envelopes by `received_at`.
// Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't
// return deleted rows in a specific order.
Expand All @@ -253,17 +270,17 @@ impl SqliteEnvelopeStore {
}

/// Deserializes an [`Envelope`] from a database row.
fn extract_envelope(row: SqliteRow) -> Result<Box<Envelope>, SqliteEnvelopeStackError> {
fn extract_envelope(row: SqliteRow) -> Result<Box<Envelope>, SqliteEnvelopeStoreError> {
let envelope_row: Vec<u8> = row
.try_get("envelope")
.map_err(|_| SqliteEnvelopeStackError::Empty)?;
.map_err(SqliteEnvelopeStoreError::UnspoolingError)?;
let envelope_bytes = bytes::Bytes::from(envelope_row);
let mut envelope =
Envelope::parse_bytes(envelope_bytes).map_err(|_| SqliteEnvelopeStackError::Empty)?;
let mut envelope = Envelope::parse_bytes(envelope_bytes)
.map_err(|_| SqliteEnvelopeStoreError::EnvelopeExtractionError)?;

let received_at: i64 = row
.try_get("received_at")
.map_err(|_| SqliteEnvelopeStackError::Empty)?;
.map_err(SqliteEnvelopeStoreError::UnspoolingError)?;
let start_time = StartTime::from_timestamp_millis(received_at as u64);

envelope.set_start_time(start_time.into_inner());
Expand Down

0 comments on commit fc5a0e9

Please sign in to comment.