libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.3% Lines (97/115) 81.0% Functions (17/21) 65.0% Branches (26/40)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Set during registration only (no mutex needed)
125 std::uint32_t registered_events = 0;
126 int fd = -1;
127
128 // For deferred I/O - set by reactor, read by scheduler
129 std::atomic<std::uint32_t> ready_events_{0};
130 std::atomic<bool> is_enqueued_{false};
131 epoll_scheduler const* scheduler_ = nullptr;
132
133 // Prevents impl destruction while this descriptor_state is queued.
134 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 std::shared_ptr<void> impl_ref_;
136
137 /// Add ready events atomically.
138 86509 void add_ready_events(std::uint32_t ev) noexcept
139 {
140 86509 ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 86509 }
142
143 /// Perform deferred I/O and queue completions.
144 void operator()() override;
145
146 /// Destroy without invoking.
147 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
148 /// the self-referential cycle set by close_socket().
149 void destroy() override { impl_ref_.reset(); }
150 };
151
152 struct epoll_op : scheduler_op
153 {
154 struct canceller
155 {
156 epoll_op* op;
157 void operator()() const noexcept;
158 };
159
160 std::coroutine_handle<> h;
161 capy::executor_ref ex;
162 std::error_code* ec_out = nullptr;
163 std::size_t* bytes_out = nullptr;
164
165 int fd = -1;
166 int errn = 0;
167 std::size_t bytes_transferred = 0;
168
169 std::atomic<bool> cancelled{false};
170 std::optional<std::stop_callback<canceller>> stop_cb;
171
172 // Prevents use-after-free when socket is closed with pending ops.
173 // See "Impl Lifetime Management" in file header.
174 std::shared_ptr<void> impl_ptr;
175
176 // For stop_token cancellation - pointer to owning socket/acceptor impl.
177 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
178 epoll_socket_impl* socket_impl_ = nullptr;
179 epoll_acceptor_impl* acceptor_impl_ = nullptr;
180
181 16387 epoll_op() = default;
182
183 167605 void reset() noexcept
184 {
185 167605 fd = -1;
186 167605 errn = 0;
187 167605 bytes_transferred = 0;
188 167605 cancelled.store(false, std::memory_order_relaxed);
189 167605 impl_ptr.reset();
190 167605 socket_impl_ = nullptr;
191 167605 acceptor_impl_ = nullptr;
192 167605 }
193
194 162166 void operator()() override
195 {
196 162166 stop_cb.reset();
197
198
1/2
✓ Branch 0 taken 162166 times.
✗ Branch 1 not taken.
162166 if (ec_out)
199 {
200
2/2
✓ Branch 1 taken 198 times.
✓ Branch 2 taken 161968 times.
162166 if (cancelled.load(std::memory_order_acquire))
201 198 *ec_out = capy::error::canceled;
202
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 161967 times.
161968 else if (errn != 0)
203 1 *ec_out = make_err(errn);
204
6/6
✓ Branch 1 taken 80969 times.
✓ Branch 2 taken 80998 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 80964 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 161962 times.
161967 else if (is_read_operation() && bytes_transferred == 0)
205 5 *ec_out = capy::error::eof;
206 else
207 161962 *ec_out = {};
208 }
209
210
1/2
✓ Branch 0 taken 162166 times.
✗ Branch 1 not taken.
162166 if (bytes_out)
211 162166 *bytes_out = bytes_transferred;
212
213 // Move to stack before resuming coroutine. The coroutine might close
214 // the socket, releasing the last wrapper ref. If impl_ptr were the
215 // last ref and we destroyed it while still in operator(), we'd have
216 // use-after-free. Moving to local ensures destruction happens at
217 // function exit, after all member accesses are complete.
218 162166 capy::executor_ref saved_ex( std::move( ex ) );
219 162166 std::coroutine_handle<> saved_h( std::move( h ) );
220 162166 auto prevent_premature_destruction = std::move(impl_ptr);
221
2/2
✓ Branch 1 taken 162166 times.
✓ Branch 4 taken 162166 times.
162166 dispatch_coro(saved_ex, saved_h).resume();
222 162166 }
223
224 80997 virtual bool is_read_operation() const noexcept { return false; }
225 virtual void cancel() noexcept = 0;
226
227 void destroy() override
228 {
229 stop_cb.reset();
230 impl_ptr.reset();
231 }
232
233 25262 void request_cancel() noexcept
234 {
235 25262 cancelled.store(true, std::memory_order_release);
236 25262 }
237
238 164882 void start(std::stop_token token, epoll_socket_impl* impl)
239 {
240 164882 cancelled.store(false, std::memory_order_release);
241 164882 stop_cb.reset();
242 164882 socket_impl_ = impl;
243 164882 acceptor_impl_ = nullptr;
244
245
2/2
✓ Branch 1 taken 105 times.
✓ Branch 2 taken 164777 times.
164882 if (token.stop_possible())
246 105 stop_cb.emplace(token, canceller{this});
247 164882 }
248
249 2723 void start(std::stop_token token, epoll_acceptor_impl* impl)
250 {
251 2723 cancelled.store(false, std::memory_order_release);
252 2723 stop_cb.reset();
253 2723 socket_impl_ = nullptr;
254 2723 acceptor_impl_ = impl;
255
256
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2714 times.
2723 if (token.stop_possible())
257 9 stop_cb.emplace(token, canceller{this});
258 2723 }
259
260 167553 void complete(int err, std::size_t bytes) noexcept
261 {
262 167553 errn = err;
263 167553 bytes_transferred = bytes;
264 167553 }
265
266 virtual void perform_io() noexcept {}
267 };
268
269
270 struct epoll_connect_op : epoll_op
271 {
272 endpoint target_endpoint;
273
274 2716 void reset() noexcept
275 {
276 2716 epoll_op::reset();
277 2716 target_endpoint = endpoint{};
278 2716 }
279
280 2715 void perform_io() noexcept override
281 {
282 // connect() completion status is retrieved via SO_ERROR, not return value
283 2715 int err = 0;
284 2715 socklen_t len = sizeof(err);
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2715 times.
2715 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
286 err = errno;
287 2715 complete(err, 0);
288 2715 }
289
290 // Defined in sockets.cpp where epoll_socket_impl is complete
291 void operator()() override;
292 void cancel() noexcept override;
293 };
294
295
296 struct epoll_read_op : epoll_op
297 {
298 static constexpr std::size_t max_buffers = 16;
299 iovec iovecs[max_buffers];
300 int iovec_count = 0;
301 bool empty_buffer_read = false;
302
303 80970 bool is_read_operation() const noexcept override
304 {
305 80970 return !empty_buffer_read;
306 }
307
308 81164 void reset() noexcept
309 {
310 81164 epoll_op::reset();
311 81164 iovec_count = 0;
312 81164 empty_buffer_read = false;
313 81164 }
314
315 208 void perform_io() noexcept override
316 {
317 ssize_t n;
318 do {
319 208 n = ::readv(fd, iovecs, iovec_count);
320
3/4
✓ Branch 0 taken 115 times.
✓ Branch 1 taken 93 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 115 times.
208 } while (n < 0 && errno == EINTR);
321
322
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 115 times.
208 if (n >= 0)
323 93 complete(0, static_cast<std::size_t>(n));
324 else
325 115 complete(errno, 0);
326 208 }
327
328 void cancel() noexcept override;
329 };
330
331
332 struct epoll_write_op : epoll_op
333 {
334 static constexpr std::size_t max_buffers = 16;
335 iovec iovecs[max_buffers];
336 int iovec_count = 0;
337
338 81002 void reset() noexcept
339 {
340 81002 epoll_op::reset();
341 81002 iovec_count = 0;
342 81002 }
343
344 void perform_io() noexcept override
345 {
346 msghdr msg{};
347 msg.msg_iov = iovecs;
348 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
349
350 ssize_t n;
351 do {
352 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
353 } while (n < 0 && errno == EINTR);
354
355 if (n >= 0)
356 complete(0, static_cast<std::size_t>(n));
357 else
358 complete(errno, 0);
359 }
360
361 void cancel() noexcept override;
362 };
363
364
365 struct epoll_accept_op : epoll_op
366 {
367 int accepted_fd = -1;
368 io_object::io_object_impl* peer_impl = nullptr;
369 io_object::io_object_impl** impl_out = nullptr;
370
371 2723 void reset() noexcept
372 {
373 2723 epoll_op::reset();
374 2723 accepted_fd = -1;
375 2723 peer_impl = nullptr;
376 2723 impl_out = nullptr;
377 2723 }
378
379 2712 void perform_io() noexcept override
380 {
381 2712 sockaddr_in addr{};
382 2712 socklen_t addrlen = sizeof(addr);
383 int new_fd;
384 do {
385 2712 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
386 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
387
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2712 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2712 } while (new_fd < 0 && errno == EINTR);
388
389
1/2
✓ Branch 0 taken 2712 times.
✗ Branch 1 not taken.
2712 if (new_fd >= 0)
390 {
391 2712 accepted_fd = new_fd;
392 2712 complete(0, 0);
393 }
394 else
395 {
396 complete(errno, 0);
397 }
398 2712 }
399
400 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
401 void operator()() override;
402 void cancel() noexcept override;
403 };
404
405 } // namespace boost::corosio::detail
406
407 #endif // BOOST_COROSIO_HAS_EPOLL
408
409 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
410