1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
 
36 +
#include <vector>
36  

37  

37  
#include <errno.h>
38  
#include <errno.h>
38  
#include <sys/epoll.h>
39  
#include <sys/epoll.h>
39  
#include <sys/eventfd.h>
40  
#include <sys/eventfd.h>
40  
#include <sys/timerfd.h>
41  
#include <sys/timerfd.h>
41  
#include <unistd.h>
42  
#include <unistd.h>
42  

43  

43  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
44  

45  

45  
struct epoll_op;
46  
struct epoll_op;
46  
struct descriptor_state;
47  
struct descriptor_state;
47  

48  

48  
/** Linux scheduler using epoll for I/O multiplexing.
49  
/** Linux scheduler using epoll for I/O multiplexing.
49  

50  

50  
    This scheduler implements the scheduler interface using Linux epoll
51  
    This scheduler implements the scheduler interface using Linux epoll
51  
    for efficient I/O event notification. It uses a single reactor model
52  
    for efficient I/O event notification. It uses a single reactor model
52  
    where one thread runs epoll_wait while other threads
53  
    where one thread runs epoll_wait while other threads
53  
    wait on a condition variable for handler work. This design provides:
54  
    wait on a condition variable for handler work. This design provides:
54  

55  

55  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  

59  

59  
    When threads call run(), they first try to execute queued handlers.
60  
    When threads call run(), they first try to execute queued handlers.
60  
    If the queue is empty and no reactor is running, one thread becomes
61  
    If the queue is empty and no reactor is running, one thread becomes
61  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    variable until handlers are available.
63  
    variable until handlers are available.
63  

64  

64  
    @par Thread Safety
65  
    @par Thread Safety
65  
    All public member functions are thread-safe.
66  
    All public member functions are thread-safe.
66  
*/
67  
*/
67  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68  
{
69  
{
69  
public:
70  
public:
70  
    /** Construct the scheduler.
71  
    /** Construct the scheduler.
71  

72  

72  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        and timerfd for kernel-managed timer expiry.
74  
        and timerfd for kernel-managed timer expiry.
74  

75  

75  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
76  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
77  
    */
78  
    */
78  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  

80  

80  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
81  
    ~epoll_scheduler() override;
82  
    ~epoll_scheduler() override;
82  

83  

83  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  

86  

86  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
87  
    void shutdown() override;
88  
    void shutdown() override;
88  

89  

 
90 +
    /// Apply runtime configuration, resizing the event buffer.
 
91 +
    void configure_reactor(
 
92 +
        unsigned max_events,
 
93 +
        unsigned budget_init,
 
94 +
        unsigned budget_max,
 
95 +
        unsigned unassisted) noexcept override;
 
96 +

89  
    /** Return the epoll file descriptor.
97  
    /** Return the epoll file descriptor.
90  

98  

91  
        Used by socket services to register file descriptors
99  
        Used by socket services to register file descriptors
92  
        for I/O event notification.
100  
        for I/O event notification.
93  

101  

94  
        @return The epoll file descriptor.
102  
        @return The epoll file descriptor.
95  
    */
103  
    */
96  
    int epoll_fd() const noexcept
104  
    int epoll_fd() const noexcept
97  
    {
105  
    {
98  
        return epoll_fd_;
106  
        return epoll_fd_;
99  
    }
107  
    }
100  

108  

101  
    /** Register a descriptor for persistent monitoring.
109  
    /** Register a descriptor for persistent monitoring.
102  

110  

103  
        The fd is registered once and stays registered until explicitly
111  
        The fd is registered once and stays registered until explicitly
104  
        deregistered. Events are dispatched via descriptor_state which
112  
        deregistered. Events are dispatched via descriptor_state which
105  
        tracks pending read/write/connect operations.
113  
        tracks pending read/write/connect operations.
106  

114  

107  
        @param fd The file descriptor to register.
115  
        @param fd The file descriptor to register.
108  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
116  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109  
    */
117  
    */
110  
    void register_descriptor(int fd, descriptor_state* desc) const;
118  
    void register_descriptor(int fd, descriptor_state* desc) const;
111  

119  

112  
    /** Deregister a persistently registered descriptor.
120  
    /** Deregister a persistently registered descriptor.
113  

121  

114  
        @param fd The file descriptor to deregister.
122  
        @param fd The file descriptor to deregister.
115  
    */
123  
    */
116  
    void deregister_descriptor(int fd) const;
124  
    void deregister_descriptor(int fd) const;
117  

125  

118  
private:
126  
private:
119  
    void
127  
    void
120 -
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
128 +
    run_task(lock_type& lock, context_type* ctx) override;
121  
    void interrupt_reactor() const override;
129  
    void interrupt_reactor() const override;
122  
    void update_timerfd() const;
130  
    void update_timerfd() const;
123  

131  

124  
    int epoll_fd_;
132  
    int epoll_fd_;
125  
    int event_fd_;
133  
    int event_fd_;
126  
    int timer_fd_;
134  
    int timer_fd_;
127  

135  

128  
    // Edge-triggered eventfd state
136  
    // Edge-triggered eventfd state
129  
    mutable std::atomic<bool> eventfd_armed_{false};
137  
    mutable std::atomic<bool> eventfd_armed_{false};
130  

138  

131  
    // Set when the earliest timer changes; flushed before epoll_wait
139  
    // Set when the earliest timer changes; flushed before epoll_wait
132  
    mutable std::atomic<bool> timerfd_stale_{false};
140  
    mutable std::atomic<bool> timerfd_stale_{false};
 
141 +

 
142 +
    // Event buffer sized from max_events_per_poll_ (set at construction,
 
143 +
    // resized by configure_reactor via io_context_options).
 
144 +
    std::vector<epoll_event> event_buffer_;
133  
};
145  
};
134  

