LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_descriptor_state.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 74.3 % 74 55 19
Test Date: 2026-03-27 21:00:37 Functions: 100.0 % 4 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      15                 : 
      16                 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
      17                 : 
      18                 : #include <atomic>
      19                 : #include <cstdint>
      20                 : #include <memory>
      21                 : 
      22                 : #include <errno.h>
      23                 : #include <sys/socket.h>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /// Shared reactor event constants.
      28                 : /// These match epoll numeric values; kqueue maps its events to the same.
      29                 : static constexpr std::uint32_t reactor_event_read  = 0x001;
      30                 : static constexpr std::uint32_t reactor_event_write = 0x004;
      31                 : static constexpr std::uint32_t reactor_event_error = 0x008;
      32                 : 
      33                 : /** Per-descriptor state shared across reactor backends.
      34                 : 
      35                 :     Tracks pending operations for a file descriptor. The fd is registered
      36                 :     once with the reactor and stays registered until closed. Uses deferred
      37                 :     I/O: the reactor sets ready_events atomically, then enqueues this state.
      38                 :     When popped by the scheduler, invoke_deferred_io() performs I/O under
      39                 :     the mutex and queues completed ops.
      40                 : 
      41                 :     Non-template: uses reactor_op_base pointers so the scheduler and
      42                 :     descriptor_state code exist as a single copy in the binary regardless
      43                 :     of how many backends are compiled in.
      44                 : 
      45                 :     @par Thread Safety
      46                 :     The mutex protects operation pointers and ready flags. ready_events_
      47                 :     and is_enqueued_ are atomic for lock-free reactor access.
      48                 : */
      49                 : struct reactor_descriptor_state : scheduler_op
      50                 : {
      51                 :     /// Protects operation pointers and ready/cancel flags.
      52                 :     /// Becomes a no-op in single-threaded mode.
      53                 :     conditionally_enabled_mutex mutex{true};
      54                 : 
      55                 :     /// Pending read operation (guarded by `mutex`).
      56                 :     reactor_op_base* read_op = nullptr;
      57                 : 
      58                 :     /// Pending write operation (guarded by `mutex`).
      59                 :     reactor_op_base* write_op = nullptr;
      60                 : 
      61                 :     /// Pending connect operation (guarded by `mutex`).
      62                 :     reactor_op_base* connect_op = nullptr;
      63                 : 
      64                 :     /// True if a read edge event arrived before an op was registered.
      65                 :     bool read_ready = false;
      66                 : 
      67                 :     /// True if a write edge event arrived before an op was registered.
      68                 :     bool write_ready = false;
      69                 : 
      70                 :     /// Deferred read cancellation (IOCP-style cancel semantics).
      71                 :     bool read_cancel_pending = false;
      72                 : 
      73                 :     /// Deferred write cancellation (IOCP-style cancel semantics).
      74                 :     bool write_cancel_pending = false;
      75                 : 
      76                 :     /// Deferred connect cancellation (IOCP-style cancel semantics).
      77                 :     bool connect_cancel_pending = false;
      78                 : 
      79                 :     /// Event mask set during registration (no mutex needed).
      80                 :     std::uint32_t registered_events = 0;
      81                 : 
      82                 :     /// File descriptor this state tracks.
      83                 :     int fd = -1;
      84                 : 
      85                 :     /// Accumulated ready events (set by reactor, read by scheduler).
      86                 :     std::atomic<std::uint32_t> ready_events_{0};
      87                 : 
      88                 :     /// True while this state is queued in the scheduler's completed_ops.
      89                 :     std::atomic<bool> is_enqueued_{false};
      90                 : 
      91                 :     /// Owning scheduler for posting completions.
      92                 :     reactor_scheduler_base const* scheduler_ = nullptr;
      93                 : 
      94                 :     /// Prevents impl destruction while queued in the scheduler.
      95                 :     std::shared_ptr<void> impl_ref_;
      96                 : 
      97                 :     /// Add ready events atomically.
      98                 :     /// Release pairs with the consumer's acquire exchange on
      99                 :     /// ready_events_ so the consumer sees all flags. On x86 (TSO)
     100                 :     /// this compiles to the same LOCK OR as relaxed.
     101 HIT      169839 :     void add_ready_events(std::uint32_t ev) noexcept
     102                 :     {
     103          169839 :         ready_events_.fetch_or(ev, std::memory_order_release);
     104          169839 :     }
     105                 : 
     106                 :     /// Invoke deferred I/O and dispatch completions.
     107          169770 :     void operator()() override
     108                 :     {
     109          169770 :         invoke_deferred_io();
     110          169770 :     }
     111                 : 
     112                 :     /// Destroy without invoking.
     113                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     114                 :     /// the self-referential cycle set by close_socket().
     115              69 :     void destroy() override
     116                 :     {
     117              69 :         impl_ref_.reset();
     118              69 :     }
     119                 : 
     120                 :     /** Perform deferred I/O and queue completions.
     121                 : 
     122                 :         Performs I/O under the mutex and queues completed ops. EAGAIN
     123                 :         ops stay parked in their slot for re-delivery on the next
     124                 :         edge event.
     125                 :     */
     126                 :     void invoke_deferred_io();
     127                 : };
     128                 : 
     129                 : inline void
     130          169770 : reactor_descriptor_state::invoke_deferred_io()
     131                 : {
     132          169770 :     std::shared_ptr<void> prevent_impl_destruction;
     133          169770 :     op_queue local_ops;
     134                 : 
     135                 :     {
     136          169770 :         conditionally_enabled_mutex::scoped_lock lock(mutex);
     137                 : 
     138                 :         // Must clear is_enqueued_ and move impl_ref_ under the same
     139                 :         // lock that processes I/O. close_socket() checks is_enqueued_
     140                 :         // under this mutex — without atomicity between the flag store
     141                 :         // and the ref move, close_socket() could see is_enqueued_==false,
     142                 :         // skip setting impl_ref_, and destroy the impl under us.
     143          169770 :         prevent_impl_destruction = std::move(impl_ref_);
     144          169770 :         is_enqueued_.store(false, std::memory_order_release);
     145                 : 
     146          169770 :         std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
     147          169770 :         if (ev == 0)
     148                 :         {
     149                 :             // Mutex unlocks here; compensate for work_cleanup's decrement
     150 MIS           0 :             scheduler_->compensating_work_started();
     151               0 :             return;
     152                 :         }
     153                 : 
     154 HIT      169770 :         int err = 0;
     155          169770 :         if (ev & reactor_event_error)
     156                 :         {
     157               1 :             socklen_t len = sizeof(err);
     158               1 :             if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     159 MIS           0 :                 err = errno;
     160 HIT           1 :             if (err == 0)
     161 MIS           0 :                 err = EIO;
     162                 :         }
     163                 : 
     164 HIT      169770 :         if (ev & reactor_event_read)
     165                 :         {
     166          141269 :             if (read_op)
     167                 :             {
     168            8492 :                 auto* rd = read_op;
     169            8492 :                 if (err)
     170 MIS           0 :                     rd->complete(err, 0);
     171                 :                 else
     172 HIT        8492 :                     rd->perform_io();
     173                 : 
     174            8492 :                 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     175                 :                 {
     176              44 :                     rd->errn = 0;
     177                 :                 }
     178                 :                 else
     179                 :                 {
     180            8448 :                     read_op = nullptr;
     181            8448 :                     local_ops.push(rd);
     182                 :                 }
     183                 :             }
     184                 :             else
     185                 :             {
     186          132777 :                 read_ready = true;
     187                 :             }
     188                 :         }
     189          169770 :         if (ev & reactor_event_write)
     190                 :         {
     191           36355 :             bool had_write_op = (connect_op || write_op);
     192           36355 :             if (connect_op)
     193                 :             {
     194            8358 :                 auto* cn = connect_op;
     195            8358 :                 if (err)
     196               1 :                     cn->complete(err, 0);
     197                 :                 else
     198            8357 :                     cn->perform_io();
     199            8358 :                 connect_op = nullptr;
     200            8358 :                 local_ops.push(cn);
     201                 :             }
     202           36355 :             if (write_op)
     203                 :             {
     204 MIS           0 :                 auto* wr = write_op;
     205               0 :                 if (err)
     206               0 :                     wr->complete(err, 0);
     207                 :                 else
     208               0 :                     wr->perform_io();
     209                 : 
     210               0 :                 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     211                 :                 {
     212               0 :                     wr->errn = 0;
     213                 :                 }
     214                 :                 else
     215                 :                 {
     216               0 :                     write_op = nullptr;
     217               0 :                     local_ops.push(wr);
     218                 :                 }
     219                 :             }
     220 HIT       36355 :             if (!had_write_op)
     221           27997 :                 write_ready = true;
     222                 :         }
     223          169770 :         if (err)
     224                 :         {
     225               1 :             if (read_op)
     226                 :             {
     227 MIS           0 :                 read_op->complete(err, 0);
     228               0 :                 local_ops.push(std::exchange(read_op, nullptr));
     229                 :             }
     230 HIT           1 :             if (write_op)
     231                 :             {
     232 MIS           0 :                 write_op->complete(err, 0);
     233               0 :                 local_ops.push(std::exchange(write_op, nullptr));
     234                 :             }
     235 HIT           1 :             if (connect_op)
     236                 :             {
     237 MIS           0 :                 connect_op->complete(err, 0);
     238               0 :                 local_ops.push(std::exchange(connect_op, nullptr));
     239                 :             }
     240                 :         }
     241 HIT      169770 :     }
     242                 : 
     243                 :     // Execute first handler inline — the scheduler's work_cleanup
     244                 :     // accounts for this as the "consumed" work item
     245          169770 :     scheduler_op* first = local_ops.pop();
     246          169770 :     if (first)
     247                 :     {
     248           16806 :         scheduler_->post_deferred_completions(local_ops);
     249           16806 :         (*first)();
     250                 :     }
     251                 :     else
     252                 :     {
     253          152964 :         scheduler_->compensating_work_started();
     254                 :     }
     255          169770 : }
     256                 : 
     257                 : } // namespace boost::corosio::detail
     258                 : 
     259                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
        

Generated by: LCOV version 2.3