Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions Sources/AsyncTimeoutSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,42 @@ public extension AsyncSequence where Element: Sendable {
}

/// Creates an asynchronous sequence that throws error if any iteration
/// takes longer than provided `Duration`.
/// takes longer than provided `Duration` using the supplied `Clock`.
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
func timeout(duration: Duration) -> AsyncTimeoutSequence<Self> {
AsyncTimeoutSequence(base: self, duration: duration)
func timeout<C: Clock>(
duration: C.Duration,
tolerance: C.Instant.Duration? = nil,
clock: C = ContinuousClock()
) -> AsyncTimeoutSequence<Self> {
AsyncTimeoutSequence(
base: self,
duration: duration,
tolerance: tolerance,
clock: clock
)
}
}

public struct AsyncTimeoutSequence<Base: AsyncSequence>: AsyncSequence where Base.Element: Sendable {
public typealias Element = Base.Element

private let base: Base
private let interval: TimeoutInterval
private let interval: TimeoutInterval<Base.Element?>

public init(base: Base, seconds: TimeInterval) {
self.base = base
self.interval = .timeInterval(seconds)
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
public init(base: Base, duration: Duration) {
public init<C: Clock>(
base: Base,
duration: C.Duration,
tolerance: C.Instant.Duration? = nil,
clock: C = ContinuousClock()
) {
self.base = base
self.interval = .duration(.init(duration))
self.interval = .duration(.init(duration: duration, tolerance: tolerance, clock: clock))
}

public func makeAsyncIterator() -> AsyncIterator {
Expand All @@ -73,9 +87,9 @@ public struct AsyncTimeoutSequence<Base: AsyncSequence>: AsyncSequence where Bas

public struct AsyncIterator: AsyncIteratorProtocol {
private var iterator: Base.AsyncIterator
private let interval: TimeoutInterval
private let interval: TimeoutInterval<Base.Element?>

init(iterator: Base.AsyncIterator, interval: TimeoutInterval) {
fileprivate init(iterator: Base.AsyncIterator, interval: TimeoutInterval<Base.Element?>) {
self.iterator = iterator
self.interval = interval
}
Expand All @@ -91,29 +105,47 @@ public struct AsyncTimeoutSequence<Base: AsyncSequence>: AsyncSequence where Bas
guard #available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) else {
fatalError("cannot occur")
}
return try await withThrowingTimeout(after: .now + durationBox.value) {
return try await durationBox.withThrowingTimeout {
try await self.iterator.next()
}
}
}
}
}

enum TimeoutInterval {
private enum TimeoutInterval<T: Sendable> {
case timeInterval(TimeInterval)
case duration(DurationBox)

struct DurationBox {
private let storage: Any
private typealias TimeoutClosure = (() async throws -> sending T) async throws -> sending T

private let storage: TimeoutClosure

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
var value: Duration {
storage as! Duration
init<C: Clock>(
duration: C.Duration,
tolerance: C.Instant.Duration? = nil,
clock: C
) {
self.storage = { closure in
try await Timeout.withThrowingTimeout(
after: clock.now.advanced(by: duration),
tolerance: tolerance,
clock: clock
) {
try await closure()
}
}
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
init(_ duration: Duration) {
self.storage = duration
func withThrowingTimeout(
_ closure: () async throws -> sending T
) async throws -> T {
try await storage {
try await closure()
}
}
}
}
25 changes: 25 additions & 0 deletions Tests/AsyncTimeoutSequenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,29 @@ struct AsyncTimeoutSequenceTests {
try await iterator.next()
}
}

@Test
func timeoutDurationWithSuspendingClock() async throws {
let (stream, continuation) = AsyncStream<Int>.makeStream()
let t = Task {
continuation.yield(1)
try await Task.sleep(nanoseconds: 1_000)
continuation.yield(2)
try await Task.sleepIndefinitely()
}
defer { t.cancel() }
var iterator = stream
.timeout(
duration: .milliseconds(100),
tolerance: .zero,
clock: SuspendingClock()
)
.makeAsyncIterator()

#expect(try await iterator.next() == 1)
#expect(try await iterator.next() == 2)
await #expect(throws: TimeoutError.self) {
try await iterator.next()
}
}
}