From 026f88d83aa2e1d80594dda2489fd0351c0ea9cf Mon Sep 17 00:00:00 2001 From: Jacob Rothstein Date: Sun, 24 Mar 2024 15:14:26 -0700 Subject: [PATCH] fix: correct usage of EventListener for scenarios that involve multiple wakes This resolves a potential bug that has not yet been observed with Stopper. It would only be encountered if the Event were notified before the atomic boolean was stored. Although the usage of atomics should make guard against this, the future logic still should take into consideration the possibility that the future was erroneously woken. I misunderstood the contract for EventListener, which turns out to be that each EventListener is only good for one wake, after which it needs to be replaced with a new listener. I thought that repeatedly polling the listener would repeatedly wake it on Event notification. refs: https://github.com/smol-rs/event-listener/issues/124 --- src/future_stopper.rs | 11 ++++++----- src/stopped.rs | 6 +++++- src/stream_stopper.rs | 6 +++++- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/future_stopper.rs b/src/future_stopper.rs index 6601a03..9eb5e7b 100644 --- a/src/future_stopper.rs +++ b/src/future_stopper.rs @@ -3,6 +3,7 @@ use event_listener::EventListener; use pin_project_lite::pin_project; use std::{ future::Future, + mem, pin::Pin, task::{Context, Poll}, }; @@ -40,12 +41,12 @@ impl Future for FutureStopper { } match Pin::new(&mut *this.event_listener).poll(cx) { - Poll::Ready(()) => continue, + Poll::Ready(()) => { + mem::replace(this.event_listener, this.stopper.0.event.listen()); + continue; + } Poll::Pending => { - return match this.future.poll(cx) { - Poll::Ready(output) => Poll::Ready(Some(output)), - Poll::Pending => Poll::Pending, - } + return this.future.poll(cx).map(Some); } }; } diff --git a/src/stopped.rs b/src/stopped.rs index 9e69967..0adea60 100644 --- a/src/stopped.rs +++ b/src/stopped.rs @@ -2,6 +2,7 @@ use crate::Stopper; use event_listener::EventListener; use std::{ future::{Future, IntoFuture}, + mem, pin::Pin, task::{Context, Poll}, }; @@ -39,7 +40,10 @@ impl Future for Stopped { } match Pin::new(&mut *event_listener).poll(cx) { - Poll::Ready(()) => continue, + Poll::Ready(()) => { + mem::replace(this.event_listener, this.stopper.0.event.listen()); + continue; + } Poll::Pending => return Poll::Pending, }; } diff --git a/src/stream_stopper.rs b/src/stream_stopper.rs index ffad82c..ab5f40a 100644 --- a/src/stream_stopper.rs +++ b/src/stream_stopper.rs @@ -4,6 +4,7 @@ use futures_lite::Stream; use std::{ fmt::{self, Debug, Formatter}, future::Future, + mem, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, @@ -66,7 +67,10 @@ impl Stream for StreamStopper { } match Pin::new(&mut *this.event_listener).poll(cx) { - Poll::Ready(()) => continue, + Poll::Ready(()) => { + mem::replace(this.event_listener, this.stopper.0.event.listen()); + continue; + } Poll::Pending => return this.stream.poll_next(cx), }; }