include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

74.4% Lines (238/320) 79.5% List of functions (35/44)
reactor_scheduler.hpp
f(x) Functions (44)
Function Calls Lines Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :66 419x 100.0% 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :83 788081x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :95 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :112 2x 50.0% 64.0% boost::corosio::detail::reactor_scheduler_base::configure_single_threaded(bool) :262 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :270 515x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :315 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :316 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :361 419x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :369 419x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_base::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :381 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :394 102268x 50.0% 45.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :420 425872x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :434 2117x 100.0% 84.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :440 2117x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :441 4234x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :443 2108x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :452 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :477 103570x 100.0% 87.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :494 1290x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :500 413x 100.0% 82.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :512 21x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :518 91x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :524 388x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::run_one() :549 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :563 61x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::poll() :577 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :602 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::work_started() :616 27427x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :622 38495x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :629 152964x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :637 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :650 16806x 30.0% 35.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :667 515x 100.0% 88.0% boost::corosio::detail::reactor_scheduler_base::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :684 889x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :691 2283x 57.1% 50.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :705 324763x 85.7% 80.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :717 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :723 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :735 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :747 2283x 87.5% 92.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :765 275440x 92.3% 92.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :789 190358x 83.3% 86.0% boost::corosio::detail::reactor_scheduler_base::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :810 275804x 73.8% 72.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27
28 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
29 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
30
31 namespace boost::corosio::detail {
32
33 // Forward declaration
34 class reactor_scheduler_base;
35
36 /** Per-thread state for a reactor scheduler.
37
38 Each thread running a scheduler's event loop has one of these
39 on a thread-local stack. It holds a private work queue and
40 inline completion budget for speculative I/O fast paths.
41 */
42 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
43 {
44 /// Scheduler this context belongs to.
45 reactor_scheduler_base const* key;
46
47 /// Next context frame on this thread's stack.
48 reactor_scheduler_context* next;
49
50 /// Private work queue for reduced contention.
51 op_queue private_queue;
52
53 /// Unflushed work count for the private queue.
54 std::int64_t private_outstanding_work;
55
56 /// Remaining inline completions allowed this cycle.
57 int inline_budget;
58
59 /// Maximum inline budget (adaptive, 2-16).
60 int inline_budget_max;
61
62 /// True if no other thread absorbed queued work last cycle.
63 bool unassisted;
64
65 /// Construct a context frame linked to @a n.
66 419x reactor_scheduler_context(
67 reactor_scheduler_base const* k, reactor_scheduler_context* n)
68 419x : key(k)
69 419x , next(n)
70 419x , private_outstanding_work(0)
71 419x , inline_budget(0)
72 419x , inline_budget_max(2)
73 419x , unassisted(false)
74 {
75 419x }
76 };
77
78 /// Thread-local context stack for reactor schedulers.
79 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
80
81 /// Find the context frame for a scheduler on this thread.
82 inline reactor_scheduler_context*
83 788081x reactor_find_context(reactor_scheduler_base const* self) noexcept
84 {
85 788081x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
86 {
87 785109x if (c->key == self)
88 785109x return c;
89 }
90 2972x return nullptr;
91 }
92
93 /// Flush private work count to global counter.
94 inline void
95 reactor_flush_private_work(
96 reactor_scheduler_context* ctx,
97 std::atomic<std::int64_t>& outstanding_work) noexcept
98 {
99 if (ctx && ctx->private_outstanding_work > 0)
100 {
101 outstanding_work.fetch_add(
102 ctx->private_outstanding_work, std::memory_order_relaxed);
103 ctx->private_outstanding_work = 0;
104 }
105 }
106
107 /** Drain private queue to global queue, flushing work count first.
108
109 @return True if any ops were drained.
110 */
111 inline bool
112 2x reactor_drain_private_queue(
113 reactor_scheduler_context* ctx,
114 std::atomic<std::int64_t>& outstanding_work,
115 op_queue& completed_ops) noexcept
116 {
117 2x if (!ctx || ctx->private_queue.empty())
118 2x return false;
119
120 reactor_flush_private_work(ctx, outstanding_work);
121 completed_ops.splice(ctx->private_queue);
122 return true;
123 }
124
125 /** Non-template base for reactor-backed scheduler implementations.
126
127 Provides the complete threading model shared by epoll, kqueue,
128 and select schedulers: signal state machine, inline completion
129 budget, work counting, run/poll methods, and the do_one event
130 loop.
131
132 Derived classes provide platform-specific hooks by overriding:
133 - `run_task(lock, ctx)` to run the reactor poll
134 - `interrupt_reactor()` to wake a blocked reactor
135
136 De-templated from the original CRTP design to eliminate
137 duplicate instantiations when multiple backends are compiled
138 into the same binary. Virtual dispatch for run_task (called
139 once per reactor cycle, before a blocking syscall) has
140 negligible overhead.
141
142 @par Thread Safety
143 All public member functions are thread-safe.
144 */
145 class reactor_scheduler_base
146 : public native_scheduler
147 , public capy::execution_context::service
148 {
149 public:
150 using key_type = scheduler;
151 using context_type = reactor_scheduler_context;
152 using mutex_type = conditionally_enabled_mutex;
153 using lock_type = mutex_type::scoped_lock;
154 using event_type = conditionally_enabled_event;
155
156 /// Post a coroutine for deferred execution.
157 void post(std::coroutine_handle<> h) const override;
158
159 /// Post a scheduler operation for deferred execution.
160 void post(scheduler_op* h) const override;
161
162 /// Return true if called from a thread running this scheduler.
163 bool running_in_this_thread() const noexcept override;
164
165 /// Request the scheduler to stop dispatching handlers.
166 void stop() override;
167
168 /// Return true if the scheduler has been stopped.
169 bool stopped() const noexcept override;
170
171 /// Reset the stopped state so `run()` can resume.
172 void restart() override;
173
174 /// Run the event loop until no work remains.
175 std::size_t run() override;
176
177 /// Run until one handler completes or no work remains.
178 std::size_t run_one() override;
179
180 /// Run until one handler completes or @a usec elapses.
181 std::size_t wait_one(long usec) override;
182
183 /// Run ready handlers without blocking.
184 std::size_t poll() override;
185
186 /// Run at most one ready handler without blocking.
187 std::size_t poll_one() override;
188
189 /// Increment the outstanding work count.
190 void work_started() noexcept override;
191
192 /// Decrement the outstanding work count, stopping on zero.
193 void work_finished() noexcept override;
194
195 /** Reset the thread's inline completion budget.
196
197 Called at the start of each posted completion handler to
198 grant a fresh budget for speculative inline completions.
199 */
200 void reset_inline_budget() const noexcept;
201
202 /** Consume one unit of inline budget if available.
203
204 @return True if budget was available and consumed.
205 */
206 bool try_consume_inline_budget() const noexcept;
207
208 /** Offset a forthcoming work_finished from work_cleanup.
209
210 Called by descriptor_state when all I/O returned EAGAIN and
211 no handler will be executed. Must be called from a scheduler
212 thread.
213 */
214 void compensating_work_started() const noexcept;
215
216 /** Drain work from thread context's private queue to global queue.
217
218 Flushes private work count to the global counter, then
219 transfers the queue under mutex protection.
220
221 @param queue The private queue to drain.
222 @param count Private work count to flush before draining.
223 */
224 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
225
226 /** Post completed operations for deferred invocation.
227
228 If called from a thread running this scheduler, operations
229 go to the thread's private queue (fast path). Otherwise,
230 operations are added to the global queue under mutex and a
231 waiter is signaled.
232
233 @par Preconditions
234 work_started() must have been called for each operation.
235
236 @param ops Queue of operations to post.
237 */
238 void post_deferred_completions(op_queue& ops) const;
239
240 /** Apply runtime configuration to the scheduler.
241
242 Called by `io_context` after construction. Values that do
243 not apply to this backend are silently ignored.
244
245 @param max_events Event buffer size for epoll/kqueue.
246 @param budget_init Starting inline completion budget.
247 @param budget_max Hard ceiling on adaptive budget ramp-up.
248 @param unassisted Budget when single-threaded.
249 */
250 virtual void configure_reactor(
251 unsigned max_events,
252 unsigned budget_init,
253 unsigned budget_max,
254 unsigned unassisted) noexcept;
255
256 /** Enable or disable single-threaded (lockless) mode.
257
258 When enabled, all scheduler mutex and condition variable
259 operations become no-ops. Cross-thread post() is
260 undefined behavior.
261 */
262 void configure_single_threaded(bool v) noexcept
263 {
264 single_threaded_ = v;
265 mutex_.set_enabled(!v);
266 cond_.set_enabled(!v);
267 }
268
269 protected:
270 515x reactor_scheduler_base() = default;
271
272 /** Drain completed_ops during shutdown.
273
274 Pops all operations from the global queue and destroys them,
275 skipping the task sentinel. Signals all waiting threads.
276 Derived classes call this from their shutdown() override
277 before performing platform-specific cleanup.
278 */
279 void shutdown_drain();
280
281 /// RAII guard that re-inserts the task sentinel after `run_task`.
282 struct task_cleanup
283 {
284 reactor_scheduler_base const* sched;
285 lock_type* lock;
286 context_type* ctx;
287 ~task_cleanup();
288 };
289
290 mutable mutex_type mutex_{true};
291 mutable event_type cond_{true};
292 mutable op_queue completed_ops_;
293 mutable std::atomic<std::int64_t> outstanding_work_{0};
294 std::atomic<bool> stopped_{false};
295 mutable std::atomic<bool> task_running_{false};
296 mutable bool task_interrupted_ = false;
297
298 // Runtime-configurable reactor tuning parameters.
299 // Defaults match the library's built-in values.
300 unsigned max_events_per_poll_ = 128;
301 unsigned inline_budget_initial_ = 2;
302 unsigned inline_budget_max_ = 16;
303 unsigned unassisted_budget_ = 4;
304
305 /// Bit 0 of `state_`: set when the condvar should be signaled.
306 static constexpr std::size_t signaled_bit = 1;
307
308 /// Increment per waiting thread in `state_`.
309 static constexpr std::size_t waiter_increment = 2;
310 mutable std::size_t state_ = 0;
311
312 /// Sentinel op that triggers a reactor poll when dequeued.
313 struct task_op final : scheduler_op
314 {
315 void operator()() override {}
316 void destroy() override {}
317 };
318 task_op task_op_;
319
320 /// Run the platform-specific reactor poll.
321 virtual void
322 run_task(lock_type& lock, context_type* ctx) = 0;
323
324 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
325 virtual void interrupt_reactor() const = 0;
326
327 private:
328 struct work_cleanup
329 {
330 reactor_scheduler_base* sched;
331 lock_type* lock;
332 context_type* ctx;
333 ~work_cleanup();
334 };
335
336 std::size_t do_one(
337 lock_type& lock, long timeout_us, context_type* ctx);
338
339 void signal_all(lock_type& lock) const;
340 bool maybe_unlock_and_signal_one(lock_type& lock) const;
341 bool unlock_and_signal_one(lock_type& lock) const;
342 void clear_signal() const;
343 void wait_for_signal(lock_type& lock) const;
344 void wait_for_signal_for(
345 lock_type& lock, long timeout_us) const;
346 void wake_one_thread_and_unlock(lock_type& lock) const;
347 };
348
349 /** RAII guard that pushes/pops a scheduler context frame.
350
351 On construction, pushes a new context frame onto the
352 thread-local stack. On destruction, drains any remaining
353 private queue items to the global queue and pops the frame.
354 */
355 struct reactor_thread_context_guard
356 {
357 /// The context frame managed by this guard.
358 reactor_scheduler_context frame_;
359
360 /// Construct the guard, pushing a frame for @a sched.
361 419x explicit reactor_thread_context_guard(
362 reactor_scheduler_base const* sched) noexcept
363 419x : frame_(sched, reactor_context_stack.get())
364 {
365 419x reactor_context_stack.set(&frame_);
366 419x }
367
368 /// Destroy the guard, draining private work and popping the frame.
369 419x ~reactor_thread_context_guard() noexcept
370 {
371 419x if (!frame_.private_queue.empty())
372 frame_.key->drain_thread_queue(
373 frame_.private_queue, frame_.private_outstanding_work);
374 419x reactor_context_stack.set(frame_.next);
375 419x }
376 };
377
378 // ---- Inline implementations ------------------------------------------------
379
380 inline void
381 reactor_scheduler_base::configure_reactor(
382 unsigned max_events,
383 unsigned budget_init,
384 unsigned budget_max,
385 unsigned unassisted) noexcept
386 {
387 max_events_per_poll_ = max_events;
388 inline_budget_initial_ = budget_init;
389 inline_budget_max_ = budget_max;
390 unassisted_budget_ = unassisted;
391 }
392
393 inline void
394 102268x reactor_scheduler_base::reset_inline_budget() const noexcept
395 {
396 102268x if (auto* ctx = reactor_find_context(this))
397 {
398 // Cap when no other thread absorbed queued work
399 102268x if (ctx->unassisted)
400 {
401 102268x ctx->inline_budget_max =
402 102268x static_cast<int>(unassisted_budget_);
403 102268x ctx->inline_budget =
404 102268x static_cast<int>(unassisted_budget_);
405 102268x return;
406 }
407 // Ramp up when previous cycle fully consumed budget
408 if (ctx->inline_budget == 0)
409 ctx->inline_budget_max = (std::min)(
410 ctx->inline_budget_max * 2,
411 static_cast<int>(inline_budget_max_));
412 else if (ctx->inline_budget < ctx->inline_budget_max)
413 ctx->inline_budget_max =
414 static_cast<int>(inline_budget_initial_);
415 ctx->inline_budget = ctx->inline_budget_max;
416 }
417 }
418
419 inline bool
420 425872x reactor_scheduler_base::try_consume_inline_budget() const noexcept
421 {
422 425872x if (auto* ctx = reactor_find_context(this))
423 {
424 425872x if (ctx->inline_budget > 0)
425 {
426 340727x --ctx->inline_budget;
427 340727x return true;
428 }
429 }
430 85145x return false;
431 }
432
433 inline void
434 2117x reactor_scheduler_base::post(std::coroutine_handle<> h) const
435 {
436 struct post_handler final : scheduler_op
437 {
438 std::coroutine_handle<> h_;
439
440 2117x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
441 4234x ~post_handler() override = default;
442
443 2108x void operator()() override
444 {
445 2108x auto saved = h_;
446 2108x delete this;
447 // Ensure stores from the posting thread are visible
448 std::atomic_thread_fence(std::memory_order_acquire);
449 2108x saved.resume();
450 2108x }
451
452 9x void destroy() override
453 {
454 9x auto saved = h_;
455 9x delete this;
456 9x saved.destroy();
457 9x }
458 };
459
460 2117x auto ph = std::make_unique<post_handler>(h);
461
462 2117x if (auto* ctx = reactor_find_context(this))
463 {
464 6x ++ctx->private_outstanding_work;
465 6x ctx->private_queue.push(ph.release());
466 6x return;
467 }
468
469 2111x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
470
471 2111x lock_type lock(mutex_);
472 2111x completed_ops_.push(ph.release());
473 2111x wake_one_thread_and_unlock(lock);
474 2117x }
475
476 inline void
477 103570x reactor_scheduler_base::post(scheduler_op* h) const
478 {
479 103570x if (auto* ctx = reactor_find_context(this))
480 {
481 103398x ++ctx->private_outstanding_work;
482 103398x ctx->private_queue.push(h);
483 103398x return;
484 }
485
486 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
487
488 172x lock_type lock(mutex_);
489 172x completed_ops_.push(h);
490 172x wake_one_thread_and_unlock(lock);
491 172x }
492
493 inline bool
494 1290x reactor_scheduler_base::running_in_this_thread() const noexcept
495 {
496 1290x return reactor_find_context(this) != nullptr;
497 }
498
499 inline void
500 413x reactor_scheduler_base::stop()
501 {
502 413x lock_type lock(mutex_);
503 413x if (!stopped_.load(std::memory_order_acquire))
504 {
505 374x stopped_.store(true, std::memory_order_release);
506 374x signal_all(lock);
507 374x interrupt_reactor();
508 }
509 413x }
510
511 inline bool
512 21x reactor_scheduler_base::stopped() const noexcept
513 {
514 21x return stopped_.load(std::memory_order_acquire);
515 }
516
517 inline void
518 91x reactor_scheduler_base::restart()
519 {
520 91x stopped_.store(false, std::memory_order_release);
521 91x }
522
523 inline std::size_t
524 388x reactor_scheduler_base::run()
525 {
526 776x if (outstanding_work_.load(std::memory_order_acquire) == 0)
527 {
528 29x stop();
529 29x return 0;
530 }
531
532 359x reactor_thread_context_guard ctx(this);
533 359x lock_type lock(mutex_);
534
535 359x std::size_t n = 0;
536 for (;;)
537 {
538 275738x if (!do_one(lock, -1, &ctx.frame_))
539 359x break;
540 275379x if (n != (std::numeric_limits<std::size_t>::max)())
541 275379x ++n;
542 275379x if (!lock.owns_lock())
543 180799x lock.lock();
544 }
545 359x return n;
546 359x }
547
548 inline std::size_t
549 2x reactor_scheduler_base::run_one()
550 {
551 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
552 {
553 stop();
554 return 0;
555 }
556
557 2x reactor_thread_context_guard ctx(this);
558 2x lock_type lock(mutex_);
559 2x return do_one(lock, -1, &ctx.frame_);
560 2x }
561
562 inline std::size_t
563 61x reactor_scheduler_base::wait_one(long usec)
564 {
565 122x if (outstanding_work_.load(std::memory_order_acquire) == 0)
566 {
567 10x stop();
568 10x return 0;
569 }
570
571 51x reactor_thread_context_guard ctx(this);
572 51x lock_type lock(mutex_);
573 51x return do_one(lock, usec, &ctx.frame_);
574 51x }
575
576 inline std::size_t
577 6x reactor_scheduler_base::poll()
578 {
579 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
580 {
581 1x stop();
582 1x return 0;
583 }
584
585 5x reactor_thread_context_guard ctx(this);
586 5x lock_type lock(mutex_);
587
588 5x std::size_t n = 0;
589 for (;;)
590 {
591 11x if (!do_one(lock, 0, &ctx.frame_))
592 5x break;
593 6x if (n != (std::numeric_limits<std::size_t>::max)())
594 6x ++n;
595 6x if (!lock.owns_lock())
596 6x lock.lock();
597 }
598 5x return n;
599 5x }
600
601 inline std::size_t
602 4x reactor_scheduler_base::poll_one()
603 {
604 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
605 {
606 2x stop();
607 2x return 0;
608 }
609
610 2x reactor_thread_context_guard ctx(this);
611 2x lock_type lock(mutex_);
612 2x return do_one(lock, 0, &ctx.frame_);
613 2x }
614
615 inline void
616 27427x reactor_scheduler_base::work_started() noexcept
617 {
618 27427x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
619 27427x }
620
621 inline void
622 38495x reactor_scheduler_base::work_finished() noexcept
623 {
624 76990x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
625 366x stop();
626 38495x }
627
628 inline void
629 152964x reactor_scheduler_base::compensating_work_started() const noexcept
630 {
631 152964x auto* ctx = reactor_find_context(this);
632 152964x if (ctx)
633 152964x ++ctx->private_outstanding_work;
634 152964x }
635
636 inline void
637 reactor_scheduler_base::drain_thread_queue(
638 op_queue& queue, std::int64_t count) const
639 {
640 if (count > 0)
641 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
642
643 lock_type lock(mutex_);
644 completed_ops_.splice(queue);
645 if (count > 0)
646 maybe_unlock_and_signal_one(lock);
647 }
648
649 inline void
650 16806x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
651 {
652 16806x if (ops.empty())
653 16806x return;
654
655 if (auto* ctx = reactor_find_context(this))
656 {
657 ctx->private_queue.splice(ops);
658 return;
659 }
660
661 lock_type lock(mutex_);
662 completed_ops_.splice(ops);
663 wake_one_thread_and_unlock(lock);
664 }
665
666 inline void
667 515x reactor_scheduler_base::shutdown_drain()
668 {
669 515x lock_type lock(mutex_);
670
671 1116x while (auto* h = completed_ops_.pop())
672 {
673 601x if (h == &task_op_)
674 515x continue;
675 86x lock.unlock();
676 86x h->destroy();
677 86x lock.lock();
678 601x }
679
680 515x signal_all(lock);
681 515x }
682
683 inline void
684 889x reactor_scheduler_base::signal_all(lock_type&) const
685 {
686 889x state_ |= signaled_bit;
687 889x cond_.notify_all();
688 889x }
689
690 inline bool
691 2283x reactor_scheduler_base::maybe_unlock_and_signal_one(
692 lock_type& lock) const
693 {
694 2283x state_ |= signaled_bit;
695 2283x if (state_ > signaled_bit)
696 {
697 lock.unlock();
698 cond_.notify_one();
699 return true;
700 }
701 2283x return false;
702 }
703
704 inline bool
705 324763x reactor_scheduler_base::unlock_and_signal_one(
706 lock_type& lock) const
707 {
708 324763x state_ |= signaled_bit;
709 324763x bool have_waiters = state_ > signaled_bit;
710 324763x lock.unlock();
711 324763x if (have_waiters)
712 cond_.notify_one();
713 324763x return have_waiters;
714 }
715
716 inline void
717 reactor_scheduler_base::clear_signal() const
718 {
719 state_ &= ~signaled_bit;
720 }
721
722 inline void
723 reactor_scheduler_base::wait_for_signal(
724 lock_type& lock) const
725 {
726 while ((state_ & signaled_bit) == 0)
727 {
728 state_ += waiter_increment;
729 cond_.wait(lock);
730 state_ -= waiter_increment;
731 }
732 }
733
734 inline void
735 reactor_scheduler_base::wait_for_signal_for(
736 lock_type& lock, long timeout_us) const
737 {
738 if ((state_ & signaled_bit) == 0)
739 {
740 state_ += waiter_increment;
741 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
742 state_ -= waiter_increment;
743 }
744 }
745
746 inline void
747 2283x reactor_scheduler_base::wake_one_thread_and_unlock(
748 lock_type& lock) const
749 {
750 2283x if (maybe_unlock_and_signal_one(lock))
751 return;
752
753 2283x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
754 {
755 53x task_interrupted_ = true;
756 53x lock.unlock();
757 53x interrupt_reactor();
758 }
759 else
760 {
761 2230x lock.unlock();
762 }
763 }
764
765 275440x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
766 {
767 275440x if (ctx)
768 {
769 275440x std::int64_t produced = ctx->private_outstanding_work;
770 275440x if (produced > 1)
771 15x sched->outstanding_work_.fetch_add(
772 produced - 1, std::memory_order_relaxed);
773 275425x else if (produced < 1)
774 27874x sched->work_finished();
775 275440x ctx->private_outstanding_work = 0;
776
777 275440x if (!ctx->private_queue.empty())
778 {
779 94602x lock->lock();
780 94602x sched->completed_ops_.splice(ctx->private_queue);
781 }
782 }
783 else
784 {
785 sched->work_finished();
786 }
787 275440x }
788
789 380716x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
790 {
791 190358x if (!ctx)
792 return;
793
794 190358x if (ctx->private_outstanding_work > 0)
795 {
796 8775x sched->outstanding_work_.fetch_add(
797 8775x ctx->private_outstanding_work, std::memory_order_relaxed);
798 8775x ctx->private_outstanding_work = 0;
799 }
800
801 190358x if (!ctx->private_queue.empty())
802 {
803 8775x if (!lock->owns_lock())
804 lock->lock();
805 8775x sched->completed_ops_.splice(ctx->private_queue);
806 }
807 190358x }
808
809 inline std::size_t
810 275804x reactor_scheduler_base::do_one(
811 lock_type& lock, long timeout_us, context_type* ctx)
812 {
813 for (;;)
814 {
815 466162x if (stopped_.load(std::memory_order_acquire))
816 357x return 0;
817
818 465805x scheduler_op* op = completed_ops_.pop();
819
820 // Handle reactor sentinel — time to poll for I/O
821 465805x if (op == &task_op_)
822 {
823 bool more_handlers =
824 190363x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
825
826 331403x if (!more_handlers &&
827 282080x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
828 timeout_us == 0))
829 {
830 5x completed_ops_.push(&task_op_);
831 5x return 0;
832 }
833
834 190358x task_interrupted_ = more_handlers || timeout_us == 0;
835 190358x task_running_.store(true, std::memory_order_release);
836
837 190358x if (more_handlers)
838 49323x unlock_and_signal_one(lock);
839
840 try
841 {
842 190358x run_task(lock, ctx);
843 }
844 catch (...)
845 {
846 task_running_.store(false, std::memory_order_relaxed);
847 throw;
848 }
849
850 190358x task_running_.store(false, std::memory_order_relaxed);
851 190358x completed_ops_.push(&task_op_);
852 190358x continue;
853 190358x }
854
855 // Handle operation
856 275442x if (op != nullptr)
857 {
858 275440x bool more = !completed_ops_.empty();
859
860 275440x if (more)
861 275440x ctx->unassisted = !unlock_and_signal_one(lock);
862 else
863 {
864 ctx->unassisted = false;
865 lock.unlock();
866 }
867
868 275440x work_cleanup on_exit{this, &lock, ctx};
869 (void)on_exit;
870
871 275440x (*op)();
872 275440x return 1;
873 275440x }
874
875 // Try private queue before blocking
876 2x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
877 continue;
878
879 4x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
880 timeout_us == 0)
881 2x return 0;
882
883 clear_signal();
884 if (timeout_us < 0)
885 wait_for_signal(lock);
886 else
887 wait_for_signal_for(lock, timeout_us);
888 190358x }
889 }
890
891 } // namespace boost::corosio::detail
892
893 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
894