include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp

86.6% Lines (123/142) 100.0% List of functions (15/15)
posix_random_access_file_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::posix_random_access_file_service::posix_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :33 515x 100.0% 88.0% boost::corosio::detail::posix_random_access_file_service::~posix_random_access_file_service() :42 1030x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::construct() :49 24x 100.0% 71.0% boost::corosio::detail::posix_random_access_file_service::destroy(boost::corosio::io_object::implementation*) :63 24x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::close(boost::corosio::io_object::handle&) :71 42x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::open_file(boost::corosio::random_access_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :81 19x 80.0% 80.0% boost::corosio::detail::posix_random_access_file_service::shutdown() :92 515x 62.5% 70.0% boost::corosio::detail::posix_random_access_file_service::destroy_impl(boost::corosio::detail::posix_random_access_file&) :104 24x 100.0% 67.0% boost::corosio::detail::posix_random_access_file_service::post(boost::corosio::detail::scheduler_op*) :111 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::pool() :126 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::get_or_create_pool(boost::capy::execution_context&) :132 515x 80.0% 67.0% boost::corosio::detail::get_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :153 515x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::read_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :163 116x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::write_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :218 10x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::raf_op::do_work(boost::corosio::detail::pool_work_item*) :275 126x 83.3% 76.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 #include <boost/corosio/native/native_scheduler.hpp>
19 #include <boost/corosio/detail/random_access_file_service.hpp>
20 #include <boost/corosio/detail/thread_pool.hpp>
21
22 #include <limits>
23 #include <mutex>
24 #include <unordered_map>
25
26 namespace boost::corosio::detail {
27
28 /** Random-access file service for POSIX backends. */
29 class BOOST_COROSIO_DECL posix_random_access_file_service final
30 : public random_access_file_service
31 {
32 public:
33 515x posix_random_access_file_service(
34 capy::execution_context& ctx, scheduler& sched)
35 1030x : sched_(&sched)
36 1030x , pool_(get_or_create_pool(ctx))
37 515x , single_threaded_(
38 515x static_cast<native_scheduler&>(sched).single_threaded_)
39 {
40 515x }
41
42 1030x ~posix_random_access_file_service() override = default;
43
44 posix_random_access_file_service(
45 posix_random_access_file_service const&) = delete;
46 posix_random_access_file_service& operator=(
47 posix_random_access_file_service const&) = delete;
48
49 24x io_object::implementation* construct() override
50 {
51 24x auto ptr = std::make_shared<posix_random_access_file>(*this);
52 24x auto* impl = ptr.get();
53
54 {
55 24x std::lock_guard<std::mutex> lock(mutex_);
56 24x file_list_.push_back(impl);
57 24x file_ptrs_[impl] = std::move(ptr);
58 24x }
59
60 24x return impl;
61 24x }
62
63 24x void destroy(io_object::implementation* p) override
64 {
65 24x auto& impl = static_cast<posix_random_access_file&>(*p);
66 24x impl.cancel();
67 24x impl.close_file();
68 24x destroy_impl(impl);
69 24x }
70
71 42x void close(io_object::handle& h) override
72 {
73 42x if (h.get())
74 {
75 42x auto& impl = static_cast<posix_random_access_file&>(*h.get());
76 42x impl.cancel();
77 42x impl.close_file();
78 }
79 42x }
80
81 19x std::error_code open_file(
82 random_access_file::implementation& impl,
83 std::filesystem::path const& path,
84 file_base::flags mode) override
85 {
86 19x if (single_threaded_)
87 return std::make_error_code(std::errc::operation_not_supported);
88 19x return static_cast<posix_random_access_file&>(impl).open_file(
89 19x path, mode);
90 }
91
92 515x void shutdown() override
93 {
94 515x std::lock_guard<std::mutex> lock(mutex_);
95 515x for (auto* impl = file_list_.pop_front(); impl != nullptr;
96 impl = file_list_.pop_front())
97 {
98 impl->cancel();
99 impl->close_file();
100 }
101 515x file_ptrs_.clear();
102 515x }
103
104 24x void destroy_impl(posix_random_access_file& impl)
105 {
106 24x std::lock_guard<std::mutex> lock(mutex_);
107 24x file_list_.remove(&impl);
108 24x file_ptrs_.erase(&impl);
109 24x }
110
111 126x void post(scheduler_op* op)
112 {
113 126x sched_->post(op);
114 126x }
115
116 void work_started() noexcept
117 {
118 sched_->work_started();
119 }
120
121 void work_finished() noexcept
122 {
123 sched_->work_finished();
124 }
125
126 126x thread_pool& pool() noexcept
127 {
128 126x return pool_;
129 }
130
131 private:
132 515x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
133 {
134 515x auto* p = ctx.find_service<thread_pool>();
135 515x if (p)
136 515x return *p;
137 return ctx.make_service<thread_pool>();
138 }
139
140 scheduler* sched_;
141 thread_pool& pool_;
142 bool single_threaded_;
143 std::mutex mutex_;
144 intrusive_list<posix_random_access_file> file_list_;
145 std::unordered_map<
146 posix_random_access_file*,
147 std::shared_ptr<posix_random_access_file>>
148 file_ptrs_;
149 };
150
151 /** Get or create the random-access file service for the given context. */
152 inline posix_random_access_file_service&
153 515x get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
154 {
155 515x return ctx.make_service<posix_random_access_file_service>(sched);
156 }
157
158 // ---------------------------------------------------------------------------
159 // posix_random_access_file inline implementations (require complete service)
160 // ---------------------------------------------------------------------------
161
162 inline std::coroutine_handle<>
163 116x posix_random_access_file::read_some_at(
164 std::uint64_t offset,
165 std::coroutine_handle<> h,
166 capy::executor_ref ex,
167 buffer_param param,
168 std::stop_token token,
169 std::error_code* ec,
170 std::size_t* bytes_out)
171 {
172 116x capy::mutable_buffer bufs[max_buffers];
173 116x auto count = param.copy_to(bufs, max_buffers);
174
175 116x if (count == 0)
176 {
177 *ec = {};
178 *bytes_out = 0;
179 return h;
180 }
181
182 116x auto* op = new raf_op();
183 116x op->is_read = true;
184 116x op->offset = offset;
185
186 116x op->iovec_count = static_cast<int>(count);
187 232x for (int i = 0; i < op->iovec_count; ++i)
188 {
189 116x op->iovecs[i].iov_base = bufs[i].data();
190 116x op->iovecs[i].iov_len = bufs[i].size();
191 }
192
193 116x op->h = h;
194 116x op->ex = ex;
195 116x op->ec_out = ec;
196 116x op->bytes_out = bytes_out;
197 116x op->file_ = this;
198 116x op->file_ref = this->shared_from_this();
199 116x op->start(token);
200
201 116x op->ex.on_work_started();
202
203 {
204 116x std::lock_guard<std::mutex> lock(ops_mutex_);
205 116x outstanding_ops_.push_back(op);
206 116x }
207
208 116x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
209 116x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
210 {
211 op->cancelled.store(true, std::memory_order_release);
212 svc_.post(static_cast<scheduler_op*>(op));
213 }
214 116x return std::noop_coroutine();
215 }
216
217 inline std::coroutine_handle<>
218 10x posix_random_access_file::write_some_at(
219 std::uint64_t offset,
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 buffer_param param,
223 std::stop_token token,
224 std::error_code* ec,
225 std::size_t* bytes_out)
226 {
227 10x capy::mutable_buffer bufs[max_buffers];
228 10x auto count = param.copy_to(bufs, max_buffers);
229
230 10x if (count == 0)
231 {
232 *ec = {};
233 *bytes_out = 0;
234 return h;
235 }
236
237 10x auto* op = new raf_op();
238 10x op->is_read = false;
239 10x op->offset = offset;
240
241 10x op->iovec_count = static_cast<int>(count);
242 20x for (int i = 0; i < op->iovec_count; ++i)
243 {
244 10x op->iovecs[i].iov_base = bufs[i].data();
245 10x op->iovecs[i].iov_len = bufs[i].size();
246 }
247
248 10x op->h = h;
249 10x op->ex = ex;
250 10x op->ec_out = ec;
251 10x op->bytes_out = bytes_out;
252 10x op->file_ = this;
253 10x op->file_ref = this->shared_from_this();
254 10x op->start(token);
255
256 10x op->ex.on_work_started();
257
258 {
259 10x std::lock_guard<std::mutex> lock(ops_mutex_);
260 10x outstanding_ops_.push_back(op);
261 10x }
262
263 10x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
264 10x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
265 {
266 op->cancelled.store(true, std::memory_order_release);
267 svc_.post(static_cast<scheduler_op*>(op));
268 }
269 10x return std::noop_coroutine();
270 }
271
272 // -- raf_op thread-pool work function --
273
274 inline void
275 126x posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
276 {
277 126x auto* op = static_cast<raf_op*>(w);
278 126x auto* self = op->file_;
279
280 126x if (op->cancelled.load(std::memory_order_acquire))
281 {
282 1x op->errn = ECANCELED;
283 1x op->bytes_transferred = 0;
284 }
285 250x else if (op->offset >
286 125x static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
287 {
288 op->errn = EOVERFLOW;
289 op->bytes_transferred = 0;
290 }
291 else
292 {
293 ssize_t n;
294 125x if (op->is_read)
295 {
296 do
297 {
298 230x n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
299 115x static_cast<off_t>(op->offset));
300 }
301 115x while (n < 0 && errno == EINTR);
302 }
303 else
304 {
305 do
306 {
307 20x n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
308 10x static_cast<off_t>(op->offset));
309 }
310 10x while (n < 0 && errno == EINTR);
311 }
312
313 125x if (n >= 0)
314 {
315 125x op->errn = 0;
316 125x op->bytes_transferred = static_cast<std::size_t>(n);
317 }
318 else
319 {
320 op->errn = errno;
321 op->bytes_transferred = 0;
322 }
323 }
324
325 126x self->svc_.post(static_cast<scheduler_op*>(op));
326 126x }
327
328 } // namespace boost::corosio::detail
329
330 #endif // BOOST_COROSIO_POSIX
331
332 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
333