146  

135  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
147  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
136  
    : epoll_fd_(-1)
148  
    : epoll_fd_(-1)
137  
    , event_fd_(-1)
149  
    , event_fd_(-1)
138  
    , timer_fd_(-1)
150  
    , timer_fd_(-1)
 
151 +
    , event_buffer_(max_events_per_poll_)
139  
{
152  
{
140  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
153  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
141  
    if (epoll_fd_ < 0)
154  
    if (epoll_fd_ < 0)
142  
        detail::throw_system_error(make_err(errno), "epoll_create1");
155  
        detail::throw_system_error(make_err(errno), "epoll_create1");
143  

156  

144  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
157  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
145  
    if (event_fd_ < 0)
158  
    if (event_fd_ < 0)
146  
    {
159  
    {
147  
        int errn = errno;
160  
        int errn = errno;
148  
        ::close(epoll_fd_);
161  
        ::close(epoll_fd_);
149  
        detail::throw_system_error(make_err(errn), "eventfd");
162  
        detail::throw_system_error(make_err(errn), "eventfd");
150  
    }
163  
    }
151  

164  

152  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
165  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
153  
    if (timer_fd_ < 0)
166  
    if (timer_fd_ < 0)
154  
    {
167  
    {
155  
        int errn = errno;
168  
        int errn = errno;
156  
        ::close(event_fd_);
169  
        ::close(event_fd_);
157  
        ::close(epoll_fd_);
170  
        ::close(epoll_fd_);
158  
        detail::throw_system_error(make_err(errn), "timerfd_create");
171  
        detail::throw_system_error(make_err(errn), "timerfd_create");
159  
    }
172  
    }
160  

173  

161  
    epoll_event ev{};
174  
    epoll_event ev{};
162  
    ev.events   = EPOLLIN | EPOLLET;
175  
    ev.events   = EPOLLIN | EPOLLET;
163  
    ev.data.ptr = nullptr;
176  
    ev.data.ptr = nullptr;
164  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
177  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
165  
    {
178  
    {
166  
        int errn = errno;
179  
        int errn = errno;
167  
        ::close(timer_fd_);
180  
        ::close(timer_fd_);
168  
        ::close(event_fd_);
181  
        ::close(event_fd_);
169  
        ::close(epoll_fd_);
182  
        ::close(epoll_fd_);
170  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
183  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
171  
    }
184  
    }
172  

185  

173  
    epoll_event timer_ev{};
186  
    epoll_event timer_ev{};
174  
    timer_ev.events   = EPOLLIN | EPOLLERR;
187  
    timer_ev.events   = EPOLLIN | EPOLLERR;
175  
    timer_ev.data.ptr = &timer_fd_;
188  
    timer_ev.data.ptr = &timer_fd_;
176  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
189  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
177  
    {
190  
    {
178  
        int errn = errno;
191  
        int errn = errno;
179  
        ::close(timer_fd_);
192  
        ::close(timer_fd_);
180  
        ::close(event_fd_);
193  
        ::close(event_fd_);
181  
        ::close(epoll_fd_);
194  
        ::close(epoll_fd_);
182  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
195  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
183  
    }
196  
    }
184  

197  

185  
    timer_svc_ = &get_timer_service(ctx, *this);
198  
    timer_svc_ = &get_timer_service(ctx, *this);
186  
    timer_svc_->set_on_earliest_changed(
199  
    timer_svc_->set_on_earliest_changed(
187  
        timer_service::callback(this, [](void* p) {
200  
        timer_service::callback(this, [](void* p) {
188  
            auto* self = static_cast<epoll_scheduler*>(p);
201  
            auto* self = static_cast<epoll_scheduler*>(p);
189  
            self->timerfd_stale_.store(true, std::memory_order_release);
202  
            self->timerfd_stale_.store(true, std::memory_order_release);
190  
            self->interrupt_reactor();
203  
            self->interrupt_reactor();
191  
        }));
204  
        }));
192  

205  

193  
    get_resolver_service(ctx, *this);
206  
    get_resolver_service(ctx, *this);
194  
    get_signal_service(ctx, *this);
207  
    get_signal_service(ctx, *this);
195  
    get_stream_file_service(ctx, *this);
208  
    get_stream_file_service(ctx, *this);
196  
    get_random_access_file_service(ctx, *this);
209  
    get_random_access_file_service(ctx, *this);
197  

210  

198  
    completed_ops_.push(&task_op_);
211  
    completed_ops_.push(&task_op_);
199  
}
212  
}
200  

213  

201  
inline epoll_scheduler::~epoll_scheduler()
214  
inline epoll_scheduler::~epoll_scheduler()
202  
{
215  
{
203  
    if (timer_fd_ >= 0)
216  
    if (timer_fd_ >= 0)
204  
        ::close(timer_fd_);
217  
        ::close(timer_fd_);
205  
    if (event_fd_ >= 0)
218  
    if (event_fd_ >= 0)
206  
        ::close(event_fd_);
219  
        ::close(event_fd_);
207  
    if (epoll_fd_ >= 0)
220  
    if (epoll_fd_ >= 0)
208  
        ::close(epoll_fd_);
221  
        ::close(epoll_fd_);
209  
}
222  
}
210  

223  

211  
inline void
224  
inline void
212  
epoll_scheduler::shutdown()
225  
epoll_scheduler::shutdown()
213  
{
226  
{
214  
    shutdown_drain();
227  
    shutdown_drain();
215  

228  

216  
    if (event_fd_ >= 0)
229  
    if (event_fd_ >= 0)
217  
        interrupt_reactor();
230  
        interrupt_reactor();
218  
}
231  
}
219  

232  

220  
inline void
233  
inline void
 
234 +
epoll_scheduler::configure_reactor(
 
235 +
    unsigned max_events,
 
236 +
    unsigned budget_init,
 
237 +
    unsigned budget_max,
 
238 +
    unsigned unassisted) noexcept
 
239 +
{
 
240 +
    reactor_scheduler_base::configure_reactor(
 
241 +
        max_events, budget_init, budget_max, unassisted);
 
242 +
    event_buffer_.resize(max_events_per_poll_);
 
243 +
}
 
244 +

 
245 +
inline void
221  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
246  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
222  
{
247  
{
223  
    epoll_event ev{};
248  
    epoll_event ev{};
224  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
249  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
225  
    ev.data.ptr = desc;
250  
    ev.data.ptr = desc;
226  

251  

227  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
252  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
228  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
253  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
229  

254  

230  
    desc->registered_events = ev.events;
255  
    desc->registered_events = ev.events;
231  
    desc->fd                = fd;
256  
    desc->fd                = fd;
232  
    desc->scheduler_        = this;
257  
    desc->scheduler_        = this;
 
258 +
    desc->mutex.set_enabled(!single_threaded_);
233  
    desc->ready_events_.store(0, std::memory_order_relaxed);
259  
    desc->ready_events_.store(0, std::memory_order_relaxed);
234  

260  

235 -
    std::lock_guard lock(desc->mutex);
261 +
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
236  
    desc->impl_ref_.reset();
262  
    desc->impl_ref_.reset();
237  
    desc->read_ready  = false;
263  
    desc->read_ready  = false;
238  
    desc->write_ready = false;
264  
    desc->write_ready = false;
239  
}
265  
}
240  

266  

241  
inline void
267  
inline void
242  
epoll_scheduler::deregister_descriptor(int fd) const
268  
epoll_scheduler::deregister_descriptor(int fd) const
243  
{
269  
{
244  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
270  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
245  
}
271  
}
246  

272  

247  
inline void
273  
inline void
248  
epoll_scheduler::interrupt_reactor() const
274  
epoll_scheduler::interrupt_reactor() const
249  
{
275  
{
250  
    bool expected = false;
276  
    bool expected = false;
251  
    if (eventfd_armed_.compare_exchange_strong(
277  
    if (eventfd_armed_.compare_exchange_strong(
252  
            expected, true, std::memory_order_release,
278  
            expected, true, std::memory_order_release,
253  
            std::memory_order_relaxed))
279  
            std::memory_order_relaxed))
254  
    {
280  
    {
255  
        std::uint64_t val       = 1;
281  
        std::uint64_t val       = 1;
256  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
282  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
257  
    }
283  
    }
258  
}
284  
}
259  

285  

260  
inline void
286  
inline void
261  
epoll_scheduler::update_timerfd() const
287  
epoll_scheduler::update_timerfd() const
262  
{
288  
{
263  
    auto nearest = timer_svc_->nearest_expiry();
289  
    auto nearest = timer_svc_->nearest_expiry();
264  

290  

265  
    itimerspec ts{};
291  
    itimerspec ts{};
266  
    int flags = 0;
292  
    int flags = 0;
267  

293  

268  
    if (nearest == timer_service::time_point::max())
294  
    if (nearest == timer_service::time_point::max())
269  
    {
295  
    {
270  
        // No timers — disarm by setting to 0 (relative)
296  
        // No timers — disarm by setting to 0 (relative)
271  
    }
297  
    }
272  
    else
298  
    else
273  
    {
299  
    {
274  
        auto now = std::chrono::steady_clock::now();
300  
        auto now = std::chrono::steady_clock::now();
275  
        if (nearest <= now)
301  
        if (nearest <= now)
276  
        {
302  
        {
277  
            // Use 1ns instead of 0 — zero disarms the timerfd
303  
            // Use 1ns instead of 0 — zero disarms the timerfd
278  
            ts.it_value.tv_nsec = 1;
304  
            ts.it_value.tv_nsec = 1;
279  
        }
305  
        }
280  
        else
306  
        else
281  
        {
307  
        {
282  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
308  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
283  
                            nearest - now)
309  
                            nearest - now)
284  
                            .count();
310  
                            .count();
285  
            ts.it_value.tv_sec  = nsec / 1000000000;
311  
            ts.it_value.tv_sec  = nsec / 1000000000;
286  
            ts.it_value.tv_nsec = nsec % 1000000000;
312  
            ts.it_value.tv_nsec = nsec % 1000000000;
287  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
313  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
288  
                ts.it_value.tv_nsec = 1;
314  
                ts.it_value.tv_nsec = 1;
289  
        }
315  
        }
290  
    }
316  
    }
291  

317  

292  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
318  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
293  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
319  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
294  
}
320  
}
295  

321  

296  
inline void
322  
inline void
297 -
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
323 +
epoll_scheduler::run_task(lock_type& lock, context_type* ctx)
298  
{
324  
{
299  
    int timeout_ms = task_interrupted_ ? 0 : -1;
325  
    int timeout_ms = task_interrupted_ ? 0 : -1;
300  

326  

301  
    if (lock.owns_lock())
327  
    if (lock.owns_lock())
302  
        lock.unlock();
328  
        lock.unlock();
303  

329  

304  
    task_cleanup on_exit{this, &lock, ctx};
330  
    task_cleanup on_exit{this, &lock, ctx};
305  

331  

306  
    // Flush deferred timerfd programming before blocking
332  
    // Flush deferred timerfd programming before blocking
307  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
333  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
308  
        update_timerfd();
334  
        update_timerfd();
309  

335  

310 -
    epoll_event events[128];
336 +
    int nfds = ::epoll_wait(
311 -
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
337 +
        epoll_fd_, event_buffer_.data(),
 
338 +
        static_cast<int>(event_buffer_.size()), timeout_ms);
312  

339  

313  
    if (nfds < 0 && errno != EINTR)
340  
    if (nfds < 0 && errno != EINTR)
314  
        detail::throw_system_error(make_err(errno), "epoll_wait");
341  
        detail::throw_system_error(make_err(errno), "epoll_wait");
315  

342  

316  
    bool check_timers = false;
343  
    bool check_timers = false;
317  
    op_queue local_ops;
344  
    op_queue local_ops;
318  

345  

319  
    for (int i = 0; i < nfds; ++i)
346  
    for (int i = 0; i < nfds; ++i)
320  
    {
347  
    {
321 -
        if (events[i].data.ptr == nullptr)
348 +
        if (event_buffer_[i].data.ptr == nullptr)
322  
        {
349  
        {
323  
            std::uint64_t val;
350  
            std::uint64_t val;
324  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
351  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
325  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
352  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
326  
            eventfd_armed_.store(false, std::memory_order_relaxed);
353  
            eventfd_armed_.store(false, std::memory_order_relaxed);
327  
            continue;
354  
            continue;
328  
        }
355  
        }
329  

356  

330 -
        if (events[i].data.ptr == &timer_fd_)
357 +
        if (event_buffer_[i].data.ptr == &timer_fd_)
331  
        {
358  
        {
332  
            std::uint64_t expirations;
359  
            std::uint64_t expirations;
333  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
360  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
334  
            [[maybe_unused]] auto r =
361  
            [[maybe_unused]] auto r =
335  
                ::read(timer_fd_, &expirations, sizeof(expirations));
362  
                ::read(timer_fd_, &expirations, sizeof(expirations));
336  
            check_timers = true;
363  
            check_timers = true;
337  
            continue;
364  
            continue;
338  
        }
365  
        }
339  

366  

340 -
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
367 +
        auto* desc =
341 -
        desc->add_ready_events(events[i].events);
368 +
            static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
 
369 +
        desc->add_ready_events(event_buffer_[i].events);
342  

370  

343  
        bool expected = false;
371  
        bool expected = false;
344  
        if (desc->is_enqueued_.compare_exchange_strong(
372  
        if (desc->is_enqueued_.compare_exchange_strong(
345  
                expected, true, std::memory_order_release,
373  
                expected, true, std::memory_order_release,
346  
                std::memory_order_relaxed))
374  
                std::memory_order_relaxed))
347  
        {
375  
        {
348  
            local_ops.push(desc);
376  
            local_ops.push(desc);
349  
        }
377  
        }
350  
    }
378  
    }
351  

379  

352  
    if (check_timers)
380  
    if (check_timers)
353  
    {
381  
    {
354  
        timer_svc_->process_expired();
382  
        timer_svc_->process_expired();
355  
        update_timerfd();
383  
        update_timerfd();
356  
    }
384  
    }
357  

385  

358  
    lock.lock();
386  
    lock.lock();
359  

387  

360  
    if (!local_ops.empty())
388  
    if (!local_ops.empty())
361  
        completed_ops_.splice(local_ops);
389  
        completed_ops_.splice(local_ops);
362  
}
390  
}
363  

391  

364  
} // namespace boost::corosio::detail
392  
} // namespace boost::corosio::detail
365  

393  

366  
#endif // BOOST_COROSIO_HAS_EPOLL
394  
#endif // BOOST_COROSIO_HAS_EPOLL
367  

395  

368  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
396  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP