1  
//
1  
//
2  
// Copyright (c) 2026 Michael Vandeberg
2  
// Copyright (c) 2026 Michael Vandeberg
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_POSIX
15  
#if BOOST_COROSIO_POSIX
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/stream_file.hpp>
18  
#include <boost/corosio/stream_file.hpp>
19  
#include <boost/corosio/file_base.hpp>
19  
#include <boost/corosio/file_base.hpp>
20  
#include <boost/corosio/detail/intrusive.hpp>
20  
#include <boost/corosio/detail/intrusive.hpp>
21  
#include <boost/corosio/detail/dispatch_coro.hpp>
21  
#include <boost/corosio/detail/dispatch_coro.hpp>
22  
#include <boost/corosio/detail/scheduler_op.hpp>
22  
#include <boost/corosio/detail/scheduler_op.hpp>
23  
#include <boost/corosio/detail/continuation_op.hpp>
23  
#include <boost/corosio/detail/continuation_op.hpp>
24  
#include <boost/corosio/detail/thread_pool.hpp>
24  
#include <boost/corosio/detail/thread_pool.hpp>
25  
#include <boost/corosio/detail/scheduler.hpp>
25  
#include <boost/corosio/detail/scheduler.hpp>
26  
#include <boost/corosio/detail/buffer_param.hpp>
26  
#include <boost/corosio/detail/buffer_param.hpp>
27  
#include <boost/corosio/native/detail/make_err.hpp>
27  
#include <boost/corosio/native/detail/make_err.hpp>
28  
#include <boost/capy/ex/executor_ref.hpp>
28  
#include <boost/capy/ex/executor_ref.hpp>
29  
#include <boost/capy/error.hpp>
29  
#include <boost/capy/error.hpp>
30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <coroutine>
33  
#include <coroutine>
34  
#include <cstddef>
34  
#include <cstddef>
35  
#include <cstdint>
35  
#include <cstdint>
36  
#include <filesystem>
36  
#include <filesystem>
37  
#include <limits>
37  
#include <limits>
38  
#include <memory>
38  
#include <memory>
39  
#include <optional>
39  
#include <optional>
40  
#include <stop_token>
40  
#include <stop_token>
41  
#include <system_error>
41  
#include <system_error>
42  

42  

43  
#include <errno.h>
43  
#include <errno.h>
44  
#include <fcntl.h>
44  
#include <fcntl.h>
45  
#include <sys/stat.h>
45  
#include <sys/stat.h>
46  
#include <sys/uio.h>
46  
#include <sys/uio.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
/*
49  
/*
50  
    POSIX Stream File Implementation
50  
    POSIX Stream File Implementation
51  
    =================================
51  
    =================================
52  

52  

53  
    Regular files cannot be monitored by epoll/kqueue/select — the kernel
53  
    Regular files cannot be monitored by epoll/kqueue/select — the kernel
54  
    always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
54  
    always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
55  
    to a shared thread pool, with completion posted back to the scheduler.
55  
    to a shared thread pool, with completion posted back to the scheduler.
56  

56  

57  
    This follows the same pattern as posix_resolver: pool_work_item for
57  
    This follows the same pattern as posix_resolver: pool_work_item for
58  
    dispatch, scheduler_op for completion, shared_from_this for lifetime.
58  
    dispatch, scheduler_op for completion, shared_from_this for lifetime.
59  

59  

60  
    Completion Flow
60  
    Completion Flow
61  
    ---------------
61  
    ---------------
62  
    1. read_some() sets up file_read_op, posts to thread pool
62  
    1. read_some() sets up file_read_op, posts to thread pool
63  
    2. Pool thread runs preadv() (blocking)
63  
    2. Pool thread runs preadv() (blocking)
64  
    3. Pool thread stores results, posts scheduler_op to scheduler
64  
    3. Pool thread stores results, posts scheduler_op to scheduler
65  
    4. Scheduler invokes op() which resumes the coroutine
65  
    4. Scheduler invokes op() which resumes the coroutine
66  

66  

67  
    Single-Inflight Constraint
67  
    Single-Inflight Constraint
68  
    --------------------------
68  
    --------------------------
69  
    Only one asynchronous operation may be in flight at a time on a
69  
    Only one asynchronous operation may be in flight at a time on a
70  
    given file object. Concurrent read and write is not supported
70  
    given file object. Concurrent read and write is not supported
71  
    because both share offset_ without synchronization.
71  
    because both share offset_ without synchronization.
72  
*/
72  
*/
73  

