include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp

84.4% Lines (124/147) 100.0% List of functions (16/16)
posix_stream_file_service.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::posix_stream_file_service::posix_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :36 515x 100.0% 88.0% boost::corosio::detail::posix_stream_file_service::~posix_stream_file_service() :45 1030x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::construct() :50 26x 100.0% 71.0% boost::corosio::detail::posix_stream_file_service::destroy(boost::corosio::io_object::implementation*) :64 26x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::close(boost::corosio::io_object::handle&) :72 43x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :82 19x 75.0% 75.0% boost::corosio::detail::posix_stream_file_service::shutdown() :92 515x 62.5% 70.0% boost::corosio::detail::posix_stream_file_service::destroy_impl(boost::corosio::detail::posix_stream_file&) :104 26x 100.0% 67.0% boost::corosio::detail::posix_stream_file_service::post(boost::corosio::detail::scheduler_op*) :111 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::pool() :126 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::get_or_create_pool(boost::capy::execution_context&) :132 515x 80.0% 67.0% boost::corosio::detail::get_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :151 515x 100.0% 100.0% boost::corosio::detail::posix_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :161 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_read_work(boost::corosio::detail::pool_work_item*) :211 6x 88.2% 83.0% boost::corosio::detail::posix_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :245 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_write_work(boost::corosio::detail::pool_work_item*) :295 6x 88.2% 83.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 #include <boost/corosio/native/native_scheduler.hpp>
19 #include <boost/corosio/detail/file_service.hpp>
20 #include <boost/corosio/detail/thread_pool.hpp>
21
22 #include <mutex>
23 #include <unordered_map>
24
25 namespace boost::corosio::detail {
26
27 /** Stream file service for POSIX backends.
28
29 Owns all posix_stream_file instances. Thread lifecycle is
30 managed by the thread_pool service (shared with resolver).
31 */
32 class BOOST_COROSIO_DECL posix_stream_file_service final
33 : public file_service
34 {
35 public:
36 515x posix_stream_file_service(
37 capy::execution_context& ctx, scheduler& sched)
38 1030x : sched_(&sched)
39 1030x , pool_(get_or_create_pool(ctx))
40 515x , single_threaded_(
41 515x static_cast<native_scheduler&>(sched).single_threaded_)
42 {
43 515x }
44
45 1030x ~posix_stream_file_service() override = default;
46
47 posix_stream_file_service(posix_stream_file_service const&) = delete;
48 posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
49
50 26x io_object::implementation* construct() override
51 {
52 26x auto ptr = std::make_shared<posix_stream_file>(*this);
53 26x auto* impl = ptr.get();
54
55 {
56 26x std::lock_guard<std::mutex> lock(mutex_);
57 26x file_list_.push_back(impl);
58 26x file_ptrs_[impl] = std::move(ptr);
59 26x }
60
61 26x return impl;
62 26x }
63
64 26x void destroy(io_object::implementation* p) override
65 {
66 26x auto& impl = static_cast<posix_stream_file&>(*p);
67 26x impl.cancel();
68 26x impl.close_file();
69 26x destroy_impl(impl);
70 26x }
71
72 43x void close(io_object::handle& h) override
73 {
74 43x if (h.get())
75 {
76 43x auto& impl = static_cast<posix_stream_file&>(*h.get());
77 43x impl.cancel();
78 43x impl.close_file();
79 }
80 43x }
81
82 19x std::error_code open_file(
83 stream_file::implementation& impl,
84 std::filesystem::path const& path,
85 file_base::flags mode) override
86 {
87 19x if (single_threaded_)
88 return std::make_error_code(std::errc::operation_not_supported);
89 19x return static_cast<posix_stream_file&>(impl).open_file(path, mode);
90 }
91
92 515x void shutdown() override
93 {
94 515x std::lock_guard<std::mutex> lock(mutex_);
95 515x for (auto* impl = file_list_.pop_front(); impl != nullptr;
96 impl = file_list_.pop_front())
97 {
98 impl->cancel();
99 impl->close_file();
100 }
101 515x file_ptrs_.clear();
102 515x }
103
104 26x void destroy_impl(posix_stream_file& impl)
105 {
106 26x std::lock_guard<std::mutex> lock(mutex_);
107 26x file_list_.remove(&impl);
108 26x file_ptrs_.erase(&impl);
109 26x }
110
111 12x void post(scheduler_op* op)
112 {
113 12x sched_->post(op);
114 12x }
115
116 void work_started() noexcept
117 {
118 sched_->work_started();
119 }
120
121 void work_finished() noexcept
122 {
123 sched_->work_finished();
124 }
125
126 12x thread_pool& pool() noexcept
127 {
128 12x return pool_;
129 }
130
131 private:
132 515x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
133 {
134 515x auto* p = ctx.find_service<thread_pool>();
135 515x if (p)
136 515x return *p;
137 return ctx.make_service<thread_pool>();
138 }
139
140 scheduler* sched_;
141 thread_pool& pool_;
142 bool single_threaded_;
143 std::mutex mutex_;
144 intrusive_list<posix_stream_file> file_list_;
145 std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
146 file_ptrs_;
147 };
148
149 /** Get or create the stream file service for the given context. */
150 inline posix_stream_file_service&
151 515x get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
152 {
153 515x return ctx.make_service<posix_stream_file_service>(sched);
154 }
155
156 // ---------------------------------------------------------------------------
157 // posix_stream_file inline implementations (require complete service type)
158 // ---------------------------------------------------------------------------
159
160 inline std::coroutine_handle<>
161 6x posix_stream_file::read_some(
162 std::coroutine_handle<> h,
163 capy::executor_ref ex,
164 buffer_param param,
165 std::stop_token token,
166 std::error_code* ec,
167 std::size_t* bytes_out)
168 {
169 6x auto& op = read_op_;
170 6x op.reset();
171 6x op.is_read = true;
172
173 6x capy::mutable_buffer bufs[max_buffers];
174 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
175
176 6x if (op.iovec_count == 0)
177 {
178 *ec = {};
179 *bytes_out = 0;
180 op.cont_op.cont.h = h;
181 return dispatch_coro(ex, op.cont_op.cont);
182 }
183
184 12x for (int i = 0; i < op.iovec_count; ++i)
185 {
186 6x op.iovecs[i].iov_base = bufs[i].data();
187 6x op.iovecs[i].iov_len = bufs[i].size();
188 }
189
190 6x op.h = h;
191 6x op.ex = ex;
192 6x op.ec_out = ec;
193 6x op.bytes_out = bytes_out;
194 6x op.start(token);
195
196 6x op.ex.on_work_started();
197
198 6x read_pool_op_.file_ = this;
199 6x read_pool_op_.ref_ = this->shared_from_this();
200 6x read_pool_op_.func_ = &posix_stream_file::do_read_work;
201 6x if (!svc_.pool().post(&read_pool_op_))
202 {
203 op.impl_ref = std::move(read_pool_op_.ref_);
204 op.cancelled.store(true, std::memory_order_release);
205 svc_.post(&read_op_);
206 }
207 6x return std::noop_coroutine();
208 }
209
210 inline void
211 6x posix_stream_file::do_read_work(pool_work_item* w) noexcept
212 {
213 6x auto* pw = static_cast<pool_op*>(w);
214 6x auto* self = pw->file_;
215 6x auto& op = self->read_op_;
216
217 6x if (!op.cancelled.load(std::memory_order_acquire))
218 {
219 ssize_t n;
220 do
221 {
222 10x n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
223 5x static_cast<off_t>(self->offset_));
224 }
225 5x while (n < 0 && errno == EINTR);
226
227 5x if (n >= 0)
228 {
229 5x op.errn = 0;
230 5x op.bytes_transferred = static_cast<std::size_t>(n);
231 5x self->offset_ += static_cast<std::uint64_t>(n);
232 }
233 else
234 {
235 op.errn = errno;
236 op.bytes_transferred = 0;
237 }
238 }
239
240 6x op.impl_ref = std::move(pw->ref_);
241 6x self->svc_.post(&op);
242 6x }
243
244 inline std::coroutine_handle<>
245 6x posix_stream_file::write_some(
246 std::coroutine_handle<> h,
247 capy::executor_ref ex,
248 buffer_param param,
249 std::stop_token token,
250 std::error_code* ec,
251 std::size_t* bytes_out)
252 {
253 6x auto& op = write_op_;
254 6x op.reset();
255 6x op.is_read = false;
256
257 6x capy::mutable_buffer bufs[max_buffers];
258 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
259
260 6x if (op.iovec_count == 0)
261 {
262 *ec = {};
263 *bytes_out = 0;
264 op.cont_op.cont.h = h;
265 return dispatch_coro(ex, op.cont_op.cont);
266 }
267
268 12x for (int i = 0; i < op.iovec_count; ++i)
269 {
270 6x op.iovecs[i].iov_base = bufs[i].data();
271 6x op.iovecs[i].iov_len = bufs[i].size();
272 }
273
274 6x op.h = h;
275 6x op.ex = ex;
276 6x op.ec_out = ec;
277 6x op.bytes_out = bytes_out;
278 6x op.start(token);
279
280 6x op.ex.on_work_started();
281
282 6x write_pool_op_.file_ = this;
283 6x write_pool_op_.ref_ = this->shared_from_this();
284 6x write_pool_op_.func_ = &posix_stream_file::do_write_work;
285 6x if (!svc_.pool().post(&write_pool_op_))
286 {
287 op.impl_ref = std::move(write_pool_op_.ref_);
288 op.cancelled.store(true, std::memory_order_release);
289 svc_.post(&write_op_);
290 }
291 6x return std::noop_coroutine();
292 }
293
294 inline void
295 6x posix_stream_file::do_write_work(pool_work_item* w) noexcept
296 {
297 6x auto* pw = static_cast<pool_op*>(w);
298 6x auto* self = pw->file_;
299 6x auto& op = self->write_op_;
300
301 6x if (!op.cancelled.load(std::memory_order_acquire))
302 {
303 ssize_t n;
304 do
305 {
306 12x n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
307 6x static_cast<off_t>(self->offset_));
308 }
309 6x while (n < 0 && errno == EINTR);
310
311 6x if (n >= 0)
312 {
313 6x op.errn = 0;
314 6x op.bytes_transferred = static_cast<std::size_t>(n);
315 6x self->offset_ += static_cast<std::uint64_t>(n);
316 }
317 else
318 {
319 op.errn = errno;
320 op.bytes_transferred = 0;
321 }
322 }
323
324 6x op.impl_ref = std::move(pw->ref_);
325 6x self->svc_.post(&op);
326 6x }
327
328 } // namespace boost::corosio::detail
329
330 #endif // BOOST_COROSIO_POSIX
331
332 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
333