From 091964c8caacc647d7f2225b3fb4e5228b87e753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=90=BD=E5=8F=B6=E4=B9=8C=E9=BE=9F?= Date: Tue, 28 May 2024 09:50:11 +0800 Subject: [PATCH] feat: add `StreamExt::take_until` This commit migrates `Stream::take_until` from futures into futures-lite with little modification. Co-authored-by: zhulin.zzz Co-authored-by: John Nunley --- src/stream.rs | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/src/stream.rs b/src/stream.rs index e2632bd..77378b4 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -663,6 +663,176 @@ impl Stream for OnceFuture { } } +/// Take elements from this stream until the provided future resolves. +/// +/// This function will take elements from the stream until the provided +/// stopping future `fut` resolves. Once the `fut` future becomes ready, +/// this stream combinator will always return that the stream is done. +/// +/// The stopping future may return any type. Once the stream is stopped +/// the result of the stopping future may be accessed with `TakeUntil::take_result()`. +/// The stream may also be resumed with `TakeUntil::take_future()`. +/// See the documentation of [`TakeUntil`] for more information. +/// +/// ``` +/// use futures_lite::stream::{self, StreamExt, take_until}; +/// use futures_lite::future; +/// use std::task::Poll; +/// +/// let stream = stream::iter(1..=10); +/// +/// # spin_on::spin_on(async { +/// let mut i = 0; +/// let stop_fut = future::poll_fn(|_cx| { +/// i += 1; +/// if i <= 5 { +/// Poll::Pending +/// } else { +/// Poll::Ready(()) +/// } +/// }); +/// +/// let stream = take_until(stream, stop_fut); +/// +/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::>().await); +/// # }); +pub fn take_until(stream: S, future: F) -> TakeUntil +where + S: Sized + Stream, + F: Future, +{ + TakeUntil { + stream, + fut: Some(future), + fut_result: None, + free: false, + } +} + +pin_project! { + /// Stream for the [`StreamExt::take_until()`] method. + #[derive(Clone, Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TakeUntil { + #[pin] + stream: S, + // Contains the inner Future on start and None once the inner Future is resolved + // or taken out by the user. + #[pin] + fut: Option, + // Contains fut's return value once fut is resolved + fut_result: Option, + // Whether the future was taken out by the user. + free: bool, + } +} + +impl TakeUntil +where + St: Stream, + Fut: Future, +{ + /// Extract the stopping future out of the combinator. + /// + /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet. + /// Taking out the future means the combinator will be yielding + /// elements from the wrapped stream without ever stopping it. + pub fn take_future(&mut self) -> Option { + if self.fut.is_some() { + self.free = true; + } + + self.fut.take() + } + + /// Once the stopping future is resolved, this method can be used + /// to extract the value returned by the stopping future. + /// + /// This may be used to retrieve arbitrary data from the stopping + /// future, for example a reason why the stream was stopped. + /// + /// This method will return `None` if the future isn't resolved yet, + /// or if the result was already taken out. + /// + /// # Examples + /// + /// ``` + /// # spin_on::spin_on(async { + /// use futures_lite::stream::{self, StreamExt, take_until}; + /// use futures_lite::future; + /// use std::task::Poll; + /// + /// let stream = stream::iter(1..=10); + /// + /// let mut i = 0; + /// let stop_fut = future::poll_fn(|_cx| { + /// i += 1; + /// if i <= 5 { + /// Poll::Pending + /// } else { + /// Poll::Ready("reason") + /// } + /// }); + /// + /// let mut stream = take_until(stream, stop_fut); + /// let _ = (&mut stream).collect::>().await; + /// + /// let result = stream.take_result().unwrap(); + /// assert_eq!(result, "reason"); + /// # }); + /// ``` + pub fn take_result(&mut self) -> Option { + self.fut_result.take() + } + + /// Whether the stream was stopped yet by the stopping future + /// being resolved. + pub fn is_stopped(&self) -> bool { + !self.free && self.fut.is_none() + } +} + +impl Stream for TakeUntil +where + St: Stream, + Fut: Future, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if let Some(f) = this.fut.as_mut().as_pin_mut() { + if let Poll::Ready(result) = f.poll(cx) { + this.fut.set(None); + *this.fut_result = Some(result); + } + } + + if !*this.free && this.fut.is_none() { + // Future resolved, inner stream stopped + Poll::Ready(None) + } else { + // Future either not resolved yet or taken out by the user + let item = ready!(this.stream.poll_next(cx)); + if item.is_none() { + this.fut.set(None); + } + Poll::Ready(item) + } + } + + fn size_hint(&self) -> (usize, Option) { + if self.is_stopped() { + return (0, Some(0)); + } + + // Original stream can be truncated at any moment, so the lower bound isn't reliable. + let (_, upper_bound) = self.stream.size_hint(); + (0, upper_bound) + } +} + /// Extension trait for [`Stream`]. pub trait StreamExt: Stream { /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.