73  

74  
namespace boost::corosio::detail {
74  
namespace boost::corosio::detail {
75  

75  

76  
struct scheduler;
76  
struct scheduler;
77  
class posix_stream_file_service;
77  
class posix_stream_file_service;
78  

78  

79  
/** Stream file implementation for POSIX backends.
79  
/** Stream file implementation for POSIX backends.
80  

80  

81  
    Each instance contains embedded operation objects (read_op_, write_op_)
81  
    Each instance contains embedded operation objects (read_op_, write_op_)
82  
    that are reused across calls. This avoids per-operation heap allocation.
82  
    that are reused across calls. This avoids per-operation heap allocation.
83  
*/
83  
*/
84  
class posix_stream_file final
84  
class posix_stream_file final
85  
    : public stream_file::implementation
85  
    : public stream_file::implementation
86  
    , public std::enable_shared_from_this<posix_stream_file>
86  
    , public std::enable_shared_from_this<posix_stream_file>
87  
    , public intrusive_list<posix_stream_file>::node
87  
    , public intrusive_list<posix_stream_file>::node
88  
{
88  
{
89  
    friend class posix_stream_file_service;
89  
    friend class posix_stream_file_service;
90  

90  

91  
public:
91  
public:
92  
    static constexpr std::size_t max_buffers = 16;
92  
    static constexpr std::size_t max_buffers = 16;
93  

93  

94  
    /** Operation state for a single file read or write. */
94  
    /** Operation state for a single file read or write. */
95  
    struct file_op : scheduler_op
95  
    struct file_op : scheduler_op
96  
    {
96  
    {
97  
        struct canceller
97  
        struct canceller
98  
        {
98  
        {
99  
            file_op* op;
99  
            file_op* op;
100  
            void operator()() const noexcept
100  
            void operator()() const noexcept
101  
            {
101  
            {
102  
                op->request_cancel();
102  
                op->request_cancel();
103  
            }
103  
            }
104  
        };
104  
        };
105  

105  

106  
        // Coroutine state
106  
        // Coroutine state
107  
        std::coroutine_handle<> h;
107  
        std::coroutine_handle<> h;
108  
        detail::continuation_op cont_op;
108  
        detail::continuation_op cont_op;
109  
        capy::executor_ref ex;
109  
        capy::executor_ref ex;
110  

110  

111  
        // Output pointers
111  
        // Output pointers
112  
        std::error_code* ec_out = nullptr;
112  
        std::error_code* ec_out = nullptr;
113  
        std::size_t* bytes_out  = nullptr;
113  
        std::size_t* bytes_out  = nullptr;
114  

114  

115  
        // Buffer data (copied from buffer_param at submission time)
115  
        // Buffer data (copied from buffer_param at submission time)
116  
        iovec iovecs[max_buffers];
116  
        iovec iovecs[max_buffers];
117  
        int iovec_count = 0;
117  
        int iovec_count = 0;
118  

118  

119  
        // Result storage (populated by worker thread)
119  
        // Result storage (populated by worker thread)
120  
        int errn                    = 0;
120  
        int errn                    = 0;
121  
        std::size_t bytes_transferred = 0;
121  
        std::size_t bytes_transferred = 0;
122  
        bool is_read                = false;
122  
        bool is_read                = false;
123  

123  

124  
        // Thread coordination
124  
        // Thread coordination
125  
        std::atomic<bool> cancelled{false};
125  
        std::atomic<bool> cancelled{false};
126  
        std::optional<std::stop_callback<canceller>> stop_cb;
126  
        std::optional<std::stop_callback<canceller>> stop_cb;
127  

127  

128  
        /// Prevents use-after-free when file is closed with pending ops.
128  
        /// Prevents use-after-free when file is closed with pending ops.
129  
        std::shared_ptr<void> impl_ref;
129  
        std::shared_ptr<void> impl_ref;
130  

130  

131  
        file_op() = default;
131  
        file_op() = default;
132  

132  

133  
        void reset() noexcept
133  
        void reset() noexcept
134  
        {
134  
        {
135  
            iovec_count       = 0;
135  
            iovec_count       = 0;
136  
            errn              = 0;
136  
            errn              = 0;
137  
            bytes_transferred = 0;
137  
            bytes_transferred = 0;
138  
            is_read           = false;
138  
            is_read           = false;
139  
            cancelled.store(false, std::memory_order_relaxed);
139  
            cancelled.store(false, std::memory_order_relaxed);
140  
            stop_cb.reset();
140  
            stop_cb.reset();
141  
            impl_ref.reset();
141  
            impl_ref.reset();
142  
            ec_out    = nullptr;
142  
            ec_out    = nullptr;
143  
            bytes_out = nullptr;
143  
            bytes_out = nullptr;
144  
        }
144  
        }
145  

145  

146  
        void operator()() override;
146  
        void operator()() override;
147  
        void destroy() override;
147  
        void destroy() override;
148  

148  

149  
        void request_cancel() noexcept
149  
        void request_cancel() noexcept
150  
        {
150  
        {
151  
            cancelled.store(true, std::memory_order_release);
151  
            cancelled.store(true, std::memory_order_release);
152  
        }
152  
        }
153  

153  

154  
        void start(std::stop_token const& token)
154  
        void start(std::stop_token const& token)
155  
        {
155  
        {
156  
            cancelled.store(false, std::memory_order_release);
156  
            cancelled.store(false, std::memory_order_release);
157  
            stop_cb.reset();
157  
            stop_cb.reset();
158  
            if (token.stop_possible())
158  
            if (token.stop_possible())
159  
                stop_cb.emplace(token, canceller{this});
159  
                stop_cb.emplace(token, canceller{this});
160  
        }
160  
        }
161  
    };
161  
    };
162  

162  

163  
    /** Pool work item for thread pool dispatch. */
163  
    /** Pool work item for thread pool dispatch. */
164  
    struct pool_op : pool_work_item
164  
    struct pool_op : pool_work_item
165  
    {
165  
    {
166  
        posix_stream_file* file_ = nullptr;
166  
        posix_stream_file* file_ = nullptr;
167  
        std::shared_ptr<posix_stream_file> ref_;
167  
        std::shared_ptr<posix_stream_file> ref_;
168  
    };
168  
    };
169  

169  

170  
    explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
170  
    explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
171  

171  

172  
    // -- io_stream::implementation --
172  
    // -- io_stream::implementation --
173  

173  

174  
    std::coroutine_handle<> read_some(
174  
    std::coroutine_handle<> read_some(
175  
        std::coroutine_handle<>,
175  
        std::coroutine_handle<>,
176  
        capy::executor_ref,
176  
        capy::executor_ref,
177  
        buffer_param,
177  
        buffer_param,
178  
        std::stop_token,
178  
        std::stop_token,
179  
        std::error_code*,
179  
        std::error_code*,
180  
        std::size_t*) override;
180  
        std::size_t*) override;
181  

181  

182  
    std::coroutine_handle<> write_some(
182  
    std::coroutine_handle<> write_some(
183  
        std::coroutine_handle<>,
183  
        std::coroutine_handle<>,
184  
        capy::executor_ref,
184  
        capy::executor_ref,
185  
        buffer_param,
185  
        buffer_param,
186  
        std::stop_token,
186  
        std::stop_token,
187  
        std::error_code*,
187  
        std::error_code*,
188  
        std::size_t*) override;
188  
        std::size_t*) override;
189  

189  

190  
    // -- stream_file::implementation --
190  
    // -- stream_file::implementation --
191  

191  

192  
    native_handle_type native_handle() const noexcept override
192  
    native_handle_type native_handle() const noexcept override
193  
    {
193  
    {
194  
        return fd_;
194  
        return fd_;
195  
    }
195  
    }
196  

196  

197  
    void cancel() noexcept override
197  
    void cancel() noexcept override
198  
    {
198  
    {
199  
        read_op_.request_cancel();
199  
        read_op_.request_cancel();
200  
        write_op_.request_cancel();
200  
        write_op_.request_cancel();
201  
    }
201  
    }
202  

202  

203  
    std::uint64_t size() const override;
203  
    std::uint64_t size() const override;
204  
    void resize(std::uint64_t new_size) override;
204  
    void resize(std::uint64_t new_size) override;
205  
    void sync_data() override;
205  
    void sync_data() override;
206  
    void sync_all() override;
206  
    void sync_all() override;
207  
    native_handle_type release() override;
207  
    native_handle_type release() override;
208  
    void assign(native_handle_type handle) override;
208  
    void assign(native_handle_type handle) override;
209  
    std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
209  
    std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
210  

210  

211  
    // -- Internal --
211  
    // -- Internal --
212  

212  

213  
    /** Open the file and store the fd. */
213  
    /** Open the file and store the fd. */
214  
    std::error_code open_file(
214  
    std::error_code open_file(
215  
        std::filesystem::path const& path, file_base::flags mode);
215  
        std::filesystem::path const& path, file_base::flags mode);
216  

216  

217  
    /** Close the file descriptor. */
217  
    /** Close the file descriptor. */
218  
    void close_file() noexcept;
218  
    void close_file() noexcept;
219  

219  

220  
private:
220  
private:
221  
    posix_stream_file_service& svc_;
221  
    posix_stream_file_service& svc_;
222  
    int fd_ = -1;
222  
    int fd_ = -1;
223  
    std::uint64_t offset_ = 0;
223  
    std::uint64_t offset_ = 0;
224  

224  

225  
    file_op read_op_;
225  
    file_op read_op_;
226  
    file_op write_op_;
226  
    file_op write_op_;
227  
    pool_op read_pool_op_;
227  
    pool_op read_pool_op_;
228  
    pool_op write_pool_op_;
228  
    pool_op write_pool_op_;
229  

229  

230  
    static void do_read_work(pool_work_item*) noexcept;
230  
    static void do_read_work(pool_work_item*) noexcept;
231  
    static void do_write_work(pool_work_item*) noexcept;
231  
    static void do_write_work(pool_work_item*) noexcept;
232  
};
232  
};
233  

233  

234  
// ---------------------------------------------------------------------------
234  
// ---------------------------------------------------------------------------
235  
// Inline implementation
235  
// Inline implementation
236  
// ---------------------------------------------------------------------------
236  
// ---------------------------------------------------------------------------
237  

237  

238  
inline
238  
inline
239  
posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
239  
posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
240  
    : svc_(svc)
240  
    : svc_(svc)
241  
{
241  
{
242  
}
242  
}
243  

243  

244  
inline std::error_code
244  
inline std::error_code
245  
posix_stream_file::open_file(
245  
posix_stream_file::open_file(
246  
    std::filesystem::path const& path, file_base::flags mode)
246  
    std::filesystem::path const& path, file_base::flags mode)
247  
{
247  
{
248  
    close_file();
248  
    close_file();
249  

249  

250  
    int oflags = 0;
250  
    int oflags = 0;
251  

251  

252  
    // Access mode
252  
    // Access mode
253  
    unsigned access = static_cast<unsigned>(mode) & 3u;
253  
    unsigned access = static_cast<unsigned>(mode) & 3u;
254  
    if (access == static_cast<unsigned>(file_base::read_write))
254  
    if (access == static_cast<unsigned>(file_base::read_write))
255  
        oflags |= O_RDWR;
255  
        oflags |= O_RDWR;
256  
    else if (access == static_cast<unsigned>(file_base::write_only))
256  
    else if (access == static_cast<unsigned>(file_base::write_only))
257  
        oflags |= O_WRONLY;
257  
        oflags |= O_WRONLY;
258  
    else
258  
    else
259  
        oflags |= O_RDONLY;
259  
        oflags |= O_RDONLY;
260  

260  

261  
    // Creation flags
261  
    // Creation flags
262  
    if ((mode & file_base::create) != file_base::flags(0))
262  
    if ((mode & file_base::create) != file_base::flags(0))
263  
        oflags |= O_CREAT;
263  
        oflags |= O_CREAT;
264  
    if ((mode & file_base::exclusive) != file_base::flags(0))
264  
    if ((mode & file_base::exclusive) != file_base::flags(0))
265  
        oflags |= O_EXCL;
265  
        oflags |= O_EXCL;
266  
    if ((mode & file_base::truncate) != file_base::flags(0))
266  
    if ((mode & file_base::truncate) != file_base::flags(0))
267  
        oflags |= O_TRUNC;
267  
        oflags |= O_TRUNC;
268  
    if ((mode & file_base::append) != file_base::flags(0))
268  
    if ((mode & file_base::append) != file_base::flags(0))
269  
        oflags |= O_APPEND;
269  
        oflags |= O_APPEND;
270  
    if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
270  
    if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
271  
        oflags |= O_SYNC;
271  
        oflags |= O_SYNC;
272  

272  

273  
    int fd = ::open(path.c_str(), oflags, 0666);
273  
    int fd = ::open(path.c_str(), oflags, 0666);
274  
    if (fd < 0)
274  
    if (fd < 0)
275  
        return make_err(errno);
275  
        return make_err(errno);
276  

276  

277  
    fd_     = fd;
277  
    fd_     = fd;
278  
    offset_ = 0;
278  
    offset_ = 0;
279  

279  

280  
    // Append mode: position at end-of-file (preadv/pwritev use
280  
    // Append mode: position at end-of-file (preadv/pwritev use
281  
    // explicit offsets, so O_APPEND alone is not sufficient).
281  
    // explicit offsets, so O_APPEND alone is not sufficient).
282  
    if ((mode & file_base::append) != file_base::flags(0))
282  
    if ((mode & file_base::append) != file_base::flags(0))
283  
    {
283  
    {
284  
        struct stat st;
284  
        struct stat st;
285  
        if (::fstat(fd, &st) < 0)
285  
        if (::fstat(fd, &st) < 0)
286  
        {
286  
        {
287  
            int err = errno;
287  
            int err = errno;
288  
            ::close(fd);
288  
            ::close(fd);
289  
            fd_ = -1;
289  
            fd_ = -1;
290  
            return make_err(err);
290  
            return make_err(err);
291  
        }
291  
        }
292  
        offset_ = static_cast<std::uint64_t>(st.st_size);
292  
        offset_ = static_cast<std::uint64_t>(st.st_size);
293  
    }
293  
    }
294  

294  

295  
#ifdef POSIX_FADV_SEQUENTIAL
295  
#ifdef POSIX_FADV_SEQUENTIAL
296  
    ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
296  
    ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
297  
#endif
297  
#endif
298  

298  

299  
    return {};
299  
    return {};
300  
}
300  
}
301  

301  

302  
inline void
302  
inline void
303  
posix_stream_file::close_file() noexcept
303  
posix_stream_file::close_file() noexcept
304  
{
304  
{
305  
    if (fd_ >= 0)
305  
    if (fd_ >= 0)
306  
    {
306  
    {
307  
        ::close(fd_);
307  
        ::close(fd_);
308  
        fd_ = -1;
308  
        fd_ = -1;
309  
    }
309  
    }
310  
}
310  
}
311  

311  

312  
inline std::uint64_t
312  
inline std::uint64_t
313  
posix_stream_file::size() const
313  
posix_stream_file::size() const
314  
{
314  
{
315  
    struct stat st;
315  
    struct stat st;
316  
    if (::fstat(fd_, &st) < 0)
316  
    if (::fstat(fd_, &st) < 0)
317  
        throw_system_error(make_err(errno), "stream_file::size");
317  
        throw_system_error(make_err(errno), "stream_file::size");
318  
    return static_cast<std::uint64_t>(st.st_size);
318  
    return static_cast<std::uint64_t>(st.st_size);
319  
}
319  
}
320  

320  

321  
inline void
321  
inline void
322  
posix_stream_file::resize(std::uint64_t new_size)
322  
posix_stream_file::resize(std::uint64_t new_size)
323  
{
323  
{
324  
    if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
324  
    if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
325  
        throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
325  
        throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
326  
    if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
326  
    if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
327  
        throw_system_error(make_err(errno), "stream_file::resize");
327  
        throw_system_error(make_err(errno), "stream_file::resize");
328  
}
328  
}
329  

329  

330  
inline void
330  
inline void
331  
posix_stream_file::sync_data()
331  
posix_stream_file::sync_data()
332  
{
332  
{
333  
#if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
333  
#if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
334  
    if (::fdatasync(fd_) < 0)
334  
    if (::fdatasync(fd_) < 0)
335  
#else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
335  
#else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
336  
    if (::fsync(fd_) < 0)
336  
    if (::fsync(fd_) < 0)
337  
#endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
337  
#endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
338  
        throw_system_error(make_err(errno), "stream_file::sync_data");
338  
        throw_system_error(make_err(errno), "stream_file::sync_data");
339  
}
339  
}
340  

340  

341  
inline void
341  
inline void
342  
posix_stream_file::sync_all()
342  
posix_stream_file::sync_all()
343  
{
343  
{
344  
    if (::fsync(fd_) < 0)
344  
    if (::fsync(fd_) < 0)
345  
        throw_system_error(make_err(errno), "stream_file::sync_all");
345  
        throw_system_error(make_err(errno), "stream_file::sync_all");
346  
}
346  
}
347  

347  

348  
inline native_handle_type
348  
inline native_handle_type
349  
posix_stream_file::release()
349  
posix_stream_file::release()
350  
{
350  
{
351  
    int fd = fd_;
351  
    int fd = fd_;
352  
    fd_ = -1;
352  
    fd_ = -1;
353  
    offset_ = 0;
353  
    offset_ = 0;
354  
    return fd;
354  
    return fd;
355  
}
355  
}
356  

356  

357  
inline void
357  
inline void
358  
posix_stream_file::assign(native_handle_type handle)
358  
posix_stream_file::assign(native_handle_type handle)
359  
{
359  
{
360  
    close_file();
360  
    close_file();
361  
    fd_ = handle;
361  
    fd_ = handle;
362  
    offset_ = 0;
362  
    offset_ = 0;
363  
}
363  
}
364  

364  

365  
inline std::uint64_t
365  
inline std::uint64_t
366  
posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
366  
posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
367  
{
367  
{
368  
    // We track offset_ ourselves (not the kernel fd offset)
368  
    // We track offset_ ourselves (not the kernel fd offset)
369  
    // because preadv/pwritev use explicit offsets.
369  
    // because preadv/pwritev use explicit offsets.
370  
    std::int64_t new_pos;
370  
    std::int64_t new_pos;
371  

371  

372  
    if (origin == file_base::seek_set)
372  
    if (origin == file_base::seek_set)
373  
    {
373  
    {
374  
        new_pos = offset;
374  
        new_pos = offset;
375  
    }
375  
    }
376  
    else if (origin == file_base::seek_cur)
376  
    else if (origin == file_base::seek_cur)
377  
    {
377  
    {
378  
        new_pos = static_cast<std::int64_t>(offset_) + offset;
378  
        new_pos = static_cast<std::int64_t>(offset_) + offset;
379  
    }
379  
    }
380  
    else
380  
    else
381  
    {
381  
    {
382  
        struct stat st;
382  
        struct stat st;
383  
        if (::fstat(fd_, &st) < 0)
383  
        if (::fstat(fd_, &st) < 0)
384  
            throw_system_error(make_err(errno), "stream_file::seek");
384  
            throw_system_error(make_err(errno), "stream_file::seek");
385  
        new_pos = st.st_size + offset;
385  
        new_pos = st.st_size + offset;
386  
    }
386  
    }
387  

387  

388  
    if (new_pos < 0)
388  
    if (new_pos < 0)
389  
        throw_system_error(make_err(EINVAL), "stream_file::seek");
389  
        throw_system_error(make_err(EINVAL), "stream_file::seek");
390  
    if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
390  
    if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
391  
        throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
391  
        throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
392  

392  

393  
    offset_ = static_cast<std::uint64_t>(new_pos);
393  
    offset_ = static_cast<std::uint64_t>(new_pos);
394  

394  

395  
    return offset_;
395  
    return offset_;
396  
}
396  
}
397  

397  

398  
// -- file_op completion handler --
398  
// -- file_op completion handler --
399  
// (read_some, write_some, do_read_work, do_write_work are
399  
// (read_some, write_some, do_read_work, do_write_work are
400  
//  defined in posix_stream_file_service.hpp after the service)
400  
//  defined in posix_stream_file_service.hpp after the service)
401  

401  

402  
inline void
402  
inline void
403  
posix_stream_file::file_op::operator()()
403  
posix_stream_file::file_op::operator()()
404  
{
404  
{
405  
    stop_cb.reset();
405  
    stop_cb.reset();
406  

406  

407  
    bool const was_cancelled = cancelled.load(std::memory_order_acquire);
407  
    bool const was_cancelled = cancelled.load(std::memory_order_acquire);
408  

408  

409  
    if (ec_out)
409  
    if (ec_out)
410  
    {
410  
    {
411  
        if (was_cancelled)
411  
        if (was_cancelled)
412  
            *ec_out = capy::error::canceled;
412  
            *ec_out = capy::error::canceled;
413  
        else if (errn != 0)
413  
        else if (errn != 0)
414  
            *ec_out = make_err(errn);
414  
            *ec_out = make_err(errn);
415  
        else if (is_read && bytes_transferred == 0)
415  
        else if (is_read && bytes_transferred == 0)
416  
            *ec_out = capy::error::eof;
416  
            *ec_out = capy::error::eof;
417  
        else
417  
        else
418  
            *ec_out = {};
418  
            *ec_out = {};
419  
    }
419  
    }
420  

420  

421  
    if (bytes_out)
421  
    if (bytes_out)
422  
        *bytes_out = was_cancelled ? 0 : bytes_transferred;
422  
        *bytes_out = was_cancelled ? 0 : bytes_transferred;
423  

423  

424  
    // Move impl_ref to a local so members remain valid through
424  
    // Move impl_ref to a local so members remain valid through
425  
    // dispatch — impl_ref may be the last shared_ptr keeping
425  
    // dispatch — impl_ref may be the last shared_ptr keeping
426  
    // the parent posix_stream_file (which embeds this file_op) alive.
426  
    // the parent posix_stream_file (which embeds this file_op) alive.
427  
    auto prevent_destroy = std::move(impl_ref);
427  
    auto prevent_destroy = std::move(impl_ref);
428  
    ex.on_work_finished();
428  
    ex.on_work_finished();
429  
    cont_op.cont.h = h;
429  
    cont_op.cont.h = h;
430  
    dispatch_coro(ex, cont_op.cont).resume();
430  
    dispatch_coro(ex, cont_op.cont).resume();
431  
}
431  
}
432  

432  

433  
inline void
433  
inline void
434  
posix_stream_file::file_op::destroy()
434  
posix_stream_file::file_op::destroy()
435  
{
435  
{
436  
    stop_cb.reset();
436  
    stop_cb.reset();
437  
    auto local_ex = ex;
437  
    auto local_ex = ex;
438  
    impl_ref.reset();
438  
    impl_ref.reset();
439  
    local_ex.on_work_finished();
439  
    local_ex.on_work_finished();
440  
}
440  
}
441  

441  

442  
} // namespace boost::corosio::detail
442  
} // namespace boost::corosio::detail
443  

443  

444  
#endif // BOOST_COROSIO_POSIX
444  
#endif // BOOST_COROSIO_POSIX
445  

445  

446  
#endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
446  
#endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP