diff --git a/Cargo.toml b/Cargo.toml index 82c1add..f215ef7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,15 @@ repository = "https://github.com/dariusc93/pollable-map" authors = ["Darius Clark"] exclude = [".gitignore"] +[features] +default = ["std", "alloc"] +std = ["futures/std", "futures-timeout"] +alloc = ["futures/alloc"] + [dependencies] -futures = { version = "0.3.31", default-features = false, features = ["std", "alloc", "async-await"] } -futures-timeout = "0.1.3" +futures = { version = "0.3.31", default-features = false, features = ["async-await"] } +futures-timeout = { version = "0.1.3", optional = true } [dev-dependencies] futures = { version = "0.3.31", default-features = false, features = ["std", "alloc", "async-await", "thread-pool", "executor"] } \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index 0b4490d..aae4dc9 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,10 +1,10 @@ -use futures::{FutureExt, Stream, StreamExt}; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures::Stream; + +#[cfg(feature = "std")] use futures_timeout::{Timeout, TimeoutExt}; -use std::future::Future; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; pub struct InnerMap { key: K, @@ -125,49 +125,56 @@ where } } +#[cfg(feature = "std")] pub struct Timed(Timeout); +#[cfg(feature = "std")] impl Timed { pub(crate) fn into_inner(self) -> F { self.0.into_inner() } } -impl Deref for Timed { +#[cfg(feature = "std")] +impl core::ops::Deref for Timed { type Target = F; fn deref(&self) -> &Self::Target { self.0.deref() } } -impl DerefMut for Timed { +#[cfg(feature = "std")] +impl core::ops::DerefMut for Timed { fn deref_mut(&mut self) -> &mut Self::Target { self.0.deref_mut() } } +#[cfg(feature = "std")] impl Timed { - pub(crate) fn new(item: F, timeout: Duration) -> Self { + pub(crate) fn new(item: F, timeout: core::time::Duration) -> Self { Self(item.timeout(timeout)) } } +#[cfg(feature = "std")] impl Future for Timed where F: Future + Unpin, { type Output = std::io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) + Pin::new(&mut self.0).poll(cx) } } +#[cfg(feature = "std")] impl Stream for Timed where F: Stream + Unpin, { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.0.poll_next_unpin(cx) + Pin::new(&mut self.0).poll_next(cx) } } diff --git a/src/futures.rs b/src/futures.rs index 3ecf39e..7075abb 100644 --- a/src/futures.rs +++ b/src/futures.rs @@ -1,15 +1,17 @@ pub mod optional; pub mod ordered; pub mod set; +#[cfg(feature = "std")] pub mod timeout_map; +#[cfg(feature = "std")] pub mod timeout_set; use crate::common::InnerMap; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use futures::stream::{FusedStream, FuturesUnordered}; use futures::{Stream, StreamExt}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; pub struct FutureMap { list: FuturesUnordered>, @@ -36,8 +38,8 @@ impl FutureMap { impl FutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Future + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + T: Future + Unpin, { /// Insert a future into the map with a unique key. /// The function will return true if the map does not have the key present, @@ -162,8 +164,8 @@ where impl FromIterator<(K, T)> for FutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Future + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + T: Future + Unpin, { fn from_iter>(iter: I) -> Self { let mut maps = Self::new(); @@ -176,8 +178,8 @@ where impl Stream for FutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Future + Unpin + Send + 'static, + K: Clone + PartialEq + Unpin, + T: Future + Unpin, { type Item = (K, T::Output); @@ -220,8 +222,8 @@ where impl FusedStream for FutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Future + Unpin + Send + 'static, + K: Clone + PartialEq + Unpin, + T: Future + Unpin, { fn is_terminated(&self) -> bool { self.list.is_terminated() diff --git a/src/futures/optional.rs b/src/futures/optional.rs index d762df6..4742c48 100644 --- a/src/futures/optional.rs +++ b/src/futures/optional.rs @@ -2,7 +2,7 @@ use crate::optional::Optional; /// A reusable future that is the equivalent to an `Option`. /// -/// By default, this future will be empty, which would return [`Poll::Pending`] when polled, +/// By default, this future will be empty, which would return [`Poll::Pending`] when polled, /// but if a [`Future`] is supplied either upon construction via [`OptionalFuture::new`] or /// is set via [`OptionalFuture::replace`], the future would then be polled once [`OptionalFuture`] /// is polled. Once the future is polled to completion, the results will be returned, with diff --git a/src/futures/ordered.rs b/src/futures/ordered.rs index 8295288..365b0d8 100644 --- a/src/futures/ordered.rs +++ b/src/futures/ordered.rs @@ -1,8 +1,8 @@ +use alloc::collections::VecDeque; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use futures::Stream; -use std::collections::VecDeque; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; /// An unbounded queue of futures imposed a FIFO order while polling one future at a time /// and returning the output to stream before popping the next future in queue to be polled. @@ -57,7 +57,7 @@ impl OrderedFutureSet { impl FromIterator for OrderedFutureSet where - F: Future + Send + Unpin + 'static, + F: Future + Unpin, { fn from_iter>(iter: T) -> Self { let mut ordered = Self::new(); @@ -70,7 +70,7 @@ where impl Stream for OrderedFutureSet where - F: Future + Send + Unpin + 'static, + F: Future + Unpin, { type Item = F::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -109,6 +109,8 @@ where #[cfg(test)] mod tests { use crate::futures::ordered::OrderedFutureSet; + use alloc::vec; + use alloc::vec::Vec; use futures::StreamExt; #[test] diff --git a/src/futures/set.rs b/src/futures/set.rs index 6154463..b8f95ae 100644 --- a/src/futures/set.rs +++ b/src/futures/set.rs @@ -1,10 +1,10 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use super::FutureMap; +use core::task::{Context, Poll}; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use std::task::{Context, Poll}; pub struct FutureSet { id: i64, @@ -13,7 +13,7 @@ pub struct FutureSet { impl Default for FutureSet where - S: Future + Send + Unpin + 'static, + S: Future + Unpin, { fn default() -> Self { Self::new() @@ -22,7 +22,7 @@ where impl FutureSet where - S: Future + Send + Unpin + 'static, + S: Future + Unpin, { /// Creates an empty ['FutureSet`] pub fn new() -> Self { @@ -71,7 +71,7 @@ where impl FromIterator for FutureSet where - S: Future + Send + Unpin + 'static, + S: Future + Unpin, { fn from_iter>(iter: I) -> Self { let mut maps = Self::new(); @@ -84,7 +84,7 @@ where impl Stream for FutureSet where - S: Future + Send + Unpin + 'static, + S: Future + Unpin, { type Item = S::Output; @@ -101,7 +101,7 @@ where impl FusedStream for FutureSet where - S: Future + Send + Unpin + 'static, + S: Future + Unpin, { fn is_terminated(&self) -> bool { self.map.is_terminated() diff --git a/src/futures/timeout_map.rs b/src/futures/timeout_map.rs index 987a2f0..7fc3a42 100644 --- a/src/futures/timeout_map.rs +++ b/src/futures/timeout_map.rs @@ -1,12 +1,12 @@ use crate::common::Timed; use crate::futures::FutureMap; +use core::future::Future; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use std::future::Future; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; pub struct TimeoutFutureMap { duration: Duration, @@ -28,8 +28,8 @@ impl DerefMut for TimeoutFutureMap { impl TimeoutFutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - F: Future + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + F: Future + Unpin, { /// Create an empty [`TimeoutFutureMap`] pub fn new(duration: Duration) -> Self { @@ -49,8 +49,8 @@ where impl Stream for TimeoutFutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - F: Future + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + F: Future + Unpin, { type Item = (K, std::io::Result); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -64,8 +64,8 @@ where impl FusedStream for TimeoutFutureMap where - K: Clone + PartialEq + Send + Unpin + 'static, - F: Future + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + F: Future + Unpin, { fn is_terminated(&self) -> bool { self.map.is_terminated() diff --git a/src/futures/timeout_set.rs b/src/futures/timeout_set.rs index ff23f5d..fe50a27 100644 --- a/src/futures/timeout_set.rs +++ b/src/futures/timeout_set.rs @@ -1,12 +1,12 @@ use crate::common::Timed; use crate::futures::set::FutureSet; +use core::future::Future; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use std::future::Future; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; pub struct TimeoutFutureSet { duration: Duration, @@ -28,7 +28,7 @@ impl DerefMut for TimeoutFutureSet { impl TimeoutFutureSet where - F: Future + Send + Unpin + 'static, + F: Future + Unpin, { /// Create an empty [`TimeoutFutureSet`] pub fn new(duration: Duration) -> Self { @@ -46,7 +46,7 @@ where impl Stream for TimeoutFutureSet where - F: Future + Send + Unpin + 'static, + F: Future + Unpin, { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -60,7 +60,7 @@ where impl FusedStream for TimeoutFutureSet where - F: Future + Send + Unpin + 'static, + F: Future + Unpin, { fn is_terminated(&self) -> bool { self.set.is_terminated() diff --git a/src/lib.rs b/src/lib.rs index 2a36d95..b8c3d4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,14 @@ +#![no_std] + +#[cfg(feature = "alloc")] +extern crate alloc; +#[cfg(feature = "std")] +extern crate std; + +#[cfg(feature = "alloc")] pub mod futures; + +#[cfg(feature = "alloc")] pub mod stream; pub(crate) mod common; diff --git a/src/optional.rs b/src/optional.rs index 756b6a0..11d6ec5 100644 --- a/src/optional.rs +++ b/src/optional.rs @@ -1,11 +1,12 @@ +#[cfg(feature = "std")] pub mod timeout; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use futures::future::FusedFuture; use futures::stream::FusedStream; use futures::Stream; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; /// A reusable future or stream based on `Option`. /// diff --git a/src/optional/timeout.rs b/src/optional/timeout.rs index df2ea66..3c8b56a 100644 --- a/src/optional/timeout.rs +++ b/src/optional/timeout.rs @@ -1,11 +1,11 @@ use crate::common::Timed; use crate::optional::Optional; +use core::future::Future; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; use futures::Stream; -use std::future::Future; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; /// A reusable future or stream based on `Option` that will time out after a specific duration as elapse. pub struct TimeoutOptional { diff --git a/src/stream.rs b/src/stream.rs index 6d76fb5..efbc8bf 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,13 +1,15 @@ pub mod optional; pub mod set; +#[cfg(feature = "std")] pub mod timeout_map; +#[cfg(feature = "std")] pub mod timeout_set; use crate::common::InnerMap; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use futures::stream::{FusedStream, SelectAll}; use futures::{Stream, StreamExt}; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; /// Combining multiple streams into one, with each stream having a unique key. pub struct StreamMap { @@ -19,7 +21,7 @@ pub struct StreamMap { impl Default for StreamMap where K: Clone + Unpin, - T: Stream + Send + Unpin + 'static, + T: Stream + Unpin, { fn default() -> Self { Self::new() @@ -29,7 +31,7 @@ where impl StreamMap where K: Clone + Unpin, - T: Stream + Send + Unpin + 'static, + T: Stream + Unpin, { /// Creates an empty [`StreamMap`] pub fn new() -> Self { @@ -43,8 +45,8 @@ where impl StreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Stream + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + T: Stream + Unpin, { /// Insert a stream into the map with a unique key. /// The function will return true if the map does not have the key present, @@ -169,8 +171,8 @@ where impl FromIterator<(K, T)> for StreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Stream + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + T: Stream + Unpin, { fn from_iter>(iter: I) -> Self { let mut maps = Self::new(); @@ -183,8 +185,8 @@ where impl Stream for StreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Stream + Unpin + Send + 'static, + K: Clone + PartialEq + Unpin, + T: Stream + Unpin, { type Item = (K, T::Item); @@ -232,8 +234,8 @@ where impl FusedStream for StreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - T: Stream + Unpin + Send + 'static, + K: Clone + PartialEq + Unpin, + T: Stream + Unpin, { fn is_terminated(&self) -> bool { self.list.is_terminated() @@ -289,7 +291,7 @@ mod test { map.insert(1, futures::stream::once(async { 10 }).boxed()); map.insert(2, futures::stream::once(async { 20 }).boxed()); - map.insert(3, futures::stream::iter(vec![30, 40, 50]).boxed()); + map.insert(3, futures::stream::iter(alloc::vec![30, 40, 50]).boxed()); futures::executor::block_on(async move { assert_eq!(map.next().await, Some((1, 10))); diff --git a/src/stream/optional.rs b/src/stream/optional.rs index a8af22b..5d67e10 100644 --- a/src/stream/optional.rs +++ b/src/stream/optional.rs @@ -2,9 +2,9 @@ use crate::optional::Optional; /// A reusable stream that is the equivalent to an `Option`. /// -/// By default, this future will be empty, which would return [`Poll::Pending`] when polled, +/// By default, this future will be empty, which would return [`Poll::Pending`] when polled, /// but if a [`Stream`] is supplied either upon construction via [`OptionalStream::new`] or -/// is set via [`OptionalStream::replace`], the stream could later polled once [`OptionalStream`] +/// is set via [`OptionalStream::replace`], the stream could later poll once [`OptionalStream`] /// is polled. Once the stream is polled to completion, [`OptionalStream`] will be empty. #[deprecated(note = "Use Optional instead")] pub type OptionalStream = Optional; diff --git a/src/stream/set.rs b/src/stream/set.rs index 4ad3a1e..ba3e1f2 100644 --- a/src/stream/set.rs +++ b/src/stream/set.rs @@ -1,9 +1,9 @@ +use core::pin::Pin; use futures::{Stream, StreamExt}; -use std::pin::Pin; use super::StreamMap; +use core::task::{Context, Poll}; use futures::stream::FusedStream; -use std::task::{Context, Poll}; pub struct StreamSet { id: i64, @@ -12,7 +12,7 @@ pub struct StreamSet { impl Default for StreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { fn default() -> Self { Self::new() @@ -21,7 +21,7 @@ where impl StreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { /// Creates an empty ['StreamSet`] pub fn new() -> Self { @@ -70,7 +70,7 @@ where impl FromIterator for StreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { fn from_iter>(iter: I) -> Self { let mut maps = Self::new(); @@ -83,7 +83,7 @@ where impl Stream for StreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { type Item = S::Item; @@ -100,7 +100,7 @@ where impl FusedStream for StreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { fn is_terminated(&self) -> bool { self.map.is_terminated() diff --git a/src/stream/timeout_map.rs b/src/stream/timeout_map.rs index b82b75b..272d7cc 100644 --- a/src/stream/timeout_map.rs +++ b/src/stream/timeout_map.rs @@ -1,11 +1,11 @@ use crate::common::Timed; use crate::stream::StreamMap; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; pub struct TimeoutStreamMap { duration: Duration, @@ -27,8 +27,8 @@ impl DerefMut for TimeoutStreamMap { impl TimeoutStreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - S: Stream + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + S: Stream + Unpin, { /// Create an empty [`TimeoutStreamMap`] pub fn new(duration: Duration) -> Self { @@ -48,8 +48,8 @@ where impl Stream for TimeoutStreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - S: Stream + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + S: Stream + Unpin, { type Item = (K, std::io::Result); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -63,8 +63,8 @@ where impl FusedStream for TimeoutStreamMap where - K: Clone + PartialEq + Send + Unpin + 'static, - S: Stream + Send + Unpin + 'static, + K: Clone + PartialEq + Unpin, + S: Stream + Unpin, { fn is_terminated(&self) -> bool { self.map.is_terminated() diff --git a/src/stream/timeout_set.rs b/src/stream/timeout_set.rs index eb52f3e..4190922 100644 --- a/src/stream/timeout_set.rs +++ b/src/stream/timeout_set.rs @@ -1,11 +1,11 @@ use crate::common::Timed; use crate::stream::set::StreamSet; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; pub struct TimeoutStreamSet { duration: Duration, @@ -27,7 +27,7 @@ impl DerefMut for TimeoutStreamSet { impl TimeoutStreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { /// Create an empty ['TimeoutStreamSet'] pub fn new(duration: Duration) -> Self { @@ -45,7 +45,7 @@ where impl Stream for TimeoutStreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -59,7 +59,7 @@ where impl FusedStream for TimeoutStreamSet where - S: Stream + Send + Unpin + 'static, + S: Stream + Unpin, { fn is_terminated(&self) -> bool { self.set.is_terminated()