Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

security: Rate limit GetAddr responses #7955

Merged
merged 26 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
969819f
Updates ADDR_RESPONSE_LIMIT_DENOMINATOR to 4
arya2 Nov 16, 2023
ba0cfc6
Moves logic getting a fraction of Zebra's peers to a method in the ad…
arya2 Nov 16, 2023
70e0cc1
Adds and uses CachedPeerAddrs struct in inbound service
arya2 Nov 16, 2023
81cf5a5
moves and documents constant
arya2 Nov 16, 2023
f28c529
fixes test
arya2 Nov 16, 2023
a82a404
Apply suggestions from code review
arya2 Nov 17, 2023
78234ba
updates docs
arya2 Nov 17, 2023
a5d6b25
renames sanitized_window method
arya2 Nov 17, 2023
a431e1b
renames CachedPeerAddrs to CachedPeerAddrResponse
arya2 Nov 17, 2023
66e1cfc
updates test
arya2 Nov 17, 2023
788c620
moves try_refresh to per request
arya2 Nov 17, 2023
29f3aed
Make unused sanitization method pub(crate)
teor2345 Nov 19, 2023
2032361
Apply suggestions from code review
arya2 Nov 20, 2023
f2c2968
moves CachedPeerAddrResponse to a module
arya2 Nov 20, 2023
2ee9a20
updates unit test
arya2 Nov 20, 2023
4d9bc0f
fixes unit test
arya2 Nov 20, 2023
7c35da6
removes unnecessary condition
arya2 Nov 20, 2023
cca2857
clears cached getaddr response if it can't refresh for over a minute …
arya2 Nov 20, 2023
7c84398
tests that inbound service gives out the same addresses for every Pee…
arya2 Nov 20, 2023
f72a52b
Applies suggestion from code review
arya2 Nov 20, 2023
5327d17
fixes doc link
arya2 Nov 20, 2023
1e3fb0e
renames constant
arya2 Nov 20, 2023
a4a8963
Fix docs on new constant
arya2 Nov 20, 2023
f902adb
applies suggestion from code review
arya2 Nov 20, 2023
120b6a5
uses longer cache expiry time
arya2 Nov 20, 2023
48f419e
Adds code comments
arya2 Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::Span;
use zebra_chain::{parameters::Network, serialization::DateTime32};

use crate::{
constants,
constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
meta_addr::MetaAddrChange,
protocol::external::{canonical_peer_addr, canonical_socket_addr},
types::MetaAddr,
Expand Down Expand Up @@ -268,7 +268,20 @@ impl AddressBook {

/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
///
/// Limited to a the number of peer addresses Zebra should give out per `GetAddr` request.
pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
let now = Utc::now();
let mut peers = self.sanitized(now);
let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));

peers
}

/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
let _guard = self.span.enter();

Expand Down
18 changes: 15 additions & 3 deletions zebra-network/src/address_book/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tracing::Span;
use zebra_chain::{parameters::Network::*, serialization::Duration32};

use crate::{
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
constants::{
ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK,
MAX_ADDRS_IN_MESSAGE, MAX_PEER_ACTIVE_FOR_GOSSIP,
},
meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange},
AddressBook,
};
Expand All @@ -36,8 +39,17 @@ proptest! {
addresses
);

for gossiped_address in address_book.sanitized(chrono_now) {
let duration_since_last_seen = gossiped_address
// Only recently reachable are sanitized
let sanitized = address_book.sanitized(chrono_now);
let gossiped = address_book.fresh_get_addr_response();

let expected_num_gossiped = sanitized.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR).min(MAX_ADDRS_IN_MESSAGE);
let num_gossiped = gossiped.len();

prop_assert_eq!(expected_num_gossiped, num_gossiped);

for sanitized_address in sanitized {
let duration_since_last_seen = sanitized_address
.last_seen()
.expect("Peer that was never seen before is being gossiped")
.saturating_elapsed(chrono_now)
Expand Down
9 changes: 5 additions & 4 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub const MAX_ADDRS_IN_MESSAGE: usize = 1000;
///
/// This limit makes sure that Zebra does not reveal its entire address book
/// in a single `Peers` response.
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 3;
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 4;

/// The maximum number of addresses Zebra will keep in its address book.
///
Expand Down Expand Up @@ -499,8 +499,9 @@ mod tests {
#[test]
#[allow(clippy::assertions_on_constants)]
fn ensure_address_limits_consistent() {
// Zebra 1.0.0-beta.2 address book metrics in December 2021.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 4_500;
// Estimated network address book size in November 2023, after the address book limit was increased.
// Zebra 1.0.0-beta.2 address book metrics in December 2021 showed 4500 peers.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 5_500;

let _init_guard = zebra_test::init();

Expand All @@ -515,7 +516,7 @@ mod tests {
);

assert!(
MAX_ADDRS_IN_ADDRESS_BOOK < TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
MAX_ADDRS_IN_ADDRESS_BOOK <= TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
"the address book limit should actually be used"
);
}
Expand Down
6 changes: 6 additions & 0 deletions zebra-network/src/protocol/internal/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,10 @@ impl Response {
pub fn is_inventory_download(&self) -> bool {
matches!(self, Response::Blocks(_) | Response::Transactions(_))
}

/// Returns true if self is the [`Response::Nil`] variant.
#[allow(dead_code)]
pub fn is_nil(&self) -> bool {
matches!(self, Self::Nil)
}
}
87 changes: 22 additions & 65 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::{Arc, TryLockError},
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use chrono::Utc;
use futures::{
future::{FutureExt, TryFutureExt},
stream::Stream,
};
use num_integer::div_ceil;
use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};

Expand All @@ -32,10 +30,7 @@ use zebra_chain::{
transaction::UnminedTxId,
};
use zebra_consensus::router::RouterError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, InventoryResponse,
};
use zebra_network::{AddressBook, InventoryResponse};
use zebra_node_services::mempool;

use crate::BoxError;
Expand All @@ -45,8 +40,11 @@ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

use InventoryResponse::*;

mod cached_peer_addr_response;
pub(crate) mod downloads;

use cached_peer_addr_response::CachedPeerAddrResponse;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -135,8 +133,12 @@ pub enum Setup {
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
/// An owned partial list of peer addresses used as a `GetAddr` response, and
/// a shared list of peer addresses used to periodically refresh the partial list.
///
/// Refreshed from the address book in `poll_ready` method
/// after [`INBOUND_CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::INBOUND_CACHED_ADDRS_REFRESH_INTERVAL).
cached_peer_addr_response: CachedPeerAddrResponse,

/// A `futures::Stream` that downloads and verifies gossiped blocks.
block_downloads: Pin<Box<GossipedBlockDownloads>>,
Expand Down Expand Up @@ -261,6 +263,8 @@ impl Service<zn::Request> for Inbound {
latest_chain_tip,
} = setup_data;

let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);

let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
Expand All @@ -271,7 +275,7 @@ impl Service<zn::Request> for Inbound {

result = Ok(());
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -306,7 +310,7 @@ impl Service<zn::Request> for Inbound {
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
cached_peer_addr_response,
mut block_downloads,
mempool,
state,
Expand All @@ -321,7 +325,7 @@ impl Service<zn::Request> for Inbound {
result = Ok(());

Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -352,13 +356,13 @@ impl Service<zn::Request> for Inbound {
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
let (address_book, block_downloads, mempool, state) = match &mut self.setup {
let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup {
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
} => (address_book, block_downloads, mempool, state),
} => (cached_peer_addr_response, block_downloads, mempool, state),
_ => {
debug!("ignoring request from remote peer during setup");
return async { Ok(zn::Response::Nil) }.boxed();
Expand All @@ -377,58 +381,11 @@ impl Service<zn::Request> for Inbound {
//
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();

let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};

let peers = get_peers();
cached_peer_addr_response.try_refresh();
let response = cached_peer_addr_response.value();

async move {
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();

// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();

// Send a sanitized response
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));

// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);

if peers.is_empty() {
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
} else {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
}

Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
}
Ok(response)
}.boxed()
}
zn::Request::BlocksByHash(hashes) => {
Expand Down
96 changes: 96 additions & 0 deletions zebrad/src/components/inbound/cached_peer_addr_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! Periodically-refreshed GetAddr response for the inbound service.
//!
//! Used to avoid giving out Zebra's entire address book over a short duration.

use std::{
sync::{Mutex, TryLockError},
time::Instant,
};

use super::*;

/// The maximum duration that a `CachedPeerAddrResponse` is considered fresh before the inbound service
/// should get new peer addresses from the address book to send as a `GetAddr` response.
pub const INBOUND_CACHED_ADDRS_REFRESH_INTERVAL: Duration = Duration::from_secs(10 * 60);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum duration that a `CachedPeerAddrResponse` is considered fresh before the inbound service
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// should get new peer addresses from the address book to send as a `GetAddr` response.
const INBOUND_CACHED_ADDRS_MAX_FRESH_DURATION: Duration = Duration::from_secs(60);

/// Caches and refreshes a partial list of peer addresses to be returned as a `GetAddr` response.
pub struct CachedPeerAddrResponse {
/// A shared list of peer addresses.
address_book: Arc<Mutex<zn::AddressBook>>,

/// An owned list of peer addresses used as a `GetAddr` response.
value: zn::Response,

/// Instant after which `cached_addrs` should be refreshed.
refresh_time: Instant,
}

impl CachedPeerAddrResponse {
/// Creates a new empty [`CachedPeerAddrResponse`].
pub(super) fn new(address_book: Arc<Mutex<AddressBook>>) -> Self {
Self {
address_book,
value: zn::Response::Nil,
refresh_time: Instant::now(),
}
}

pub(super) fn value(&self) -> zn::Response {
self.value.clone()
}

/// Refreshes the `cached_addrs` if the time has past `refresh_time` or the cache is empty
pub(super) fn try_refresh(&mut self) {
let now = Instant::now();

// return early if there are some cached addresses, and they are still fresh
if now < self.refresh_time {
return;
}

let max_fresh_time = self.refresh_time + INBOUND_CACHED_ADDRS_MAX_FRESH_DURATION;

// try getting a lock on the address book if it's time to refresh the cached addresses
match self
.address_book
.try_lock()
.map(|book| book.fresh_get_addr_response())
{
// Update cached value and refresh_time, even if the address book is empty.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
//
// Security: this avoids outdated gossiped peers. Outdated Zebra binaries will gradually lose all their peers,
// because those peers refuse to connect to outdated versions. So we don't want those outdated Zebra
// versions to keep gossiping old peer information either.
Ok(peers) if !peers.is_empty() => {
self.refresh_time = now + INBOUND_CACHED_ADDRS_REFRESH_INTERVAL;
self.value = zn::Response::Peers(peers);
}

Ok(_) if now > max_fresh_time => {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
self.value = zn::Response::Nil;
}

Err(TryLockError::WouldBlock) if now > max_fresh_time => {
warn!("getaddrs response hasn't been refreshed in some time");
self.value = zn::Response::Nil;
}

Ok(_) => {
debug!(
"could not refresh cached response because our address \
book has no available peers"
);
}

Err(TryLockError::WouldBlock) => {}

Err(TryLockError::Poisoned(_)) => {
panic!("previous thread panicked while holding the address book lock")
}
};
}
}
Loading
Loading