Skip to content

Commit

Permalink
scope and spawn instead of par iter
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta authored and romanz committed Nov 17, 2023
1 parent d789651 commit bec551b
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use bitcoin::{
};
use bitcoin_slices::{bsl, Parse};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;

use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
Expand Down Expand Up @@ -102,8 +100,8 @@ impl Connection {
R: Send + Sync,
{
self.blocks_duration.observe_duration("total", || {
let mut result = vec![];
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
let blockhashes_len = blockhashes.len();
if blockhashes.is_empty() {
return Ok(vec![]);
}
Expand All @@ -112,31 +110,46 @@ impl Connection {
self.req_send.send(Request::get_blocks(&blockhashes))
})?;

for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
})?;
result.push((hash, block));
}

Ok(result
.into_par_iter()
.map(|(hash, block)| {
self.blocks_duration
.observe_duration("process", || func(hash, block))
})
.collect())
rayon::scope(|s| {
let (send, receive) = std::sync::mpsc::channel();

for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
})?;
let func = &func;
let blocks_duration = &self.blocks_duration;
let send = send.clone();
s.spawn(move |_| {
let r = blocks_duration.observe_duration("process", || func(hash, block));
let _ = send.send(r);
});
}
let result: Result<Vec<_>, std::sync::mpsc::RecvError> =
(0..blockhashes_len).map(|_| receive.recv()).collect();

match result {
Ok(result) => {
if result.len() == blockhashes_len {
Ok(result)
} else {
bail!("asked {blockhashes_len} blocks, returned {}", result.len(),);
}
}
Err(e) => bail!("recv error {e:?}"),
}
})
})
}

Expand Down

0 comments on commit bec551b

Please sign in to comment.