LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_stream_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 84.4 % 147 124 23
Test Date: 2026-03-27 21:00:37 Functions: 100.0 % 17 17

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
      18                 : #include <boost/corosio/native/native_scheduler.hpp>
      19                 : #include <boost/corosio/detail/file_service.hpp>
      20                 : #include <boost/corosio/detail/thread_pool.hpp>
      21                 : 
      22                 : #include <mutex>
      23                 : #include <unordered_map>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /** Stream file service for POSIX backends.
      28                 : 
      29                 :     Owns all posix_stream_file instances. Thread lifecycle is
      30                 :     managed by the thread_pool service (shared with resolver).
      31                 : */
      32                 : class BOOST_COROSIO_DECL posix_stream_file_service final
      33                 :     : public file_service
      34                 : {
      35                 : public:
      36 HIT         515 :     posix_stream_file_service(
      37                 :         capy::execution_context& ctx, scheduler& sched)
      38            1030 :         : sched_(&sched)
      39            1030 :         , pool_(get_or_create_pool(ctx))
      40             515 :         , single_threaded_(
      41             515 :               static_cast<native_scheduler&>(sched).single_threaded_)
      42                 :     {
      43             515 :     }
      44                 : 
      45            1030 :     ~posix_stream_file_service() override = default;
      46                 : 
      47                 :     posix_stream_file_service(posix_stream_file_service const&)            = delete;
      48                 :     posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
      49                 : 
      50              26 :     io_object::implementation* construct() override
      51                 :     {
      52              26 :         auto ptr   = std::make_shared<posix_stream_file>(*this);
      53              26 :         auto* impl = ptr.get();
      54                 : 
      55                 :         {
      56              26 :             std::lock_guard<std::mutex> lock(mutex_);
      57              26 :             file_list_.push_back(impl);
      58              26 :             file_ptrs_[impl] = std::move(ptr);
      59              26 :         }
      60                 : 
      61              26 :         return impl;
      62              26 :     }
      63                 : 
      64              26 :     void destroy(io_object::implementation* p) override
      65                 :     {
      66              26 :         auto& impl = static_cast<posix_stream_file&>(*p);
      67              26 :         impl.cancel();
      68              26 :         impl.close_file();
      69              26 :         destroy_impl(impl);
      70              26 :     }
      71                 : 
      72              43 :     void close(io_object::handle& h) override
      73                 :     {
      74              43 :         if (h.get())
      75                 :         {
      76              43 :             auto& impl = static_cast<posix_stream_file&>(*h.get());
      77              43 :             impl.cancel();
      78              43 :             impl.close_file();
      79                 :         }
      80              43 :     }
      81                 : 
      82              19 :     std::error_code open_file(
      83                 :         stream_file::implementation& impl,
      84                 :         std::filesystem::path const& path,
      85                 :         file_base::flags mode) override
      86                 :     {
      87              19 :         if (single_threaded_)
      88 MIS           0 :             return std::make_error_code(std::errc::operation_not_supported);
      89 HIT          19 :         return static_cast<posix_stream_file&>(impl).open_file(path, mode);
      90                 :     }
      91                 : 
      92             515 :     void shutdown() override
      93                 :     {
      94             515 :         std::lock_guard<std::mutex> lock(mutex_);
      95             515 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      96 MIS           0 :              impl       = file_list_.pop_front())
      97                 :         {
      98               0 :             impl->cancel();
      99               0 :             impl->close_file();
     100                 :         }
     101 HIT         515 :         file_ptrs_.clear();
     102             515 :     }
     103                 : 
     104              26 :     void destroy_impl(posix_stream_file& impl)
     105                 :     {
     106              26 :         std::lock_guard<std::mutex> lock(mutex_);
     107              26 :         file_list_.remove(&impl);
     108              26 :         file_ptrs_.erase(&impl);
     109              26 :     }
     110                 : 
     111              12 :     void post(scheduler_op* op)
     112                 :     {
     113              12 :         sched_->post(op);
     114              12 :     }
     115                 : 
     116                 :     void work_started() noexcept
     117                 :     {
     118                 :         sched_->work_started();
     119                 :     }
     120                 : 
     121                 :     void work_finished() noexcept
     122                 :     {
     123                 :         sched_->work_finished();
     124                 :     }
     125                 : 
     126              12 :     thread_pool& pool() noexcept
     127                 :     {
     128              12 :         return pool_;
     129                 :     }
     130                 : 
     131                 : private:
     132             515 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     133                 :     {
     134             515 :         auto* p = ctx.find_service<thread_pool>();
     135             515 :         if (p)
     136             515 :             return *p;
     137 MIS           0 :         return ctx.make_service<thread_pool>();
     138                 :     }
     139                 : 
     140                 :     scheduler* sched_;
     141                 :     thread_pool& pool_;
     142                 :     bool single_threaded_;
     143                 :     std::mutex mutex_;
     144                 :     intrusive_list<posix_stream_file> file_list_;
     145                 :     std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
     146                 :         file_ptrs_;
     147                 : };
     148                 : 
     149                 : /** Get or create the stream file service for the given context. */
     150                 : inline posix_stream_file_service&
     151 HIT         515 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
     152                 : {
     153             515 :     return ctx.make_service<posix_stream_file_service>(sched);
     154                 : }
     155                 : 
     156                 : // ---------------------------------------------------------------------------
     157                 : // posix_stream_file inline implementations (require complete service type)
     158                 : // ---------------------------------------------------------------------------
     159                 : 
     160                 : inline std::coroutine_handle<>
     161               6 : posix_stream_file::read_some(
     162                 :     std::coroutine_handle<> h,
     163                 :     capy::executor_ref ex,
     164                 :     buffer_param param,
     165                 :     std::stop_token token,
     166                 :     std::error_code* ec,
     167                 :     std::size_t* bytes_out)
     168                 : {
     169               6 :     auto& op = read_op_;
     170               6 :     op.reset();
     171               6 :     op.is_read = true;
     172                 : 
     173               6 :     capy::mutable_buffer bufs[max_buffers];
     174               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     175                 : 
     176               6 :     if (op.iovec_count == 0)
     177                 :     {
     178 MIS           0 :         *ec        = {};
     179               0 :         *bytes_out = 0;
     180               0 :         op.cont_op.cont.h = h;
     181               0 :         return dispatch_coro(ex, op.cont_op.cont);
     182                 :     }
     183                 : 
     184 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     185                 :     {
     186               6 :         op.iovecs[i].iov_base = bufs[i].data();
     187               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     188                 :     }
     189                 : 
     190               6 :     op.h         = h;
     191               6 :     op.ex        = ex;
     192               6 :     op.ec_out    = ec;
     193               6 :     op.bytes_out = bytes_out;
     194               6 :     op.start(token);
     195                 : 
     196               6 :     op.ex.on_work_started();
     197                 : 
     198               6 :     read_pool_op_.file_ = this;
     199               6 :     read_pool_op_.ref_  = this->shared_from_this();
     200               6 :     read_pool_op_.func_ = &posix_stream_file::do_read_work;
     201               6 :     if (!svc_.pool().post(&read_pool_op_))
     202                 :     {
     203 MIS           0 :         op.impl_ref = std::move(read_pool_op_.ref_);
     204               0 :         op.cancelled.store(true, std::memory_order_release);
     205               0 :         svc_.post(&read_op_);
     206                 :     }
     207 HIT           6 :     return std::noop_coroutine();
     208                 : }
     209                 : 
     210                 : inline void
     211               6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
     212                 : {
     213               6 :     auto* pw   = static_cast<pool_op*>(w);
     214               6 :     auto* self = pw->file_;
     215               6 :     auto& op   = self->read_op_;
     216                 : 
     217               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     218                 :     {
     219                 :         ssize_t n;
     220                 :         do
     221                 :         {
     222              10 :             n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
     223               5 :                          static_cast<off_t>(self->offset_));
     224                 :         }
     225               5 :         while (n < 0 && errno == EINTR);
     226                 : 
     227               5 :         if (n >= 0)
     228                 :         {
     229               5 :             op.errn              = 0;
     230               5 :             op.bytes_transferred = static_cast<std::size_t>(n);
     231               5 :             self->offset_ += static_cast<std::uint64_t>(n);
     232                 :         }
     233                 :         else
     234                 :         {
     235 MIS           0 :             op.errn              = errno;
     236               0 :             op.bytes_transferred = 0;
     237                 :         }
     238                 :     }
     239                 : 
     240 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     241               6 :     self->svc_.post(&op);
     242               6 : }
     243                 : 
     244                 : inline std::coroutine_handle<>
     245               6 : posix_stream_file::write_some(
     246                 :     std::coroutine_handle<> h,
     247                 :     capy::executor_ref ex,
     248                 :     buffer_param param,
     249                 :     std::stop_token token,
     250                 :     std::error_code* ec,
     251                 :     std::size_t* bytes_out)
     252                 : {
     253               6 :     auto& op = write_op_;
     254               6 :     op.reset();
     255               6 :     op.is_read = false;
     256                 : 
     257               6 :     capy::mutable_buffer bufs[max_buffers];
     258               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     259                 : 
     260               6 :     if (op.iovec_count == 0)
     261                 :     {
     262 MIS           0 :         *ec        = {};
     263               0 :         *bytes_out = 0;
     264               0 :         op.cont_op.cont.h = h;
     265               0 :         return dispatch_coro(ex, op.cont_op.cont);
     266                 :     }
     267                 : 
     268 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     269                 :     {
     270               6 :         op.iovecs[i].iov_base = bufs[i].data();
     271               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     272                 :     }
     273                 : 
     274               6 :     op.h         = h;
     275               6 :     op.ex        = ex;
     276               6 :     op.ec_out    = ec;
     277               6 :     op.bytes_out = bytes_out;
     278               6 :     op.start(token);
     279                 : 
     280               6 :     op.ex.on_work_started();
     281                 : 
     282               6 :     write_pool_op_.file_ = this;
     283               6 :     write_pool_op_.ref_  = this->shared_from_this();
     284               6 :     write_pool_op_.func_ = &posix_stream_file::do_write_work;
     285               6 :     if (!svc_.pool().post(&write_pool_op_))
     286                 :     {
     287 MIS           0 :         op.impl_ref = std::move(write_pool_op_.ref_);
     288               0 :         op.cancelled.store(true, std::memory_order_release);
     289               0 :         svc_.post(&write_op_);
     290                 :     }
     291 HIT           6 :     return std::noop_coroutine();
     292                 : }
     293                 : 
     294                 : inline void
     295               6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
     296                 : {
     297               6 :     auto* pw   = static_cast<pool_op*>(w);
     298               6 :     auto* self = pw->file_;
     299               6 :     auto& op   = self->write_op_;
     300                 : 
     301               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     302                 :     {
     303                 :         ssize_t n;
     304                 :         do
     305                 :         {
     306              12 :             n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
     307               6 :                           static_cast<off_t>(self->offset_));
     308                 :         }
     309               6 :         while (n < 0 && errno == EINTR);
     310                 : 
     311               6 :         if (n >= 0)
     312                 :         {
     313               6 :             op.errn              = 0;
     314               6 :             op.bytes_transferred = static_cast<std::size_t>(n);
     315               6 :             self->offset_ += static_cast<std::uint64_t>(n);
     316                 :         }
     317                 :         else
     318                 :         {
     319 MIS           0 :             op.errn              = errno;
     320               0 :             op.bytes_transferred = 0;
     321                 :         }
     322                 :     }
     323                 : 
     324 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     325               6 :     self->svc_.post(&op);
     326               6 : }
     327                 : 
     328                 : } // namespace boost::corosio::detail
     329                 : 
     330                 : #endif // BOOST_COROSIO_POSIX
     331                 : 
     332                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3