Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ repository = "https://github.com/dariusc93/pollable-map"
authors = ["Darius Clark"]
exclude = [".gitignore"]

[features]
default = ["std", "alloc"]
std = ["futures/std", "futures-timeout"]
alloc = ["futures/alloc"]


[dependencies]
futures = { version = "0.3.31", default-features = false, features = ["std", "alloc", "async-await"] }
futures-timeout = "0.1.3"
futures = { version = "0.3.31", default-features = false, features = ["async-await"] }
futures-timeout = { version = "0.1.3", optional = true }

[dev-dependencies]
futures = { version = "0.3.31", default-features = false, features = ["std", "alloc", "async-await", "thread-pool", "executor"] }
29 changes: 18 additions & 11 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use futures::{FutureExt, Stream, StreamExt};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::Stream;

#[cfg(feature = "std")]
use futures_timeout::{Timeout, TimeoutExt};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pub struct InnerMap<K, S> {
key: K,
Expand Down Expand Up @@ -125,49 +125,56 @@ where
}
}

#[cfg(feature = "std")]
pub struct Timed<F>(Timeout<F>);

#[cfg(feature = "std")]
impl<F> Timed<F> {
pub(crate) fn into_inner(self) -> F {
self.0.into_inner()
}
}

impl<F> Deref for Timed<F> {
#[cfg(feature = "std")]
impl<F> core::ops::Deref for Timed<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}

impl<F> DerefMut for Timed<F> {
#[cfg(feature = "std")]
impl<F> core::ops::DerefMut for Timed<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}

#[cfg(feature = "std")]
impl<F> Timed<F> {
pub(crate) fn new(item: F, timeout: Duration) -> Self {
pub(crate) fn new(item: F, timeout: core::time::Duration) -> Self {
Self(item.timeout(timeout))
}
}

#[cfg(feature = "std")]
impl<F> Future for Timed<F>
where
F: Future + Unpin,
{
type Output = std::io::Result<F::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
Pin::new(&mut self.0).poll(cx)
}
}

#[cfg(feature = "std")]
impl<F> Stream for Timed<F>
where
F: Stream + Unpin,
{
type Item = std::io::Result<F::Item>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
Pin::new(&mut self.0).poll_next(cx)
}
}
24 changes: 13 additions & 11 deletions src/futures.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
pub mod optional;
pub mod ordered;
pub mod set;
#[cfg(feature = "std")]
pub mod timeout_map;
#[cfg(feature = "std")]
pub mod timeout_set;

use crate::common::InnerMap;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use futures::stream::{FusedStream, FuturesUnordered};
use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

