1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/select/select_op.hpp>
22  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  
#include <errno.h>
34  
#include <errno.h>
35  
#include <fcntl.h>
35  
#include <fcntl.h>
36  

36  

37  
#include <atomic>
37  
#include <atomic>
38  
#include <chrono>
38  
#include <chrono>
39  
#include <cstdint>
39  
#include <cstdint>
40  
#include <limits>
40  
#include <limits>
41  
#include <mutex>
41  
#include <mutex>
42  
#include <unordered_map>
42  
#include <unordered_map>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45  

45  

46  
struct select_op;
46  
struct select_op;
47  
struct select_descriptor_state;
47  
struct select_descriptor_state;
48  

48  

49  
/** POSIX scheduler using select() for I/O multiplexing.
49  
/** POSIX scheduler using select() for I/O multiplexing.
50  

50  

51  
    This scheduler implements the scheduler interface using the POSIX select()
51  
    This scheduler implements the scheduler interface using the POSIX select()
52  
    call for I/O event notification. It inherits the shared reactor threading
52  
    call for I/O event notification. It inherits the shared reactor threading
53  
    model from reactor_scheduler_base: signal state machine, inline completion
53  
    model from reactor_scheduler_base: signal state machine, inline completion
54  
    budget, work counting, and the do_one event loop.
54  
    budget, work counting, and the do_one event loop.
55  

55  

56  
    The design mirrors epoll_scheduler for behavioral consistency:
56  
    The design mirrors epoll_scheduler for behavioral consistency:
57  
    - Same single-reactor thread coordination model
57  
    - Same single-reactor thread coordination model
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59  
    - Same timer integration pattern
59  
    - Same timer integration pattern
60  

60  

61  
    Known Limitations:
61  
    Known Limitations:
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
63  
    - O(n) scanning: rebuilds fd_sets each iteration
63  
    - O(n) scanning: rebuilds fd_sets each iteration
64  
    - Level-triggered only (no edge-triggered mode)
64  
    - Level-triggered only (no edge-triggered mode)
65  

65  

66  
    @par Thread Safety
66  
    @par Thread Safety
67  
    All public member functions are thread-safe.
67  
    All public member functions are thread-safe.
68  
*/
68  
*/
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
70  
{
70  
{
71  
public:
71  
public:
72  
    /** Construct the scheduler.
72  
    /** Construct the scheduler.
73  

73  

74  
        Creates a self-pipe for reactor interruption.
74  
        Creates a self-pipe for reactor interruption.
75  

75  

76  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
77  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
78  
    */
78  
    */
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80  

80  

81  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
82  
    ~select_scheduler() override;
82  
    ~select_scheduler() override;
83  

83  

84  
    select_scheduler(select_scheduler const&)            = delete;
84  
    select_scheduler(select_scheduler const&)            = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
86  

86  

87  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
88  
    void shutdown() override;
88  
    void shutdown() override;
89  

89  

90  
    /** Return the maximum file descriptor value supported.
90  
    /** Return the maximum file descriptor value supported.
91  

91  

92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
94  
        will fail with EINVAL.
94  
        will fail with EINVAL.
95  

95  

96  
        @return The maximum supported file descriptor value.
96  
        @return The maximum supported file descriptor value.
97  
    */
97  
    */
98  
    static constexpr int max_fd() noexcept
98  
    static constexpr int max_fd() noexcept
99  
    {
99  
    {
100  
        return FD_SETSIZE - 1;
100  
        return FD_SETSIZE - 1;
101  
    }
101  
    }
102  

102  

103  
    /** Register a descriptor for persistent monitoring.
103  
    /** Register a descriptor for persistent monitoring.
104  

104  

105  
        The fd is added to the registered_descs_ map and will be
105  
        The fd is added to the registered_descs_ map and will be
106  
        included in subsequent select() calls. The reactor is
106  
        included in subsequent select() calls. The reactor is
107  
        interrupted so a blocked select() rebuilds its fd_sets.
107  
        interrupted so a blocked select() rebuilds its fd_sets.
108  

108  

109  
        @param fd The file descriptor to register.
109  
        @param fd The file descriptor to register.
110  
        @param desc Pointer to descriptor state for this fd.
110  
        @param desc Pointer to descriptor state for this fd.
111  
    */
111  
    */
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
113  

113  

114  
    /** Deregister a persistently registered descriptor.
114  
    /** Deregister a persistently registered descriptor.
115  

115  

116  
        @param fd The file descriptor to deregister.
116  
        @param fd The file descriptor to deregister.
117  
    */
117  
    */
118  
    void deregister_descriptor(int fd) const;
118  
    void deregister_descriptor(int fd) const;
119  

119  

120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
121  

121  

122  
        Called when a write or connect op is registered after
122  
        Called when a write or connect op is registered after
123  
        the reactor's snapshot was taken. Without this, select()
123  
        the reactor's snapshot was taken. Without this, select()
124  
        may block not watching for writability on the fd.
124  
        may block not watching for writability on the fd.
125  
    */
125  
    */
126  
    void notify_reactor() const;
126  
    void notify_reactor() const;
127  

127  

128  
private:
128  
private:
129  
    void
129  
    void
130 -
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
130 +
    run_task(lock_type& lock, context_type* ctx,
131  
        long timeout_us) override;
131  
        long timeout_us) override;
132  
    void interrupt_reactor() const override;
132  
    void interrupt_reactor() const override;
133  
    long calculate_timeout(long requested_timeout_us) const;
133  
    long calculate_timeout(long requested_timeout_us) const;
134  

134  

135  
    // Self-pipe for interrupting select()
135  
    // Self-pipe for interrupting select()
136  
    int pipe_fds_[2]; // [0]=read, [1]=write
136  
    int pipe_fds_[2]; // [0]=read, [1]=write
137  

137  

138  
    // Per-fd tracking for fd_set building
138  
    // Per-fd tracking for fd_set building
139  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
139  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
140  
    mutable int max_fd_ = -1;
140  
    mutable int max_fd_ = -1;
141  
};
141  
};
142  

