Skip to content

Commit

Permalink
added logging for participant
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Feb 6, 2024
1 parent 0f927cd commit d100986
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
1 change: 0 additions & 1 deletion src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub struct Coordinator {
impl Coordinator {
pub async fn new(config: CoordinatorConfig) -> eyre::Result<Self> {
tracing::info!("Initializing coordinator");

let database = Arc::new(CoordinatorDb::new(&config.db).await?);

tracing::info!("Fetching masks from database");
Expand Down
8 changes: 8 additions & 0 deletions src/db/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,28 @@ pub struct ParticipantDb {

impl ParticipantDb {
pub async fn new(config: &DbConfig) -> eyre::Result<Self> {
tracing::info!("Connecting to database");

if config.create
&& !sqlx::Postgres::database_exists(&config.url).await?
{
tracing::info!("Creating database");
sqlx::Postgres::create_database(&config.url).await?;
}

let pool = sqlx::Pool::connect(&config.url).await?;

if config.migrate {
tracing::info!("Running migrations");
MIGRATOR.run(&pool).await?;
}

tracing::info!("Connected to database");

Ok(Self { pool })
}

#[tracing::instrument(skip(self))]
pub async fn fetch_shares(
&self,
id: usize,
Expand All @@ -46,6 +53,7 @@ impl ParticipantDb {
Ok(shares.into_iter().map(|(share,)| share).collect())
}

#[tracing::instrument(skip(self))]
pub async fn insert_shares(
&self,
shares: &[(u64, EncodedBits)],
Expand Down
2 changes: 2 additions & 0 deletions src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl DistanceEngine {
Self { rotations }
}

#[tracing::instrument(skip(self, out, db), level = "debug")]
pub fn batch_process(&self, out: &mut [[u16; 31]], db: &[EncodedBits]) {
assert_eq!(out.len(), db.len());
out.par_iter_mut()
Expand Down Expand Up @@ -101,6 +102,7 @@ impl MasksEngine {
Self { rotations }
}

#[tracing::instrument(skip(self, out, db), level = "debug")]
pub fn batch_process(&self, out: &mut [[u16; 31]], db: &[Bits]) {
assert_eq!(out.len(), db.len());
out.par_iter_mut()
Expand Down
15 changes: 12 additions & 3 deletions src/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use distance::Template;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex};
use tracing::instrument;

use crate::config::ParticipantConfig;
use crate::db::participant::ParticipantDb;
Expand All @@ -17,7 +18,11 @@ pub struct Participant {

impl Participant {
pub async fn new(config: ParticipantConfig) -> eyre::Result<Self> {
tracing::info!("Initializing participant");

let database = ParticipantDb::new(&config.db).await?;

tracing::info!("Fetching shares from database");
let shares = database.fetch_shares(0).await?;
let shares = Arc::new(Mutex::new(shares));

Expand All @@ -32,7 +37,7 @@ impl Participant {
}

pub async fn spawn(&self) -> eyre::Result<()> {
tracing::info!("Starting participant");
tracing::info!("Spawning participant");

let batch_size = self.batch_size;

Expand All @@ -49,24 +54,27 @@ impl Participant {
.read_exact(bytemuck::bytes_of_mut(&mut template))
.await?;

tracing::info!(?template, "Received template");

//TODO: add id comm
tracing::info!("Received query");
let shares_ref = self.shares.clone();
// Process in worker thread
let (sender, mut receiver) = mpsc::channel(4);

let worker = tokio::task::spawn_blocking(move || {
calculate_share_distances(
shares_ref, template, batch_size, sender,
)
});

while let Some(buffer) = receiver.recv().await {
tracing::info!(batch_size = ?buffer.len(), "Sending batch result to coordinator");
stream.write_all(&buffer).await?;
}
worker.await??;
}
}

#[tracing::instrument(skip(self))]
pub async fn sync_shares(&self) -> eyre::Result<()> {
let mut shares = self.shares.lock().await;

Expand All @@ -79,6 +87,7 @@ impl Participant {
}
}

#[instrument(skip(shares, template, sender))]
fn calculate_share_distances(
shares: Arc<Mutex<Vec<EncodedBits>>>,
template: Template,
Expand Down

0 comments on commit d100986

Please sign in to comment.