include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

82.7% Lines (124/150) 90.0% List of functions (9/10)
epoll_scheduler.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <cstdint>
35 #include <mutex>
36 #include <vector>
37
38 #include <errno.h>
39 #include <sys/epoll.h>
40 #include <sys/eventfd.h>
41 #include <sys/timerfd.h>
42 #include <unistd.h>
43
44 namespace boost::corosio::detail {
45
46 struct epoll_op;
47 struct descriptor_state;
48
49 /** Linux scheduler using epoll for I/O multiplexing.
50
51 This scheduler implements the scheduler interface using Linux epoll
52 for efficient I/O event notification. It uses a single reactor model
53 where one thread runs epoll_wait while other threads
54 wait on a condition variable for handler work. This design provides:
55
56 - Handler parallelism: N posted handlers can execute on N threads
57 - No thundering herd: condition_variable wakes exactly one thread
58 - IOCP parity: Behavior matches Windows I/O completion port semantics
59
60 When threads call run(), they first try to execute queued handlers.
61 If the queue is empty and no reactor is running, one thread becomes
62 the reactor and runs epoll_wait. Other threads wait on a condition
63 variable until handlers are available.
64
65 @par Thread Safety
66 All public member functions are thread-safe.
67 */
68 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
69 {
70 public:
71 /** Construct the scheduler.
72
73 Creates an epoll instance, eventfd for reactor interruption,
74 and timerfd for kernel-managed timer expiry.
75
76 @param ctx Reference to the owning execution_context.
77 @param concurrency_hint Hint for expected thread count (unused).
78 */
79 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80
81 /// Destroy the scheduler.
82 ~epoll_scheduler() override;
83
84 epoll_scheduler(epoll_scheduler const&) = delete;
85 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86
87 /// Shut down the scheduler, draining pending operations.
88 void shutdown() override;
89
90 /// Apply runtime configuration, resizing the event buffer.
91 void configure_reactor(
92 unsigned max_events,
93 unsigned budget_init,
94 unsigned budget_max,
95 unsigned unassisted) override;
96
97 /** Return the epoll file descriptor.
98
99 Used by socket services to register file descriptors
100 for I/O event notification.
101
102 @return The epoll file descriptor.
103 */
104 int epoll_fd() const noexcept
105 {
106 return epoll_fd_;
107 }
108
109 /** Register a descriptor for persistent monitoring.
110
111 The fd is registered once and stays registered until explicitly
112 deregistered. Events are dispatched via descriptor_state which
113 tracks pending read/write/connect operations.
114
115 @param fd The file descriptor to register.
116 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117 */
118 void register_descriptor(int fd, descriptor_state* desc) const;
119
120 /** Deregister a persistently registered descriptor.
121
122 @param fd The file descriptor to deregister.
123 */
124 void deregister_descriptor(int fd) const;
125
126 private:
127 void
128 run_task(lock_type& lock, context_type* ctx,
129 long timeout_us) override;
130 void interrupt_reactor() const override;
131 void update_timerfd() const;
132
133 int epoll_fd_;
134 int event_fd_;
135 int timer_fd_;
136
137 // Edge-triggered eventfd state
138 mutable std::atomic<bool> eventfd_armed_{false};
139
140 // Set when the earliest timer changes; flushed before epoll_wait
141 mutable std::atomic<bool> timerfd_stale_{false};
142
143 // Event buffer sized from max_events_per_poll_ (set at construction,
144 // resized by configure_reactor via io_context_options).
145 std::vector<epoll_event> event_buffer_;
146 };
147
148 322x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
149 322x : epoll_fd_(-1)
150 322x , event_fd_(-1)
151 322x , timer_fd_(-1)
152 644x , event_buffer_(max_events_per_poll_)
153 {
154 322x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
155 322x if (epoll_fd_ < 0)
156 detail::throw_system_error(make_err(errno), "epoll_create1");
157
158 322x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
159 322x if (event_fd_ < 0)
160 {
161 int errn = errno;
162 ::close(epoll_fd_);
163 detail::throw_system_error(make_err(errn), "eventfd");
164 }
165
166 322x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
167 322x if (timer_fd_ < 0)
168 {
169 int errn = errno;
170 ::close(event_fd_);
171 ::close(epoll_fd_);
172 detail::throw_system_error(make_err(errn), "timerfd_create");
173 }
174
175 322x epoll_event ev{};
176 322x ev.events = EPOLLIN | EPOLLET;
177 322x ev.data.ptr = nullptr;
178 322x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
179 {
180 int errn = errno;
181 ::close(timer_fd_);
182 ::close(event_fd_);
183 ::close(epoll_fd_);
184 detail::throw_system_error(make_err(errn), "epoll_ctl");
185 }
186
187 322x epoll_event timer_ev{};
188 322x timer_ev.events = EPOLLIN | EPOLLERR;
189 322x timer_ev.data.ptr = &timer_fd_;
190 322x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
191 {
192 int errn = errno;
193 ::close(timer_fd_);
194 ::close(event_fd_);
195 ::close(epoll_fd_);
196 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
197 }
198
199 322x timer_svc_ = &get_timer_service(ctx, *this);
200 322x timer_svc_->set_on_earliest_changed(
201 5268x timer_service::callback(this, [](void* p) {
202 4946x auto* self = static_cast<epoll_scheduler*>(p);
203 4946x self->timerfd_stale_.store(true, std::memory_order_release);
204 4946x self->interrupt_reactor();
205 4946x }));
206
207 322x get_resolver_service(ctx, *this);
208 322x get_signal_service(ctx, *this);
209 322x get_stream_file_service(ctx, *this);
210 322x get_random_access_file_service(ctx, *this);
211
212 322x completed_ops_.push(&task_op_);
213 322x }
214
215 644x inline epoll_scheduler::~epoll_scheduler()
216 {
217 322x if (timer_fd_ >= 0)
218 322x ::close(timer_fd_);
219 322x if (event_fd_ >= 0)
220 322x ::close(event_fd_);
221 322x if (epoll_fd_ >= 0)
222 322x ::close(epoll_fd_);
223 644x }
224
225 inline void
226 322x epoll_scheduler::shutdown()
227 {
228 322x shutdown_drain();
229
230 322x if (event_fd_ >= 0)
231 322x interrupt_reactor();
232 322x }
233
234 inline void
235 epoll_scheduler::configure_reactor(
236 unsigned max_events,
237 unsigned budget_init,
238 unsigned budget_max,
239 unsigned unassisted)
240 {
241 reactor_scheduler_base::configure_reactor(
242 max_events, budget_init, budget_max, unassisted);
243 event_buffer_.resize(max_events_per_poll_);
244 }
245
246 inline void
247 9560x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
248 {
249 9560x epoll_event ev{};
250 9560x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
251 9560x ev.data.ptr = desc;
252
253 9560x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
254 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
255
256 9560x desc->registered_events = ev.events;
257 9560x desc->fd = fd;
258 9560x desc->scheduler_ = this;
259 9560x desc->mutex.set_enabled(!single_threaded_);
260 9560x desc->ready_events_.store(0, std::memory_order_relaxed);
261
262 9560x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
263 9560x desc->impl_ref_.reset();
264 9560x desc->read_ready = false;
265 9560x desc->write_ready = false;
266 9560x }
267
268 inline void
269 9560x epoll_scheduler::deregister_descriptor(int fd) const
270 {
271 9560x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
272 9560x }
273
274 inline void
275 5555x epoll_scheduler::interrupt_reactor() const
276 {
277 5555x bool expected = false;
278 5555x if (eventfd_armed_.compare_exchange_strong(
279 expected, true, std::memory_order_release,
280 std::memory_order_relaxed))
281 {
282 5343x std::uint64_t val = 1;
283 5343x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
284 }
285 5555x }
286
287 inline void
288 9850x epoll_scheduler::update_timerfd() const
289 {
290 9850x auto nearest = timer_svc_->nearest_expiry();
291
292 9850x itimerspec ts{};
293 9850x int flags = 0;
294
295 9850x if (nearest == timer_service::time_point::max())
296 {
297 // No timers — disarm by setting to 0 (relative)
298 }
299 else
300 {
301 9796x auto now = std::chrono::steady_clock::now();
302 9796x if (nearest <= now)
303 {
304 // Use 1ns instead of 0 — zero disarms the timerfd
305 236x ts.it_value.tv_nsec = 1;
306 }
307 else
308 {
309 9560x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
310 9560x nearest - now)
311 9560x .count();
312 9560x ts.it_value.tv_sec = nsec / 1000000000;
313 9560x ts.it_value.tv_nsec = nsec % 1000000000;
314 9560x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
315 ts.it_value.tv_nsec = 1;
316 }
317 }
318
319 9850x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
320 detail::throw_system_error(make_err(errno), "timerfd_settime");
321 9850x }
322
323 inline void
324 39888x epoll_scheduler::run_task(
325 lock_type& lock, context_type* ctx, long timeout_us)
326 {
327 int timeout_ms;
328 39888x if (task_interrupted_)
329 25760x timeout_ms = 0;
330 14128x else if (timeout_us < 0)
331 14120x timeout_ms = -1;
332 else
333 8x timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
334
335 39888x if (lock.owns_lock())
336 14128x lock.unlock();
337
338 39888x task_cleanup on_exit{this, &lock, ctx};
339
340 // Flush deferred timerfd programming before blocking
341 39888x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
342 4924x update_timerfd();
343
344 39888x int nfds = ::epoll_wait(
345 epoll_fd_, event_buffer_.data(),
346 39888x static_cast<int>(event_buffer_.size()), timeout_ms);
347
348 39888x if (nfds < 0 && errno != EINTR)
349 detail::throw_system_error(make_err(errno), "epoll_wait");
350
351 39888x bool check_timers = false;
352 39888x op_queue local_ops;
353
354 89927x for (int i = 0; i < nfds; ++i)
355 {
356 50039x if (event_buffer_[i].data.ptr == nullptr)
357 {
358 std::uint64_t val;
359 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
360 5021x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
361 5021x eventfd_armed_.store(false, std::memory_order_relaxed);
362 5021x continue;
363 5021x }
364
365 45018x if (event_buffer_[i].data.ptr == &timer_fd_)
366 {
367 std::uint64_t expirations;
368 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
369 [[maybe_unused]] auto r =
370 4926x ::read(timer_fd_, &expirations, sizeof(expirations));
371 4926x check_timers = true;
372 4926x continue;
373 4926x }
374
375 auto* desc =
376 40092x static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
377 40092x desc->add_ready_events(event_buffer_[i].events);
378
379 40092x bool expected = false;
380 40092x if (desc->is_enqueued_.compare_exchange_strong(
381 expected, true, std::memory_order_release,
382 std::memory_order_relaxed))
383 {
384 40092x local_ops.push(desc);
385 }
386 }
387
388 39888x if (check_timers)
389 {
390 4926x timer_svc_->process_expired();
391 4926x update_timerfd();
392 }
393
394 39888x lock.lock();
395
396 39888x if (!local_ops.empty())
397 25223x completed_ops_.splice(local_ops);
398 39888x }
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_EPOLL
403
404 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
405