1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
 
36 +
#include <vector>
36  

37  

37  
#include <errno.h>
38  
#include <errno.h>
38  
#include <sys/epoll.h>
39  
#include <sys/epoll.h>
39  
#include <sys/eventfd.h>
40  
#include <sys/eventfd.h>
40  
#include <sys/timerfd.h>
41  
#include <sys/timerfd.h>
41  
#include <unistd.h>
42  
#include <unistd.h>
42  

43  

43  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
44  

45  

45  
struct epoll_op;
46  
struct epoll_op;
46  
struct descriptor_state;
47  
struct descriptor_state;
47  

48  

48  
/** Linux scheduler using epoll for I/O multiplexing.
49  
/** Linux scheduler using epoll for I/O multiplexing.
49  

50  

50  
    This scheduler implements the scheduler interface using Linux epoll
51  
    This scheduler implements the scheduler interface using Linux epoll
51  
    for efficient I/O event notification. It uses a single reactor model
52  
    for efficient I/O event notification. It uses a single reactor model
52  
    where one thread runs epoll_wait while other threads
53  
    where one thread runs epoll_wait while other threads
53  
    wait on a condition variable for handler work. This design provides:
54  
    wait on a condition variable for handler work. This design provides:
54  

55  

55  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  

59  

59  
    When threads call run(), they first try to execute queued handlers.
60  
    When threads call run(), they first try to execute queued handlers.
60  
    If the queue is empty and no reactor is running, one thread becomes
61  
    If the queue is empty and no reactor is running, one thread becomes
61  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    variable until handlers are available.
63  
    variable until handlers are available.
63  

64  

64  
    @par Thread Safety
65  
    @par Thread Safety
65  
    All public member functions are thread-safe.
66  
    All public member functions are thread-safe.
66  
*/
67  
*/
67  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68  
{
69  
{
69  
public:
70  
public:
70  
    /** Construct the scheduler.
71  
    /** Construct the scheduler.
71  

72  

72  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        and timerfd for kernel-managed timer expiry.
74  
        and timerfd for kernel-managed timer expiry.
74  

75  

75  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
76  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
77  
    */
78  
    */
78  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  

80  

80  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
81  
    ~epoll_scheduler() override;
82  
    ~epoll_scheduler() override;
82  

83  

83  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  

86  

86  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
87  
    void shutdown() override;
88  
    void shutdown() override;
88  

89  

 
90 +
    /// Apply runtime configuration, resizing the event buffer.
 
91 +
    void configure_reactor(
 
92 +
        unsigned max_events,
 
93 +
        unsigned budget_init,
 
94 +
        unsigned budget_max,
 
95 +
        unsigned unassisted) override;
 
96 +

89  
    /** Return the epoll file descriptor.
97  
    /** Return the epoll file descriptor.
90  

98  

91  
        Used by socket services to register file descriptors
99  
        Used by socket services to register file descriptors
92  
        for I/O event notification.
100  
        for I/O event notification.
93  

101  

94  
        @return The epoll file descriptor.
102  
        @return The epoll file descriptor.
95  
    */
103  
    */
96  
    int epoll_fd() const noexcept
104  
    int epoll_fd() const noexcept
97  
    {
105  
    {
98  
        return epoll_fd_;
106  
        return epoll_fd_;
99  
    }
107  
    }
100  

108  

101  
    /** Register a descriptor for persistent monitoring.
109  
    /** Register a descriptor for persistent monitoring.
102  

110  

103  
        The fd is registered once and stays registered until explicitly
111  
        The fd is registered once and stays registered until explicitly
104  
        deregistered. Events are dispatched via descriptor_state which
112  
        deregistered. Events are dispatched via descriptor_state which
105  
        tracks pending read/write/connect operations.
113  
        tracks pending read/write/connect operations.
106  

114  

107  
        @param fd The file descriptor to register.
115  
        @param fd The file descriptor to register.
108  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
116  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109  
    */
117  
    */
110  
    void register_descriptor(int fd, descriptor_state* desc) const;
118  
    void register_descriptor(int fd, descriptor_state* desc) const;
111  

119  

112  
    /** Deregister a persistently registered descriptor.
120  
    /** Deregister a persistently registered descriptor.
113  

121  

114  
        @param fd The file descriptor to deregister.
122  
        @param fd The file descriptor to deregister.
115  
    */
123  
    */
116  
    void deregister_descriptor(int fd) const;
124  
    void deregister_descriptor(int fd) const;
117  

125  

118  
private:
126  
private:
119  
    void
127  
    void
120 -
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
128 +
    run_task(lock_type& lock, context_type* ctx,
121  
        long timeout_us) override;
129  
        long timeout_us) override;
122  
    void interrupt_reactor() const override;
130  
    void interrupt_reactor() const override;
123  
    void update_timerfd() const;
131  
    void update_timerfd() const;
124  

132  

125  
    int epoll_fd_;
133  
    int epoll_fd_;
126  
    int event_fd_;
134  
    int event_fd_;
127  
    int timer_fd_;
135  
    int timer_fd_;
128  

136  

129  
    // Edge-triggered eventfd state
137  
    // Edge-triggered eventfd state
130  
    mutable std::atomic<bool> eventfd_armed_{false};
138  
    mutable std::atomic<bool> eventfd_armed_{false};
131  

139  

132  
    // Set when the earliest timer changes; flushed before epoll_wait
140  
    // Set when the earliest timer changes; flushed before epoll_wait
133  
    mutable std::atomic<bool> timerfd_stale_{false};
141  
    mutable std::atomic<bool> timerfd_stale_{false};
 
142 +

 
143 +
    // Event buffer sized from max_events_per_poll_ (set at construction,
 
144 +
    // resized by configure_reactor via io_context_options).
 
145 +
    std::vector<epoll_event> event_buffer_;
134  
};
146  
};
135  

147  

136  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
148  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
137  
    : epoll_fd_(-1)
149  
    : epoll_fd_(-1)
138  
    , event_fd_(-1)
150  
    , event_fd_(-1)
139  
    , timer_fd_(-1)
151  
    , timer_fd_(-1)
 
152 +
    , event_buffer_(max_events_per_poll_)
140  
{
153  
{
141  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
154  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
142  
    if (epoll_fd_ < 0)
155  
    if (epoll_fd_ < 0)
143  
        detail::throw_system_error(make_err(errno), "epoll_create1");
156  
        detail::throw_system_error(make_err(errno), "epoll_create1");
144  

157  

145  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
158  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
146  
    if (event_fd_ < 0)
159  
    if (event_fd_ < 0)
147  
    {
160  
    {
148  
        int errn = errno;
161  
        int errn = errno;
149  
        ::close(epoll_fd_);
162  
        ::close(epoll_fd_);
150  
        detail::throw_system_error(make_err(errn), "eventfd");
163  
        detail::throw_system_error(make_err(errn), "eventfd");
151  
    }
164  
    }
152  

165  

153  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
166  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
154  
    if (timer_fd_ < 0)
167  
    if (timer_fd_ < 0)
155  
    {
168  
    {
156  
        int errn = errno;
169  
        int errn = errno;
157  
        ::close(event_fd_);
170  
        ::close(event_fd_);
158  
        ::close(epoll_fd_);
171  
        ::close(epoll_fd_);
159  
        detail::throw_system_error(make_err(errn), "timerfd_create");
172  
        detail::throw_system_error(make_err(errn), "timerfd_create");
160  
    }
173  
    }
161  

174  

162  
    epoll_event ev{};
175  
    epoll_event ev{};
163  
    ev.events   = EPOLLIN | EPOLLET;
176  
    ev.events   = EPOLLIN | EPOLLET;
164  
    ev.data.ptr = nullptr;
177  
    ev.data.ptr = nullptr;
165  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
178  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
166  
    {
179  
    {
167  
        int errn = errno;
180  
        int errn = errno;
168  
        ::close(timer_fd_);
181  
        ::close(timer_fd_);
169  
        ::close(event_fd_);
182  
        ::close(event_fd_);
170  
        ::close(epoll_fd_);
183  
        ::close(epoll_fd_);
171  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
184  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
172  
    }
185  
    }
173  

186  

174  
    epoll_event timer_ev{};
187  
    epoll_event timer_ev{};
175  
    timer_ev.events   = EPOLLIN | EPOLLERR;
188  
    timer_ev.events   = EPOLLIN | EPOLLERR;
176  
    timer_ev.data.ptr = &timer_fd_;
189  
    timer_ev.data.ptr = &timer_fd_;
177  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
190  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
178  
    {
191  
    {
179  
        int errn = errno;
192  
        int errn = errno;
180  
        ::close(timer_fd_);
193  
        ::close(timer_fd_);
181  
        ::close(event_fd_);
194  
        ::close(event_fd_);
182  
        ::close(epoll_fd_);
195  
        ::close(epoll_fd_);
183  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
196  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
184  
    }
197  
    }
185  

198  

186  
    timer_svc_ = &get_timer_service(ctx, *this);
199  
    timer_svc_ = &get_timer_service(ctx, *this);
187  
    timer_svc_->set_on_earliest_changed(
200  
    timer_svc_->set_on_earliest_changed(
188  
        timer_service::callback(this, [](void* p) {
201  
        timer_service::callback(this, [](void* p) {
189  
            auto* self = static_cast<epoll_scheduler*>(p);
202  
            auto* self = static_cast<epoll_scheduler*>(p);
190  
            self->timerfd_stale_.store(true, std::memory_order_release);
203  
            self->timerfd_stale_.store(true, std::memory_order_release);
191  
            self->interrupt_reactor();
204  
            self->interrupt_reactor();
192  
        }));
205  
        }));
193  

206  

194  
    get_resolver_service(ctx, *this);
207  
    get_resolver_service(ctx, *this);
195  
    get_signal_service(ctx, *this);
208  
    get_signal_service(ctx, *this);
196  
    get_stream_file_service(ctx, *this);
209  
    get_stream_file_service(ctx, *this);
197  
    get_random_access_file_service(ctx, *this);
210  
    get_random_access_file_service(ctx, *this);
198  

211  

199  
    completed_ops_.push(&task_op_);
212  
    completed_ops_.push(&task_op_);
200  
}
213  
}
201  

214  

202  
inline epoll_scheduler::~epoll_scheduler()
215  
inline epoll_scheduler::~epoll_scheduler()
203  
{
216  
{
204  
    if (timer_fd_ >= 0)
217  
    if (timer_fd_ >= 0)
205  
        ::close(timer_fd_);
218  
        ::close(timer_fd_);
206  
    if (event_fd_ >= 0)
219  
    if (event_fd_ >= 0)
207  
        ::close(event_fd_);
220  
        ::close(event_fd_);
208  
    if (epoll_fd_ >= 0)
221  
    if (epoll_fd_ >= 0)
209  
        ::close(epoll_fd_);
222  
        ::close(epoll_fd_);
210  
}
223  
}
211  

224  

212  
inline void
225  
inline void
213  
epoll_scheduler::shutdown()
226  
epoll_scheduler::shutdown()
214  
{
227  
{
215  
    shutdown_drain();
228  
    shutdown_drain();
216  

229  

217  
    if (event_fd_ >= 0)
230  
    if (event_fd_ >= 0)
218  
        interrupt_reactor();
231  
        interrupt_reactor();
219  
}
232  
}
220  

233  

221  
inline void
234  
inline void
 
235 +
epoll_scheduler::configure_reactor(
 
236 +
    unsigned max_events,
 
237 +
    unsigned budget_init,
 
238 +
    unsigned budget_max,
 
239 +
    unsigned unassisted)
 
240 +
{
 
241 +
    reactor_scheduler_base::configure_reactor(
 
242 +
        max_events, budget_init, budget_max, unassisted);
 
243 +
    event_buffer_.resize(max_events_per_poll_);
 
244 +
}
 
245 +

 
246 +
inline void
222  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
247  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
223  
{
248  
{
224  
    epoll_event ev{};
249  
    epoll_event ev{};
225  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
250  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
226  
    ev.data.ptr = desc;
251  
    ev.data.ptr = desc;
227  

252  

228  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
253  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
229  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
254  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
230  

255  

231  
    desc->registered_events = ev.events;
256  
    desc->registered_events = ev.events;
232  
    desc->fd                = fd;
257  
    desc->fd                = fd;
233  
    desc->scheduler_        = this;
258  
    desc->scheduler_        = this;
 
259 +
    desc->mutex.set_enabled(!single_threaded_);
234  
    desc->ready_events_.store(0, std::memory_order_relaxed);
260  
    desc->ready_events_.store(0, std::memory_order_relaxed);
235  

261  

236 -
    std::lock_guard lock(desc->mutex);
262 +
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
237  
    desc->impl_ref_.reset();
263  
    desc->impl_ref_.reset();
238  
    desc->read_ready  = false;
264  
    desc->read_ready  = false;
239  
    desc->write_ready = false;
265  
    desc->write_ready = false;
240  
}
266  
}
241  

