Skip to content

Commit

Permalink
Fix leaky connections
Browse files Browse the repository at this point in the history
Fixes #221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.

I have additionally changed `notify.notify_waiters();` to a
`notify.notify_one();` call in the broken connection branch. A single
broken connection should only need to notify one pending task to spawn a
new connect task, not all of them.
  • Loading branch information
tneely committed Oct 14, 2024
1 parent cb99697 commit 298ad54
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
3 changes: 2 additions & 1 deletion bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
let mut wait_time_start = None;

let future = async {
let _guard = self.inner.request();
loop {
let (conn, approvals) = self.inner.pop();
self.spawn_replenishing_approvals(approvals);
Expand Down Expand Up @@ -158,7 +159,7 @@ where
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
self.inner.notify.notify_one();
}
}
}
Expand Down
32 changes: 31 additions & 1 deletion bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ where
pub(crate) statistics: AtomicStatistics,
}

pub(crate) struct GetGuard<M: ManageConnection + Send> {
inner: Arc<SharedPool<M>>,
}

impl<M: ManageConnection + Send> GetGuard<M> {
fn new(inner: Arc<SharedPool<M>>) -> Self {
{
let mut locked = inner.internals.lock();
locked.inflight_gets += 1;
}
GetGuard { inner }
}
}

impl<M: ManageConnection + Send> Drop for GetGuard<M> {
fn drop(&mut self) {
let mut locked = self.inner.internals.lock();
locked.inflight_gets -= 1;
}
}

impl<M> SharedPool<M>
where
M: ManageConnection + Send,
Expand All @@ -41,12 +62,19 @@ where
let conn = locked.conns.pop_front().map(|idle| idle.conn);
let approvals = match &conn {
Some(_) => locked.wanted(&self.statics),
None => locked.approvals(&self.statics, 1),
None => {
let approvals = min(1, locked.inflight_gets.saturating_sub(locked.pending_conns));
locked.approvals(&self.statics, approvals)
}
};

(conn, approvals)
}

pub(crate) fn request(self: &Arc<Self>) -> GetGuard<M> {
GetGuard::new(self.clone())
}

pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
let mut locked = self.internals.lock();
let mut approvals = locked.approvals(&self.statics, 1);
Expand Down Expand Up @@ -81,6 +109,7 @@ where
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
inflight_gets: u32,
}

impl<M> PoolInternals<M>
Expand Down Expand Up @@ -202,6 +231,7 @@ where
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
inflight_gets: 0,
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,3 +1068,37 @@ async fn test_add_checks_broken_connections() {
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::Broken(_))));
}

#[tokio::test]
async fn test_reuse_on_drop() {
let pool = Pool::builder()
.min_idle(0)
.max_size(100)
.queue_strategy(QueueStrategy::Lifo)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

// The first get should
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection and grab it from the pool
let conn_0 = pool.get().await.expect("should connect");
// Dropping the connection queues up a notify
drop(conn_0);
// The second get should
// 1) see the first connection in the pool and grab it
let _conn_1: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");
// The third get will
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection,
// 4) see nothing in the pool,
// 5) _not_ spawn a single replenishing approval,
// 6) get notified of the new connection and grab it from the pool
let _conn_2: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");

assert_eq!(pool.state().connections, 2);
}

0 comments on commit 298ad54

Please sign in to comment.