TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15 :
16 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
17 :
18 : #include <atomic>
19 : #include <cstdint>
20 : #include <memory>
21 :
22 : #include <errno.h>
23 : #include <sys/socket.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /// Shared reactor event constants.
28 : /// These match epoll numeric values; kqueue maps its events to the same.
29 : static constexpr std::uint32_t reactor_event_read = 0x001;
30 : static constexpr std::uint32_t reactor_event_write = 0x004;
31 : static constexpr std::uint32_t reactor_event_error = 0x008;
32 :
33 : /** Per-descriptor state shared across reactor backends.
34 :
35 : Tracks pending operations for a file descriptor. The fd is registered
36 : once with the reactor and stays registered until closed. Uses deferred
37 : I/O: the reactor sets ready_events atomically, then enqueues this state.
38 : When popped by the scheduler, invoke_deferred_io() performs I/O under
39 : the mutex and queues completed ops.
40 :
41 : Non-template: uses reactor_op_base pointers so the scheduler and
42 : descriptor_state code exist as a single copy in the binary regardless
43 : of how many backends are compiled in.
44 :
45 : @par Thread Safety
46 : The mutex protects operation pointers and ready flags. ready_events_
47 : and is_enqueued_ are atomic for lock-free reactor access.
48 : */
49 : struct reactor_descriptor_state : scheduler_op
50 : {
51 : /// Protects operation pointers and ready/cancel flags.
52 : /// Becomes a no-op in single-threaded mode.
53 : conditionally_enabled_mutex mutex{true};
54 :
55 : /// Pending read operation (guarded by `mutex`).
56 : reactor_op_base* read_op = nullptr;
57 :
58 : /// Pending write operation (guarded by `mutex`).
59 : reactor_op_base* write_op = nullptr;
60 :
61 : /// Pending connect operation (guarded by `mutex`).
62 : reactor_op_base* connect_op = nullptr;
63 :
64 : /// True if a read edge event arrived before an op was registered.
65 : bool read_ready = false;
66 :
67 : /// True if a write edge event arrived before an op was registered.
68 : bool write_ready = false;
69 :
70 : /// Deferred read cancellation (IOCP-style cancel semantics).
71 : bool read_cancel_pending = false;
72 :
73 : /// Deferred write cancellation (IOCP-style cancel semantics).
74 : bool write_cancel_pending = false;
75 :
76 : /// Deferred connect cancellation (IOCP-style cancel semantics).
77 : bool connect_cancel_pending = false;
78 :
79 : /// Event mask set during registration (no mutex needed).
80 : std::uint32_t registered_events = 0;
81 :
82 : /// File descriptor this state tracks.
83 : int fd = -1;
84 :
85 : /// Accumulated ready events (set by reactor, read by scheduler).
86 : std::atomic<std::uint32_t> ready_events_{0};
87 :
88 : /// True while this state is queued in the scheduler's completed_ops.
89 : std::atomic<bool> is_enqueued_{false};
90 :
91 : /// Owning scheduler for posting completions.
92 : reactor_scheduler_base const* scheduler_ = nullptr;
93 :
94 : /// Prevents impl destruction while queued in the scheduler.
95 : std::shared_ptr<void> impl_ref_;
96 :
97 : /// Add ready events atomically.
98 : /// Release pairs with the consumer's acquire exchange on
99 : /// ready_events_ so the consumer sees all flags. On x86 (TSO)
100 : /// this compiles to the same LOCK OR as relaxed.
101 HIT 169839 : void add_ready_events(std::uint32_t ev) noexcept
102 : {
103 169839 : ready_events_.fetch_or(ev, std::memory_order_release);
104 169839 : }
105 :
106 : /// Invoke deferred I/O and dispatch completions.
107 169770 : void operator()() override
108 : {
109 169770 : invoke_deferred_io();
110 169770 : }
111 :
112 : /// Destroy without invoking.
113 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
114 : /// the self-referential cycle set by close_socket().
115 69 : void destroy() override
116 : {
117 69 : impl_ref_.reset();
118 69 : }
119 :
120 : /** Perform deferred I/O and queue completions.
121 :
122 : Performs I/O under the mutex and queues completed ops. EAGAIN
123 : ops stay parked in their slot for re-delivery on the next
124 : edge event.
125 : */
126 : void invoke_deferred_io();
127 : };
128 :
129 : inline void
130 169770 : reactor_descriptor_state::invoke_deferred_io()
131 : {
132 169770 : std::shared_ptr<void> prevent_impl_destruction;
133 169770 : op_queue local_ops;
134 :
135 : {
136 169770 : conditionally_enabled_mutex::scoped_lock lock(mutex);
137 :
138 : // Must clear is_enqueued_ and move impl_ref_ under the same
139 : // lock that processes I/O. close_socket() checks is_enqueued_
140 : // under this mutex — without atomicity between the flag store
141 : // and the ref move, close_socket() could see is_enqueued_==false,
142 : // skip setting impl_ref_, and destroy the impl under us.
143 169770 : prevent_impl_destruction = std::move(impl_ref_);
144 169770 : is_enqueued_.store(false, std::memory_order_release);
145 :
146 169770 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
147 169770 : if (ev == 0)
148 : {
149 : // Mutex unlocks here; compensate for work_cleanup's decrement
150 MIS 0 : scheduler_->compensating_work_started();
151 0 : return;
152 : }
153 :
154 HIT 169770 : int err = 0;
155 169770 : if (ev & reactor_event_error)
156 : {
157 1 : socklen_t len = sizeof(err);
158 1 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
159 MIS 0 : err = errno;
160 HIT 1 : if (err == 0)
161 MIS 0 : err = EIO;
162 : }
163 :
164 HIT 169770 : if (ev & reactor_event_read)
165 : {
166 141269 : if (read_op)
167 : {
168 8492 : auto* rd = read_op;
169 8492 : if (err)
170 MIS 0 : rd->complete(err, 0);
171 : else
172 HIT 8492 : rd->perform_io();
173 :
174 8492 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
175 : {
176 44 : rd->errn = 0;
177 : }
178 : else
179 : {
180 8448 : read_op = nullptr;
181 8448 : local_ops.push(rd);
182 : }
183 : }
184 : else
185 : {
186 132777 : read_ready = true;
187 : }
188 : }
189 169770 : if (ev & reactor_event_write)
190 : {
191 36355 : bool had_write_op = (connect_op || write_op);
192 36355 : if (connect_op)
193 : {
194 8358 : auto* cn = connect_op;
195 8358 : if (err)
196 1 : cn->complete(err, 0);
197 : else
198 8357 : cn->perform_io();
199 8358 : connect_op = nullptr;
200 8358 : local_ops.push(cn);
201 : }
202 36355 : if (write_op)
203 : {
204 MIS 0 : auto* wr = write_op;
205 0 : if (err)
206 0 : wr->complete(err, 0);
207 : else
208 0 : wr->perform_io();
209 :
210 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
211 : {
212 0 : wr->errn = 0;
213 : }
214 : else
215 : {
216 0 : write_op = nullptr;
217 0 : local_ops.push(wr);
218 : }
219 : }
220 HIT 36355 : if (!had_write_op)
221 27997 : write_ready = true;
222 : }
223 169770 : if (err)
224 : {
225 1 : if (read_op)
226 : {
227 MIS 0 : read_op->complete(err, 0);
228 0 : local_ops.push(std::exchange(read_op, nullptr));
229 : }
230 HIT 1 : if (write_op)
231 : {
232 MIS 0 : write_op->complete(err, 0);
233 0 : local_ops.push(std::exchange(write_op, nullptr));
234 : }
235 HIT 1 : if (connect_op)
236 : {
237 MIS 0 : connect_op->complete(err, 0);
238 0 : local_ops.push(std::exchange(connect_op, nullptr));
239 : }
240 : }
241 HIT 169770 : }
242 :
243 : // Execute first handler inline — the scheduler's work_cleanup
244 : // accounts for this as the "consumed" work item
245 169770 : scheduler_op* first = local_ops.pop();
246 169770 : if (first)
247 : {
248 16806 : scheduler_->post_deferred_completions(local_ops);
249 16806 : (*first)();
250 : }
251 : else
252 : {
253 152964 : scheduler_->compensating_work_started();
254 : }
255 169770 : }
256 :
257 : } // namespace boost::corosio::detail
258 :
259 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
|