TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 : #include <boost/corosio/native/native_scheduler.hpp>
19 : #include <boost/corosio/detail/file_service.hpp>
20 : #include <boost/corosio/detail/thread_pool.hpp>
21 :
22 : #include <mutex>
23 : #include <unordered_map>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** Stream file service for POSIX backends.
28 :
29 : Owns all posix_stream_file instances. Thread lifecycle is
30 : managed by the thread_pool service (shared with resolver).
31 : */
32 : class BOOST_COROSIO_DECL posix_stream_file_service final
33 : : public file_service
34 : {
35 : public:
36 HIT 515 : posix_stream_file_service(
37 : capy::execution_context& ctx, scheduler& sched)
38 1030 : : sched_(&sched)
39 1030 : , pool_(get_or_create_pool(ctx))
40 515 : , single_threaded_(
41 515 : static_cast<native_scheduler&>(sched).single_threaded_)
42 : {
43 515 : }
44 :
45 1030 : ~posix_stream_file_service() override = default;
46 :
47 : posix_stream_file_service(posix_stream_file_service const&) = delete;
48 : posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
49 :
50 26 : io_object::implementation* construct() override
51 : {
52 26 : auto ptr = std::make_shared<posix_stream_file>(*this);
53 26 : auto* impl = ptr.get();
54 :
55 : {
56 26 : std::lock_guard<std::mutex> lock(mutex_);
57 26 : file_list_.push_back(impl);
58 26 : file_ptrs_[impl] = std::move(ptr);
59 26 : }
60 :
61 26 : return impl;
62 26 : }
63 :
64 26 : void destroy(io_object::implementation* p) override
65 : {
66 26 : auto& impl = static_cast<posix_stream_file&>(*p);
67 26 : impl.cancel();
68 26 : impl.close_file();
69 26 : destroy_impl(impl);
70 26 : }
71 :
72 43 : void close(io_object::handle& h) override
73 : {
74 43 : if (h.get())
75 : {
76 43 : auto& impl = static_cast<posix_stream_file&>(*h.get());
77 43 : impl.cancel();
78 43 : impl.close_file();
79 : }
80 43 : }
81 :
82 19 : std::error_code open_file(
83 : stream_file::implementation& impl,
84 : std::filesystem::path const& path,
85 : file_base::flags mode) override
86 : {
87 19 : if (single_threaded_)
88 MIS 0 : return std::make_error_code(std::errc::operation_not_supported);
89 HIT 19 : return static_cast<posix_stream_file&>(impl).open_file(path, mode);
90 : }
91 :
92 515 : void shutdown() override
93 : {
94 515 : std::lock_guard<std::mutex> lock(mutex_);
95 515 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
96 MIS 0 : impl = file_list_.pop_front())
97 : {
98 0 : impl->cancel();
99 0 : impl->close_file();
100 : }
101 HIT 515 : file_ptrs_.clear();
102 515 : }
103 :
104 26 : void destroy_impl(posix_stream_file& impl)
105 : {
106 26 : std::lock_guard<std::mutex> lock(mutex_);
107 26 : file_list_.remove(&impl);
108 26 : file_ptrs_.erase(&impl);
109 26 : }
110 :
111 12 : void post(scheduler_op* op)
112 : {
113 12 : sched_->post(op);
114 12 : }
115 :
116 : void work_started() noexcept
117 : {
118 : sched_->work_started();
119 : }
120 :
121 : void work_finished() noexcept
122 : {
123 : sched_->work_finished();
124 : }
125 :
126 12 : thread_pool& pool() noexcept
127 : {
128 12 : return pool_;
129 : }
130 :
131 : private:
132 515 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
133 : {
134 515 : auto* p = ctx.find_service<thread_pool>();
135 515 : if (p)
136 515 : return *p;
137 MIS 0 : return ctx.make_service<thread_pool>();
138 : }
139 :
140 : scheduler* sched_;
141 : thread_pool& pool_;
142 : bool single_threaded_;
143 : std::mutex mutex_;
144 : intrusive_list<posix_stream_file> file_list_;
145 : std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
146 : file_ptrs_;
147 : };
148 :
149 : /** Get or create the stream file service for the given context. */
150 : inline posix_stream_file_service&
151 HIT 515 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
152 : {
153 515 : return ctx.make_service<posix_stream_file_service>(sched);
154 : }
155 :
156 : // ---------------------------------------------------------------------------
157 : // posix_stream_file inline implementations (require complete service type)
158 : // ---------------------------------------------------------------------------
159 :
160 : inline std::coroutine_handle<>
161 6 : posix_stream_file::read_some(
162 : std::coroutine_handle<> h,
163 : capy::executor_ref ex,
164 : buffer_param param,
165 : std::stop_token token,
166 : std::error_code* ec,
167 : std::size_t* bytes_out)
168 : {
169 6 : auto& op = read_op_;
170 6 : op.reset();
171 6 : op.is_read = true;
172 :
173 6 : capy::mutable_buffer bufs[max_buffers];
174 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
175 :
176 6 : if (op.iovec_count == 0)
177 : {
178 MIS 0 : *ec = {};
179 0 : *bytes_out = 0;
180 0 : op.cont_op.cont.h = h;
181 0 : return dispatch_coro(ex, op.cont_op.cont);
182 : }
183 :
184 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
185 : {
186 6 : op.iovecs[i].iov_base = bufs[i].data();
187 6 : op.iovecs[i].iov_len = bufs[i].size();
188 : }
189 :
190 6 : op.h = h;
191 6 : op.ex = ex;
192 6 : op.ec_out = ec;
193 6 : op.bytes_out = bytes_out;
194 6 : op.start(token);
195 :
196 6 : op.ex.on_work_started();
197 :
198 6 : read_pool_op_.file_ = this;
199 6 : read_pool_op_.ref_ = this->shared_from_this();
200 6 : read_pool_op_.func_ = &posix_stream_file::do_read_work;
201 6 : if (!svc_.pool().post(&read_pool_op_))
202 : {
203 MIS 0 : op.impl_ref = std::move(read_pool_op_.ref_);
204 0 : op.cancelled.store(true, std::memory_order_release);
205 0 : svc_.post(&read_op_);
206 : }
207 HIT 6 : return std::noop_coroutine();
208 : }
209 :
210 : inline void
211 6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
212 : {
213 6 : auto* pw = static_cast<pool_op*>(w);
214 6 : auto* self = pw->file_;
215 6 : auto& op = self->read_op_;
216 :
217 6 : if (!op.cancelled.load(std::memory_order_acquire))
218 : {
219 : ssize_t n;
220 : do
221 : {
222 10 : n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
223 5 : static_cast<off_t>(self->offset_));
224 : }
225 5 : while (n < 0 && errno == EINTR);
226 :
227 5 : if (n >= 0)
228 : {
229 5 : op.errn = 0;
230 5 : op.bytes_transferred = static_cast<std::size_t>(n);
231 5 : self->offset_ += static_cast<std::uint64_t>(n);
232 : }
233 : else
234 : {
235 MIS 0 : op.errn = errno;
236 0 : op.bytes_transferred = 0;
237 : }
238 : }
239 :
240 HIT 6 : op.impl_ref = std::move(pw->ref_);
241 6 : self->svc_.post(&op);
242 6 : }
243 :
244 : inline std::coroutine_handle<>
245 6 : posix_stream_file::write_some(
246 : std::coroutine_handle<> h,
247 : capy::executor_ref ex,
248 : buffer_param param,
249 : std::stop_token token,
250 : std::error_code* ec,
251 : std::size_t* bytes_out)
252 : {
253 6 : auto& op = write_op_;
254 6 : op.reset();
255 6 : op.is_read = false;
256 :
257 6 : capy::mutable_buffer bufs[max_buffers];
258 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
259 :
260 6 : if (op.iovec_count == 0)
261 : {
262 MIS 0 : *ec = {};
263 0 : *bytes_out = 0;
264 0 : op.cont_op.cont.h = h;
265 0 : return dispatch_coro(ex, op.cont_op.cont);
266 : }
267 :
268 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
269 : {
270 6 : op.iovecs[i].iov_base = bufs[i].data();
271 6 : op.iovecs[i].iov_len = bufs[i].size();
272 : }
273 :
274 6 : op.h = h;
275 6 : op.ex = ex;
276 6 : op.ec_out = ec;
277 6 : op.bytes_out = bytes_out;
278 6 : op.start(token);
279 :
280 6 : op.ex.on_work_started();
281 :
282 6 : write_pool_op_.file_ = this;
283 6 : write_pool_op_.ref_ = this->shared_from_this();
284 6 : write_pool_op_.func_ = &posix_stream_file::do_write_work;
285 6 : if (!svc_.pool().post(&write_pool_op_))
286 : {
287 MIS 0 : op.impl_ref = std::move(write_pool_op_.ref_);
288 0 : op.cancelled.store(true, std::memory_order_release);
289 0 : svc_.post(&write_op_);
290 : }
291 HIT 6 : return std::noop_coroutine();
292 : }
293 :
294 : inline void
295 6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
296 : {
297 6 : auto* pw = static_cast<pool_op*>(w);
298 6 : auto* self = pw->file_;
299 6 : auto& op = self->write_op_;
300 :
301 6 : if (!op.cancelled.load(std::memory_order_acquire))
302 : {
303 : ssize_t n;
304 : do
305 : {
306 12 : n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
307 6 : static_cast<off_t>(self->offset_));
308 : }
309 6 : while (n < 0 && errno == EINTR);
310 :
311 6 : if (n >= 0)
312 : {
313 6 : op.errn = 0;
314 6 : op.bytes_transferred = static_cast<std::size_t>(n);
315 6 : self->offset_ += static_cast<std::uint64_t>(n);
316 : }
317 : else
318 : {
319 MIS 0 : op.errn = errno;
320 0 : op.bytes_transferred = 0;
321 : }
322 : }
323 :
324 HIT 6 : op.impl_ref = std::move(pw->ref_);
325 6 : self->svc_.post(&op);
326 6 : }
327 :
328 : } // namespace boost::corosio::detail
329 :
330 : #endif // BOOST_COROSIO_POSIX
331 :
332 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
|