include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

79.3% Lines (46/58) 85.7% List of functions (12/14)
epoll_tcp_service.hpp
f(x) Functions (14)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 322x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :112 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :121 98x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :130 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :139 44105x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :145 4718x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :150 14209x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :155 14209x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :158 4718x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :169 110004x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :181 109852x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :193 95x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :199 42599x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :205 4733x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 322x explicit epoll_tcp_service(capy::execution_context& ctx)
100 322x : reactor_socket_service(ctx)
101 {
102 322x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109 };
110
111 inline void
112 epoll_connect_op::cancel() noexcept
113 {
114 if (socket_impl_)
115 socket_impl_->cancel_single_op(*this);
116 else
117 request_cancel();
118 }
119
120 inline void
121 98x epoll_read_op::cancel() noexcept
122 {
123 98x if (socket_impl_)
124 98x socket_impl_->cancel_single_op(*this);
125 else
126 request_cancel();
127 98x }
128
129 inline void
130 epoll_write_op::cancel() noexcept
131 {
132 if (socket_impl_)
133 socket_impl_->cancel_single_op(*this);
134 else
135 request_cancel();
136 }
137
138 inline void
139 44105x epoll_op::operator()()
140 {
141 44105x complete_io_op(*this);
142 44105x }
143
144 inline void
145 4718x epoll_connect_op::operator()()
146 {
147 4718x complete_connect_op(*this);
148 4718x }
149
150 14209x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151 14209x : reactor_stream_socket(svc)
152 {
153 14209x }
154
155 14209x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156
157 inline std::coroutine_handle<>
158 4718x epoll_tcp_socket::connect(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 endpoint ep,
162 std::stop_token token,
163 std::error_code* ec)
164 {
165 4718x return do_connect(h, ex, ep, token, ec);
166 }
167
168 inline std::coroutine_handle<>
169 110004x epoll_tcp_socket::read_some(
170 std::coroutine_handle<> h,
171 capy::executor_ref ex,
172 buffer_param param,
173 std::stop_token token,
174 std::error_code* ec,
175 std::size_t* bytes_out)
176 {
177 110004x return do_read_some(h, ex, param, token, ec, bytes_out);
178 }
179
180 inline std::coroutine_handle<>
181 109852x epoll_tcp_socket::write_some(
182 std::coroutine_handle<> h,
183 capy::executor_ref ex,
184 buffer_param param,
185 std::stop_token token,
186 std::error_code* ec,
187 std::size_t* bytes_out)
188 {
189 109852x return do_write_some(h, ex, param, token, ec, bytes_out);
190 }
191
192 inline void
193 95x epoll_tcp_socket::cancel() noexcept
194 {
195 95x do_cancel();
196 95x }
197
198 inline void
199 42599x epoll_tcp_socket::close_socket() noexcept
200 {
201 42599x do_close_socket();
202 42599x }
203
204 inline std::error_code
205 4733x epoll_tcp_service::open_socket(
206 tcp_socket::implementation& impl, int family, int type, int protocol)
207 {
208 4733x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209 4733x epoll_impl->close_socket();
210
211 4733x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212 4733x if (fd < 0)
213 return make_err(errno);
214
215 4733x if (family == AF_INET6)
216 {
217 5x int one = 1;
218 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219 }
220
221 4733x epoll_impl->fd_ = fd;
222
223 // Register fd with epoll (edge-triggered mode)
224 4733x epoll_impl->desc_state_.fd = fd;
225 {
226 4733x std::lock_guard lock(epoll_impl->desc_state_.mutex);
227 4733x epoll_impl->desc_state_.read_op = nullptr;
228 4733x epoll_impl->desc_state_.write_op = nullptr;
229 4733x epoll_impl->desc_state_.connect_op = nullptr;
230 4733x }
231 4733x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232
233 4733x return {};
234 }
235
236 } // namespace boost::corosio::detail
237
238 #endif // BOOST_COROSIO_HAS_EPOLL
239
240 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
241