pub struct FutureMap<K, S> {
list: FuturesUnordered<InnerMap<K, S>>,
Expand All @@ -36,8 +38,8 @@ impl<K, T> FutureMap<K, T> {

impl<K, T> FutureMap<K, T>
where
K: Clone + PartialEq + Send + Unpin + 'static,
T: Future + Send + Unpin + 'static,
K: Clone + PartialEq + Unpin,
T: Future + Unpin,
{
/// Insert a future into the map with a unique key.
/// The function will return true if the map does not have the key present,
Expand Down Expand Up @@ -162,8 +164,8 @@ where

impl<K, T> FromIterator<(K, T)> for FutureMap<K, T>
where
K: Clone + PartialEq + Send + Unpin + 'static,
T: Future + Send + Unpin + 'static,
K: Clone + PartialEq + Unpin,
T: Future + Unpin,
{
fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
let mut maps = Self::new();
Expand All @@ -176,8 +178,8 @@ where

impl<K, T> Stream for FutureMap<K, T>
where
K: Clone + PartialEq + Send + Unpin + 'static,
T: Future + Unpin + Send + 'static,
K: Clone + PartialEq + Unpin,
T: Future + Unpin,
{
type Item = (K, T::Output);

Expand Down Expand Up @@ -220,8 +222,8 @@ where

impl<K, T> FusedStream for FutureMap<K, T>
where
K: Clone + PartialEq + Send + Unpin + 'static,
T: Future + Unpin + Send + 'static,
K: Clone + PartialEq + Unpin,
T: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.list.is_terminated()
Expand Down
2 changes: 1 addition & 1 deletion src/futures/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::optional::Optional;

/// A reusable future that is the equivalent to an `Option`.
///
/// By default, this future will be empty, which would return [`Poll::Pending`] when polled,
/// By default, this future will be empty, which would return [`Poll::Pending`] when polled,
/// but if a [`Future`] is supplied either upon construction via [`OptionalFuture::new`] or
/// is set via [`OptionalFuture::replace`], the future would then be polled once [`OptionalFuture`]
/// is polled. Once the future is polled to completion, the results will be returned, with
Expand Down
14 changes: 8 additions & 6 deletions src/futures/ordered.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use alloc::collections::VecDeque;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use futures::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

/// An unbounded queue of futures imposed a FIFO order while polling one future at a time
/// and returning the output to stream before popping the next future in queue to be polled.
Expand Down Expand Up @@ -57,7 +57,7 @@ impl<F> OrderedFutureSet<F> {

impl<F> FromIterator<F> for OrderedFutureSet<F>
where
F: Future + Send + Unpin + 'static,
F: Future + Unpin,
{
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let mut ordered = Self::new();
Expand All @@ -70,7 +70,7 @@ where

impl<F> Stream for OrderedFutureSet<F>
where
F: Future + Send + Unpin + 'static,
F: Future + Unpin,
{
type Item = F::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -109,6 +109,8 @@ where
#[cfg(test)]
mod tests {
use crate::futures::ordered::OrderedFutureSet;
use alloc::vec;
use alloc::vec::Vec;
use futures::StreamExt;

#[test]
Expand Down
16 changes: 8 additions & 8 deletions src/futures/set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::future::Future;
use std::pin::Pin;
use core::future::Future;
use core::pin::Pin;

use super::FutureMap;
use core::task::{Context, Poll};
use futures::stream::FusedStream;
use futures::{Stream, StreamExt};
use std::task::{Context, Poll};

pub struct FutureSet<S> {
id: i64,
Expand All @@ -13,7 +13,7 @@ pub struct FutureSet<S> {

impl<S> Default for FutureSet<S>
where
S: Future + Send + Unpin + 'static,
S: Future + Unpin,
{
fn default() -> Self {
Self::new()
Expand All @@ -22,7 +22,7 @@ where

impl<S> FutureSet<S>
where
S: Future + Send + Unpin + 'static,
S: Future + Unpin,
{
/// Creates an empty ['FutureSet`]
pub fn new() -> Self {
Expand Down Expand Up @@ -71,7 +71,7 @@ where

impl<S> FromIterator<S> for FutureSet<S>
where
S: Future + Send + Unpin + 'static,
S: Future + Unpin,
{
fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
let mut maps = Self::new();
Expand All @@ -84,7 +84,7 @@ where

impl<S> Stream for FutureSet<S>
where
S: Future + Send + Unpin + 'static,
S: Future + Unpin,
{
type Item = S::Output;

Expand All @@ -101,7 +101,7 @@ where

impl<S> FusedStream for FutureSet<S>
where
S: Future + Send + Unpin + 'static,
S: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.map.is_terminated()
Expand Down
22 changes: 11 additions & 11 deletions src/futures/timeout_map.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::common::Timed;
use crate::futures::FutureMap;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::task::{Context, Poll};
use core::time::Duration;
use futures::stream::FusedStream;
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pub struct TimeoutFutureMap<K, F> {
duration: Duration,
Expand All @@ -28,8 +28,8 @@ impl<K, F> DerefMut for TimeoutFutureMap<K, F> {

impl<K, F> TimeoutFutureMap<K, F>
where
K: Clone + PartialEq + Send + Unpin + 'static,
F: Future + Send + Unpin + 'static,
K: Clone + PartialEq + Unpin,
F: Future + Unpin,
{
/// Create an empty [`TimeoutFutureMap`]
pub fn new(duration: Duration) -> Self {
Expand All @@ -49,8 +49,8 @@ where

impl<K, F> Stream for TimeoutFutureMap<K, F>
where
K: Clone + PartialEq + Send + Unpin + 'static,
F: Future + Send + Unpin + 'static,
K: Clone + PartialEq + Unpin,
F: Future + Unpin,
{
type Item = (K, std::io::Result<F::Output>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -64,8 +64,8 @@ where

impl<K, F> FusedStream for TimeoutFutureMap<K, F>
where
K: Clone + PartialEq + Send + Unpin + 'static,
F: Future + Send + Unpin + 'static,
K: Clone + PartialEq + Unpin,
F: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.map.is_terminated()
Expand Down
16 changes: 8 additions & 8 deletions src/futures/timeout_set.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::common::Timed;
use crate::futures::set::FutureSet;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::task::{Context, Poll};
use core::time::Duration;
use futures::stream::FusedStream;
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pub struct TimeoutFutureSet<S> {
duration: Duration,
Expand All @@ -28,7 +28,7 @@ impl<S> DerefMut for TimeoutFutureSet<S> {

impl<F> TimeoutFutureSet<F>
where
F: Future + Send + Unpin + 'static,
F: Future + Unpin,
{
/// Create an empty [`TimeoutFutureSet`]
pub fn new(duration: Duration) -> Self {
Expand All @@ -46,7 +46,7 @@ where

impl<F> Stream for TimeoutFutureSet<F>
where
F: Future + Send + Unpin + 'static,
F: Future + Unpin,
{
type Item = std::io::Result<F::Output>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -60,7 +60,7 @@ where

impl<F> FusedStream for TimeoutFutureSet<F>
where
F: Future + Send + Unpin + 'static,
F: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.set.is_terminated()
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
#![no_std]

#[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;

#[cfg(feature = "alloc")]
pub mod futures;

#[cfg(feature = "alloc")]
pub mod stream;

pub(crate) mod common;
Expand Down
7 changes: 4 additions & 3 deletions src/optional.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#[cfg(feature = "std")]
pub mod timeout;

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use futures::future::FusedFuture;
use futures::stream::FusedStream;
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

/// A reusable future or stream based on `Option`.
///
Expand Down
Loading
Loading