Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ The following icons are used to distinguish breaking changes from non-breaking c
- 🔥: Breaking change (high impact: will require code changes for most users)
- 💔: Breaking change (low impact: won't require code changes for most users)

## 0.7.7

### Added

- Added `try_count`


## 0.7.6

### Added
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "streamtools"
version = "0.7.6"
version = "0.7.7"
edition = "2024"
authors = ["extremeandy"]
license = "MIT OR Apache-2.0"
Expand All @@ -9,6 +9,8 @@ description = "Additional stream combinators"
readme = "README.md"
# Oldest 2024 edition compatible version
rust-version = "1.85.0"
# Max 5 from here: https://crates.io/category_slugs
categories = ["algorithms", "asynchronous"]

[dependencies]
futures = "0.3.28"
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Detect the rustc channel
//
use rustc_version::{version_meta, Channel};
use rustc_version::{Channel, version_meta};

fn main() {
// Set cfg flags depending on release channel
Expand Down
6 changes: 3 additions & 3 deletions src/fast_forward.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{pin::Pin, task::Poll};

use futures::{stream::FusedStream, Stream, StreamExt};
use futures::{Stream, StreamExt, stream::FusedStream};
use pin_project_lite::pin_project;

pin_project! {
Expand Down Expand Up @@ -35,7 +35,7 @@ where
let Some(mut inner) = this.inner.as_mut().as_pin_mut() else {
// Last time we polled, the inner stream terminated, but we yielded a value.
// If we are here then it's time to terminate.
return Poll::Ready(None)
return Poll::Ready(None);
};

let mut last_value = None;
Expand Down Expand Up @@ -85,7 +85,7 @@ where

#[cfg(test)]
mod tests {
use futures::{stream, SinkExt};
use futures::{SinkExt, stream};
use tokio_test::{assert_pending, assert_ready_eq};

use super::*;
Expand Down
6 changes: 3 additions & 3 deletions src/flatten_switch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::task;
use futures::Stream;
use futures::task;
use pin_project_lite::pin_project;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -110,7 +110,7 @@ where
mod tests {
use std::future;

use futures::{stream, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt, stream};
use parking_lot::Mutex;
use tokio_test::{assert_pending, assert_ready_eq};

Expand Down Expand Up @@ -142,7 +142,7 @@ mod tests {

#[tokio::test]
async fn test_flatten_switch() {
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::{SinkExt, StreamExt, channel::mpsc};
use tokio::sync::broadcast::{self, error::SendError};
use tokio_stream::wrappers::BroadcastStream;

Expand Down
39 changes: 36 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
//! - `test-util`: Exposes utilities for testing streams, in particular:
//! - [`delay_items`](crate::test_util::delay_items)
//! - [`record_delay`](crate::StreamTools::record_delay)
#![doc(html_root_url = "https://docs.rs/streamtools/0.7.6/")]

use either_or_both::EitherOrBoth;
use futures::{stream::Map, Stream};
use merge_join_by::MergeJoinBy;
use futures::{Stream, TryStream, stream::Map};
use std::cmp::Ordering;

use merge_join_by::MergeJoinBy;
use try_count::TryCount;

mod fast_forward;
mod flatten_switch;
mod merge_join_by;
mod outer_waker;
mod sample;
mod try_count;

#[cfg(feature = "tokio-time")]
mod throttle_last;
Expand Down Expand Up @@ -250,6 +252,37 @@ pub trait StreamTools: Stream {
{
RecordDelay::new(self)
}

/// Try to count the items in a stream of results.
///
/// If an error is encountered it's returned right away and the stream is dropped.
/// `Ok` results are always dropped.
///
/// See [`futures::StreamExt::count()`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.count) for a non short circuiting version of this.
///
///
/// # Example
///
/// ```rust
/// # futures::executor::block_on(async {
/// use streamtools::StreamTools;
/// use futures::stream;
///
/// let oks = stream::iter(vec![Ok::<&str, &str>("The"), Ok("values"), Ok("inside"), Ok("don't"), Ok("matter")]);
/// let count = oks.try_count().await;
/// assert_eq!(count, Ok(5));
///
/// let erring = stream::iter(vec![Ok("Short"), Ok("circuits"), Err("on"), Ok("first"), Err("Err")]);
/// let err = erring.try_count().await;
/// assert_eq!(err, Err("on"));
/// # });
/// ```
fn try_count(self) -> impl Future<Output = Result<usize, Self::Error>>
where
Self: Sized + TryStream,
{
TryCount::new(self)
}
}

impl<T: Stream> StreamTools for T {}
Expand Down
2 changes: 1 addition & 1 deletion src/merge_join_by.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Implementation of the `StreamTools::merge_join_by` combinator.

use either_or_both::EitherOrBoth;
use futures::stream::Fuse;
use futures::Stream;
use futures::StreamExt;
use futures::stream::Fuse;
use pin_project_lite::pin_project;
use std::{
cmp::{self, Ordering},
Expand Down
8 changes: 4 additions & 4 deletions src/sample.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{stream::FusedStream, Stream, StreamExt};
use futures::{Stream, StreamExt, stream::FusedStream};
use pin_project_lite::pin_project;
use std::{
pin::Pin,
Expand Down Expand Up @@ -47,7 +47,7 @@ impl<T: Stream, S: Stream> Stream for Sample<T, S> {
let Some(mut inner) = this.inner.as_mut().as_pin_mut() else {
// Last time we polled, the inner stream terminated, but we yielded a value.
// If we are here then it's time to terminate.
return Poll::Ready(None)
return Poll::Ready(None);
};

// Fast forward to the latest value on the inner stream
Expand Down Expand Up @@ -129,7 +129,7 @@ where
mod tests {
use std::future;

use futures::{stream, SinkExt, StreamExt};
use futures::{SinkExt, StreamExt, stream};

use tokio_test::{assert_pending, assert_ready_eq};

Expand Down Expand Up @@ -218,8 +218,8 @@ mod tests {
#[cfg(feature = "test-util")]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_sample_with_interval() {
use crate::test_util::delay_items;
use crate::StreamTools;
use crate::test_util::delay_items;

let sampler = IntervalStream::new(tokio::time::interval(Duration::from_millis(1500)));

Expand Down
2 changes: 1 addition & 1 deletion src/test_util/delay_items.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use futures::{stream, FutureExt, Stream, StreamExt};
use futures::{FutureExt, Stream, StreamExt, stream};

/// Creates a stream from an iterator where items are delayed by the specified amount
pub fn delay_items<T>(items: impl IntoIterator<Item = (Duration, T)>) -> impl Stream<Item = T> {
Expand Down
6 changes: 3 additions & 3 deletions src/throttle_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::{Future, StreamExt};
use tokio::time::{Duration, Instant, Sleep};

use std::pin::Pin;
use std::task::{self, ready, Poll};
use std::task::{self, Poll, ready};

use pin_project_lite::pin_project;

Expand Down Expand Up @@ -48,7 +48,7 @@ where
let Some(mut inner) = this.inner.as_mut().as_pin_mut() else {
// Last time we polled, the inner stream terminated, but we yielded a value.
// If we are here then it's time to terminate.
return Poll::Ready(None)
return Poll::Ready(None);
};

let dur = *this.duration;
Expand Down Expand Up @@ -98,7 +98,7 @@ mod tests {

use futures::StreamExt;

use crate::{test_util::delay_items, StreamTools, ThrottleLast};
use crate::{StreamTools, ThrottleLast, test_util::delay_items};

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_throttle_last() {
Expand Down
59 changes: 59 additions & 0 deletions src/try_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Implementation of `StreamTools::try_count()`.

use futures::TryStream;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};

pin_project! {
/// Future for [`StreamTools::try_count()`].
#[must_use = "streams do nothing unless polled"]
pub (crate) struct TryCount<S>
where
S: TryStream,
{
#[pin]
stream: S,
accumulator: usize,
}
}

impl<S> TryCount<S>
where
S: TryStream,
{
pub(crate) fn new(try_stream: S) -> Self {
Self {
stream: try_stream,
// Start counting at 0.
accumulator: 0,
}
}
}

// Just need to implement Future as this consumes the Stream.
impl<S> Future for TryCount<S>
where
S: TryStream,
{
type Output = Result<usize, S::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.stream.as_mut().try_poll_next(cx) {
// Count Ok and loop.
Poll::Ready(Some(Ok(_))) => *this.accumulator += 1,
// Short circuit on errors.
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
// Reached the end. Return the count.
Poll::Ready(None) => return Poll::Ready(Ok(*this.accumulator)),
// Nothing new at this moment, back to the executor.
Poll::Pending => return Poll::Pending,
}
}
}
}
Loading