TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 : #include <boost/corosio/native/native_scheduler.hpp>
19 : #include <boost/corosio/detail/random_access_file_service.hpp>
20 : #include <boost/corosio/detail/thread_pool.hpp>
21 :
22 : #include <limits>
23 : #include <mutex>
24 : #include <unordered_map>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Random-access file service for POSIX backends. */
29 : class BOOST_COROSIO_DECL posix_random_access_file_service final
30 : : public random_access_file_service
31 : {
32 : public:
33 HIT 515 : posix_random_access_file_service(
34 : capy::execution_context& ctx, scheduler& sched)
35 1030 : : sched_(&sched)
36 1030 : , pool_(get_or_create_pool(ctx))
37 515 : , single_threaded_(
38 515 : static_cast<native_scheduler&>(sched).single_threaded_)
39 : {
40 515 : }
41 :
42 1030 : ~posix_random_access_file_service() override = default;
43 :
44 : posix_random_access_file_service(
45 : posix_random_access_file_service const&) = delete;
46 : posix_random_access_file_service& operator=(
47 : posix_random_access_file_service const&) = delete;
48 :
49 24 : io_object::implementation* construct() override
50 : {
51 24 : auto ptr = std::make_shared<posix_random_access_file>(*this);
52 24 : auto* impl = ptr.get();
53 :
54 : {
55 24 : std::lock_guard<std::mutex> lock(mutex_);
56 24 : file_list_.push_back(impl);
57 24 : file_ptrs_[impl] = std::move(ptr);
58 24 : }
59 :
60 24 : return impl;
61 24 : }
62 :
63 24 : void destroy(io_object::implementation* p) override
64 : {
65 24 : auto& impl = static_cast<posix_random_access_file&>(*p);
66 24 : impl.cancel();
67 24 : impl.close_file();
68 24 : destroy_impl(impl);
69 24 : }
70 :
71 42 : void close(io_object::handle& h) override
72 : {
73 42 : if (h.get())
74 : {
75 42 : auto& impl = static_cast<posix_random_access_file&>(*h.get());
76 42 : impl.cancel();
77 42 : impl.close_file();
78 : }
79 42 : }
80 :
81 19 : std::error_code open_file(
82 : random_access_file::implementation& impl,
83 : std::filesystem::path const& path,
84 : file_base::flags mode) override
85 : {
86 19 : if (single_threaded_)
87 MIS 0 : return std::make_error_code(std::errc::operation_not_supported);
88 HIT 19 : return static_cast<posix_random_access_file&>(impl).open_file(
89 19 : path, mode);
90 : }
91 :
92 515 : void shutdown() override
93 : {
94 515 : std::lock_guard<std::mutex> lock(mutex_);
95 515 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
96 MIS 0 : impl = file_list_.pop_front())
97 : {
98 0 : impl->cancel();
99 0 : impl->close_file();
100 : }
101 HIT 515 : file_ptrs_.clear();
102 515 : }
103 :
104 24 : void destroy_impl(posix_random_access_file& impl)
105 : {
106 24 : std::lock_guard<std::mutex> lock(mutex_);
107 24 : file_list_.remove(&impl);
108 24 : file_ptrs_.erase(&impl);
109 24 : }
110 :
111 126 : void post(scheduler_op* op)
112 : {
113 126 : sched_->post(op);
114 126 : }
115 :
116 : void work_started() noexcept
117 : {
118 : sched_->work_started();
119 : }
120 :
121 : void work_finished() noexcept
122 : {
123 : sched_->work_finished();
124 : }
125 :
126 126 : thread_pool& pool() noexcept
127 : {
128 126 : return pool_;
129 : }
130 :
131 : private:
132 515 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
133 : {
134 515 : auto* p = ctx.find_service<thread_pool>();
135 515 : if (p)
136 515 : return *p;
137 MIS 0 : return ctx.make_service<thread_pool>();
138 : }
139 :
140 : scheduler* sched_;
141 : thread_pool& pool_;
142 : bool single_threaded_;
143 : std::mutex mutex_;
144 : intrusive_list<posix_random_access_file> file_list_;
145 : std::unordered_map<
146 : posix_random_access_file*,
147 : std::shared_ptr<posix_random_access_file>>
148 : file_ptrs_;
149 : };
150 :
151 : /** Get or create the random-access file service for the given context. */
152 : inline posix_random_access_file_service&
153 HIT 515 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
154 : {
155 515 : return ctx.make_service<posix_random_access_file_service>(sched);
156 : }
157 :
158 : // ---------------------------------------------------------------------------
159 : // posix_random_access_file inline implementations (require complete service)
160 : // ---------------------------------------------------------------------------
161 :
162 : inline std::coroutine_handle<>
163 116 : posix_random_access_file::read_some_at(
164 : std::uint64_t offset,
165 : std::coroutine_handle<> h,
166 : capy::executor_ref ex,
167 : buffer_param param,
168 : std::stop_token token,
169 : std::error_code* ec,
170 : std::size_t* bytes_out)
171 : {
172 116 : capy::mutable_buffer bufs[max_buffers];
173 116 : auto count = param.copy_to(bufs, max_buffers);
174 :
175 116 : if (count == 0)
176 : {
177 MIS 0 : *ec = {};
178 0 : *bytes_out = 0;
179 0 : return h;
180 : }
181 :
182 HIT 116 : auto* op = new raf_op();
183 116 : op->is_read = true;
184 116 : op->offset = offset;
185 :
186 116 : op->iovec_count = static_cast<int>(count);
187 232 : for (int i = 0; i < op->iovec_count; ++i)
188 : {
189 116 : op->iovecs[i].iov_base = bufs[i].data();
190 116 : op->iovecs[i].iov_len = bufs[i].size();
191 : }
192 :
193 116 : op->h = h;
194 116 : op->ex = ex;
195 116 : op->ec_out = ec;
196 116 : op->bytes_out = bytes_out;
197 116 : op->file_ = this;
198 116 : op->file_ref = this->shared_from_this();
199 116 : op->start(token);
200 :
201 116 : op->ex.on_work_started();
202 :
203 : {
204 116 : std::lock_guard<std::mutex> lock(ops_mutex_);
205 116 : outstanding_ops_.push_back(op);
206 116 : }
207 :
208 116 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
209 116 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
210 : {
211 MIS 0 : op->cancelled.store(true, std::memory_order_release);
212 0 : svc_.post(static_cast<scheduler_op*>(op));
213 : }
214 HIT 116 : return std::noop_coroutine();
215 : }
216 :
217 : inline std::coroutine_handle<>
218 10 : posix_random_access_file::write_some_at(
219 : std::uint64_t offset,
220 : std::coroutine_handle<> h,
221 : capy::executor_ref ex,
222 : buffer_param param,
223 : std::stop_token token,
224 : std::error_code* ec,
225 : std::size_t* bytes_out)
226 : {
227 10 : capy::mutable_buffer bufs[max_buffers];
228 10 : auto count = param.copy_to(bufs, max_buffers);
229 :
230 10 : if (count == 0)
231 : {
232 MIS 0 : *ec = {};
233 0 : *bytes_out = 0;
234 0 : return h;
235 : }
236 :
237 HIT 10 : auto* op = new raf_op();
238 10 : op->is_read = false;
239 10 : op->offset = offset;
240 :
241 10 : op->iovec_count = static_cast<int>(count);
242 20 : for (int i = 0; i < op->iovec_count; ++i)
243 : {
244 10 : op->iovecs[i].iov_base = bufs[i].data();
245 10 : op->iovecs[i].iov_len = bufs[i].size();
246 : }
247 :
248 10 : op->h = h;
249 10 : op->ex = ex;
250 10 : op->ec_out = ec;
251 10 : op->bytes_out = bytes_out;
252 10 : op->file_ = this;
253 10 : op->file_ref = this->shared_from_this();
254 10 : op->start(token);
255 :
256 10 : op->ex.on_work_started();
257 :
258 : {
259 10 : std::lock_guard<std::mutex> lock(ops_mutex_);
260 10 : outstanding_ops_.push_back(op);
261 10 : }
262 :
263 10 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
264 10 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
265 : {
266 MIS 0 : op->cancelled.store(true, std::memory_order_release);
267 0 : svc_.post(static_cast<scheduler_op*>(op));
268 : }
269 HIT 10 : return std::noop_coroutine();
270 : }
271 :
272 : // -- raf_op thread-pool work function --
273 :
274 : inline void
275 126 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
276 : {
277 126 : auto* op = static_cast<raf_op*>(w);
278 126 : auto* self = op->file_;
279 :
280 126 : if (op->cancelled.load(std::memory_order_acquire))
281 : {
282 1 : op->errn = ECANCELED;
283 1 : op->bytes_transferred = 0;
284 : }
285 250 : else if (op->offset >
286 125 : static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
287 : {
288 MIS 0 : op->errn = EOVERFLOW;
289 0 : op->bytes_transferred = 0;
290 : }
291 : else
292 : {
293 : ssize_t n;
294 HIT 125 : if (op->is_read)
295 : {
296 : do
297 : {
298 230 : n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
299 115 : static_cast<off_t>(op->offset));
300 : }
301 115 : while (n < 0 && errno == EINTR);
302 : }
303 : else
304 : {
305 : do
306 : {
307 20 : n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
308 10 : static_cast<off_t>(op->offset));
309 : }
310 10 : while (n < 0 && errno == EINTR);
311 : }
312 :
313 125 : if (n >= 0)
314 : {
315 125 : op->errn = 0;
316 125 : op->bytes_transferred = static_cast<std::size_t>(n);
317 : }
318 : else
319 : {
320 MIS 0 : op->errn = errno;
321 0 : op->bytes_transferred = 0;
322 : }
323 : }
324 :
325 HIT 126 : self->svc_.post(static_cast<scheduler_op*>(op));
326 126 : }
327 :
328 : } // namespace boost::corosio::detail
329 :
330 : #endif // BOOST_COROSIO_POSIX
331 :
332 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
|