diff --git a/CHANGELOG.md b/CHANGELOG.md index 37500e4..1ad8000 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,13 @@ The following icons are used to distinguish breaking changes from non-breaking c - 🔥: Breaking change (high impact: will require code changes for most users) - 💔: Breaking change (low impact: won't require code changes for most users) +## 0.7.7 + +### Added + +- Added `try_count` + + ## 0.7.6 ### Added diff --git a/Cargo.toml b/Cargo.toml index de516b6..3d8450f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "streamtools" -version = "0.7.6" +version = "0.7.7" edition = "2024" authors = ["extremeandy"] license = "MIT OR Apache-2.0" @@ -9,6 +9,8 @@ description = "Additional stream combinators" readme = "README.md" # Oldest 2024 edition compatible version rust-version = "1.85.0" +# Max 5 from here: https://crates.io/category_slugs +categories = ["algorithms", "asynchronous"] [dependencies] futures = "0.3.28" diff --git a/build.rs b/build.rs index 3ed75f1..1a8ed46 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ // Detect the rustc channel // -use rustc_version::{version_meta, Channel}; +use rustc_version::{Channel, version_meta}; fn main() { // Set cfg flags depending on release channel diff --git a/src/fast_forward.rs b/src/fast_forward.rs index 4358a7a..39ea6ce 100644 --- a/src/fast_forward.rs +++ b/src/fast_forward.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, task::Poll}; -use futures::{stream::FusedStream, Stream, StreamExt}; +use futures::{Stream, StreamExt, stream::FusedStream}; use pin_project_lite::pin_project; pin_project! { @@ -35,7 +35,7 @@ where let Some(mut inner) = this.inner.as_mut().as_pin_mut() else { // Last time we polled, the inner stream terminated, but we yielded a value. // If we are here then it's time to terminate. - return Poll::Ready(None) + return Poll::Ready(None); }; let mut last_value = None; @@ -85,7 +85,7 @@ where #[cfg(test)] mod tests { - use futures::{stream, SinkExt}; + use futures::{SinkExt, stream}; use tokio_test::{assert_pending, assert_ready_eq}; use super::*; diff --git a/src/flatten_switch.rs b/src/flatten_switch.rs index fcdc26e..bcba2c8 100644 --- a/src/flatten_switch.rs +++ b/src/flatten_switch.rs @@ -1,5 +1,5 @@ -use futures::task; use futures::Stream; +use futures::task; use pin_project_lite::pin_project; use std::sync::Arc; use std::task::{Context, Poll}; @@ -110,7 +110,7 @@ where mod tests { use std::future; - use futures::{stream, FutureExt, StreamExt}; + use futures::{FutureExt, StreamExt, stream}; use parking_lot::Mutex; use tokio_test::{assert_pending, assert_ready_eq}; @@ -142,7 +142,7 @@ mod tests { #[tokio::test] async fn test_flatten_switch() { - use futures::{channel::mpsc, SinkExt, StreamExt}; + use futures::{SinkExt, StreamExt, channel::mpsc}; use tokio::sync::broadcast::{self, error::SendError}; use tokio_stream::wrappers::BroadcastStream; diff --git a/src/lib.rs b/src/lib.rs index e28dc9a..6c31fe6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,18 +14,20 @@ //! - `test-util`: Exposes utilities for testing streams, in particular: //! - [`delay_items`](crate::test_util::delay_items) //! - [`record_delay`](crate::StreamTools::record_delay) -#![doc(html_root_url = "https://docs.rs/streamtools/0.7.6/")] use either_or_both::EitherOrBoth; -use futures::{stream::Map, Stream}; -use merge_join_by::MergeJoinBy; +use futures::{Stream, TryStream, stream::Map}; use std::cmp::Ordering; +use merge_join_by::MergeJoinBy; +use try_count::TryCount; + mod fast_forward; mod flatten_switch; mod merge_join_by; mod outer_waker; mod sample; +mod try_count; #[cfg(feature = "tokio-time")] mod throttle_last; @@ -250,6 +252,37 @@ pub trait StreamTools: Stream { { RecordDelay::new(self) } + + /// Try to count the items in a stream of results. + /// + /// If an error is encountered it's returned right away and the stream is dropped. + /// `Ok` results are always dropped. + /// + /// See [`futures::StreamExt::count()`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.count) for a non short circuiting version of this. + /// + /// + /// # Example + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use streamtools::StreamTools; + /// use futures::stream; + /// + /// let oks = stream::iter(vec![Ok::<&str, &str>("The"), Ok("values"), Ok("inside"), Ok("don't"), Ok("matter")]); + /// let count = oks.try_count().await; + /// assert_eq!(count, Ok(5)); + /// + /// let erring = stream::iter(vec![Ok("Short"), Ok("circuits"), Err("on"), Ok("first"), Err("Err")]); + /// let err = erring.try_count().await; + /// assert_eq!(err, Err("on")); + /// # }); + /// ``` + fn try_count(self) -> impl Future> + where + Self: Sized + TryStream, + { + TryCount::new(self) + } } impl StreamTools for T {} diff --git a/src/merge_join_by.rs b/src/merge_join_by.rs index a57cb0b..b604c8d 100644 --- a/src/merge_join_by.rs +++ b/src/merge_join_by.rs @@ -1,9 +1,9 @@ //! Implementation of the `StreamTools::merge_join_by` combinator. use either_or_both::EitherOrBoth; -use futures::stream::Fuse; use futures::Stream; use futures::StreamExt; +use futures::stream::Fuse; use pin_project_lite::pin_project; use std::{ cmp::{self, Ordering}, diff --git a/src/sample.rs b/src/sample.rs index 212885f..91c91ad 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -1,4 +1,4 @@ -use futures::{stream::FusedStream, Stream, StreamExt}; +use futures::{Stream, StreamExt, stream::FusedStream}; use pin_project_lite::pin_project; use std::{ pin::Pin, @@ -47,7 +47,7 @@ impl Stream for Sample { let Some(mut inner) = this.inner.as_mut().as_pin_mut() else { // Last time we polled, the inner stream terminated, but we yielded a value. // If we are here then it's time to terminate. - return Poll::Ready(None) + return Poll::Ready(None); }; // Fast forward to the latest value on the inner stream @@ -129,7 +129,7 @@ where mod tests { use std::future; - use futures::{stream, SinkExt, StreamExt}; + use futures::{SinkExt, StreamExt, stream}; use tokio_test::{assert_pending, assert_ready_eq}; @@ -218,8 +218,8 @@ mod tests { #[cfg(feature = "test-util")] #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_sample_with_interval() { - use crate::test_util::delay_items; use crate::StreamTools; + use crate::test_util::delay_items; let sampler = IntervalStream::new(tokio::time::interval(Duration::from_millis(1500))); diff --git a/src/test_util/delay_items.rs b/src/test_util/delay_items.rs index 1310288..4205064 100644 --- a/src/test_util/delay_items.rs +++ b/src/test_util/delay_items.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use futures::{stream, FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt, stream}; /// Creates a stream from an iterator where items are delayed by the specified amount pub fn delay_items(items: impl IntoIterator) -> impl Stream { diff --git a/src/throttle_last.rs b/src/throttle_last.rs index 8abab84..4a4874f 100644 --- a/src/throttle_last.rs +++ b/src/throttle_last.rs @@ -3,7 +3,7 @@ use futures::{Future, StreamExt}; use tokio::time::{Duration, Instant, Sleep}; use std::pin::Pin; -use std::task::{self, ready, Poll}; +use std::task::{self, Poll, ready}; use pin_project_lite::pin_project; @@ -48,7 +48,7 @@ where let Some(mut inner) = this.inner.as_mut().as_pin_mut() else { // Last time we polled, the inner stream terminated, but we yielded a value. // If we are here then it's time to terminate. - return Poll::Ready(None) + return Poll::Ready(None); }; let dur = *this.duration; @@ -98,7 +98,7 @@ mod tests { use futures::StreamExt; - use crate::{test_util::delay_items, StreamTools, ThrottleLast}; + use crate::{StreamTools, ThrottleLast, test_util::delay_items}; #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_throttle_last() { diff --git a/src/try_count.rs b/src/try_count.rs new file mode 100644 index 0000000..7f248a5 --- /dev/null +++ b/src/try_count.rs @@ -0,0 +1,59 @@ +//! Implementation of `StreamTools::try_count()`. + +use futures::TryStream; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Future for [`StreamTools::try_count()`]. + #[must_use = "streams do nothing unless polled"] + pub (crate) struct TryCount + where + S: TryStream, + { + #[pin] + stream: S, + accumulator: usize, + } +} + +impl TryCount +where + S: TryStream, +{ + pub(crate) fn new(try_stream: S) -> Self { + Self { + stream: try_stream, + // Start counting at 0. + accumulator: 0, + } + } +} + +// Just need to implement Future as this consumes the Stream. +impl Future for TryCount +where + S: TryStream, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + match this.stream.as_mut().try_poll_next(cx) { + // Count Ok and loop. + Poll::Ready(Some(Ok(_))) => *this.accumulator += 1, + // Short circuit on errors. + Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)), + // Reached the end. Return the count. + Poll::Ready(None) => return Poll::Ready(Ok(*this.accumulator)), + // Nothing new at this moment, back to the executor. + Poll::Pending => return Poll::Pending, + } + } + } +}