Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rename argument remote_peer to parent #141

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ impl Task {
task_id: &str,
host_id: &str,
peer_id: &str,
remote_peers: Vec<Peer>,
parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
content_length: u64,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
Expand All @@ -643,7 +643,7 @@ impl Task {
self.config.clone(),
task_id,
interested_pieces.clone(),
remote_peers.clone(),
parents.clone(),
);

// Initialize the join set.
Expand All @@ -660,35 +660,31 @@ impl Task {
async fn download_from_remote_peer(
task_id: String,
number: u32,
remote_peer: Peer,
parent: Peer,
piece: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
) -> ClientResult<metadata::Piece> {
let _permit = semaphore.acquire().await.map_err(|err| {
error!("acquire semaphore error: {:?}", err);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
piece_number: number,
parent_id: remote_peer.id.clone(),
parent_id: parent.id.clone(),
})
})?;

let metadata = piece
.download_from_remote_peer(
task_id.as_str(),
number,
remote_peer.clone(),
)
.download_from_remote_peer(task_id.as_str(), number, parent.clone())
.await
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
number,
remote_peer.id.clone(),
parent.id.clone(),
err
);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
piece_number: number,
parent_id: remote_peer.id.clone(),
parent_id: parent.id.clone(),
})
})?;

Expand Down
12 changes: 6 additions & 6 deletions src/task/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,16 @@ impl Piece {
&self,
task_id: &str,
number: u32,
remote_peer: Peer,
parent: Peer,
) -> Result<metadata::Piece> {
// Record the start of downloading piece.
self.storage.download_piece_started(task_id, number).await?;

// Create a dfdaemon client.
let host = remote_peer
let host = parent
.host
.clone()
.ok_or(Error::InvalidPeer(remote_peer.id.clone()))?;
.ok_or(Error::InvalidPeer(parent.id.clone()))?;
let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;

Expand Down Expand Up @@ -308,7 +308,7 @@ impl Piece {
number,
piece.offset,
piece.digest.as_str(),
remote_peer.id.as_str(),
parent.id.as_str(),
&mut content.as_slice(),
)
.await
Expand Down Expand Up @@ -336,10 +336,10 @@ impl Piece {
&self,
task_id: &str,
number: u32,
remote_peer: Peer,
parent: Peer,
) -> Result<impl AsyncRead> {
// Download the piece from the remote peer.
self.download_from_remote_peer(task_id, number, remote_peer)
self.download_from_remote_peer(task_id, number, parent)
.await?;

// Return reader of the piece.
Expand Down
58 changes: 29 additions & 29 deletions src/task/piece_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub struct PieceCollector {
// task_id is the id of the task.
task_id: String,

// peers is the peers to collect pieces from.
peers: Vec<Peer>,
// parents is the parent peers.
parents: Vec<Peer>,

// interested_pieces is the pieces interested by the collector.
interested_pieces: Vec<metadata::Piece>,
Expand All @@ -56,12 +56,12 @@ pub struct PieceCollector {
}

impl PieceCollector {
// NewPieceCollector returns a new PieceCollector.
// new creates a new PieceCollector.
pub fn new(
config: Arc<Config>,
task_id: &str,
interested_pieces: Vec<metadata::Piece>,
peers: Vec<Peer>,
parents: Vec<Peer>,
) -> Self {
// Initialize collected_pieces.
let collected_pieces = Arc::new(DashMap::new());
Expand All @@ -75,24 +75,24 @@ impl PieceCollector {
Self {
config,
task_id: task_id.to_string(),
peers,
parents,
interested_pieces,
collected_pieces,
}
}

// Run runs the collector.
// run runs the piece collector.
pub async fn run(&self) -> Receiver<CollectedPiece> {
let task_id = self.task_id.clone();
let peers = self.peers.clone();
let parents = self.parents.clone();
let interested_pieces = self.interested_pieces.clone();
let collected_pieces = self.collected_pieces.clone();
let collected_piece_timeout = self.config.download.piece_timeout;
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128);
tokio::spawn(async move {
Self::collect_from_peers(
Self::collect_from_remote_peers(
task_id,
peers,
parents,
interested_pieces,
collected_pieces,
collected_piece_tx,
Expand All @@ -107,31 +107,31 @@ impl PieceCollector {
collected_piece_rx
}

// collect collects a piece from peers.
async fn collect_from_peers(
// collect_from_remote_peers collects pieces from remote peers.
async fn collect_from_remote_peers(
task_id: String,
peers: Vec<Peer>,
parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<()> {
// Create a task to collect pieces from peers.
let mut join_set = JoinSet::new();
for peer in peers.iter() {
for parent in parents.iter() {
async fn sync_pieces(
task_id: String,
peer: Peer,
peers: Vec<Peer>,
parent: Peer,
parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<Peer> {
// If candidate_parent.host is None, skip it.
let host = peer.host.clone().ok_or_else(|| {
error!("peer {:?} host is empty", peer);
Error::InvalidPeer(peer.id.clone())
let host = parent.host.clone().ok_or_else(|| {
error!("peer {:?} host is empty", parent);
Error::InvalidPeer(parent.id.clone())
})?;

// Create a dfdaemon client.
Expand Down Expand Up @@ -159,20 +159,20 @@ impl PieceCollector {
collected_pieces
.entry(message.piece_number)
.and_modify(|peers| {
peers.insert(peer.id.clone());
peers.insert(parent.id.clone());
});

match collected_pieces.get(&message.piece_number) {
Some(parents) => {
if let Some(parent) = parents.iter().next() {
Some(parent_ids) => {
if let Some(parent_id) = parent_ids.iter().next() {
let number = message.piece_number;
let parent = peers
let parent = parents
.iter()
.find(|peer| peer.id == parent.as_str())
.find(|parent| parent.id == parent_id.as_str())
.ok_or_else(|| {
error!("parent {} not found", parent.as_str());
Error::InvalidPeer(parent.clone())
})?;
error!("parent {} not found", parent_id.as_str());
Error::InvalidPeer(parent_id.clone())
})?;

collected_piece_tx
.send(CollectedPiece {
Expand All @@ -188,13 +188,13 @@ impl PieceCollector {
};
}

Ok(peer)
Ok(parent)
}

join_set.spawn(sync_pieces(
task_id.clone(),
peer.clone(),
peers.clone(),
parent.clone(),
parents.clone(),
interested_pieces.clone(),
collected_pieces.clone(),
collected_piece_tx.clone(),
Expand Down