142  

143  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
143  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
144  
    : pipe_fds_{-1, -1}
144  
    : pipe_fds_{-1, -1}
145  
    , max_fd_(-1)
145  
    , max_fd_(-1)
146  
{
146  
{
147  
    if (::pipe(pipe_fds_) < 0)
147  
    if (::pipe(pipe_fds_) < 0)
148  
        detail::throw_system_error(make_err(errno), "pipe");
148  
        detail::throw_system_error(make_err(errno), "pipe");
149  

149  

150  
    for (int i = 0; i < 2; ++i)
150  
    for (int i = 0; i < 2; ++i)
151  
    {
151  
    {
152  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
152  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
153  
        if (flags == -1)
153  
        if (flags == -1)
154  
        {
154  
        {
155  
            int errn = errno;
155  
            int errn = errno;
156  
            ::close(pipe_fds_[0]);
156  
            ::close(pipe_fds_[0]);
157  
            ::close(pipe_fds_[1]);
157  
            ::close(pipe_fds_[1]);
158  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
159  
        }
159  
        }
160  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
161  
        {
161  
        {
162  
            int errn = errno;
162  
            int errn = errno;
163  
            ::close(pipe_fds_[0]);
163  
            ::close(pipe_fds_[0]);
164  
            ::close(pipe_fds_[1]);
164  
            ::close(pipe_fds_[1]);
165  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
166  
        }
166  
        }
167  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
168  
        {
168  
        {
169  
            int errn = errno;
169  
            int errn = errno;
170  
            ::close(pipe_fds_[0]);
170  
            ::close(pipe_fds_[0]);
171  
            ::close(pipe_fds_[1]);
171  
            ::close(pipe_fds_[1]);
172  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
173  
        }
173  
        }
174  
    }
174  
    }
175  

175  

176  
    timer_svc_ = &get_timer_service(ctx, *this);
176  
    timer_svc_ = &get_timer_service(ctx, *this);
177  
    timer_svc_->set_on_earliest_changed(
177  
    timer_svc_->set_on_earliest_changed(
178  
        timer_service::callback(this, [](void* p) {
178  
        timer_service::callback(this, [](void* p) {
179  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
179  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
180  
        }));
180  
        }));
181  

181  

182  
    get_resolver_service(ctx, *this);
182  
    get_resolver_service(ctx, *this);
183  
    get_signal_service(ctx, *this);
183  
    get_signal_service(ctx, *this);
184  
    get_stream_file_service(ctx, *this);
184  
    get_stream_file_service(ctx, *this);
185  
    get_random_access_file_service(ctx, *this);
185  
    get_random_access_file_service(ctx, *this);
186  

186  

187  
    completed_ops_.push(&task_op_);
187  
    completed_ops_.push(&task_op_);
188  
}
188  
}
189  

189  

190  
inline select_scheduler::~select_scheduler()
190  
inline select_scheduler::~select_scheduler()
191  
{
191  
{
192  
    if (pipe_fds_[0] >= 0)
192  
    if (pipe_fds_[0] >= 0)
193  
        ::close(pipe_fds_[0]);
193  
        ::close(pipe_fds_[0]);
194  
    if (pipe_fds_[1] >= 0)
194  
    if (pipe_fds_[1] >= 0)
195  
        ::close(pipe_fds_[1]);
195  
        ::close(pipe_fds_[1]);
196  
}
196  
}
197  

197  

198  
inline void
198  
inline void
199  
select_scheduler::shutdown()
199  
select_scheduler::shutdown()
200  
{
200  
{
201  
    shutdown_drain();
201  
    shutdown_drain();
202  

202  

203  
    if (pipe_fds_[1] >= 0)
203  
    if (pipe_fds_[1] >= 0)
204  
        interrupt_reactor();
204  
        interrupt_reactor();
205  
}
205  
}
206  

206  

207  
inline void
207  
inline void
208  
select_scheduler::register_descriptor(
208  
select_scheduler::register_descriptor(
209  
    int fd, select_descriptor_state* desc) const
209  
    int fd, select_descriptor_state* desc) const
210  
{
210  
{
211  
    if (fd < 0 || fd >= FD_SETSIZE)
211  
    if (fd < 0 || fd >= FD_SETSIZE)
212  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
213  

213  

214  
    desc->registered_events = reactor_event_read | reactor_event_write;
214  
    desc->registered_events = reactor_event_read | reactor_event_write;
215  
    desc->fd                = fd;
215  
    desc->fd                = fd;
216  
    desc->scheduler_        = this;
216  
    desc->scheduler_        = this;
 
217 +
    desc->mutex.set_enabled(!single_threaded_);
217  
    desc->ready_events_.store(0, std::memory_order_relaxed);
218  
    desc->ready_events_.store(0, std::memory_order_relaxed);
218  

219  

219  
    {
220  
    {
220 -
        std::lock_guard lock(desc->mutex);
221 +
        conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
221  
        desc->impl_ref_.reset();
222  
        desc->impl_ref_.reset();
222  
        desc->read_ready  = false;
223  
        desc->read_ready  = false;
223  
        desc->write_ready = false;
224  
        desc->write_ready = false;
224  
    }
225  
    }
225  

226  

226  
    {
227  
    {
227 -
        std::lock_guard lock(mutex_);
228 +
        mutex_type::scoped_lock lock(mutex_);
228  
        registered_descs_[fd] = desc;
229  
        registered_descs_[fd] = desc;
229  
        if (fd > max_fd_)
230  
        if (fd > max_fd_)
230  
            max_fd_ = fd;
231  
            max_fd_ = fd;
231  
    }
232  
    }
232  

233  

233  
    interrupt_reactor();
234  
    interrupt_reactor();
234  
}
235  
}
235  

236  

236  
inline void
237  
inline void
237  
select_scheduler::deregister_descriptor(int fd) const
238  
select_scheduler::deregister_descriptor(int fd) const
238  
{
239  
{
239 -
    std::lock_guard lock(mutex_);
240 +
    mutex_type::scoped_lock lock(mutex_);
240  

241  

241  
    auto it = registered_descs_.find(fd);
242  
    auto it = registered_descs_.find(fd);
242  
    if (it == registered_descs_.end())
243  
    if (it == registered_descs_.end())
243  
        return;
244  
        return;
244  

245  

245  
    registered_descs_.erase(it);
246  
    registered_descs_.erase(it);
246  

247  

247  
    if (fd == max_fd_)
248  
    if (fd == max_fd_)
248  
    {
249  
    {
249  
        max_fd_ = pipe_fds_[0];
250  
        max_fd_ = pipe_fds_[0];
250  
        for (auto& [registered_fd, state] : registered_descs_)
251  
        for (auto& [registered_fd, state] : registered_descs_)
251  
        {
252  
        {
252  
            if (registered_fd > max_fd_)
253  
            if (registered_fd > max_fd_)
253  
                max_fd_ = registered_fd;
254  
                max_fd_ = registered_fd;
254  
        }
255  
        }
255  
    }
256  
    }
256  
}
257  
}
257  

258  

258  
inline void
259  
inline void
259  
select_scheduler::notify_reactor() const
260  
select_scheduler::notify_reactor() const
260  
{
261  
{
261  
    interrupt_reactor();
262  
    interrupt_reactor();
262  
}
263  
}
263  

264  

264  
inline void
265  
inline void
265  
select_scheduler::interrupt_reactor() const
266  
select_scheduler::interrupt_reactor() const
266  
{
267  
{
267  
    char byte               = 1;
268  
    char byte               = 1;
268  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
269  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
269  
}
270  
}
270  

271  

271  
inline long
272  
inline long
272  
select_scheduler::calculate_timeout(long requested_timeout_us) const
273  
select_scheduler::calculate_timeout(long requested_timeout_us) const
273  
{
274  
{
274  
    if (requested_timeout_us == 0)
275  
    if (requested_timeout_us == 0)
275  
        return 0;
276  
        return 0;
276  

277  

277  
    auto nearest = timer_svc_->nearest_expiry();
278  
    auto nearest = timer_svc_->nearest_expiry();
278  
    if (nearest == timer_service::time_point::max())
279  
    if (nearest == timer_service::time_point::max())
279  
        return requested_timeout_us;
280  
        return requested_timeout_us;
280  

281  

281  
    auto now = std::chrono::steady_clock::now();
282  
    auto now = std::chrono::steady_clock::now();
282  
    if (nearest <= now)
283  
    if (nearest <= now)
283  
        return 0;
284  
        return 0;
284  

285  

285  
    auto timer_timeout_us =
286  
    auto timer_timeout_us =
286  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
287  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
287  
            .count();
288  
            .count();
288  

289  

289  
    constexpr auto long_max =
290  
    constexpr auto long_max =
290  
        static_cast<long long>((std::numeric_limits<long>::max)());
291  
        static_cast<long long>((std::numeric_limits<long>::max)());
291  
    auto capped_timer_us =
292  
    auto capped_timer_us =
292  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
293  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
293  
                              static_cast<long long>(0)),
294  
                              static_cast<long long>(0)),
294  
                   long_max);
295  
                   long_max);
295  

296  

296  
    if (requested_timeout_us < 0)
297  
    if (requested_timeout_us < 0)
297  
        return static_cast<long>(capped_timer_us);
298  
        return static_cast<long>(capped_timer_us);
298  

299  

299  
    return static_cast<long>(
300  
    return static_cast<long>(
300  
        (std::min)(static_cast<long long>(requested_timeout_us),
301  
        (std::min)(static_cast<long long>(requested_timeout_us),
301  
                   capped_timer_us));
302  
                   capped_timer_us));
302  
}
303  
}
303  

304  

304  
inline void
305  
inline void
305  
select_scheduler::run_task(
306  
select_scheduler::run_task(
306 -
    std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
307 +
    lock_type& lock, context_type* ctx, long timeout_us)
307  
{
308  
{
308  
    long effective_timeout_us =
309  
    long effective_timeout_us =
309  
        task_interrupted_ ? 0 : calculate_timeout(timeout_us);
310  
        task_interrupted_ ? 0 : calculate_timeout(timeout_us);
310  

311  

311  
    // Snapshot registered descriptors while holding lock.
312  
    // Snapshot registered descriptors while holding lock.
312  
    // Record which fds need write monitoring to avoid a hot loop:
313  
    // Record which fds need write monitoring to avoid a hot loop:
313  
    // select is level-triggered so writable sockets (nearly always
314  
    // select is level-triggered so writable sockets (nearly always
314  
    // writable) would cause select() to return immediately every
315  
    // writable) would cause select() to return immediately every
315  
    // iteration if unconditionally added to write_fds.
316  
    // iteration if unconditionally added to write_fds.
316  
    struct fd_entry
317  
    struct fd_entry
317  
    {
318  
    {
318  
        int fd;
319  
        int fd;
319  
        select_descriptor_state* desc;
320  
        select_descriptor_state* desc;
320  
        bool needs_write;
321  
        bool needs_write;
321  
    };
322  
    };
322  
    fd_entry snapshot[FD_SETSIZE];
323  
    fd_entry snapshot[FD_SETSIZE];
323  
    int snapshot_count = 0;
324  
    int snapshot_count = 0;
324  

325  

325  
    for (auto& [fd, desc] : registered_descs_)
326  
    for (auto& [fd, desc] : registered_descs_)
326  
    {
327  
    {
327  
        if (snapshot_count < FD_SETSIZE)
328  
        if (snapshot_count < FD_SETSIZE)
328  
        {
329  
        {
329 -
            std::lock_guard desc_lock(desc->mutex);
330 +
            conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
330  
            snapshot[snapshot_count].fd   = fd;
331  
            snapshot[snapshot_count].fd   = fd;
331  
            snapshot[snapshot_count].desc = desc;
332  
            snapshot[snapshot_count].desc = desc;
332  
            snapshot[snapshot_count].needs_write =
333  
            snapshot[snapshot_count].needs_write =
333  
                (desc->write_op || desc->connect_op);
334  
                (desc->write_op || desc->connect_op);
334  
            ++snapshot_count;
335  
            ++snapshot_count;
335  
        }
336  
        }
336  
    }
337  
    }
337  

338  

338  
    if (lock.owns_lock())
339  
    if (lock.owns_lock())
339  
        lock.unlock();
340  
        lock.unlock();
340  

341  

341  
    task_cleanup on_exit{this, &lock, ctx};
342  
    task_cleanup on_exit{this, &lock, ctx};
342  

343  

343  
    fd_set read_fds, write_fds, except_fds;
344  
    fd_set read_fds, write_fds, except_fds;
344  
    FD_ZERO(&read_fds);
345  
    FD_ZERO(&read_fds);
345  
    FD_ZERO(&write_fds);
346  
    FD_ZERO(&write_fds);
346  
    FD_ZERO(&except_fds);
347  
    FD_ZERO(&except_fds);
347  

348  

348  
    FD_SET(pipe_fds_[0], &read_fds);
349  
    FD_SET(pipe_fds_[0], &read_fds);
349  
    int nfds = pipe_fds_[0];
350  
    int nfds = pipe_fds_[0];
350  

351  

351  
    for (int i = 0; i < snapshot_count; ++i)
352  
    for (int i = 0; i < snapshot_count; ++i)
352  
    {
353  
    {
353  
        int fd = snapshot[i].fd;
354  
        int fd = snapshot[i].fd;
354  
        FD_SET(fd, &read_fds);
355  
        FD_SET(fd, &read_fds);
355  
        if (snapshot[i].needs_write)
356  
        if (snapshot[i].needs_write)
356  
            FD_SET(fd, &write_fds);
357  
            FD_SET(fd, &write_fds);
357  
        FD_SET(fd, &except_fds);
358  
        FD_SET(fd, &except_fds);
358  
        if (fd > nfds)
359  
        if (fd > nfds)
359  
            nfds = fd;
360  
            nfds = fd;
360  
    }
361  
    }
361  

362  

362  
    struct timeval tv;
363  
    struct timeval tv;
363  
    struct timeval* tv_ptr = nullptr;
364  
    struct timeval* tv_ptr = nullptr;
364  
    if (effective_timeout_us >= 0)
365  
    if (effective_timeout_us >= 0)
365  
    {
366  
    {
366  
        tv.tv_sec  = effective_timeout_us / 1000000;
367  
        tv.tv_sec  = effective_timeout_us / 1000000;
367  
        tv.tv_usec = effective_timeout_us % 1000000;
368  
        tv.tv_usec = effective_timeout_us % 1000000;
368  
        tv_ptr     = &tv;
369  
        tv_ptr     = &tv;
369  
    }
370  
    }
370  

371  

371  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
372  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
372  

373  

373  
    // EINTR: signal interrupted select(), just retry.
374  
    // EINTR: signal interrupted select(), just retry.
374  
    // EBADF: an fd was closed between snapshot and select(); retry
375  
    // EBADF: an fd was closed between snapshot and select(); retry
375  
    // with a fresh snapshot from registered_descs_.
376  
    // with a fresh snapshot from registered_descs_.
376  
    if (ready < 0)
377  
    if (ready < 0)
377  
    {
378  
    {
378  
        if (errno == EINTR || errno == EBADF)
379  
        if (errno == EINTR || errno == EBADF)
379  
            return;
380  
            return;
380  
        detail::throw_system_error(make_err(errno), "select");
381  
        detail::throw_system_error(make_err(errno), "select");
381  
    }
382  
    }
382  

383  

383  
    // Process timers outside the lock
384  
    // Process timers outside the lock
384  
    timer_svc_->process_expired();
385  
    timer_svc_->process_expired();
385  

386  

386  
    op_queue local_ops;
387  
    op_queue local_ops;
387  

388  

388  
    if (ready > 0)
389  
    if (ready > 0)
389  
    {
390  
    {
390  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
391  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
391  
        {
392  
        {
392  
            char buf[256];
393  
            char buf[256];
393  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
394  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
394  
            {
395  
            {
395  
            }
396  
            }
396  
        }
397  
        }
397  

398  

398  
        for (int i = 0; i < snapshot_count; ++i)
399  
        for (int i = 0; i < snapshot_count; ++i)
399  
        {
400  
        {
400  
            int fd                        = snapshot[i].fd;
401  
            int fd                        = snapshot[i].fd;
401  
            select_descriptor_state* desc = snapshot[i].desc;
402  
            select_descriptor_state* desc = snapshot[i].desc;
402  

403  

403  
            std::uint32_t flags = 0;
404  
            std::uint32_t flags = 0;
404  
            if (FD_ISSET(fd, &read_fds))
405  
            if (FD_ISSET(fd, &read_fds))
405  
                flags |= reactor_event_read;
406  
                flags |= reactor_event_read;
406  
            if (FD_ISSET(fd, &write_fds))
407  
            if (FD_ISSET(fd, &write_fds))
407  
                flags |= reactor_event_write;
408  
                flags |= reactor_event_write;
408  
            if (FD_ISSET(fd, &except_fds))
409  
            if (FD_ISSET(fd, &except_fds))
409  
                flags |= reactor_event_error;
410  
                flags |= reactor_event_error;
410  

411  

411  
            if (flags == 0)
412  
            if (flags == 0)
412  
                continue;
413  
                continue;
413  

414  

414  
            desc->add_ready_events(flags);
415  
            desc->add_ready_events(flags);
415  

416  

416  
            bool expected = false;
417  
            bool expected = false;
417  
            if (desc->is_enqueued_.compare_exchange_strong(
418  
            if (desc->is_enqueued_.compare_exchange_strong(
418  
                    expected, true, std::memory_order_release,
419  
                    expected, true, std::memory_order_release,
419  
                    std::memory_order_relaxed))
420  
                    std::memory_order_relaxed))
420  
            {
421  
            {
421  
                local_ops.push(desc);
422  
                local_ops.push(desc);
422  
            }
423  
            }
423  
        }
424  
        }
424  
    }
425  
    }
425  

426  

426  
    lock.lock();
427  
    lock.lock();
427  

428  

428  
    if (!local_ops.empty())
429  
    if (!local_ops.empty())
429  
        completed_ops_.splice(local_ops);
430  
        completed_ops_.splice(local_ops);
430  
}
431  
}
431  

432  

432  
} // namespace boost::corosio::detail
433  
} // namespace boost::corosio::detail
433  

434  

434  
#endif // BOOST_COROSIO_HAS_SELECT
435  
#endif // BOOST_COROSIO_HAS_SELECT
435  

436  

436  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
437  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP