-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathevent_loop.h
More file actions
736 lines (666 loc) · 30.2 KB
/
event_loop.h
File metadata and controls
736 lines (666 loc) · 30.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
#pragma once
/**
* Single-threaded event loop for serializing coroutine completions.
* An alternative to thread_pool that ensures all coroutine resumes
* happen on the event loop thread, enabling efficient single-threaded
* coroutine scheduling.
*
* Copyright (c) 2026, Jorma Rebane
* Distributed under MIT Software License
*/
#include "config.h"
#include "future_types.h"
#include "thread_pool.h" // parallel_task, pool_task_handle
#include "concurrent_queue.h"
#include "collections.h" // rpp::erase_if
#include "timer.h"
#include "threads.h"
#include "debugging.h"
#include <atomic>
#include <memory>
#include <optional>
#include <type_traits>
#if RPP_HAS_COROUTINES
namespace rpp
{
/**
* @brief A simple top-level coroutine return type for use with event_loop.
*
* `event_task` acts as a fire-and-forget coroutine handle that the event loop
* drives. It starts eagerly (no initial suspension), suspends at final_suspend
* so the caller can query completion, and captures unhandled exceptions.
*
* Usage:
* @code
* rpp::event_task myCoroutine(rpp::event_loop& loop) {
* int val = co_await loop.run_async([]{ return heavyWork(); });
* // resumed on event loop thread
* updateUI(val);
* }
*
* rpp::event_loop loop;
* auto task = myCoroutine(loop);
* loop.run_until_done(task); // drives the loop and rethrows on failure
* @endcode
*/
struct RPPAPI RPP_CORO_RETURN_TYPE event_task
{
struct promise_type
{
std::exception_ptr exception;
rpp::delegate<void()> on_complete {}; // optional: called at final_suspend (e.g. by fork())
event_task get_return_object() noexcept
{
return event_task{rpp::coro_handle<promise_type>::from_promise(*this)};
}
rpp::suspend_never initial_suspend() noexcept { return {}; }
// custom final_awaiter: calls on_complete before suspending (keeps frame alive)
struct final_awaiter
{
bool await_ready() noexcept { return false; }
void await_suspend(rpp::coro_handle<promise_type> h) noexcept
{
if (auto& promise_completed = h.promise().on_complete)
promise_completed();
}
void await_resume() noexcept {}
};
final_awaiter final_suspend() noexcept { return {}; }
void return_void() noexcept {}
void unhandled_exception() noexcept { exception = std::current_exception(); }
};
rpp::coro_handle<promise_type> handle;
explicit event_task(rpp::coro_handle<promise_type> h) noexcept : handle{h} {}
event_task(event_task&& o) noexcept : handle{o.handle} { o.handle = nullptr; }
~event_task() { if (handle) handle.destroy(); }
event_task& operator=(event_task&& o) noexcept
{
if (this != &o) { if (handle) handle.destroy(); handle = o.handle; o.handle = nullptr; }
return *this;
}
event_task(const event_task&) = delete;
event_task& operator=(const event_task&) = delete;
/** @returns true if the coroutine has finished (or was never started) */
bool done() const noexcept { return !handle || handle.done(); }
/** @brief Rethrows any unhandled exception captured by the coroutine */
void rethrow_if_exception() const
{
if (handle && handle.promise().exception)
{
std::exception_ptr ex = std::move(handle.promise().exception); // only throw once
std::rethrow_exception(ex);
}
}
};
/**
* @brief A single-threaded event loop that serializes coroutine completions.
*
* Unlike thread_pool, which resumes coroutines on background threads,
* event_loop ensures that all coroutine resumes happen on the thread
* that is running the loop (typically the main thread).
*
* Background work (such as lambdas in co_await [&]{...}) is still dispatched
* to the configured thread_pool, but when the work completes, the coroutine
* resume is posted back to the event loop instead of running inline
* on the worker thread.
*
* This enables a programming model where:
* - Suspended coroutines are paused while other pending tasks can resume
* - All resumes are serialized onto a single thread (no data races)
* - The user drives the loop via run_once(), run_until_idle() or run_loop()
* - fork() launches concurrent coroutine paths with event-driven join_forks()
* - run_async() supports semaphore waits and queue pops (resumes on loop thread)
*
* @code
* rpp::event_loop loop;
*
* rpp::cfuture<std::string> fetchData() {
* std::string raw = co_await loop.run_async([&]{
* return downloadFile(url); // runs on thread pool
* });
* // NOTE: always resumed on event_loop thread, parse data on main thread
* // the coroutine can be cancelled by throwing an exception in downloadFile()
* co_return parseData(raw);
* }
*
* auto future = fetchData();
* loop.run_until_idle(); // drives event processing until no work remains
* auto result = future.get();
* @endcode
*/
class RPPAPI event_loop
{
friend class event_loop_test;
struct resume_event
{
rpp::coro_handle<> handle {};
rpp::delegate<void()> callback {};
// resume via coroutine handle
explicit resume_event(rpp::coro_handle<> h) noexcept : handle{h} {}
// resume via generic callback
explicit resume_event(rpp::delegate<void()> cb) noexcept : callback{std::move(cb)} {}
resume_event() noexcept = default;
~resume_event() noexcept = default;
};
// the thread that owns and drives this event loop, initialized in CTOR
std::atomic_uint64_t owner_thread_id {0};
// number of tasks currently suspended in a background task
std::atomic_int num_background_suspended {0};
// true if the infinite run_loop() was started, instead of run_once()
std::atomic_bool loop_running { false };
rpp::thread_pool& background_pool;
// thread-safe FIFO queue of resume events
rpp::concurrent_queue<resume_event> resume_queue;
// user-provided exception handler for any unhandled exceptions from background tasks
rpp::delegate<void(std::exception_ptr)> except_handler {};
// fork tracking: stores event_tasks for forked coroutines
std::vector<event_task> fork_tasks;
std::atomic<int> num_active_forks {0};
rpp::coro_handle<> fork_joiner {}; // coroutine waiting in join_forks()
public:
/**
* @brief Initializes a new event loop.
* @param main_thr_id Captures the main thread ID where the loop will supposedly run.
* @param background_task_pool Optional thread pool for running background tasks.
* If null, then global thread pool is used.
*/
event_loop(rpp::uint64 main_thr_id = 0/*0=rpp::get_thread_id()*/,
rpp::thread_pool* background_task_pool = nullptr) noexcept;
~event_loop() noexcept;
NOCOPY_NOMOVE(event_loop)
/** @returns true if there are background tasks currently in progress */
bool has_background_tasks() const noexcept { return num_background_suspended.load(std::memory_order_acquire) > 0; }
/** @returns the number of pending background tasks. */
int background_tasks() const noexcept { return num_background_suspended.load(std::memory_order_acquire); }
/** @returns true if there are any pending resume events for the main thread to handle */
bool has_pending_completions() const noexcept { return !resume_queue.empty(); }
/** @returns the number of pending resume events for the main thread. */
int pending_completions() const noexcept { return int(resume_queue.size()); }
/** @returns true if there are any pending tasks or resume events that the loop should process */
bool has_pending_work() const noexcept { return has_background_tasks() || has_pending_completions(); }
/** @returns the thread ID of the event loop thread. Set in CTOR. */
rpp::uint64 main_thread_id() const noexcept { return owner_thread_id.load(std::memory_order_acquire); }
/**
* @brief Signals the event loop to return and finalize all pending tasks.
*/
void stop() noexcept;
/**
* @brief Waits on all pending tasks to fully drain the event loop
* @param timeout Maximum time to wait for pending tasks to complete.
* @returns true if all tasks were completed within timeout
*/
bool wait_on_all(rpp::Duration timeout = rpp::seconds(1)) noexcept;
/**
* @brief By default exceptions are swallowed and logged as warnings.
* This allows the event loop to handle these errors without crashing.
*/
void set_except_handler(rpp::delegate<void(std::exception_ptr)> handler) noexcept
{ except_handler = std::move(handler); }
/**
* @brief Runs the event loop until stop() is called and all pending tasks are completed.
*
* Processes all queued coroutine resumes and waits for new ones.
* Returns when stop() is called and attempts to drain any remaining pending tasks before exiting.
* Automatically cleans up completed forks; fork exceptions go through except_handler.
* @returns true if all pending tasks were completed, false if some tasks still pending
*/
bool run_loop() noexcept;
/**
* @brief Processes at most one pending resume event.
*
* @param timeout Maximum time to wait for a resume event.
* Use Duration::zero() for non-blocking poll.
* @returns true if a resume was processed, false if timed out
*/
bool run_once(rpp::Duration timeout) noexcept;
/**
* @brief Runs the event loop until there are no more background tasks and no more resume events.
*
* Automatically cleans up completed forks; fork exceptions go through except_handler.
* @returns Number of resume events processed before the loop became idle
*/
int run_until_idle() noexcept;
/**
* @brief Drives the event loop until the given event_task completes,
* then rethrows any exception captured by the coroutine.
*
* This is the simplest way to run a top-level event_task coroutine:
* @code
* auto task = myCoroutine(loop);
* loop.run_until_done(task);
* @endcode
*/
void run_until_done(event_task& task);
private:
// forward declarations for fork API (used by fork() template)
void cleanup_forks() noexcept;
void notify_fork_joiner() noexcept;
public:
// ─── Fork API ────────────────────────────────────────────
/**
* @brief Forks a new coroutine execution path on the event loop.
*
* The callback is invoked immediately on the event loop thread (eager start).
* It must return `rpp::event_task`. The resulting coroutine is tracked
* internally — no handle management needed.
*
* Must be called on the event loop thread.
*
* @code
* loop.fork([&]() -> rpp::event_task {
* auto result = co_await loop.run_async([&]{ return heavyWork(); });
* updateState(result);
* });
* loop.fork([&]() -> rpp::event_task { ... });
* loop.run_until_idle(); // drives all forks, cleans up on completion
* @endcode
*/
template<typename F>
void fork(F&& coro_factory)
{
// must be called on the event loop thread (fork_tasks is not thread-safe)
if (rpp::get_thread_id() != owner_thread_id.load(std::memory_order_acquire))
{
LogError("event_loop::fork() must be called on the event loop thread");
return;
}
// clean up completed forks to avoid unbounded growth
cleanup_forks();
num_active_forks.fetch_add(1, std::memory_order_acq_rel);
// Heap-allocate the lambda to keep it alive for the coroutine's lifetime.
// Lambda coroutines store a `this` pointer to the lambda object — if the
// lambda is a temporary and is destroyed before the coroutine completes,
// all by-reference captures dangle.
auto stored = std::make_shared<std::decay_t<F>>(std::forward<F>(coro_factory));
auto task = (*stored)(); // starts eagerly on event loop thread
if (task.done())
{
// completed synchronously (no co_await in callback)
num_active_forks.fetch_sub(1, std::memory_order_acq_rel);
notify_fork_joiner();
}
else
{
// Capture shared_ptr in on_complete to extend lambda lifetime.
// The delegate lives in the promise (coroutine frame), which stays
// alive until the event_task destructor calls handle.destroy().
task.handle.promise().on_complete = [this, stored]()
{
num_active_forks.fetch_sub(1, std::memory_order_acq_rel);
notify_fork_joiner();
};
}
fork_tasks.push_back(std::move(task));
}
/**
* @brief Returns the number of forked coroutines that have not yet completed.
*/
int num_forks() const noexcept { return num_active_forks.load(std::memory_order_acquire); }
/**
* @brief Checks completed forks for exceptions and clears them.
*
* Rethrows the first exception found among completed forks.
* Must be called on the event loop thread.
*/
void drain_forks();
/**
* @brief Posts a coroutine handle to be resumed on the event loop thread.
* Thread-safe: can be called from any thread.
*/
void post_resume(rpp::coro_handle<> handle) noexcept;
/**
* @brief Posts a generic callback to be executed on the event loop thread.
* Thread-safe: can be called from any thread.
*
* This is often known as `run_on_main_thread()` in GUI frameworks.
*/
void post(rpp::delegate<void()>&& callback) noexcept;
// ─── Awaiter types ──────────────────────────────────────────
/**
* @brief Awaiter that runs a void lambda on the thread pool and resumes
* the coroutine on the event loop thread.
*/
struct RPP_CORO_RETURN_TYPE background_awaiter_void
{
event_loop& loop;
rpp::delegate<void()> action;
std::exception_ptr ex {};
background_awaiter_void(event_loop& loop, rpp::delegate<void()> action) noexcept
: loop{loop}, action{std::move(action)} {}
bool await_ready() const noexcept { return false; }
void await_suspend(rpp::coro_handle<> cont) noexcept
{
loop.start_in_background([this, cont]() mutable
{
try { action(); }
catch (...) { ex = std::current_exception(); }
// WARNING: do not deallocate action here, it can lead to a race-condition + memory corruption
loop.post_resume_from_suspension(cont);
});
}
void await_resume() { if (ex) std::rethrow_exception(ex); }
};
/**
* @brief Awaiter that runs a lambda on the thread pool and resumes
* the coroutine on the event loop thread.
*/
template<typename T> struct RPP_CORO_RETURN_TYPE background_awaiter
{
event_loop& loop;
rpp::delegate<T()> action;
std::optional<T> result {};
std::exception_ptr ex {};
background_awaiter(event_loop& loop, rpp::delegate<T()> action) noexcept
: loop{loop}, action{std::move(action)} {}
bool await_ready() const noexcept { return false; }
void await_suspend(rpp::coro_handle<> cont) noexcept
{
loop.start_in_background([this, cont]() mutable
{
try { result.emplace(action()); }
catch (...) { ex = std::current_exception(); }
// WARNING: do not deallocate action here, it can lead to a race-condition + memory corruption
loop.post_resume_from_suspension(cont);
});
}
T await_resume()
{
// release resources before resuming the coroutine
action = {};
if (ex) std::rethrow_exception(ex);
return std::move(*result);
}
};
/**
* @brief Awaiter that runs a future on the thread pool and resumes
* the coroutine on the event loop thread when the future is ready.
*/
template<IsFuture Future> struct RPP_CORO_RETURN_TYPE background_awaiter_fut
{
event_loop& loop;
rpp::delegate<Future()> action;
Future f {};
std::exception_ptr ex {};
background_awaiter_fut(event_loop& loop, rpp::delegate<Future()> action) noexcept
: loop{loop}, action{std::move(action)} {}
bool await_ready() const noexcept { return false; }
void await_suspend(rpp::coro_handle<> cont) noexcept
{
loop.start_in_background([this, cont]() mutable
{
try {
f = action(); // get the future from the lambda
f.wait(); // wait for the nested coroutine to finish (can throw)
} catch (...) { ex = std::current_exception(); }
// WARNING: do not deallocate action here, it can lead to a race-condition + memory corruption
loop.post_resume_from_suspension(cont);
});
}
// similar to future<T>, either gets the result T, or throws the caught exception
auto await_resume()
{
// release resources before resuming the coroutine
action = {};
if (ex) std::rethrow_exception(ex);
return f.get();
}
};
/**
* @brief Awaiter that waits for a cfuture on a background thread,
* then resumes the coroutine on the event loop thread.
*/
template<typename T>
struct RPP_CORO_RETURN_TYPE future_awaiter
{
event_loop& loop;
rpp::cfuture<T> fut;
std::exception_ptr ex {};
future_awaiter(event_loop& loop, rpp::cfuture<T>&& fut) noexcept
: loop{loop}, fut{std::move(fut)} {}
future_awaiter(event_loop& loop, std::future<T>&& fut) noexcept
: loop{loop}, fut{std::move(fut)} {}
bool await_ready() const noexcept
{
return fut.valid() && fut.wait_for(std::chrono::microseconds{0}) != std::future_status::timeout;
}
void await_suspend(rpp::coro_handle<> cont) noexcept
{
if (!fut.valid())
{
// nothing to wait on; resume immediately on loop thread
loop.post_resume(cont);
return;
}
loop.start_in_background([this, cont]() mutable
{
try {
if (fut.valid())
fut.wait();
} catch (...) { ex = std::current_exception(); }
loop.post_resume_from_suspension(cont);
});
}
auto await_resume()
{
if (ex) std::rethrow_exception(ex);
if (!fut.valid())
{
if constexpr (std::is_void_v<T>) return;
else return T{};
}
return fut.get();
}
};
/**
* @brief Creates an awaiter that runs the given lambda on the thread pool
* and resumes the coroutine on the event loop thread.
*
* NOTE: The lambda runs in a background thread context,
* but ALWAYS resumes on the Main Thread !
*
* @code
* std::string result = co_await loop.run_async([&]{
* return expensiveComputation();
* });
* // After co_await, we are back on the event loop thread
*
* std::string result2 = co_await loop.run_async([&]() -> rpp::cfuture<std::string> {
* auto input = prepareExpensiveInput();
* std::string output = co_await loop.run_async([&]{
* return expensiveComputation(input);
* });
* // Resumes on event loop thread !
* ui.showResult(output);
* co_return output;
* });
* // After co_await, we are back on the event loop thread
* @endcode
*/
template<typename FutureOrCallback>
RPP_CORO_WRAPPER auto run_async(FutureOrCallback&& fut_or_cb) noexcept
{
using Decayed = std::decay_t<FutureOrCallback>;
if constexpr (IsFuture<Decayed>) // rpp::cfuture<R> or std::future<R>
{
using T = decltype(fut_or_cb.get());
return future_awaiter<T>{ *this, std::move(fut_or_cb) };
}
else if constexpr (IsFunctionReturningFuture<Decayed>) // lambda[]() -> rpp::cfuture<R>
{
using Fut = decltype(fut_or_cb());
return background_awaiter_fut<Fut>{ *this, std::move(fut_or_cb) };
}
else // lambda[]()->R or rpp::delegate<R()>
{
using R = decltype(fut_or_cb());
if constexpr (std::is_void_v<R>) return background_awaiter_void{ *this, std::move(fut_or_cb) };
else return background_awaiter<R>{ *this, std::move(fut_or_cb) };
}
}
// ─── Semaphore / Queue await ────────────────────────────────
/**
* @brief Waits for a semaphore signal on a background thread,
* resumes on the event loop thread.
* @code
* auto wr = co_await loop.await(sem, rpp::millis(100));
* if (wr == rpp::semaphore::notified) { // signaled }
* @endcode
*/
RPP_CORO_WRAPPER auto await(rpp::semaphore& sem, rpp::Duration timeout) noexcept
{
return run_async([&sem, timeout]() { return sem.wait(timeout); });
}
/**
* @brief Pops from a queue on a background thread,
* resumes on the event loop thread.
* @code
* std::string item;
* bool got = co_await loop.await(queue, item, rpp::millis(100));
* @endcode
*/
template<typename T>
RPP_CORO_WRAPPER auto await(rpp::concurrent_queue<T>& queue, T& out, rpp::Duration timeout) noexcept
{
return run_async([&queue, &out, timeout]() { return queue.wait_pop(out, timeout); });
}
/**
* @brief Pops from a queue on a background thread, returns std::optional<T>,
* resumes on the event loop thread.
* @code
* auto item = co_await loop.await_pop(queue, rpp::millis(100));
* if (item) { use(*item); }
* @endcode
*/
template<typename T>
RPP_CORO_WRAPPER auto await_pop(rpp::concurrent_queue<T>& queue, rpp::Duration timeout) noexcept
{
return run_async([&queue, timeout]() -> std::optional<T> {
T item;
if (queue.wait_pop(item, timeout)) return std::move(item);
return std::nullopt;
});
}
// ─── Sleep / delay awaiter ──────────────────────────────────
/**
* @brief Awaiter that sleeps on a background thread, then resumes
* on the event loop thread.
*/
struct delay_awaiter
{
event_loop& loop;
rpp::TimePoint end;
delay_awaiter(event_loop& loop, rpp::TimePoint tp) noexcept : loop{loop}, end{tp} {}
delay_awaiter(event_loop& loop, rpp::Duration d) noexcept : loop{loop}, end{rpp::TimePoint::monotonic_now() + d} {}
bool await_ready() const noexcept { return rpp::TimePoint::monotonic_now() >= end; }
void await_suspend(rpp::coro_handle<> cont) noexcept
{
loop.start_in_background([this, cont]() mutable
{
rpp::sleep_until(end);
loop.post_resume_from_suspension(cont);
});
}
void await_resume() const noexcept {}
};
/**
* @brief Creates an awaiter that sleeps for the given duration,
* then resumes on the event loop thread.
* @code
* co_await loop.delay(rpp::millis(100));
* @endcode
*/
delay_awaiter delay(rpp::Duration duration) noexcept
{
return delay_awaiter{ *this, duration };
}
delay_awaiter delay_until(rpp::TimePoint until) noexcept
{
return delay_awaiter{ *this, until };
}
/**
* @brief Awaiter that suspends the caller until all forks complete or timeout expires.
*
* Event-driven: fork completions resume the joiner directly via post_resume().
* For timeout: a background timer resumes the joiner if forks haven't finished.
*/
struct RPP_CORO_RETURN_TYPE join_forks_awaiter
{
event_loop& loop;
rpp::Duration timeout;
bool await_ready() const noexcept
{
return loop.num_active_forks.load(std::memory_order_acquire) == 0;
}
void await_suspend(rpp::coro_handle<> cont) noexcept
{
loop.fork_joiner = cont;
// double-check: all forks may have completed between await_ready and here
// (single-threaded event loop — cannot actually race, but defensive)
if (loop.num_active_forks.load(std::memory_order_acquire) == 0)
{
loop.fork_joiner = {};
loop.post_resume(cont);
return;
}
// start timeout timer if finite
if (timeout < rpp::seconds(86400))
{
rpp::TimePoint deadline = rpp::TimePoint::monotonic_now() + timeout;
loop.start_in_background([&loop=loop, cont, deadline]() mutable // mutable because of `cont.resume()`
{
rpp::sleep_until(deadline);
// post callback to event loop thread for thread-safe joiner check
loop.resume_queue.push(resume_event{rpp::delegate<void()>{
[&loop=loop, cont]() mutable // mutable because of `cont.resume()`
{
if (loop.fork_joiner == cont)
{
loop.fork_joiner = {};
cont.resume();
}
}
}});
// balance the start_in_background counter increment
loop.num_background_suspended.fetch_sub(1, std::memory_order_acq_rel);
});
}
}
int await_resume()
{
loop.drain_forks();
return loop.num_active_forks.load(std::memory_order_acquire);
}
};
/**
* @brief Awaitable join for all active forks, with optional timeout.
*
* Suspends the caller until all forks complete or the timeout expires.
* Returns the number of forks still active (0 = all done).
* Event-driven: zero polling overhead; fork completions resume the joiner directly.
*
* @code
* // wait indefinitely for all forks
* co_await loop.join_forks();
*
* // wait with timeout — handle soft deadlock
* int remaining = co_await loop.join_forks(rpp::seconds(5));
* if (remaining > 0) { // some forks still running }
* @endcode
*/
RPP_CORO_WRAPPER join_forks_awaiter join_forks(rpp::Duration timeout = rpp::seconds(86400)) noexcept
{
return join_forks_awaiter{ *this, timeout };
}
private:
void start_in_background(task_delegate<void()>&& generic_task) noexcept
{
num_background_suspended.fetch_add(1, std::memory_order_acq_rel);
background_pool.parallel_task_detached(std::move(generic_task));
}
// posts a resume event and decrements the background suspension count
void post_resume_from_suspension(rpp::coro_handle<> handle) noexcept;
// processes a single resume event
void process_event(resume_event& event) noexcept;
};
} // namespace rpp
#endif // RPP_HAS_COROUTINES