267  

242  
inline void
268  
inline void
243  
epoll_scheduler::deregister_descriptor(int fd) const
269  
epoll_scheduler::deregister_descriptor(int fd) const
244  
{
270  
{
245  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
271  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
246  
}
272  
}
247  

273  

248  
inline void
274  
inline void
249  
epoll_scheduler::interrupt_reactor() const
275  
epoll_scheduler::interrupt_reactor() const
250  
{
276  
{
251  
    bool expected = false;
277  
    bool expected = false;
252  
    if (eventfd_armed_.compare_exchange_strong(
278  
    if (eventfd_armed_.compare_exchange_strong(
253  
            expected, true, std::memory_order_release,
279  
            expected, true, std::memory_order_release,
254  
            std::memory_order_relaxed))
280  
            std::memory_order_relaxed))
255  
    {
281  
    {
256  
        std::uint64_t val       = 1;
282  
        std::uint64_t val       = 1;
257  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
283  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
258  
    }
284  
    }
259  
}
285  
}
260  

286  

261  
inline void
287  
inline void
262  
epoll_scheduler::update_timerfd() const
288  
epoll_scheduler::update_timerfd() const
263  
{
289  
{
264  
    auto nearest = timer_svc_->nearest_expiry();
290  
    auto nearest = timer_svc_->nearest_expiry();
265  

291  

266  
    itimerspec ts{};
292  
    itimerspec ts{};
267  
    int flags = 0;
293  
    int flags = 0;
268  

294  

269  
    if (nearest == timer_service::time_point::max())
295  
    if (nearest == timer_service::time_point::max())
270  
    {
296  
    {
271  
        // No timers — disarm by setting to 0 (relative)
297  
        // No timers — disarm by setting to 0 (relative)
272  
    }
298  
    }
273  
    else
299  
    else
274  
    {
300  
    {
275  
        auto now = std::chrono::steady_clock::now();
301  
        auto now = std::chrono::steady_clock::now();
276  
        if (nearest <= now)
302  
        if (nearest <= now)
277  
        {
303  
        {
278  
            // Use 1ns instead of 0 — zero disarms the timerfd
304  
            // Use 1ns instead of 0 — zero disarms the timerfd
279  
            ts.it_value.tv_nsec = 1;
305  
            ts.it_value.tv_nsec = 1;
280  
        }
306  
        }
281  
        else
307  
        else
282  
        {
308  
        {
283  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
309  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
284  
                            nearest - now)
310  
                            nearest - now)
285  
                            .count();
311  
                            .count();
286  
            ts.it_value.tv_sec  = nsec / 1000000000;
312  
            ts.it_value.tv_sec  = nsec / 1000000000;
287  
            ts.it_value.tv_nsec = nsec % 1000000000;
313  
            ts.it_value.tv_nsec = nsec % 1000000000;
288  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
314  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
289  
                ts.it_value.tv_nsec = 1;
315  
                ts.it_value.tv_nsec = 1;
290  
        }
316  
        }
291  
    }
317  
    }
292  

318  

293  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
319  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
294  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
320  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
295  
}
321  
}
296  

322  

297  
inline void
323  
inline void
298  
epoll_scheduler::run_task(
324  
epoll_scheduler::run_task(
299 -
    std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
325 +
    lock_type& lock, context_type* ctx, long timeout_us)
300  
{
326  
{
301  
    int timeout_ms;
327  
    int timeout_ms;
302  
    if (task_interrupted_)
328  
    if (task_interrupted_)
303  
        timeout_ms = 0;
329  
        timeout_ms = 0;
304  
    else if (timeout_us < 0)
330  
    else if (timeout_us < 0)
305  
        timeout_ms = -1;
331  
        timeout_ms = -1;
306  
    else
332  
    else
307  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
333  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
308  

334  

309  
    if (lock.owns_lock())
335  
    if (lock.owns_lock())
310  
        lock.unlock();
336  
        lock.unlock();
311  

337  

312  
    task_cleanup on_exit{this, &lock, ctx};
338  
    task_cleanup on_exit{this, &lock, ctx};
313  

339  

314  
    // Flush deferred timerfd programming before blocking
340  
    // Flush deferred timerfd programming before blocking
315  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
341  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
316  
        update_timerfd();
342  
        update_timerfd();
317  

343  

318 -
    epoll_event events[128];
344 +
    int nfds = ::epoll_wait(
319 -
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
345 +
        epoll_fd_, event_buffer_.data(),
 
346 +
        static_cast<int>(event_buffer_.size()), timeout_ms);
320  

347  

321  
    if (nfds < 0 && errno != EINTR)
348  
    if (nfds < 0 && errno != EINTR)
322  
        detail::throw_system_error(make_err(errno), "epoll_wait");
349  
        detail::throw_system_error(make_err(errno), "epoll_wait");
323  

350  

324  
    bool check_timers = false;
351  
    bool check_timers = false;
325  
    op_queue local_ops;
352  
    op_queue local_ops;
326  

353  

327  
    for (int i = 0; i < nfds; ++i)
354  
    for (int i = 0; i < nfds; ++i)
328  
    {
355  
    {
329 -
        if (events[i].data.ptr == nullptr)
356 +
        if (event_buffer_[i].data.ptr == nullptr)
330  
        {
357  
        {
331  
            std::uint64_t val;
358  
            std::uint64_t val;
332  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
359  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
333  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
360  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
334  
            eventfd_armed_.store(false, std::memory_order_relaxed);
361  
            eventfd_armed_.store(false, std::memory_order_relaxed);
335  
            continue;
362  
            continue;
336  
        }
363  
        }
337  

364  

338 -
        if (events[i].data.ptr == &timer_fd_)
365 +
        if (event_buffer_[i].data.ptr == &timer_fd_)
339  
        {
366  
        {
340  
            std::uint64_t expirations;
367  
            std::uint64_t expirations;
341  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
368  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
342  
            [[maybe_unused]] auto r =
369  
            [[maybe_unused]] auto r =
343  
                ::read(timer_fd_, &expirations, sizeof(expirations));
370  
                ::read(timer_fd_, &expirations, sizeof(expirations));
344  
            check_timers = true;
371  
            check_timers = true;
345  
            continue;
372  
            continue;
346  
        }
373  
        }
347  

374  

348 -
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
375 +
        auto* desc =
349 -
        desc->add_ready_events(events[i].events);
376 +
            static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
 
377 +
        desc->add_ready_events(event_buffer_[i].events);
350  

378  

351  
        bool expected = false;
379  
        bool expected = false;
352  
        if (desc->is_enqueued_.compare_exchange_strong(
380  
        if (desc->is_enqueued_.compare_exchange_strong(
353  
                expected, true, std::memory_order_release,
381  
                expected, true, std::memory_order_release,
354  
                std::memory_order_relaxed))
382  
                std::memory_order_relaxed))
355  
        {
383  
        {
356  
            local_ops.push(desc);
384  
            local_ops.push(desc);
357  
        }
385  
        }
358  
    }
386  
    }
359  

387  

360  
    if (check_timers)
388  
    if (check_timers)
361  
    {
389  
    {
362  
        timer_svc_->process_expired();
390  
        timer_svc_->process_expired();
363  
        update_timerfd();
391  
        update_timerfd();
364  
    }
392  
    }
365  

393  

366  
    lock.lock();
394  
    lock.lock();
367  

395  

368  
    if (!local_ops.empty())
396  
    if (!local_ops.empty())
369  
        completed_ops_.splice(local_ops);
397  
        completed_ops_.splice(local_ops);
370  
}
398  
}
371  

399  

372  
} // namespace boost::corosio::detail
400  
} // namespace boost::corosio::detail
373  

401  

374  
#endif // BOOST_COROSIO_HAS_EPOLL
402  
#endif // BOOST_COROSIO_HAS_EPOLL
375  

403  

376  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
404  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP