1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/select/select_op.hpp>
22  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  
#include <errno.h>
34  
#include <errno.h>
35  
#include <fcntl.h>
35  
#include <fcntl.h>
36  

36  

37  
#include <atomic>
37  
#include <atomic>
38  
#include <chrono>
38  
#include <chrono>
39  
#include <cstdint>
39  
#include <cstdint>
40  
#include <limits>
40  
#include <limits>
41  
#include <mutex>
41  
#include <mutex>
42  
#include <unordered_map>
42  
#include <unordered_map>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45  

45  

46  
struct select_op;
46  
struct select_op;
47  
struct select_descriptor_state;
47  
struct select_descriptor_state;
48  

48  

49  
/** POSIX scheduler using select() for I/O multiplexing.
49  
/** POSIX scheduler using select() for I/O multiplexing.
50  

50  

51  
    This scheduler implements the scheduler interface using the POSIX select()
51  
    This scheduler implements the scheduler interface using the POSIX select()
52  
    call for I/O event notification. It inherits the shared reactor threading
52  
    call for I/O event notification. It inherits the shared reactor threading
53  
    model from reactor_scheduler_base: signal state machine, inline completion
53  
    model from reactor_scheduler_base: signal state machine, inline completion
54  
    budget, work counting, and the do_one event loop.
54  
    budget, work counting, and the do_one event loop.
55  

55  

56  
    The design mirrors epoll_scheduler for behavioral consistency:
56  
    The design mirrors epoll_scheduler for behavioral consistency:
57  
    - Same single-reactor thread coordination model
57  
    - Same single-reactor thread coordination model
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59  
    - Same timer integration pattern
59  
    - Same timer integration pattern
60  

60  

61  
    Known Limitations:
61  
    Known Limitations:
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
63  
    - O(n) scanning: rebuilds fd_sets each iteration
63  
    - O(n) scanning: rebuilds fd_sets each iteration
64  
    - Level-triggered only (no edge-triggered mode)
64  
    - Level-triggered only (no edge-triggered mode)
65  

65  

66  
    @par Thread Safety
66  
    @par Thread Safety
67  
    All public member functions are thread-safe.
67  
    All public member functions are thread-safe.
68  
*/
68  
*/
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
70  
{
70  
{
71  
public:
71  
public:
72  
    /** Construct the scheduler.
72  
    /** Construct the scheduler.
73  

73  

74  
        Creates a self-pipe for reactor interruption.
74  
        Creates a self-pipe for reactor interruption.
75  

75  

76  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
77  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
78  
    */
78  
    */
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80  

80  

81  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
82  
    ~select_scheduler() override;
82  
    ~select_scheduler() override;
83  

83  

84  
    select_scheduler(select_scheduler const&)            = delete;
84  
    select_scheduler(select_scheduler const&)            = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
86  

86  

87  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
88  
    void shutdown() override;
88  
    void shutdown() override;
89  

89  

90  
    /** Return the maximum file descriptor value supported.
90  
    /** Return the maximum file descriptor value supported.
91  

91  

92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
94  
        will fail with EINVAL.
94  
        will fail with EINVAL.
95  

95  

96  
        @return The maximum supported file descriptor value.
96  
        @return The maximum supported file descriptor value.
97  
    */
97  
    */
98  
    static constexpr int max_fd() noexcept
98  
    static constexpr int max_fd() noexcept
99  
    {
99  
    {
100  
        return FD_SETSIZE - 1;
100  
        return FD_SETSIZE - 1;
101  
    }
101  
    }
102  

102  

103  
    /** Register a descriptor for persistent monitoring.
103  
    /** Register a descriptor for persistent monitoring.
104  

104  

105  
        The fd is added to the registered_descs_ map and will be
105  
        The fd is added to the registered_descs_ map and will be
106  
        included in subsequent select() calls. The reactor is
106  
        included in subsequent select() calls. The reactor is
107  
        interrupted so a blocked select() rebuilds its fd_sets.
107  
        interrupted so a blocked select() rebuilds its fd_sets.
108  

108  

109  
        @param fd The file descriptor to register.
109  
        @param fd The file descriptor to register.
110  
        @param desc Pointer to descriptor state for this fd.
110  
        @param desc Pointer to descriptor state for this fd.
111  
    */
111  
    */
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
113  

113  

114  
    /** Deregister a persistently registered descriptor.
114  
    /** Deregister a persistently registered descriptor.
115  

115  

116  
        @param fd The file descriptor to deregister.
116  
        @param fd The file descriptor to deregister.
117  
    */
117  
    */
118  
    void deregister_descriptor(int fd) const;
118  
    void deregister_descriptor(int fd) const;
119  

119  

120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
121  

121  

122  
        Called when a write or connect op is registered after
122  
        Called when a write or connect op is registered after
123  
        the reactor's snapshot was taken. Without this, select()
123  
        the reactor's snapshot was taken. Without this, select()
124  
        may block not watching for writability on the fd.
124  
        may block not watching for writability on the fd.
125  
    */
125  
    */
126  
    void notify_reactor() const;
126  
    void notify_reactor() const;
127  

127  

128  
private:
128  
private:
129  
    void
129  
    void
130 -
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
130 +
    run_task(lock_type& lock, context_type* ctx) override;
131  
    void interrupt_reactor() const override;
131  
    void interrupt_reactor() const override;
132  
    long calculate_timeout(long requested_timeout_us) const;
132  
    long calculate_timeout(long requested_timeout_us) const;
133  

133  

134  
    // Self-pipe for interrupting select()
134  
    // Self-pipe for interrupting select()
135  
    int pipe_fds_[2]; // [0]=read, [1]=write
135  
    int pipe_fds_[2]; // [0]=read, [1]=write
136  

136  

137  
    // Per-fd tracking for fd_set building
137  
    // Per-fd tracking for fd_set building
138  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
138  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
139  
    mutable int max_fd_ = -1;
139  
    mutable int max_fd_ = -1;
140  
};
140  
};
141  

141  

142  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
142  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
143  
    : pipe_fds_{-1, -1}
143  
    : pipe_fds_{-1, -1}
144  
    , max_fd_(-1)
144  
    , max_fd_(-1)
145  
{
145  
{
146  
    if (::pipe(pipe_fds_) < 0)
146  
    if (::pipe(pipe_fds_) < 0)
147  
        detail::throw_system_error(make_err(errno), "pipe");
147  
        detail::throw_system_error(make_err(errno), "pipe");
148  

148  

149  
    for (int i = 0; i < 2; ++i)
149  
    for (int i = 0; i < 2; ++i)
150  
    {
150  
    {
151  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
151  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
152  
        if (flags == -1)
152  
        if (flags == -1)
153  
        {
153  
        {
154  
            int errn = errno;
154  
            int errn = errno;
155  
            ::close(pipe_fds_[0]);
155  
            ::close(pipe_fds_[0]);
156  
            ::close(pipe_fds_[1]);
156  
            ::close(pipe_fds_[1]);
157  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
157  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158  
        }
158  
        }
159  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
159  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160  
        {
160  
        {
161  
            int errn = errno;
161  
            int errn = errno;
162  
            ::close(pipe_fds_[0]);
162  
            ::close(pipe_fds_[0]);
163  
            ::close(pipe_fds_[1]);
163  
            ::close(pipe_fds_[1]);
164  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
164  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165  
        }
165  
        }
166  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
166  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167  
        {
167  
        {
168  
            int errn = errno;
168  
            int errn = errno;
169  
            ::close(pipe_fds_[0]);
169  
            ::close(pipe_fds_[0]);
170  
            ::close(pipe_fds_[1]);
170  
            ::close(pipe_fds_[1]);
171  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
171  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172  
        }
172  
        }
173  
    }
173  
    }
174  

174  

175  
    timer_svc_ = &get_timer_service(ctx, *this);
175  
    timer_svc_ = &get_timer_service(ctx, *this);
176  
    timer_svc_->set_on_earliest_changed(
176  
    timer_svc_->set_on_earliest_changed(
177  
        timer_service::callback(this, [](void* p) {
177  
        timer_service::callback(this, [](void* p) {
178  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
178  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
179  
        }));
179  
        }));
180  

180  

181  
    get_resolver_service(ctx, *this);
181  
    get_resolver_service(ctx, *this);
182  
    get_signal_service(ctx, *this);
182  
    get_signal_service(ctx, *this);
183  
    get_stream_file_service(ctx, *this);
183  
    get_stream_file_service(ctx, *this);
184  
    get_random_access_file_service(ctx, *this);
184  
    get_random_access_file_service(ctx, *this);
185  

185  

186  
    completed_ops_.push(&task_op_);
186  
    completed_ops_.push(&task_op_);
187  
}
187  
}
188  

188  

189  
inline select_scheduler::~select_scheduler()
189  
inline select_scheduler::~select_scheduler()
190  
{
190  
{
191  
    if (pipe_fds_[0] >= 0)
191  
    if (pipe_fds_[0] >= 0)
192  
        ::close(pipe_fds_[0]);
192  
        ::close(pipe_fds_[0]);
193  
    if (pipe_fds_[1] >= 0)
193  
    if (pipe_fds_[1] >= 0)
194  
        ::close(pipe_fds_[1]);
194  
        ::close(pipe_fds_[1]);
195  
}
195  
}
196  

196  

197  
inline void
197  
inline void
198  
select_scheduler::shutdown()
198  
select_scheduler::shutdown()
199  
{
199  
{
200  
    shutdown_drain();
200  
    shutdown_drain();
201  

201  

202  
    if (pipe_fds_[1] >= 0)
202  
    if (pipe_fds_[1] >= 0)
203  
        interrupt_reactor();
203  
        interrupt_reactor();
204  
}
204  
}
205  

205  

206  
inline void
206  
inline void
207  
select_scheduler::register_descriptor(
207  
select_scheduler::register_descriptor(
208  
    int fd, select_descriptor_state* desc) const
208  
    int fd, select_descriptor_state* desc) const
209  
{
209  
{
210  
    if (fd < 0 || fd >= FD_SETSIZE)
210  
    if (fd < 0 || fd >= FD_SETSIZE)
211  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
211  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212  

212  

213  
    desc->registered_events = reactor_event_read | reactor_event_write;
213  
    desc->registered_events = reactor_event_read | reactor_event_write;
214  
    desc->fd                = fd;
214  
    desc->fd                = fd;
215  
    desc->scheduler_        = this;
215  
    desc->scheduler_        = this;
 
216 +
    desc->mutex.set_enabled(!single_threaded_);
216  
    desc->ready_events_.store(0, std::memory_order_relaxed);
217  
    desc->ready_events_.store(0, std::memory_order_relaxed);
217  

218  

218  
    {
219  
    {
219 -
        std::lock_guard lock(desc->mutex);
220 +
        conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
220  
        desc->impl_ref_.reset();
221  
        desc->impl_ref_.reset();
221  
        desc->read_ready  = false;
222  
        desc->read_ready  = false;
222  
        desc->write_ready = false;
223  
        desc->write_ready = false;
223  
    }
224  
    }
224  

225  

225  
    {
226  
    {
226 -
        std::lock_guard lock(mutex_);
227 +
        mutex_type::scoped_lock lock(mutex_);
227  
        registered_descs_[fd] = desc;
228  
        registered_descs_[fd] = desc;
228  
        if (fd > max_fd_)
229  
        if (fd > max_fd_)
229  
            max_fd_ = fd;
230  
            max_fd_ = fd;
230  
    }
231  
    }
231  

232  

232  
    interrupt_reactor();
233  
    interrupt_reactor();
233  
}
234  
}
234  

235  

235  
inline void
236  
inline void
236  
select_scheduler::deregister_descriptor(int fd) const
237  
select_scheduler::deregister_descriptor(int fd) const
237  
{
238  
{
238 -
    std::lock_guard lock(mutex_);
239 +
    mutex_type::scoped_lock lock(mutex_);
239  

240  

240  
    auto it = registered_descs_.find(fd);
241  
    auto it = registered_descs_.find(fd);
241  
    if (it == registered_descs_.end())
242  
    if (it == registered_descs_.end())
242  
        return;
243  
        return;
243  

244  

244  
    registered_descs_.erase(it);
245  
    registered_descs_.erase(it);
245  

246  

246  
    if (fd == max_fd_)
247  
    if (fd == max_fd_)
247  
    {
248  
    {
248  
        max_fd_ = pipe_fds_[0];
249  
        max_fd_ = pipe_fds_[0];
249  
        for (auto& [registered_fd, state] : registered_descs_)
250  
        for (auto& [registered_fd, state] : registered_descs_)
250  
        {
251  
        {
251  
            if (registered_fd > max_fd_)
252  
            if (registered_fd > max_fd_)
252  
                max_fd_ = registered_fd;
253  
                max_fd_ = registered_fd;
253  
        }
254  
        }
254  
    }
255  
    }
255  
}
256  
}
256  

257  

257  
inline void
258  
inline void
258  
select_scheduler::notify_reactor() const
259  
select_scheduler::notify_reactor() const
259  
{
260  
{
260  
    interrupt_reactor();
261  
    interrupt_reactor();
261  
}
262  
}
262  

263  

263  
inline void
264  
inline void
264  
select_scheduler::interrupt_reactor() const
265  
select_scheduler::interrupt_reactor() const
265  
{
266  
{
266  
    char byte               = 1;
267  
    char byte               = 1;
267  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
268  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
268  
}
269  
}
269  

270  

270  
inline long
271  
inline long
271  
select_scheduler::calculate_timeout(long requested_timeout_us) const
272  
select_scheduler::calculate_timeout(long requested_timeout_us) const
272  
{
273  
{
273  
    if (requested_timeout_us == 0)
274  
    if (requested_timeout_us == 0)
274  
        return 0;
275  
        return 0;
275  

276  

276  
    auto nearest = timer_svc_->nearest_expiry();
277  
    auto nearest = timer_svc_->nearest_expiry();
277  
    if (nearest == timer_service::time_point::max())
278  
    if (nearest == timer_service::time_point::max())
278  
        return requested_timeout_us;
279  
        return requested_timeout_us;
279  

280  

280  
    auto now = std::chrono::steady_clock::now();
281  
    auto now = std::chrono::steady_clock::now();
281  
    if (nearest <= now)
282  
    if (nearest <= now)
282  
        return 0;
283  
        return 0;
283  

284  

284  
    auto timer_timeout_us =
285  
    auto timer_timeout_us =
285  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
286  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
286  
            .count();
287  
            .count();
287  

288  

288  
    constexpr auto long_max =
289  
    constexpr auto long_max =
289  
        static_cast<long long>((std::numeric_limits<long>::max)());
290  
        static_cast<long long>((std::numeric_limits<long>::max)());
290  
    auto capped_timer_us =
291  
    auto capped_timer_us =
291  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
292  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
292  
                              static_cast<long long>(0)),
293  
                              static_cast<long long>(0)),
293  
                   long_max);
294  
                   long_max);
294  

295  

295  
    if (requested_timeout_us < 0)
296  
    if (requested_timeout_us < 0)
296  
        return static_cast<long>(capped_timer_us);
297  
        return static_cast<long>(capped_timer_us);
297  

298  

298  
    return static_cast<long>(
299  
    return static_cast<long>(
299  
        (std::min)(static_cast<long long>(requested_timeout_us),
300  
        (std::min)(static_cast<long long>(requested_timeout_us),
300  
                   capped_timer_us));
301  
                   capped_timer_us));
301  
}
302  
}
302  

303  

303  
inline void
304  
inline void
304  
select_scheduler::run_task(
305  
select_scheduler::run_task(
305 -
    std::unique_lock<std::mutex>& lock, context_type* ctx)
306 +
    lock_type& lock, context_type* ctx)
306  
{
307  
{
307  
    long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
308  
    long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
308  

309  

309  
    // Snapshot registered descriptors while holding lock.
310  
    // Snapshot registered descriptors while holding lock.
310  
    // Record which fds need write monitoring to avoid a hot loop:
311  
    // Record which fds need write monitoring to avoid a hot loop:
311  
    // select is level-triggered so writable sockets (nearly always
312  
    // select is level-triggered so writable sockets (nearly always
312  
    // writable) would cause select() to return immediately every
313  
    // writable) would cause select() to return immediately every
313  
    // iteration if unconditionally added to write_fds.
314  
    // iteration if unconditionally added to write_fds.
314  
    struct fd_entry
315  
    struct fd_entry
315  
    {
316  
    {
316  
        int fd;
317  
        int fd;
317  
        select_descriptor_state* desc;
318  
        select_descriptor_state* desc;
318  
        bool needs_write;
319  
        bool needs_write;
319  
    };
320  
    };
320  
    fd_entry snapshot[FD_SETSIZE];
321  
    fd_entry snapshot[FD_SETSIZE];
321  
    int snapshot_count = 0;
322  
    int snapshot_count = 0;
322  

323  

323  
    for (auto& [fd, desc] : registered_descs_)
324  
    for (auto& [fd, desc] : registered_descs_)
324  
    {
325  
    {
325  
        if (snapshot_count < FD_SETSIZE)
326  
        if (snapshot_count < FD_SETSIZE)
326  
        {
327  
        {
327 -
            std::lock_guard desc_lock(desc->mutex);
328 +
            conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
328  
            snapshot[snapshot_count].fd   = fd;
329  
            snapshot[snapshot_count].fd   = fd;
329  
            snapshot[snapshot_count].desc = desc;
330  
            snapshot[snapshot_count].desc = desc;
330  
            snapshot[snapshot_count].needs_write =
331  
            snapshot[snapshot_count].needs_write =
331  
                (desc->write_op || desc->connect_op);
332  
                (desc->write_op || desc->connect_op);
332  
            ++snapshot_count;
333  
            ++snapshot_count;
333  
        }
334  
        }
334  
    }
335  
    }
335  

336  

336  
    if (lock.owns_lock())
337  
    if (lock.owns_lock())
337  
        lock.unlock();
338  
        lock.unlock();
338  

339  

339  
    task_cleanup on_exit{this, &lock, ctx};
340  
    task_cleanup on_exit{this, &lock, ctx};
340  

341  

341  
    fd_set read_fds, write_fds, except_fds;
342  
    fd_set read_fds, write_fds, except_fds;
342  
    FD_ZERO(&read_fds);
343  
    FD_ZERO(&read_fds);
343  
    FD_ZERO(&write_fds);
344  
    FD_ZERO(&write_fds);
344  
    FD_ZERO(&except_fds);
345  
    FD_ZERO(&except_fds);
345  

346  

346  
    FD_SET(pipe_fds_[0], &read_fds);
347  
    FD_SET(pipe_fds_[0], &read_fds);
347  
    int nfds = pipe_fds_[0];
348  
    int nfds = pipe_fds_[0];
348  

349  

349  
    for (int i = 0; i < snapshot_count; ++i)
350  
    for (int i = 0; i < snapshot_count; ++i)
350  
    {
351  
    {
351  
        int fd = snapshot[i].fd;
352  
        int fd = snapshot[i].fd;
352  
        FD_SET(fd, &read_fds);
353  
        FD_SET(fd, &read_fds);
353  
        if (snapshot[i].needs_write)
354  
        if (snapshot[i].needs_write)
354  
            FD_SET(fd, &write_fds);
355  
            FD_SET(fd, &write_fds);
355  
        FD_SET(fd, &except_fds);
356  
        FD_SET(fd, &except_fds);
356  
        if (fd > nfds)
357  
        if (fd > nfds)
357  
            nfds = fd;
358  
            nfds = fd;
358  
    }
359  
    }
359  

360  

360  
    struct timeval tv;
361  
    struct timeval tv;
361  
    struct timeval* tv_ptr = nullptr;
362  
    struct timeval* tv_ptr = nullptr;
362  
    if (effective_timeout_us >= 0)
363  
    if (effective_timeout_us >= 0)
363  
    {
364  
    {
364  
        tv.tv_sec  = effective_timeout_us / 1000000;
365  
        tv.tv_sec  = effective_timeout_us / 1000000;
365  
        tv.tv_usec = effective_timeout_us % 1000000;
366  
        tv.tv_usec = effective_timeout_us % 1000000;
366  
        tv_ptr     = &tv;
367  
        tv_ptr     = &tv;
367  
    }
368  
    }
368  

369  

369  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
370  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
370  

371  

371  
    // EINTR: signal interrupted select(), just retry.
372  
    // EINTR: signal interrupted select(), just retry.
372  
    // EBADF: an fd was closed between snapshot and select(); retry
373  
    // EBADF: an fd was closed between snapshot and select(); retry
373  
    // with a fresh snapshot from registered_descs_.
374  
    // with a fresh snapshot from registered_descs_.
374  
    if (ready < 0)
375  
    if (ready < 0)
375  
    {
376  
    {
376  
        if (errno == EINTR || errno == EBADF)
377  
        if (errno == EINTR || errno == EBADF)
377  
            return;
378  
            return;
378  
        detail::throw_system_error(make_err(errno), "select");
379  
        detail::throw_system_error(make_err(errno), "select");
379  
    }
380  
    }
380  

381  

381  
    // Process timers outside the lock
382  
    // Process timers outside the lock
382  
    timer_svc_->process_expired();
383  
    timer_svc_->process_expired();
383  

384  

384  
    op_queue local_ops;
385  
    op_queue local_ops;
385  

386  

386  
    if (ready > 0)
387  
    if (ready > 0)
387  
    {
388  
    {
388  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
389  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
389  
        {
390  
        {
390  
            char buf[256];
391  
            char buf[256];
391  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
392  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
392  
            {
393  
            {
393  
            }
394  
            }
394  
        }
395  
        }
395  

396  

396  
        for (int i = 0; i < snapshot_count; ++i)
397  
        for (int i = 0; i < snapshot_count; ++i)
397  
        {
398  
        {
398  
            int fd                        = snapshot[i].fd;
399  
            int fd                        = snapshot[i].fd;
399  
            select_descriptor_state* desc = snapshot[i].desc;
400  
            select_descriptor_state* desc = snapshot[i].desc;
400  

401  

401  
            std::uint32_t flags = 0;
402  
            std::uint32_t flags = 0;
402  
            if (FD_ISSET(fd, &read_fds))
403  
            if (FD_ISSET(fd, &read_fds))
403  
                flags |= reactor_event_read;
404  
                flags |= reactor_event_read;
404  
            if (FD_ISSET(fd, &write_fds))
405  
            if (FD_ISSET(fd, &write_fds))
405  
                flags |= reactor_event_write;
406  
                flags |= reactor_event_write;
406  
            if (FD_ISSET(fd, &except_fds))
407  
            if (FD_ISSET(fd, &except_fds))
407  
                flags |= reactor_event_error;
408  
                flags |= reactor_event_error;
408  

409  

409  
            if (flags == 0)
410  
            if (flags == 0)
410  
                continue;
411  
                continue;
411  

412  

412  
            desc->add_ready_events(flags);
413  
            desc->add_ready_events(flags);
413  

414  

414  
            bool expected = false;
415  
            bool expected = false;
415  
            if (desc->is_enqueued_.compare_exchange_strong(
416  
            if (desc->is_enqueued_.compare_exchange_strong(
416  
                    expected, true, std::memory_order_release,
417  
                    expected, true, std::memory_order_release,
417  
                    std::memory_order_relaxed))
418  
                    std::memory_order_relaxed))
418  
            {
419  
            {
419  
                local_ops.push(desc);
420  
                local_ops.push(desc);
420  
            }
421  
            }
421  
        }
422  
        }
422  
    }
423  
    }
423  

424  

424  
    lock.lock();
425  
    lock.lock();
425  

426  

426  
    if (!local_ops.empty())
427  
    if (!local_ops.empty())
427  
        completed_ops_.splice(local_ops);
428  
        completed_ops_.splice(local_ops);
428  
}
429  
}
429  

430  

430  
} // namespace boost::corosio::detail
431  
} // namespace boost::corosio::detail
431  

432  

432  
#endif // BOOST_COROSIO_HAS_SELECT
433  
#endif // BOOST_COROSIO_HAS_SELECT
433  

434  

434  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
435  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP