LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_random_access_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 86.6 % 142 123 19
Test Date: 2026-03-27 21:00:37 Functions: 100.0 % 16 16

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
      18                 : #include <boost/corosio/native/native_scheduler.hpp>
      19                 : #include <boost/corosio/detail/random_access_file_service.hpp>
      20                 : #include <boost/corosio/detail/thread_pool.hpp>
      21                 : 
      22                 : #include <limits>
      23                 : #include <mutex>
      24                 : #include <unordered_map>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Random-access file service for POSIX backends. */
      29                 : class BOOST_COROSIO_DECL posix_random_access_file_service final
      30                 :     : public random_access_file_service
      31                 : {
      32                 : public:
      33 HIT         515 :     posix_random_access_file_service(
      34                 :         capy::execution_context& ctx, scheduler& sched)
      35            1030 :         : sched_(&sched)
      36            1030 :         , pool_(get_or_create_pool(ctx))
      37             515 :         , single_threaded_(
      38             515 :               static_cast<native_scheduler&>(sched).single_threaded_)
      39                 :     {
      40             515 :     }
      41                 : 
      42            1030 :     ~posix_random_access_file_service() override = default;
      43                 : 
      44                 :     posix_random_access_file_service(
      45                 :         posix_random_access_file_service const&)            = delete;
      46                 :     posix_random_access_file_service& operator=(
      47                 :         posix_random_access_file_service const&) = delete;
      48                 : 
      49              24 :     io_object::implementation* construct() override
      50                 :     {
      51              24 :         auto ptr   = std::make_shared<posix_random_access_file>(*this);
      52              24 :         auto* impl = ptr.get();
      53                 : 
      54                 :         {
      55              24 :             std::lock_guard<std::mutex> lock(mutex_);
      56              24 :             file_list_.push_back(impl);
      57              24 :             file_ptrs_[impl] = std::move(ptr);
      58              24 :         }
      59                 : 
      60              24 :         return impl;
      61              24 :     }
      62                 : 
      63              24 :     void destroy(io_object::implementation* p) override
      64                 :     {
      65              24 :         auto& impl = static_cast<posix_random_access_file&>(*p);
      66              24 :         impl.cancel();
      67              24 :         impl.close_file();
      68              24 :         destroy_impl(impl);
      69              24 :     }
      70                 : 
      71              42 :     void close(io_object::handle& h) override
      72                 :     {
      73              42 :         if (h.get())
      74                 :         {
      75              42 :             auto& impl = static_cast<posix_random_access_file&>(*h.get());
      76              42 :             impl.cancel();
      77              42 :             impl.close_file();
      78                 :         }
      79              42 :     }
      80                 : 
      81              19 :     std::error_code open_file(
      82                 :         random_access_file::implementation& impl,
      83                 :         std::filesystem::path const& path,
      84                 :         file_base::flags mode) override
      85                 :     {
      86              19 :         if (single_threaded_)
      87 MIS           0 :             return std::make_error_code(std::errc::operation_not_supported);
      88 HIT          19 :         return static_cast<posix_random_access_file&>(impl).open_file(
      89              19 :             path, mode);
      90                 :     }
      91                 : 
      92             515 :     void shutdown() override
      93                 :     {
      94             515 :         std::lock_guard<std::mutex> lock(mutex_);
      95             515 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      96 MIS           0 :              impl       = file_list_.pop_front())
      97                 :         {
      98               0 :             impl->cancel();
      99               0 :             impl->close_file();
     100                 :         }
     101 HIT         515 :         file_ptrs_.clear();
     102             515 :     }
     103                 : 
     104              24 :     void destroy_impl(posix_random_access_file& impl)
     105                 :     {
     106              24 :         std::lock_guard<std::mutex> lock(mutex_);
     107              24 :         file_list_.remove(&impl);
     108              24 :         file_ptrs_.erase(&impl);
     109              24 :     }
     110                 : 
     111             126 :     void post(scheduler_op* op)
     112                 :     {
     113             126 :         sched_->post(op);
     114             126 :     }
     115                 : 
     116                 :     void work_started() noexcept
     117                 :     {
     118                 :         sched_->work_started();
     119                 :     }
     120                 : 
     121                 :     void work_finished() noexcept
     122                 :     {
     123                 :         sched_->work_finished();
     124                 :     }
     125                 : 
     126             126 :     thread_pool& pool() noexcept
     127                 :     {
     128             126 :         return pool_;
     129                 :     }
     130                 : 
     131                 : private:
     132             515 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     133                 :     {
     134             515 :         auto* p = ctx.find_service<thread_pool>();
     135             515 :         if (p)
     136             515 :             return *p;
     137 MIS           0 :         return ctx.make_service<thread_pool>();
     138                 :     }
     139                 : 
     140                 :     scheduler* sched_;
     141                 :     thread_pool& pool_;
     142                 :     bool single_threaded_;
     143                 :     std::mutex mutex_;
     144                 :     intrusive_list<posix_random_access_file> file_list_;
     145                 :     std::unordered_map<
     146                 :         posix_random_access_file*,
     147                 :         std::shared_ptr<posix_random_access_file>>
     148                 :         file_ptrs_;
     149                 : };
     150                 : 
     151                 : /** Get or create the random-access file service for the given context. */
     152                 : inline posix_random_access_file_service&
     153 HIT         515 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
     154                 : {
     155             515 :     return ctx.make_service<posix_random_access_file_service>(sched);
     156                 : }
     157                 : 
     158                 : // ---------------------------------------------------------------------------
     159                 : // posix_random_access_file inline implementations (require complete service)
     160                 : // ---------------------------------------------------------------------------
     161                 : 
     162                 : inline std::coroutine_handle<>
     163             116 : posix_random_access_file::read_some_at(
     164                 :     std::uint64_t offset,
     165                 :     std::coroutine_handle<> h,
     166                 :     capy::executor_ref ex,
     167                 :     buffer_param param,
     168                 :     std::stop_token token,
     169                 :     std::error_code* ec,
     170                 :     std::size_t* bytes_out)
     171                 : {
     172             116 :     capy::mutable_buffer bufs[max_buffers];
     173             116 :     auto count = param.copy_to(bufs, max_buffers);
     174                 : 
     175             116 :     if (count == 0)
     176                 :     {
     177 MIS           0 :         *ec        = {};
     178               0 :         *bytes_out = 0;
     179               0 :         return h;
     180                 :     }
     181                 : 
     182 HIT         116 :     auto* op = new raf_op();
     183             116 :     op->is_read = true;
     184             116 :     op->offset  = offset;
     185                 : 
     186             116 :     op->iovec_count = static_cast<int>(count);
     187             232 :     for (int i = 0; i < op->iovec_count; ++i)
     188                 :     {
     189             116 :         op->iovecs[i].iov_base = bufs[i].data();
     190             116 :         op->iovecs[i].iov_len  = bufs[i].size();
     191                 :     }
     192                 : 
     193             116 :     op->h         = h;
     194             116 :     op->ex        = ex;
     195             116 :     op->ec_out    = ec;
     196             116 :     op->bytes_out = bytes_out;
     197             116 :     op->file_     = this;
     198             116 :     op->file_ref  = this->shared_from_this();
     199             116 :     op->start(token);
     200                 : 
     201             116 :     op->ex.on_work_started();
     202                 : 
     203                 :     {
     204             116 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     205             116 :         outstanding_ops_.push_back(op);
     206             116 :     }
     207                 : 
     208             116 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     209             116 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     210                 :     {
     211 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     212               0 :         svc_.post(static_cast<scheduler_op*>(op));
     213                 :     }
     214 HIT         116 :     return std::noop_coroutine();
     215                 : }
     216                 : 
     217                 : inline std::coroutine_handle<>
     218              10 : posix_random_access_file::write_some_at(
     219                 :     std::uint64_t offset,
     220                 :     std::coroutine_handle<> h,
     221                 :     capy::executor_ref ex,
     222                 :     buffer_param param,
     223                 :     std::stop_token token,
     224                 :     std::error_code* ec,
     225                 :     std::size_t* bytes_out)
     226                 : {
     227              10 :     capy::mutable_buffer bufs[max_buffers];
     228              10 :     auto count = param.copy_to(bufs, max_buffers);
     229                 : 
     230              10 :     if (count == 0)
     231                 :     {
     232 MIS           0 :         *ec        = {};
     233               0 :         *bytes_out = 0;
     234               0 :         return h;
     235                 :     }
     236                 : 
     237 HIT          10 :     auto* op = new raf_op();
     238              10 :     op->is_read = false;
     239              10 :     op->offset  = offset;
     240                 : 
     241              10 :     op->iovec_count = static_cast<int>(count);
     242              20 :     for (int i = 0; i < op->iovec_count; ++i)
     243                 :     {
     244              10 :         op->iovecs[i].iov_base = bufs[i].data();
     245              10 :         op->iovecs[i].iov_len  = bufs[i].size();
     246                 :     }
     247                 : 
     248              10 :     op->h         = h;
     249              10 :     op->ex        = ex;
     250              10 :     op->ec_out    = ec;
     251              10 :     op->bytes_out = bytes_out;
     252              10 :     op->file_     = this;
     253              10 :     op->file_ref  = this->shared_from_this();
     254              10 :     op->start(token);
     255                 : 
     256              10 :     op->ex.on_work_started();
     257                 : 
     258                 :     {
     259              10 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     260              10 :         outstanding_ops_.push_back(op);
     261              10 :     }
     262                 : 
     263              10 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     264              10 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     265                 :     {
     266 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     267               0 :         svc_.post(static_cast<scheduler_op*>(op));
     268                 :     }
     269 HIT          10 :     return std::noop_coroutine();
     270                 : }
     271                 : 
     272                 : // -- raf_op thread-pool work function --
     273                 : 
     274                 : inline void
     275             126 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
     276                 : {
     277             126 :     auto* op   = static_cast<raf_op*>(w);
     278             126 :     auto* self = op->file_;
     279                 : 
     280             126 :     if (op->cancelled.load(std::memory_order_acquire))
     281                 :     {
     282               1 :         op->errn              = ECANCELED;
     283               1 :         op->bytes_transferred = 0;
     284                 :     }
     285             250 :     else if (op->offset >
     286             125 :              static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
     287                 :     {
     288 MIS           0 :         op->errn              = EOVERFLOW;
     289               0 :         op->bytes_transferred = 0;
     290                 :     }
     291                 :     else
     292                 :     {
     293                 :         ssize_t n;
     294 HIT         125 :         if (op->is_read)
     295                 :         {
     296                 :             do
     297                 :             {
     298             230 :                 n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
     299             115 :                              static_cast<off_t>(op->offset));
     300                 :             }
     301             115 :             while (n < 0 && errno == EINTR);
     302                 :         }
     303                 :         else
     304                 :         {
     305                 :             do
     306                 :             {
     307              20 :                 n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
     308              10 :                               static_cast<off_t>(op->offset));
     309                 :             }
     310              10 :             while (n < 0 && errno == EINTR);
     311                 :         }
     312                 : 
     313             125 :         if (n >= 0)
     314                 :         {
     315             125 :             op->errn              = 0;
     316             125 :             op->bytes_transferred = static_cast<std::size_t>(n);
     317                 :         }
     318                 :         else
     319                 :         {
     320 MIS           0 :             op->errn              = errno;
     321               0 :             op->bytes_transferred = 0;
     322                 :         }
     323                 :     }
     324                 : 
     325 HIT         126 :     self->svc_.post(static_cast<scheduler_op*>(op));
     326             126 : }
     327                 : 
     328                 : } // namespace boost::corosio::detail
     329                 : 
     330                 : #endif // BOOST_COROSIO_POSIX
     331                 : 
     332                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3