Skip to content

Commit

Permalink
update run loop in mobile-packet-verifier to have predictable ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Oct 24, 2024
1 parent 4976c4e commit 34b759b
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,13 @@ where
S: SolanaNetwork,
MCR: MobileConfigResolverExt,
{
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> {
pub async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> {
// Set the initial burn period to one minute
let mut burn_time = Instant::now() + Duration::from_secs(60);
loop {
tokio::select! {
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
},
biased;
_ = &mut shutdown => return Ok(()),
_ = sleep_until(burn_time) => {
// It's time to burn
match self.burner.burn(&self.pool).await {
Expand All @@ -102,7 +92,18 @@ where
}
}
}
_ = shutdown.clone() => return Ok(()),
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
}
}
}
}
Expand Down

0 comments on commit 34b759b

Please sign in to comment.