TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/native/native_scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 :
28 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
29 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
30 :
31 : namespace boost::corosio::detail {
32 :
33 : // Forward declaration
34 : class reactor_scheduler_base;
35 :
36 : /** Per-thread state for a reactor scheduler.
37 :
38 : Each thread running a scheduler's event loop has one of these
39 : on a thread-local stack. It holds a private work queue and
40 : inline completion budget for speculative I/O fast paths.
41 : */
42 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
43 : {
44 : /// Scheduler this context belongs to.
45 : reactor_scheduler_base const* key;
46 :
47 : /// Next context frame on this thread's stack.
48 : reactor_scheduler_context* next;
49 :
50 : /// Private work queue for reduced contention.
51 : op_queue private_queue;
52 :
53 : /// Unflushed work count for the private queue.
54 : std::int64_t private_outstanding_work;
55 :
56 : /// Remaining inline completions allowed this cycle.
57 : int inline_budget;
58 :
59 : /// Maximum inline budget (adaptive, 2-16).
60 : int inline_budget_max;
61 :
62 : /// True if no other thread absorbed queued work last cycle.
63 : bool unassisted;
64 :
65 : /// Construct a context frame linked to @a n.
66 HIT 419 : reactor_scheduler_context(
67 : reactor_scheduler_base const* k, reactor_scheduler_context* n)
68 419 : : key(k)
69 419 : , next(n)
70 419 : , private_outstanding_work(0)
71 419 : , inline_budget(0)
72 419 : , inline_budget_max(2)
73 419 : , unassisted(false)
74 : {
75 419 : }
76 : };
77 :
78 : /// Thread-local context stack for reactor schedulers.
79 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
80 :
81 : /// Find the context frame for a scheduler on this thread.
82 : inline reactor_scheduler_context*
83 788081 : reactor_find_context(reactor_scheduler_base const* self) noexcept
84 : {
85 788081 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
86 : {
87 785109 : if (c->key == self)
88 785109 : return c;
89 : }
90 2972 : return nullptr;
91 : }
92 :
93 : /// Flush private work count to global counter.
94 : inline void
95 MIS 0 : reactor_flush_private_work(
96 : reactor_scheduler_context* ctx,
97 : std::atomic<std::int64_t>& outstanding_work) noexcept
98 : {
99 0 : if (ctx && ctx->private_outstanding_work > 0)
100 : {
101 0 : outstanding_work.fetch_add(
102 : ctx->private_outstanding_work, std::memory_order_relaxed);
103 0 : ctx->private_outstanding_work = 0;
104 : }
105 0 : }
106 :
107 : /** Drain private queue to global queue, flushing work count first.
108 :
109 : @return True if any ops were drained.
110 : */
111 : inline bool
112 HIT 2 : reactor_drain_private_queue(
113 : reactor_scheduler_context* ctx,
114 : std::atomic<std::int64_t>& outstanding_work,
115 : op_queue& completed_ops) noexcept
116 : {
117 2 : if (!ctx || ctx->private_queue.empty())
118 2 : return false;
119 :
120 MIS 0 : reactor_flush_private_work(ctx, outstanding_work);
121 0 : completed_ops.splice(ctx->private_queue);
122 0 : return true;
123 : }
124 :
125 : /** Non-template base for reactor-backed scheduler implementations.
126 :
127 : Provides the complete threading model shared by epoll, kqueue,
128 : and select schedulers: signal state machine, inline completion
129 : budget, work counting, run/poll methods, and the do_one event
130 : loop.
131 :
132 : Derived classes provide platform-specific hooks by overriding:
133 : - `run_task(lock, ctx)` to run the reactor poll
134 : - `interrupt_reactor()` to wake a blocked reactor
135 :
136 : De-templated from the original CRTP design to eliminate
137 : duplicate instantiations when multiple backends are compiled
138 : into the same binary. Virtual dispatch for run_task (called
139 : once per reactor cycle, before a blocking syscall) has
140 : negligible overhead.
141 :
142 : @par Thread Safety
143 : All public member functions are thread-safe.
144 : */
145 : class reactor_scheduler_base
146 : : public native_scheduler
147 : , public capy::execution_context::service
148 : {
149 : public:
150 : using key_type = scheduler;
151 : using context_type = reactor_scheduler_context;
152 : using mutex_type = conditionally_enabled_mutex;
153 : using lock_type = mutex_type::scoped_lock;
154 : using event_type = conditionally_enabled_event;
155 :
156 : /// Post a coroutine for deferred execution.
157 : void post(std::coroutine_handle<> h) const override;
158 :
159 : /// Post a scheduler operation for deferred execution.
160 : void post(scheduler_op* h) const override;
161 :
162 : /// Return true if called from a thread running this scheduler.
163 : bool running_in_this_thread() const noexcept override;
164 :
165 : /// Request the scheduler to stop dispatching handlers.
166 : void stop() override;
167 :
168 : /// Return true if the scheduler has been stopped.
169 : bool stopped() const noexcept override;
170 :
171 : /// Reset the stopped state so `run()` can resume.
172 : void restart() override;
173 :
174 : /// Run the event loop until no work remains.
175 : std::size_t run() override;
176 :
177 : /// Run until one handler completes or no work remains.
178 : std::size_t run_one() override;
179 :
180 : /// Run until one handler completes or @a usec elapses.
181 : std::size_t wait_one(long usec) override;
182 :
183 : /// Run ready handlers without blocking.
184 : std::size_t poll() override;
185 :
186 : /// Run at most one ready handler without blocking.
187 : std::size_t poll_one() override;
188 :
189 : /// Increment the outstanding work count.
190 : void work_started() noexcept override;
191 :
192 : /// Decrement the outstanding work count, stopping on zero.
193 : void work_finished() noexcept override;
194 :
195 : /** Reset the thread's inline completion budget.
196 :
197 : Called at the start of each posted completion handler to
198 : grant a fresh budget for speculative inline completions.
199 : */
200 : void reset_inline_budget() const noexcept;
201 :
202 : /** Consume one unit of inline budget if available.
203 :
204 : @return True if budget was available and consumed.
205 : */
206 : bool try_consume_inline_budget() const noexcept;
207 :
208 : /** Offset a forthcoming work_finished from work_cleanup.
209 :
210 : Called by descriptor_state when all I/O returned EAGAIN and
211 : no handler will be executed. Must be called from a scheduler
212 : thread.
213 : */
214 : void compensating_work_started() const noexcept;
215 :
216 : /** Drain work from thread context's private queue to global queue.
217 :
218 : Flushes private work count to the global counter, then
219 : transfers the queue under mutex protection.
220 :
221 : @param queue The private queue to drain.
222 : @param count Private work count to flush before draining.
223 : */
224 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
225 :
226 : /** Post completed operations for deferred invocation.
227 :
228 : If called from a thread running this scheduler, operations
229 : go to the thread's private queue (fast path). Otherwise,
230 : operations are added to the global queue under mutex and a
231 : waiter is signaled.
232 :
233 : @par Preconditions
234 : work_started() must have been called for each operation.
235 :
236 : @param ops Queue of operations to post.
237 : */
238 : void post_deferred_completions(op_queue& ops) const;
239 :
240 : /** Apply runtime configuration to the scheduler.
241 :
242 : Called by `io_context` after construction. Values that do
243 : not apply to this backend are silently ignored.
244 :
245 : @param max_events Event buffer size for epoll/kqueue.
246 : @param budget_init Starting inline completion budget.
247 : @param budget_max Hard ceiling on adaptive budget ramp-up.
248 : @param unassisted Budget when single-threaded.
249 : */
250 : virtual void configure_reactor(
251 : unsigned max_events,
252 : unsigned budget_init,
253 : unsigned budget_max,
254 : unsigned unassisted) noexcept;
255 :
256 : /** Enable or disable single-threaded (lockless) mode.
257 :
258 : When enabled, all scheduler mutex and condition variable
259 : operations become no-ops. Cross-thread post() is
260 : undefined behavior.
261 : */
262 0 : void configure_single_threaded(bool v) noexcept
263 : {
264 0 : single_threaded_ = v;
265 0 : mutex_.set_enabled(!v);
266 0 : cond_.set_enabled(!v);
267 0 : }
268 :
269 : protected:
270 HIT 515 : reactor_scheduler_base() = default;
271 :
272 : /** Drain completed_ops during shutdown.
273 :
274 : Pops all operations from the global queue and destroys them,
275 : skipping the task sentinel. Signals all waiting threads.
276 : Derived classes call this from their shutdown() override
277 : before performing platform-specific cleanup.
278 : */
279 : void shutdown_drain();
280 :
281 : /// RAII guard that re-inserts the task sentinel after `run_task`.
282 : struct task_cleanup
283 : {
284 : reactor_scheduler_base const* sched;
285 : lock_type* lock;
286 : context_type* ctx;
287 : ~task_cleanup();
288 : };
289 :
290 : mutable mutex_type mutex_{true};
291 : mutable event_type cond_{true};
292 : mutable op_queue completed_ops_;
293 : mutable std::atomic<std::int64_t> outstanding_work_{0};
294 : std::atomic<bool> stopped_{false};
295 : mutable std::atomic<bool> task_running_{false};
296 : mutable bool task_interrupted_ = false;
297 :
298 : // Runtime-configurable reactor tuning parameters.
299 : // Defaults match the library's built-in values.
300 : unsigned max_events_per_poll_ = 128;
301 : unsigned inline_budget_initial_ = 2;
302 : unsigned inline_budget_max_ = 16;
303 : unsigned unassisted_budget_ = 4;
304 :
305 : /// Bit 0 of `state_`: set when the condvar should be signaled.
306 : static constexpr std::size_t signaled_bit = 1;
307 :
308 : /// Increment per waiting thread in `state_`.
309 : static constexpr std::size_t waiter_increment = 2;
310 : mutable std::size_t state_ = 0;
311 :
312 : /// Sentinel op that triggers a reactor poll when dequeued.
313 : struct task_op final : scheduler_op
314 : {
315 MIS 0 : void operator()() override {}
316 0 : void destroy() override {}
317 : };
318 : task_op task_op_;
319 :
320 : /// Run the platform-specific reactor poll.
321 : virtual void
322 : run_task(lock_type& lock, context_type* ctx) = 0;
323 :
324 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
325 : virtual void interrupt_reactor() const = 0;
326 :
327 : private:
328 : struct work_cleanup
329 : {
330 : reactor_scheduler_base* sched;
331 : lock_type* lock;
332 : context_type* ctx;
333 : ~work_cleanup();
334 : };
335 :
336 : std::size_t do_one(
337 : lock_type& lock, long timeout_us, context_type* ctx);
338 :
339 : void signal_all(lock_type& lock) const;
340 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
341 : bool unlock_and_signal_one(lock_type& lock) const;
342 : void clear_signal() const;
343 : void wait_for_signal(lock_type& lock) const;
344 : void wait_for_signal_for(
345 : lock_type& lock, long timeout_us) const;
346 : void wake_one_thread_and_unlock(lock_type& lock) const;
347 : };
348 :
349 : /** RAII guard that pushes/pops a scheduler context frame.
350 :
351 : On construction, pushes a new context frame onto the
352 : thread-local stack. On destruction, drains any remaining
353 : private queue items to the global queue and pops the frame.
354 : */
355 : struct reactor_thread_context_guard
356 : {
357 : /// The context frame managed by this guard.
358 : reactor_scheduler_context frame_;
359 :
360 : /// Construct the guard, pushing a frame for @a sched.
361 HIT 419 : explicit reactor_thread_context_guard(
362 : reactor_scheduler_base const* sched) noexcept
363 419 : : frame_(sched, reactor_context_stack.get())
364 : {
365 419 : reactor_context_stack.set(&frame_);
366 419 : }
367 :
368 : /// Destroy the guard, draining private work and popping the frame.
369 419 : ~reactor_thread_context_guard() noexcept
370 : {
371 419 : if (!frame_.private_queue.empty())
372 MIS 0 : frame_.key->drain_thread_queue(
373 0 : frame_.private_queue, frame_.private_outstanding_work);
374 HIT 419 : reactor_context_stack.set(frame_.next);
375 419 : }
376 : };
377 :
378 : // ---- Inline implementations ------------------------------------------------
379 :
380 : inline void
381 MIS 0 : reactor_scheduler_base::configure_reactor(
382 : unsigned max_events,
383 : unsigned budget_init,
384 : unsigned budget_max,
385 : unsigned unassisted) noexcept
386 : {
387 0 : max_events_per_poll_ = max_events;
388 0 : inline_budget_initial_ = budget_init;
389 0 : inline_budget_max_ = budget_max;
390 0 : unassisted_budget_ = unassisted;
391 0 : }
392 :
393 : inline void
394 HIT 102268 : reactor_scheduler_base::reset_inline_budget() const noexcept
395 : {
396 102268 : if (auto* ctx = reactor_find_context(this))
397 : {
398 : // Cap when no other thread absorbed queued work
399 102268 : if (ctx->unassisted)
400 : {
401 102268 : ctx->inline_budget_max =
402 102268 : static_cast<int>(unassisted_budget_);
403 102268 : ctx->inline_budget =
404 102268 : static_cast<int>(unassisted_budget_);
405 102268 : return;
406 : }
407 : // Ramp up when previous cycle fully consumed budget
408 MIS 0 : if (ctx->inline_budget == 0)
409 0 : ctx->inline_budget_max = (std::min)(
410 0 : ctx->inline_budget_max * 2,
411 0 : static_cast<int>(inline_budget_max_));
412 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
413 0 : ctx->inline_budget_max =
414 0 : static_cast<int>(inline_budget_initial_);
415 0 : ctx->inline_budget = ctx->inline_budget_max;
416 : }
417 : }
418 :
419 : inline bool
420 HIT 425872 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
421 : {
422 425872 : if (auto* ctx = reactor_find_context(this))
423 : {
424 425872 : if (ctx->inline_budget > 0)
425 : {
426 340727 : --ctx->inline_budget;
427 340727 : return true;
428 : }
429 : }
430 85145 : return false;
431 : }
432 :
433 : inline void
434 2117 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
435 : {
436 : struct post_handler final : scheduler_op
437 : {
438 : std::coroutine_handle<> h_;
439 :
440 2117 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
441 4234 : ~post_handler() override = default;
442 :
443 2108 : void operator()() override
444 : {
445 2108 : auto saved = h_;
446 2108 : delete this;
447 : // Ensure stores from the posting thread are visible
448 : std::atomic_thread_fence(std::memory_order_acquire);
449 2108 : saved.resume();
450 2108 : }
451 :
452 9 : void destroy() override
453 : {
454 9 : auto saved = h_;
455 9 : delete this;
456 9 : saved.destroy();
457 9 : }
458 : };
459 :
460 2117 : auto ph = std::make_unique<post_handler>(h);
461 :
462 2117 : if (auto* ctx = reactor_find_context(this))
463 : {
464 6 : ++ctx->private_outstanding_work;
465 6 : ctx->private_queue.push(ph.release());
466 6 : return;
467 : }
468 :
469 2111 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
470 :
471 2111 : lock_type lock(mutex_);
472 2111 : completed_ops_.push(ph.release());
473 2111 : wake_one_thread_and_unlock(lock);
474 2117 : }
475 :
476 : inline void
477 103570 : reactor_scheduler_base::post(scheduler_op* h) const
478 : {
479 103570 : if (auto* ctx = reactor_find_context(this))
480 : {
481 103398 : ++ctx->private_outstanding_work;
482 103398 : ctx->private_queue.push(h);
483 103398 : return;
484 : }
485 :
486 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
487 :
488 172 : lock_type lock(mutex_);
489 172 : completed_ops_.push(h);
490 172 : wake_one_thread_and_unlock(lock);
491 172 : }
492 :
493 : inline bool
494 1290 : reactor_scheduler_base::running_in_this_thread() const noexcept
495 : {
496 1290 : return reactor_find_context(this) != nullptr;
497 : }
498 :
499 : inline void
500 413 : reactor_scheduler_base::stop()
501 : {
502 413 : lock_type lock(mutex_);
503 413 : if (!stopped_.load(std::memory_order_acquire))
504 : {
505 374 : stopped_.store(true, std::memory_order_release);
506 374 : signal_all(lock);
507 374 : interrupt_reactor();
508 : }
509 413 : }
510 :
511 : inline bool
512 21 : reactor_scheduler_base::stopped() const noexcept
513 : {
514 21 : return stopped_.load(std::memory_order_acquire);
515 : }
516 :
517 : inline void
518 91 : reactor_scheduler_base::restart()
519 : {
520 91 : stopped_.store(false, std::memory_order_release);
521 91 : }
522 :
523 : inline std::size_t
524 388 : reactor_scheduler_base::run()
525 : {
526 776 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
527 : {
528 29 : stop();
529 29 : return 0;
530 : }
531 :
532 359 : reactor_thread_context_guard ctx(this);
533 359 : lock_type lock(mutex_);
534 :
535 359 : std::size_t n = 0;
536 : for (;;)
537 : {
538 275738 : if (!do_one(lock, -1, &ctx.frame_))
539 359 : break;
540 275379 : if (n != (std::numeric_limits<std::size_t>::max)())
541 275379 : ++n;
542 275379 : if (!lock.owns_lock())
543 180799 : lock.lock();
544 : }
545 359 : return n;
546 359 : }
547 :
548 : inline std::size_t
549 2 : reactor_scheduler_base::run_one()
550 : {
551 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
552 : {
553 MIS 0 : stop();
554 0 : return 0;
555 : }
556 :
557 HIT 2 : reactor_thread_context_guard ctx(this);
558 2 : lock_type lock(mutex_);
559 2 : return do_one(lock, -1, &ctx.frame_);
560 2 : }
561 :
562 : inline std::size_t
563 61 : reactor_scheduler_base::wait_one(long usec)
564 : {
565 122 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
566 : {
567 10 : stop();
568 10 : return 0;
569 : }
570 :
571 51 : reactor_thread_context_guard ctx(this);
572 51 : lock_type lock(mutex_);
573 51 : return do_one(lock, usec, &ctx.frame_);
574 51 : }
575 :
576 : inline std::size_t
577 6 : reactor_scheduler_base::poll()
578 : {
579 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
580 : {
581 1 : stop();
582 1 : return 0;
583 : }
584 :
585 5 : reactor_thread_context_guard ctx(this);
586 5 : lock_type lock(mutex_);
587 :
588 5 : std::size_t n = 0;
589 : for (;;)
590 : {
591 11 : if (!do_one(lock, 0, &ctx.frame_))
592 5 : break;
593 6 : if (n != (std::numeric_limits<std::size_t>::max)())
594 6 : ++n;
595 6 : if (!lock.owns_lock())
596 6 : lock.lock();
597 : }
598 5 : return n;
599 5 : }
600 :
601 : inline std::size_t
602 4 : reactor_scheduler_base::poll_one()
603 : {
604 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
605 : {
606 2 : stop();
607 2 : return 0;
608 : }
609 :
610 2 : reactor_thread_context_guard ctx(this);
611 2 : lock_type lock(mutex_);
612 2 : return do_one(lock, 0, &ctx.frame_);
613 2 : }
614 :
615 : inline void
616 27427 : reactor_scheduler_base::work_started() noexcept
617 : {
618 27427 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
619 27427 : }
620 :
621 : inline void
622 38495 : reactor_scheduler_base::work_finished() noexcept
623 : {
624 76990 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
625 366 : stop();
626 38495 : }
627 :
628 : inline void
629 152964 : reactor_scheduler_base::compensating_work_started() const noexcept
630 : {
631 152964 : auto* ctx = reactor_find_context(this);
632 152964 : if (ctx)
633 152964 : ++ctx->private_outstanding_work;
634 152964 : }
635 :
636 : inline void
637 MIS 0 : reactor_scheduler_base::drain_thread_queue(
638 : op_queue& queue, std::int64_t count) const
639 : {
640 0 : if (count > 0)
641 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
642 :
643 0 : lock_type lock(mutex_);
644 0 : completed_ops_.splice(queue);
645 0 : if (count > 0)
646 0 : maybe_unlock_and_signal_one(lock);
647 0 : }
648 :
649 : inline void
650 HIT 16806 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
651 : {
652 16806 : if (ops.empty())
653 16806 : return;
654 :
655 MIS 0 : if (auto* ctx = reactor_find_context(this))
656 : {
657 0 : ctx->private_queue.splice(ops);
658 0 : return;
659 : }
660 :
661 0 : lock_type lock(mutex_);
662 0 : completed_ops_.splice(ops);
663 0 : wake_one_thread_and_unlock(lock);
664 0 : }
665 :
666 : inline void
667 HIT 515 : reactor_scheduler_base::shutdown_drain()
668 : {
669 515 : lock_type lock(mutex_);
670 :
671 1116 : while (auto* h = completed_ops_.pop())
672 : {
673 601 : if (h == &task_op_)
674 515 : continue;
675 86 : lock.unlock();
676 86 : h->destroy();
677 86 : lock.lock();
678 601 : }
679 :
680 515 : signal_all(lock);
681 515 : }
682 :
683 : inline void
684 889 : reactor_scheduler_base::signal_all(lock_type&) const
685 : {
686 889 : state_ |= signaled_bit;
687 889 : cond_.notify_all();
688 889 : }
689 :
690 : inline bool
691 2283 : reactor_scheduler_base::maybe_unlock_and_signal_one(
692 : lock_type& lock) const
693 : {
694 2283 : state_ |= signaled_bit;
695 2283 : if (state_ > signaled_bit)
696 : {
697 MIS 0 : lock.unlock();
698 0 : cond_.notify_one();
699 0 : return true;
700 : }
701 HIT 2283 : return false;
702 : }
703 :
704 : inline bool
705 324763 : reactor_scheduler_base::unlock_and_signal_one(
706 : lock_type& lock) const
707 : {
708 324763 : state_ |= signaled_bit;
709 324763 : bool have_waiters = state_ > signaled_bit;
710 324763 : lock.unlock();
711 324763 : if (have_waiters)
712 MIS 0 : cond_.notify_one();
713 HIT 324763 : return have_waiters;
714 : }
715 :
716 : inline void
717 MIS 0 : reactor_scheduler_base::clear_signal() const
718 : {
719 0 : state_ &= ~signaled_bit;
720 0 : }
721 :
722 : inline void
723 0 : reactor_scheduler_base::wait_for_signal(
724 : lock_type& lock) const
725 : {
726 0 : while ((state_ & signaled_bit) == 0)
727 : {
728 0 : state_ += waiter_increment;
729 0 : cond_.wait(lock);
730 0 : state_ -= waiter_increment;
731 : }
732 0 : }
733 :
734 : inline void
735 0 : reactor_scheduler_base::wait_for_signal_for(
736 : lock_type& lock, long timeout_us) const
737 : {
738 0 : if ((state_ & signaled_bit) == 0)
739 : {
740 0 : state_ += waiter_increment;
741 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
742 0 : state_ -= waiter_increment;
743 : }
744 0 : }
745 :
746 : inline void
747 HIT 2283 : reactor_scheduler_base::wake_one_thread_and_unlock(
748 : lock_type& lock) const
749 : {
750 2283 : if (maybe_unlock_and_signal_one(lock))
751 MIS 0 : return;
752 :
753 HIT 2283 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
754 : {
755 53 : task_interrupted_ = true;
756 53 : lock.unlock();
757 53 : interrupt_reactor();
758 : }
759 : else
760 : {
761 2230 : lock.unlock();
762 : }
763 : }
764 :
765 275440 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
766 : {
767 275440 : if (ctx)
768 : {
769 275440 : std::int64_t produced = ctx->private_outstanding_work;
770 275440 : if (produced > 1)
771 15 : sched->outstanding_work_.fetch_add(
772 : produced - 1, std::memory_order_relaxed);
773 275425 : else if (produced < 1)
774 27874 : sched->work_finished();
775 275440 : ctx->private_outstanding_work = 0;
776 :
777 275440 : if (!ctx->private_queue.empty())
778 : {
779 94602 : lock->lock();
780 94602 : sched->completed_ops_.splice(ctx->private_queue);
781 : }
782 : }
783 : else
784 : {
785 MIS 0 : sched->work_finished();
786 : }
787 HIT 275440 : }
788 :
789 380716 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
790 : {
791 190358 : if (!ctx)
792 MIS 0 : return;
793 :
794 HIT 190358 : if (ctx->private_outstanding_work > 0)
795 : {
796 8775 : sched->outstanding_work_.fetch_add(
797 8775 : ctx->private_outstanding_work, std::memory_order_relaxed);
798 8775 : ctx->private_outstanding_work = 0;
799 : }
800 :
801 190358 : if (!ctx->private_queue.empty())
802 : {
803 8775 : if (!lock->owns_lock())
804 MIS 0 : lock->lock();
805 HIT 8775 : sched->completed_ops_.splice(ctx->private_queue);
806 : }
807 190358 : }
808 :
809 : inline std::size_t
810 275804 : reactor_scheduler_base::do_one(
811 : lock_type& lock, long timeout_us, context_type* ctx)
812 : {
813 : for (;;)
814 : {
815 466162 : if (stopped_.load(std::memory_order_acquire))
816 357 : return 0;
817 :
818 465805 : scheduler_op* op = completed_ops_.pop();
819 :
820 : // Handle reactor sentinel — time to poll for I/O
821 465805 : if (op == &task_op_)
822 : {
823 : bool more_handlers =
824 190363 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
825 :
826 331403 : if (!more_handlers &&
827 282080 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
828 : timeout_us == 0))
829 : {
830 5 : completed_ops_.push(&task_op_);
831 5 : return 0;
832 : }
833 :
834 190358 : task_interrupted_ = more_handlers || timeout_us == 0;
835 190358 : task_running_.store(true, std::memory_order_release);
836 :
837 190358 : if (more_handlers)
838 49323 : unlock_and_signal_one(lock);
839 :
840 : try
841 : {
842 190358 : run_task(lock, ctx);
843 : }
844 MIS 0 : catch (...)
845 : {
846 0 : task_running_.store(false, std::memory_order_relaxed);
847 0 : throw;
848 0 : }
849 :
850 HIT 190358 : task_running_.store(false, std::memory_order_relaxed);
851 190358 : completed_ops_.push(&task_op_);
852 190358 : continue;
853 190358 : }
854 :
855 : // Handle operation
856 275442 : if (op != nullptr)
857 : {
858 275440 : bool more = !completed_ops_.empty();
859 :
860 275440 : if (more)
861 275440 : ctx->unassisted = !unlock_and_signal_one(lock);
862 : else
863 : {
864 MIS 0 : ctx->unassisted = false;
865 0 : lock.unlock();
866 : }
867 :
868 HIT 275440 : work_cleanup on_exit{this, &lock, ctx};
869 : (void)on_exit;
870 :
871 275440 : (*op)();
872 275440 : return 1;
873 275440 : }
874 :
875 : // Try private queue before blocking
876 2 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
877 MIS 0 : continue;
878 :
879 HIT 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
880 : timeout_us == 0)
881 2 : return 0;
882 :
883 MIS 0 : clear_signal();
884 0 : if (timeout_us < 0)
885 0 : wait_for_signal(lock);
886 : else
887 0 : wait_for_signal_for(lock, timeout_us);
888 HIT 190358 : }
889 : }
890 :
891 : } // namespace boost::corosio::detail
892 :
893 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|