Skip to content

Commit

Permalink
Limit in-flight plotting sectors until the very end
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 22, 2024
1 parent 86d410b commit 3c49bb2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
2 changes: 0 additions & 2 deletions crates/subspace-farmer/src/cluster/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,6 @@ async fn send_publish_progress(
}
}

response_sender.close_channel();

return;
}
SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
Expand Down
14 changes: 11 additions & 3 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::num::NonZeroUsize;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{PublicKey, SectorIndex};
Expand Down Expand Up @@ -454,12 +455,19 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector: Box::pin(stream::once(async move { Ok(sector) })),
sector: Box::pin({
let mut sector = Some(Ok(sector));

stream::poll_fn(move |_cx| {
// Just so that permit is dropped with stream itself
let _downloading_permit = &downloading_permit;

Poll::Ready(sector.take())
})
}),
},
)
.await;

drop(downloading_permit);
}
};

Expand Down
14 changes: 11 additions & 3 deletions crates/subspace-farmer/src/plotter/gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::num::TryFromIntError;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{PublicKey, SectorIndex};
Expand Down Expand Up @@ -440,12 +441,19 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector: Box::pin(stream::once(async move { Ok(sector) })),
sector: Box::pin({
let mut sector = Some(Ok(sector));

stream::poll_fn(move |_cx| {
// Just so that permit is dropped with stream itself
let _downloading_permit = &downloading_permit;

Poll::Ready(sector.take())
})
}),
},
)
.await;

drop(downloading_permit);
}
};

Expand Down

0 comments on commit 3c49bb2

Please sign in to comment.