include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

82.2% Lines (120/146) 90.0% List of functions (9/10)
epoll_scheduler.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <cstdint>
35 #include <mutex>
36 #include <vector>
37
38 #include <errno.h>
39 #include <sys/epoll.h>
40 #include <sys/eventfd.h>
41 #include <sys/timerfd.h>
42 #include <unistd.h>
43
44 namespace boost::corosio::detail {
45
46 struct epoll_op;
47 struct descriptor_state;
48
49 /** Linux scheduler using epoll for I/O multiplexing.
50
51 This scheduler implements the scheduler interface using Linux epoll
52 for efficient I/O event notification. It uses a single reactor model
53 where one thread runs epoll_wait while other threads
54 wait on a condition variable for handler work. This design provides:
55
56 - Handler parallelism: N posted handlers can execute on N threads
57 - No thundering herd: condition_variable wakes exactly one thread
58 - IOCP parity: Behavior matches Windows I/O completion port semantics
59
60 When threads call run(), they first try to execute queued handlers.
61 If the queue is empty and no reactor is running, one thread becomes
62 the reactor and runs epoll_wait. Other threads wait on a condition
63 variable until handlers are available.
64
65 @par Thread Safety
66 All public member functions are thread-safe.
67 */
68 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
69 {
70 public:
71 /** Construct the scheduler.
72
73 Creates an epoll instance, eventfd for reactor interruption,
74 and timerfd for kernel-managed timer expiry.
75
76 @param ctx Reference to the owning execution_context.
77 @param concurrency_hint Hint for expected thread count (unused).
78 */
79 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80
81 /// Destroy the scheduler.
82 ~epoll_scheduler() override;
83
84 epoll_scheduler(epoll_scheduler const&) = delete;
85 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86
87 /// Shut down the scheduler, draining pending operations.
88 void shutdown() override;
89
90 /// Apply runtime configuration, resizing the event buffer.
91 void configure_reactor(
92 unsigned max_events,
93 unsigned budget_init,
94 unsigned budget_max,
95 unsigned unassisted) noexcept override;
96
97 /** Return the epoll file descriptor.
98
99 Used by socket services to register file descriptors
100 for I/O event notification.
101
102 @return The epoll file descriptor.
103 */
104 int epoll_fd() const noexcept
105 {
106 return epoll_fd_;
107 }
108
109 /** Register a descriptor for persistent monitoring.
110
111 The fd is registered once and stays registered until explicitly
112 deregistered. Events are dispatched via descriptor_state which
113 tracks pending read/write/connect operations.
114
115 @param fd The file descriptor to register.
116 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117 */
118 void register_descriptor(int fd, descriptor_state* desc) const;
119
120 /** Deregister a persistently registered descriptor.
121
122 @param fd The file descriptor to deregister.
123 */
124 void deregister_descriptor(int fd) const;
125
126 private:
127 void
128 run_task(lock_type& lock, context_type* ctx) override;
129 void interrupt_reactor() const override;
130 void update_timerfd() const;
131
132 int epoll_fd_;
133 int event_fd_;
134 int timer_fd_;
135
136 // Edge-triggered eventfd state
137 mutable std::atomic<bool> eventfd_armed_{false};
138
139 // Set when the earliest timer changes; flushed before epoll_wait
140 mutable std::atomic<bool> timerfd_stale_{false};
141
142 // Event buffer sized from max_events_per_poll_ (set at construction,
143 // resized by configure_reactor via io_context_options).
144 std::vector<epoll_event> event_buffer_;
145 };
146
147 320x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
148 320x : epoll_fd_(-1)
149 320x , event_fd_(-1)
150 320x , timer_fd_(-1)
151 640x , event_buffer_(max_events_per_poll_)
152 {
153 320x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
154 320x if (epoll_fd_ < 0)
155 detail::throw_system_error(make_err(errno), "epoll_create1");
156
157 320x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
158 320x if (event_fd_ < 0)
159 {
160 int errn = errno;
161 ::close(epoll_fd_);
162 detail::throw_system_error(make_err(errn), "eventfd");
163 }
164
165 320x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
166 320x if (timer_fd_ < 0)
167 {
168 int errn = errno;
169 ::close(event_fd_);
170 ::close(epoll_fd_);
171 detail::throw_system_error(make_err(errn), "timerfd_create");
172 }
173
174 320x epoll_event ev{};
175 320x ev.events = EPOLLIN | EPOLLET;
176 320x ev.data.ptr = nullptr;
177 320x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
178 {
179 int errn = errno;
180 ::close(timer_fd_);
181 ::close(event_fd_);
182 ::close(epoll_fd_);
183 detail::throw_system_error(make_err(errn), "epoll_ctl");
184 }
185
186 320x epoll_event timer_ev{};
187 320x timer_ev.events = EPOLLIN | EPOLLERR;
188 320x timer_ev.data.ptr = &timer_fd_;
189 320x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
190 {
191 int errn = errno;
192 ::close(timer_fd_);
193 ::close(event_fd_);
194 ::close(epoll_fd_);
195 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
196 }
197
198 320x timer_svc_ = &get_timer_service(ctx, *this);
199 320x timer_svc_->set_on_earliest_changed(
200 5371x timer_service::callback(this, [](void* p) {
201 5051x auto* self = static_cast<epoll_scheduler*>(p);
202 5051x self->timerfd_stale_.store(true, std::memory_order_release);
203 5051x self->interrupt_reactor();
204 5051x }));
205
206 320x get_resolver_service(ctx, *this);
207 320x get_signal_service(ctx, *this);
208 320x get_stream_file_service(ctx, *this);
209 320x get_random_access_file_service(ctx, *this);
210
211 320x completed_ops_.push(&task_op_);
212 320x }
213
214 640x inline epoll_scheduler::~epoll_scheduler()
215 {
216 320x if (timer_fd_ >= 0)
217 320x ::close(timer_fd_);
218 320x if (event_fd_ >= 0)
219 320x ::close(event_fd_);
220 320x if (epoll_fd_ >= 0)
221 320x ::close(epoll_fd_);
222 640x }
223
224 inline void
225 320x epoll_scheduler::shutdown()
226 {
227 320x shutdown_drain();
228
229 320x if (event_fd_ >= 0)
230 320x interrupt_reactor();
231 320x }
232
233 inline void
234 epoll_scheduler::configure_reactor(
235 unsigned max_events,
236 unsigned budget_init,
237 unsigned budget_max,
238 unsigned unassisted) noexcept
239 {
240 reactor_scheduler_base::configure_reactor(
241 max_events, budget_init, budget_max, unassisted);
242 event_buffer_.resize(max_events_per_poll_);
243 }
244
245 inline void
246 9765x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
247 {
248 9765x epoll_event ev{};
249 9765x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
250 9765x ev.data.ptr = desc;
251
252 9765x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
253 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
254
255 9765x desc->registered_events = ev.events;
256 9765x desc->fd = fd;
257 9765x desc->scheduler_ = this;
258 9765x desc->mutex.set_enabled(!single_threaded_);
259 9765x desc->ready_events_.store(0, std::memory_order_relaxed);
260
261 9765x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
262 9765x desc->impl_ref_.reset();
263 9765x desc->read_ready = false;
264 9765x desc->write_ready = false;
265 9765x }
266
267 inline void
268 9765x epoll_scheduler::deregister_descriptor(int fd) const
269 {
270 9765x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
271 9765x }
272
273 inline void
274 5655x epoll_scheduler::interrupt_reactor() const
275 {
276 5655x bool expected = false;
277 5655x if (eventfd_armed_.compare_exchange_strong(
278 expected, true, std::memory_order_release,
279 std::memory_order_relaxed))
280 {
281 5446x std::uint64_t val = 1;
282 5446x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
283 }
284 5655x }
285
286 inline void
287 10060x epoll_scheduler::update_timerfd() const
288 {
289 10060x auto nearest = timer_svc_->nearest_expiry();
290
291 10060x itimerspec ts{};
292 10060x int flags = 0;
293
294 10060x if (nearest == timer_service::time_point::max())
295 {
296 // No timers — disarm by setting to 0 (relative)
297 }
298 else
299 {
300 10006x auto now = std::chrono::steady_clock::now();
301 10006x if (nearest <= now)
302 {
303 // Use 1ns instead of 0 — zero disarms the timerfd
304 151x ts.it_value.tv_nsec = 1;
305 }
306 else
307 {
308 9855x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
309 9855x nearest - now)
310 9855x .count();
311 9855x ts.it_value.tv_sec = nsec / 1000000000;
312 9855x ts.it_value.tv_nsec = nsec % 1000000000;
313 9855x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
314 ts.it_value.tv_nsec = 1;
315 }
316 }
317
318 10060x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
319 detail::throw_system_error(make_err(errno), "timerfd_settime");
320 10060x }
321
322 inline void
323 38631x epoll_scheduler::run_task(lock_type& lock, context_type* ctx)
324 {
325 38631x int timeout_ms = task_interrupted_ ? 0 : -1;
326
327 38631x if (lock.owns_lock())
328 14509x lock.unlock();
329
330 38631x task_cleanup on_exit{this, &lock, ctx};
331
332 // Flush deferred timerfd programming before blocking
333 38631x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
334 5029x update_timerfd();
335
336 38631x int nfds = ::epoll_wait(
337 epoll_fd_, event_buffer_.data(),
338 38631x static_cast<int>(event_buffer_.size()), timeout_ms);
339
340 38631x if (nfds < 0 && errno != EINTR)
341 detail::throw_system_error(make_err(errno), "epoll_wait");
342
343 38631x bool check_timers = false;
344 38631x op_queue local_ops;
345
346 86465x for (int i = 0; i < nfds; ++i)
347 {
348 47834x if (event_buffer_[i].data.ptr == nullptr)
349 {
350 std::uint64_t val;
351 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
352 5126x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
353 5126x eventfd_armed_.store(false, std::memory_order_relaxed);
354 5126x continue;
355 5126x }
356
357 42708x if (event_buffer_[i].data.ptr == &timer_fd_)
358 {
359 std::uint64_t expirations;
360 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
361 [[maybe_unused]] auto r =
362 5031x ::read(timer_fd_, &expirations, sizeof(expirations));
363 5031x check_timers = true;
364 5031x continue;
365 5031x }
366
367 auto* desc =
368 37677x static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
369 37677x desc->add_ready_events(event_buffer_[i].events);
370
371 37677x bool expected = false;
372 37677x if (desc->is_enqueued_.compare_exchange_strong(
373 expected, true, std::memory_order_release,
374 std::memory_order_relaxed))
375 {
376 37677x local_ops.push(desc);
377 }
378 }
379
380 38631x if (check_timers)
381 {
382 5031x timer_svc_->process_expired();
383 5031x update_timerfd();
384 }
385
386 38631x lock.lock();
387
388 38631x if (!local_ops.empty())
389 23581x completed_ops_.splice(local_ops);
390 38631x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_EPOLL
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
397