TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <cstdint>
35 : #include <mutex>
36 : #include <vector>
37 :
38 : #include <errno.h>
39 : #include <sys/epoll.h>
40 : #include <sys/eventfd.h>
41 : #include <sys/timerfd.h>
42 : #include <unistd.h>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : struct epoll_op;
47 : struct descriptor_state;
48 :
49 : /** Linux scheduler using epoll for I/O multiplexing.
50 :
51 : This scheduler implements the scheduler interface using Linux epoll
52 : for efficient I/O event notification. It uses a single reactor model
53 : where one thread runs epoll_wait while other threads
54 : wait on a condition variable for handler work. This design provides:
55 :
56 : - Handler parallelism: N posted handlers can execute on N threads
57 : - No thundering herd: condition_variable wakes exactly one thread
58 : - IOCP parity: Behavior matches Windows I/O completion port semantics
59 :
60 : When threads call run(), they first try to execute queued handlers.
61 : If the queue is empty and no reactor is running, one thread becomes
62 : the reactor and runs epoll_wait. Other threads wait on a condition
63 : variable until handlers are available.
64 :
65 : @par Thread Safety
66 : All public member functions are thread-safe.
67 : */
68 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
69 : {
70 : public:
71 : /** Construct the scheduler.
72 :
73 : Creates an epoll instance, eventfd for reactor interruption,
74 : and timerfd for kernel-managed timer expiry.
75 :
76 : @param ctx Reference to the owning execution_context.
77 : @param concurrency_hint Hint for expected thread count (unused).
78 : */
79 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80 :
81 : /// Destroy the scheduler.
82 : ~epoll_scheduler() override;
83 :
84 : epoll_scheduler(epoll_scheduler const&) = delete;
85 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86 :
87 : /// Shut down the scheduler, draining pending operations.
88 : void shutdown() override;
89 :
90 : /// Apply runtime configuration, resizing the event buffer.
91 : void configure_reactor(
92 : unsigned max_events,
93 : unsigned budget_init,
94 : unsigned budget_max,
95 : unsigned unassisted) noexcept override;
96 :
97 : /** Return the epoll file descriptor.
98 :
99 : Used by socket services to register file descriptors
100 : for I/O event notification.
101 :
102 : @return The epoll file descriptor.
103 : */
104 : int epoll_fd() const noexcept
105 : {
106 : return epoll_fd_;
107 : }
108 :
109 : /** Register a descriptor for persistent monitoring.
110 :
111 : The fd is registered once and stays registered until explicitly
112 : deregistered. Events are dispatched via descriptor_state which
113 : tracks pending read/write/connect operations.
114 :
115 : @param fd The file descriptor to register.
116 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117 : */
118 : void register_descriptor(int fd, descriptor_state* desc) const;
119 :
120 : /** Deregister a persistently registered descriptor.
121 :
122 : @param fd The file descriptor to deregister.
123 : */
124 : void deregister_descriptor(int fd) const;
125 :
126 : private:
127 : void
128 : run_task(lock_type& lock, context_type* ctx) override;
129 : void interrupt_reactor() const override;
130 : void update_timerfd() const;
131 :
132 : int epoll_fd_;
133 : int event_fd_;
134 : int timer_fd_;
135 :
136 : // Edge-triggered eventfd state
137 : mutable std::atomic<bool> eventfd_armed_{false};
138 :
139 : // Set when the earliest timer changes; flushed before epoll_wait
140 : mutable std::atomic<bool> timerfd_stale_{false};
141 :
142 : // Event buffer sized from max_events_per_poll_ (set at construction,
143 : // resized by configure_reactor via io_context_options).
144 : std::vector<epoll_event> event_buffer_;
145 : };
146 :
147 HIT 320 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
148 320 : : epoll_fd_(-1)
149 320 : , event_fd_(-1)
150 320 : , timer_fd_(-1)
151 640 : , event_buffer_(max_events_per_poll_)
152 : {
153 320 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
154 320 : if (epoll_fd_ < 0)
155 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
156 :
157 HIT 320 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
158 320 : if (event_fd_ < 0)
159 : {
160 MIS 0 : int errn = errno;
161 0 : ::close(epoll_fd_);
162 0 : detail::throw_system_error(make_err(errn), "eventfd");
163 : }
164 :
165 HIT 320 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
166 320 : if (timer_fd_ < 0)
167 : {
168 MIS 0 : int errn = errno;
169 0 : ::close(event_fd_);
170 0 : ::close(epoll_fd_);
171 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
172 : }
173 :
174 HIT 320 : epoll_event ev{};
175 320 : ev.events = EPOLLIN | EPOLLET;
176 320 : ev.data.ptr = nullptr;
177 320 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
178 : {
179 MIS 0 : int errn = errno;
180 0 : ::close(timer_fd_);
181 0 : ::close(event_fd_);
182 0 : ::close(epoll_fd_);
183 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
184 : }
185 :
186 HIT 320 : epoll_event timer_ev{};
187 320 : timer_ev.events = EPOLLIN | EPOLLERR;
188 320 : timer_ev.data.ptr = &timer_fd_;
189 320 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
190 : {
191 MIS 0 : int errn = errno;
192 0 : ::close(timer_fd_);
193 0 : ::close(event_fd_);
194 0 : ::close(epoll_fd_);
195 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
196 : }
197 :
198 HIT 320 : timer_svc_ = &get_timer_service(ctx, *this);
199 320 : timer_svc_->set_on_earliest_changed(
200 5371 : timer_service::callback(this, [](void* p) {
201 5051 : auto* self = static_cast<epoll_scheduler*>(p);
202 5051 : self->timerfd_stale_.store(true, std::memory_order_release);
203 5051 : self->interrupt_reactor();
204 5051 : }));
205 :
206 320 : get_resolver_service(ctx, *this);
207 320 : get_signal_service(ctx, *this);
208 320 : get_stream_file_service(ctx, *this);
209 320 : get_random_access_file_service(ctx, *this);
210 :
211 320 : completed_ops_.push(&task_op_);
212 320 : }
213 :
214 640 : inline epoll_scheduler::~epoll_scheduler()
215 : {
216 320 : if (timer_fd_ >= 0)
217 320 : ::close(timer_fd_);
218 320 : if (event_fd_ >= 0)
219 320 : ::close(event_fd_);
220 320 : if (epoll_fd_ >= 0)
221 320 : ::close(epoll_fd_);
222 640 : }
223 :
224 : inline void
225 320 : epoll_scheduler::shutdown()
226 : {
227 320 : shutdown_drain();
228 :
229 320 : if (event_fd_ >= 0)
230 320 : interrupt_reactor();
231 320 : }
232 :
233 : inline void
234 MIS 0 : epoll_scheduler::configure_reactor(
235 : unsigned max_events,
236 : unsigned budget_init,
237 : unsigned budget_max,
238 : unsigned unassisted) noexcept
239 : {
240 0 : reactor_scheduler_base::configure_reactor(
241 : max_events, budget_init, budget_max, unassisted);
242 0 : event_buffer_.resize(max_events_per_poll_);
243 0 : }
244 :
245 : inline void
246 HIT 9765 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
247 : {
248 9765 : epoll_event ev{};
249 9765 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
250 9765 : ev.data.ptr = desc;
251 :
252 9765 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
253 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
254 :
255 HIT 9765 : desc->registered_events = ev.events;
256 9765 : desc->fd = fd;
257 9765 : desc->scheduler_ = this;
258 9765 : desc->mutex.set_enabled(!single_threaded_);
259 9765 : desc->ready_events_.store(0, std::memory_order_relaxed);
260 :
261 9765 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
262 9765 : desc->impl_ref_.reset();
263 9765 : desc->read_ready = false;
264 9765 : desc->write_ready = false;
265 9765 : }
266 :
267 : inline void
268 9765 : epoll_scheduler::deregister_descriptor(int fd) const
269 : {
270 9765 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
271 9765 : }
272 :
273 : inline void
274 5655 : epoll_scheduler::interrupt_reactor() const
275 : {
276 5655 : bool expected = false;
277 5655 : if (eventfd_armed_.compare_exchange_strong(
278 : expected, true, std::memory_order_release,
279 : std::memory_order_relaxed))
280 : {
281 5446 : std::uint64_t val = 1;
282 5446 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
283 : }
284 5655 : }
285 :
286 : inline void
287 10060 : epoll_scheduler::update_timerfd() const
288 : {
289 10060 : auto nearest = timer_svc_->nearest_expiry();
290 :
291 10060 : itimerspec ts{};
292 10060 : int flags = 0;
293 :
294 10060 : if (nearest == timer_service::time_point::max())
295 : {
296 : // No timers — disarm by setting to 0 (relative)
297 : }
298 : else
299 : {
300 10006 : auto now = std::chrono::steady_clock::now();
301 10006 : if (nearest <= now)
302 : {
303 : // Use 1ns instead of 0 — zero disarms the timerfd
304 151 : ts.it_value.tv_nsec = 1;
305 : }
306 : else
307 : {
308 9855 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
309 9855 : nearest - now)
310 9855 : .count();
311 9855 : ts.it_value.tv_sec = nsec / 1000000000;
312 9855 : ts.it_value.tv_nsec = nsec % 1000000000;
313 9855 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
314 MIS 0 : ts.it_value.tv_nsec = 1;
315 : }
316 : }
317 :
318 HIT 10060 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
319 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
320 HIT 10060 : }
321 :
322 : inline void
323 38631 : epoll_scheduler::run_task(lock_type& lock, context_type* ctx)
324 : {
325 38631 : int timeout_ms = task_interrupted_ ? 0 : -1;
326 :
327 38631 : if (lock.owns_lock())
328 14509 : lock.unlock();
329 :
330 38631 : task_cleanup on_exit{this, &lock, ctx};
331 :
332 : // Flush deferred timerfd programming before blocking
333 38631 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
334 5029 : update_timerfd();
335 :
336 38631 : int nfds = ::epoll_wait(
337 : epoll_fd_, event_buffer_.data(),
338 38631 : static_cast<int>(event_buffer_.size()), timeout_ms);
339 :
340 38631 : if (nfds < 0 && errno != EINTR)
341 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
342 :
343 HIT 38631 : bool check_timers = false;
344 38631 : op_queue local_ops;
345 :
346 86465 : for (int i = 0; i < nfds; ++i)
347 : {
348 47834 : if (event_buffer_[i].data.ptr == nullptr)
349 : {
350 : std::uint64_t val;
351 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
352 5126 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
353 5126 : eventfd_armed_.store(false, std::memory_order_relaxed);
354 5126 : continue;
355 5126 : }
356 :
357 42708 : if (event_buffer_[i].data.ptr == &timer_fd_)
358 : {
359 : std::uint64_t expirations;
360 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
361 : [[maybe_unused]] auto r =
362 5031 : ::read(timer_fd_, &expirations, sizeof(expirations));
363 5031 : check_timers = true;
364 5031 : continue;
365 5031 : }
366 :
367 : auto* desc =
368 37677 : static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
369 37677 : desc->add_ready_events(event_buffer_[i].events);
370 :
371 37677 : bool expected = false;
372 37677 : if (desc->is_enqueued_.compare_exchange_strong(
373 : expected, true, std::memory_order_release,
374 : std::memory_order_relaxed))
375 : {
376 37677 : local_ops.push(desc);
377 : }
378 : }
379 :
380 38631 : if (check_timers)
381 : {
382 5031 : timer_svc_->process_expired();
383 5031 : update_timerfd();
384 : }
385 :
386 38631 : lock.lock();
387 :
388 38631 : if (!local_ops.empty())
389 23581 : completed_ops_.splice(local_ops);
390 38631 : }
391 :
392 : } // namespace boost::corosio::detail
393 :
394 : #endif // BOOST_COROSIO_HAS_EPOLL
395 :
396 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|