libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.7% Lines (392/486) 89.1% Functions (41/46) 69.4% Branches (206/297)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 167 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 167 : key(k)
108 167 , next(n)
109 167 , private_outstanding_work(0)
110 {
111 167 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 167 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 167 : frame_(ctx, context_stack.get())
125 {
126 167 context_stack.set(&frame_);
127 167 }
128
129 167 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 167 times.
167 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 167 context_stack.set(frame_.next);
134 167 }
135 };
136
137 scheduler_context*
138 250697 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 249048 times.
✓ Branch 2 taken 1649 times.
250697 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 249048 times.
✗ Branch 1 not taken.
249048 if (c->key == self)
142 249048 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 86509 descriptor_state::
150 operator()()
151 {
152 86509 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 86509 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 86509 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 86509 times.
86509 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 86509 op_queue local_ops;
166
167 86509 int err = 0;
168
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 86507 times.
86509 if (ev & EPOLLERR)
169 {
170 2 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if (err == 0)
174 1 err = EIO;
175 }
176
177 {
178
1/1
✓ Branch 1 taken 86509 times.
86509 std::lock_guard lock(mutex);
179
2/2
✓ Branch 0 taken 33598 times.
✓ Branch 1 taken 52911 times.
86509 if (ev & EPOLLIN)
180 {
181
2/2
✓ Branch 0 taken 2805 times.
✓ Branch 1 taken 30793 times.
33598 if (read_op)
182 {
183 2805 auto* rd = read_op;
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2805 times.
2805 if (err)
185 rd->complete(err, 0);
186 else
187 2805 rd->perform_io();
188
189
2/4
✓ Branch 0 taken 2805 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2805 times.
2805 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
190 {
191 rd->errn = 0;
192 }
193 else
194 {
195 2805 read_op = nullptr;
196 2805 local_ops.push(rd);
197 }
198 }
199 else
200 {
201 30793 read_ready = true;
202 }
203 }
204
2/2
✓ Branch 0 taken 83797 times.
✓ Branch 1 taken 2712 times.
86509 if (ev & EPOLLOUT)
205 {
206
3/4
✓ Branch 0 taken 81081 times.
✓ Branch 1 taken 2716 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 81081 times.
83797 bool had_write_op = (connect_op || write_op);
207
2/2
✓ Branch 0 taken 2716 times.
✓ Branch 1 taken 81081 times.
83797 if (connect_op)
208 {
209 2716 auto* cn = connect_op;
210
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2715 times.
2716 if (err)
211 1 cn->complete(err, 0);
212 else
213 2715 cn->perform_io();
214 2716 connect_op = nullptr;
215 2716 local_ops.push(cn);
216 }
217
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83797 times.
83797 if (write_op)
218 {
219 auto* wr = write_op;
220 if (err)
221 wr->complete(err, 0);
222 else
223 wr->perform_io();
224
225 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
226 {
227 wr->errn = 0;
228 }
229 else
230 {
231 write_op = nullptr;
232 local_ops.push(wr);
233 }
234 }
235
2/2
✓ Branch 0 taken 81081 times.
✓ Branch 1 taken 2716 times.
83797 if (!had_write_op)
236 81081 write_ready = true;
237 }
238
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 86507 times.
86509 if (err)
239 {
240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (read_op)
241 {
242 read_op->complete(err, 0);
243 local_ops.push(std::exchange(read_op, nullptr));
244 }
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (write_op)
246 {
247 write_op->complete(err, 0);
248 local_ops.push(std::exchange(write_op, nullptr));
249 }
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (connect_op)
251 {
252 connect_op->complete(err, 0);
253 local_ops.push(std::exchange(connect_op, nullptr));
254 }
255 }
256 86509 }
257
258 // Execute first handler inline — the scheduler's work_cleanup
259 // accounts for this as the "consumed" work item
260 86509 scheduler_op* first = local_ops.pop();
261
2/2
✓ Branch 0 taken 5521 times.
✓ Branch 1 taken 80988 times.
86509 if (first)
262 {
263
1/1
✓ Branch 1 taken 5521 times.
5521 scheduler_->post_deferred_completions(local_ops);
264
1/1
✓ Branch 1 taken 5521 times.
5521 (*first)();
265 }
266 else
267 {
268 80988 scheduler_->compensating_work_started();
269 }
270 86509 }
271
272 189 epoll_scheduler::
273 epoll_scheduler(
274 capy::execution_context& ctx,
275 189 int)
276 189 : epoll_fd_(-1)
277 189 , event_fd_(-1)
278 189 , timer_fd_(-1)
279 189 , outstanding_work_(0)
280 189 , stopped_(false)
281 189 , shutdown_(false)
282 189 , task_running_{false}
283 189 , task_interrupted_(false)
284 378 , state_(0)
285 {
286 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
288 detail::throw_system_error(make_err(errno), "epoll_create1");
289
290 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
292 {
293 int errn = errno;
294 ::close(epoll_fd_);
295 detail::throw_system_error(make_err(errn), "eventfd");
296 }
297
298 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
300 {
301 int errn = errno;
302 ::close(event_fd_);
303 ::close(epoll_fd_);
304 detail::throw_system_error(make_err(errn), "timerfd_create");
305 }
306
307 189 epoll_event ev{};
308 189 ev.events = EPOLLIN | EPOLLET;
309 189 ev.data.ptr = nullptr;
310
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
311 {
312 int errn = errno;
313 ::close(timer_fd_);
314 ::close(event_fd_);
315 ::close(epoll_fd_);
316 detail::throw_system_error(make_err(errn), "epoll_ctl");
317 }
318
319 189 epoll_event timer_ev{};
320 189 timer_ev.events = EPOLLIN | EPOLLERR;
321 189 timer_ev.data.ptr = &timer_fd_;
322
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
323 {
324 int errn = errno;
325 ::close(timer_fd_);
326 ::close(event_fd_);
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
329 }
330
331
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
332
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
333 timer_service::callback(
334 this,
335 [](void* p) {
336 2908 auto* self = static_cast<epoll_scheduler*>(p);
337 2908 self->timerfd_stale_.store(true, std::memory_order_release);
338
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2908 times.
2908 if (self->task_running_.load(std::memory_order_acquire))
339 self->interrupt_reactor();
340 2908 }));
341
342 // Initialize resolver service
343
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
344
345 // Initialize signal service
346
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
347
348 // Push task sentinel to interleave reactor runs with handler execution
349 189 completed_ops_.push(&task_op_);
350 189 }
351
352 378 epoll_scheduler::
353 189 ~epoll_scheduler()
354 {
355
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
356 189 ::close(timer_fd_);
357
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
358 189 ::close(event_fd_);
359
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
360 189 ::close(epoll_fd_);
361 378 }
362
363 void
364 189 epoll_scheduler::
365 shutdown()
366 {
367 {
368
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
369 189 shutdown_ = true;
370
371
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
372 {
373
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
374 189 continue;
375 lock.unlock();
376 h->destroy();
377 lock.lock();
378 189 }
379
380 189 signal_all(lock);
381 189 }
382
383 189 outstanding_work_.store(0, std::memory_order_release);
384
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 interrupt_reactor();
387 189 }
388
389 void
390 4676 epoll_scheduler::
391 post(std::coroutine_handle<> h) const
392 {
393 struct post_handler final
394 : scheduler_op
395 {
396 std::coroutine_handle<> h_;
397
398 explicit
399 4676 post_handler(std::coroutine_handle<> h)
400 4676 : h_(h)
401 {
402 4676 }
403
404 9352 ~post_handler() = default;
405
406 4676 void operator()() override
407 {
408 4676 auto h = h_;
409
1/2
✓ Branch 0 taken 4676 times.
✗ Branch 1 not taken.
4676 delete this;
410
1/1
✓ Branch 1 taken 4676 times.
4676 h.resume();
411 4676 }
412
413 void destroy() override
414 {
415 delete this;
416 }
417 };
418
419
1/1
✓ Branch 1 taken 4676 times.
4676 auto ph = std::make_unique<post_handler>(h);
420
421 // Fast path: same thread posts to private queue
422 // Only count locally; work_cleanup batches to global counter
423
2/2
✓ Branch 1 taken 3053 times.
✓ Branch 2 taken 1623 times.
4676 if (auto* ctx = find_context(this))
424 {
425 3053 ++ctx->private_outstanding_work;
426 3053 ctx->private_queue.push(ph.release());
427 3053 return;
428 }
429
430 // Slow path: cross-thread post requires mutex
431 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
432
433
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
434 1623 completed_ops_.push(ph.release());
435
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
436 4676 }
437
438 void
439 165033 epoll_scheduler::
440 post(scheduler_op* h) const
441 {
442 // Fast path: same thread posts to private queue
443 // Only count locally; work_cleanup batches to global counter
444
2/2
✓ Branch 1 taken 165007 times.
✓ Branch 2 taken 26 times.
165033 if (auto* ctx = find_context(this))
445 {
446 165007 ++ctx->private_outstanding_work;
447 165007 ctx->private_queue.push(h);
448 165007 return;
449 }
450
451 // Slow path: cross-thread post requires mutex
452 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
453
454
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
455 26 completed_ops_.push(h);
456
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
457 26 }
458
459 void
460 3526 epoll_scheduler::
461 on_work_started() noexcept
462 {
463 3526 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
464 3526 }
465
466 void
467 3494 epoll_scheduler::
468 on_work_finished() noexcept
469 {
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3494 times.
6988 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
471 stop();
472 3494 }
473
474 bool
475 600 epoll_scheduler::
476 running_in_this_thread() const noexcept
477 {
478
2/2
✓ Branch 1 taken 390 times.
✓ Branch 2 taken 210 times.
600 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
479
1/2
✓ Branch 0 taken 390 times.
✗ Branch 1 not taken.
390 if (c->key == this)
480 390 return true;
481 210 return false;
482 }
483
484 void
485 31 epoll_scheduler::
486 stop()
487 {
488
1/1
✓ Branch 1 taken 31 times.
31 std::unique_lock lock(mutex_);
489
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 12 times.
31 if (!stopped_)
490 {
491 19 stopped_ = true;
492 19 signal_all(lock);
493
1/1
✓ Branch 1 taken 19 times.
19 interrupt_reactor();
494 }
495 31 }
496
497 bool
498 16 epoll_scheduler::
499 stopped() const noexcept
500 {
501 16 std::unique_lock lock(mutex_);
502 32 return stopped_;
503 16 }
504
505 void
506 49 epoll_scheduler::
507 restart()
508 {
509
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
510 49 stopped_ = false;
511 49 }
512
513 std::size_t
514 175 epoll_scheduler::
515 run()
516 {
517
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 153 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
518 {
519
1/1
✓ Branch 1 taken 22 times.
22 stop();
520 22 return 0;
521 }
522
523 153 thread_context_guard ctx(this);
524
1/1
✓ Branch 1 taken 153 times.
153 std::unique_lock lock(mutex_);
525
526 153 std::size_t n = 0;
527 for (;;)
528 {
529
3/3
✓ Branch 1 taken 256356 times.
✓ Branch 3 taken 153 times.
✓ Branch 4 taken 256203 times.
256356 if (!do_one(lock, -1, &ctx.frame_))
530 153 break;
531
1/2
✓ Branch 1 taken 256203 times.
✗ Branch 2 not taken.
256203 if (n != (std::numeric_limits<std::size_t>::max)())
532 256203 ++n;
533
2/2
✓ Branch 1 taken 91224 times.
✓ Branch 2 taken 164979 times.
256203 if (!lock.owns_lock())
534
1/1
✓ Branch 1 taken 91224 times.
91224 lock.lock();
535 }
536 153 return n;
537 153 }
538
539 std::size_t
540 2 epoll_scheduler::
541 run_one()
542 {
543
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545 stop();
546 return 0;
547 }
548
549 2 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
551
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
552 2 }
553
554 std::size_t
555 14 epoll_scheduler::
556 wait_one(long usec)
557 {
558
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
559 {
560
1/1
✓ Branch 1 taken 5 times.
5 stop();
561 5 return 0;
562 }
563
564 9 thread_context_guard ctx(this);
565
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
566
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
567 9 }
568
569 std::size_t
570 2 epoll_scheduler::
571 poll()
572 {
573
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
574 {
575
1/1
✓ Branch 1 taken 1 time.
1 stop();
576 1 return 0;
577 }
578
579 1 thread_context_guard ctx(this);
580
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
581
582 1 std::size_t n = 0;
583 for (;;)
584 {
585
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
586 1 break;
587
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
588 2 ++n;
589
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
590
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
591 }
592 1 return n;
593 1 }
594
595 std::size_t
596 4 epoll_scheduler::
597 poll_one()
598 {
599
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 2 times.
2 stop();
602 2 return 0;
603 }
604
605 2 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
607
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
608 2 }
609
610 void
611 5503 epoll_scheduler::
612 register_descriptor(int fd, descriptor_state* desc) const
613 {
614 5503 epoll_event ev{};
615 5503 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
616 5503 ev.data.ptr = desc;
617
618
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5503 times.
5503 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
619 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
620
621 5503 desc->registered_events = ev.events;
622 5503 desc->fd = fd;
623 5503 desc->scheduler_ = this;
624
625
1/1
✓ Branch 1 taken 5503 times.
5503 std::lock_guard lock(desc->mutex);
626 5503 desc->read_ready = false;
627 5503 desc->write_ready = false;
628 5503 }
629
630 void
631 5503 epoll_scheduler::
632 deregister_descriptor(int fd) const
633 {
634 5503 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
635 5503 }
636
637 void
638 5688 epoll_scheduler::
639 work_started() const noexcept
640 {
641 5688 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
642 5688 }
643
644 void
645 10447 epoll_scheduler::
646 work_finished() const noexcept
647 {
648
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 10299 times.
20894 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
649 {
650 // Last work item completed - wake all threads so they can exit.
651 // signal_all() wakes threads waiting on the condvar.
652 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
653 // Both are needed because they target different blocking mechanisms.
654 148 std::unique_lock lock(mutex_);
655 148 signal_all(lock);
656
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 146 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
657 {
658 2 task_interrupted_ = true;
659 2 lock.unlock();
660 2 interrupt_reactor();
661 }
662 148 }
663 10447 }
664
665 void
666 80988 epoll_scheduler::
667 compensating_work_started() const noexcept
668 {
669 80988 auto* ctx = find_context(this);
670
1/2
✓ Branch 0 taken 80988 times.
✗ Branch 1 not taken.
80988 if (ctx)
671 80988 ++ctx->private_outstanding_work;
672 80988 }
673
674 void
675 epoll_scheduler::
676 drain_thread_queue(op_queue& queue, long count) const
677 {
678 // Note: outstanding_work_ was already incremented when posting
679 std::unique_lock lock(mutex_);
680 completed_ops_.splice(queue);
681 if (count > 0)
682 maybe_unlock_and_signal_one(lock);
683 }
684
685 void
686 5521 epoll_scheduler::
687 post_deferred_completions(op_queue& ops) const
688 {
689
1/2
✓ Branch 1 taken 5521 times.
✗ Branch 2 not taken.
5521 if (ops.empty())
690 5521 return;
691
692 // Fast path: if on scheduler thread, use private queue
693 if (auto* ctx = find_context(this))
694 {
695 ctx->private_queue.splice(ops);
696 return;
697 }
698
699 // Slow path: add to global queue and wake a thread
700 std::unique_lock lock(mutex_);
701 completed_ops_.splice(ops);
702 wake_one_thread_and_unlock(lock);
703 }
704
705 void
706 236 epoll_scheduler::
707 interrupt_reactor() const
708 {
709 // Only write if not already armed to avoid redundant writes
710 236 bool expected = false;
711
2/2
✓ Branch 1 taken 223 times.
✓ Branch 2 taken 13 times.
236 if (eventfd_armed_.compare_exchange_strong(expected, true,
712 std::memory_order_release, std::memory_order_relaxed))
713 {
714 223 std::uint64_t val = 1;
715
1/1
✓ Branch 1 taken 223 times.
223 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
716 }
717 236 }
718
719 void
720 356 epoll_scheduler::
721 signal_all(std::unique_lock<std::mutex>&) const
722 {
723 356 state_ |= 1;
724 356 cond_.notify_all();
725 356 }
726
727 bool
728 1649 epoll_scheduler::
729 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
730 {
731 1649 state_ |= 1;
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
733 {
734 lock.unlock();
735 cond_.notify_one();
736 return true;
737 }
738 1649 return false;
739 }
740
741 void
742 333622 epoll_scheduler::
743 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
744 {
745 333622 state_ |= 1;
746 333622 bool have_waiters = state_ > 1;
747 333622 lock.unlock();
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 333622 times.
333622 if (have_waiters)
749 cond_.notify_one();
750 333622 }
751
752 void
753 epoll_scheduler::
754 clear_signal() const
755 {
756 state_ &= ~std::size_t(1);
757 }
758
759 void
760 epoll_scheduler::
761 wait_for_signal(std::unique_lock<std::mutex>& lock) const
762 {
763 while ((state_ & 1) == 0)
764 {
765 state_ += 2;
766 cond_.wait(lock);
767 state_ -= 2;
768 }
769 }
770
771 void
772 epoll_scheduler::
773 wait_for_signal_for(
774 std::unique_lock<std::mutex>& lock,
775 long timeout_us) const
776 {
777 if ((state_ & 1) == 0)
778 {
779 state_ += 2;
780 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
781 state_ -= 2;
782 }
783 }
784
785 void
786 1649 epoll_scheduler::
787 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
788 {
789
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
790 return;
791
792
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
793 {
794 26 task_interrupted_ = true;
795 26 lock.unlock();
796 26 interrupt_reactor();
797 }
798 else
799 {
800 1623 lock.unlock();
801 }
802 }
803
804 /** RAII guard for handler execution work accounting.
805
806 Handler consumes 1 work item, may produce N new items via fast-path posts.
807 Net change = N - 1:
808 - If N > 1: add (N-1) to global (more work produced than consumed)
809 - If N == 1: net zero, do nothing
810 - If N < 1: call work_finished() (work consumed, may trigger stop)
811
812 Also drains private queue to global for other threads to process.
813 */
814 struct work_cleanup
815 {
816 epoll_scheduler const* scheduler;
817 std::unique_lock<std::mutex>* lock;
818 scheduler_context* ctx;
819
820 256218 ~work_cleanup()
821 {
822
1/2
✓ Branch 0 taken 256218 times.
✗ Branch 1 not taken.
256218 if (ctx)
823 {
824 256218 long produced = ctx->private_outstanding_work;
825
2/2
✓ Branch 0 taken 169 times.
✓ Branch 1 taken 256049 times.
256218 if (produced > 1)
826 169 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
827
2/2
✓ Branch 0 taken 10248 times.
✓ Branch 1 taken 245801 times.
256049 else if (produced < 1)
828 10248 scheduler->work_finished();
829 // produced == 1: net zero, handler consumed what it produced
830 256218 ctx->private_outstanding_work = 0;
831
832
2/2
✓ Branch 1 taken 164982 times.
✓ Branch 2 taken 91236 times.
256218 if (!ctx->private_queue.empty())
833 {
834 164982 lock->lock();
835 164982 scheduler->completed_ops_.splice(ctx->private_queue);
836 }
837 }
838 else
839 {
840 // No thread context - slow-path op was already counted globally
841 scheduler->work_finished();
842 }
843 256218 }
844 };
845
846 /** RAII guard for reactor work accounting.
847
848 Reactor only produces work via timer/signal callbacks posting handlers.
849 Unlike handler execution which consumes 1, the reactor consumes nothing.
850 All produced work must be flushed to global counter.
851 */
852 struct task_cleanup
853 {
854 epoll_scheduler const* scheduler;
855 std::unique_lock<std::mutex>* lock;
856 scheduler_context* ctx;
857
858 83050 ~task_cleanup()
859 83050 {
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83050 times.
83050 if (!ctx)
861 return;
862
863
2/2
✓ Branch 0 taken 2907 times.
✓ Branch 1 taken 80143 times.
83050 if (ctx->private_outstanding_work > 0)
864 {
865 2907 scheduler->outstanding_work_.fetch_add(
866 2907 ctx->private_outstanding_work, std::memory_order_relaxed);
867 2907 ctx->private_outstanding_work = 0;
868 }
869
870
2/2
✓ Branch 1 taken 2907 times.
✓ Branch 2 taken 80143 times.
83050 if (!ctx->private_queue.empty())
871 {
872
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2907 times.
2907 if (!lock->owns_lock())
873 lock->lock();
874 2907 scheduler->completed_ops_.splice(ctx->private_queue);
875 }
876 83050 }
877 };
878
879 void
880 5811 epoll_scheduler::
881 update_timerfd() const
882 {
883 5811 auto nearest = timer_svc_->nearest_expiry();
884
885 5811 itimerspec ts{};
886 5811 int flags = 0;
887
888
3/3
✓ Branch 2 taken 5811 times.
✓ Branch 4 taken 5770 times.
✓ Branch 5 taken 41 times.
5811 if (nearest == timer_service::time_point::max())
889 {
890 // No timers - disarm by setting to 0 (relative)
891 }
892 else
893 {
894 5770 auto now = std::chrono::steady_clock::now();
895
3/3
✓ Branch 1 taken 5770 times.
✓ Branch 4 taken 34 times.
✓ Branch 5 taken 5736 times.
5770 if (nearest <= now)
896 {
897 // Use 1ns instead of 0 - zero disarms the timerfd
898 34 ts.it_value.tv_nsec = 1;
899 }
900 else
901 {
902 5736 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
903
1/1
✓ Branch 1 taken 5736 times.
11472 nearest - now).count();
904 5736 ts.it_value.tv_sec = nsec / 1000000000;
905 5736 ts.it_value.tv_nsec = nsec % 1000000000;
906 // Ensure non-zero to avoid disarming if duration rounds to 0
907
3/4
✓ Branch 0 taken 5732 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5732 times.
5736 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
908 ts.it_value.tv_nsec = 1;
909 }
910 }
911
912
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5811 times.
5811 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
913 detail::throw_system_error(make_err(errno), "timerfd_settime");
914 5811 }
915
916 void
917 83050 epoll_scheduler::
918 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
919 {
920
2/2
✓ Branch 0 taken 77404 times.
✓ Branch 1 taken 5646 times.
83050 int timeout_ms = task_interrupted_ ? 0 : -1;
921
922
2/2
✓ Branch 1 taken 5646 times.
✓ Branch 2 taken 77404 times.
83050 if (lock.owns_lock())
923
1/1
✓ Branch 1 taken 5646 times.
5646 lock.unlock();
924
925 83050 task_cleanup on_exit{this, &lock, ctx};
926
927 // Flush deferred timerfd programming before blocking
928
2/2
✓ Branch 1 taken 2904 times.
✓ Branch 2 taken 80146 times.
83050 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
929
1/1
✓ Branch 1 taken 2904 times.
2904 update_timerfd();
930
931 // Event loop runs without mutex held
932 epoll_event events[128];
933
1/1
✓ Branch 1 taken 83050 times.
83050 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
934
935
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 83050 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
83050 if (nfds < 0 && errno != EINTR)
936 detail::throw_system_error(make_err(errno), "epoll_wait");
937
938 83050 bool check_timers = false;
939 83050 op_queue local_ops;
940
941 // Process events without holding the mutex
942
2/2
✓ Branch 0 taken 89450 times.
✓ Branch 1 taken 83050 times.
172500 for (int i = 0; i < nfds; ++i)
943 {
944
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 89416 times.
89450 if (events[i].data.ptr == nullptr)
945 {
946 std::uint64_t val;
947
1/1
✓ Branch 1 taken 34 times.
34 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
948 34 eventfd_armed_.store(false, std::memory_order_relaxed);
949 34 continue;
950 34 }
951
952
2/2
✓ Branch 0 taken 2907 times.
✓ Branch 1 taken 86509 times.
89416 if (events[i].data.ptr == &timer_fd_)
953 {
954 std::uint64_t expirations;
955
1/1
✓ Branch 1 taken 2907 times.
2907 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
956 2907 check_timers = true;
957 2907 continue;
958 2907 }
959
960 // Deferred I/O: just set ready events and enqueue descriptor
961 // No per-descriptor mutex locking in reactor hot path!
962 86509 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
963 86509 desc->add_ready_events(events[i].events);
964
965 // Only enqueue if not already enqueued
966 86509 bool expected = false;
967
1/2
✓ Branch 1 taken 86509 times.
✗ Branch 2 not taken.
86509 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
968 std::memory_order_release, std::memory_order_relaxed))
969 {
970 86509 local_ops.push(desc);
971 }
972 }
973
974 // Process timers only when timerfd fires
975
2/2
✓ Branch 0 taken 2907 times.
✓ Branch 1 taken 80143 times.
83050 if (check_timers)
976 {
977
1/1
✓ Branch 1 taken 2907 times.
2907 timer_svc_->process_expired();
978
1/1
✓ Branch 1 taken 2907 times.
2907 update_timerfd();
979 }
980
981
1/1
✓ Branch 1 taken 83050 times.
83050 lock.lock();
982
983
2/2
✓ Branch 1 taken 46201 times.
✓ Branch 2 taken 36849 times.
83050 if (!local_ops.empty())
984 46201 completed_ops_.splice(local_ops);
985 83050 }
986
987 std::size_t
988 256372 epoll_scheduler::
989 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
990 {
991 for (;;)
992 {
993
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 339418 times.
339422 if (stopped_)
994 4 return 0;
995
996 339418 scheduler_op* op = completed_ops_.pop();
997
998 // Handle reactor sentinel - time to poll for I/O
999
2/2
✓ Branch 0 taken 83197 times.
✓ Branch 1 taken 256221 times.
339418 if (op == &task_op_)
1000 {
1001 83197 bool more_handlers = !completed_ops_.empty();
1002
1003 // Nothing to run the reactor for: no pending work to wait on,
1004 // or caller requested a non-blocking poll
1005
4/4
✓ Branch 0 taken 5793 times.
✓ Branch 1 taken 77404 times.
✓ Branch 2 taken 147 times.
✓ Branch 3 taken 83050 times.
88990 if (!more_handlers &&
1006
3/4
✓ Branch 1 taken 5646 times.
✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5646 times.
11586 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1007 timeout_us == 0))
1008 {
1009 147 completed_ops_.push(&task_op_);
1010 147 return 0;
1011 }
1012
1013
3/4
✓ Branch 0 taken 5646 times.
✓ Branch 1 taken 77404 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5646 times.
83050 task_interrupted_ = more_handlers || timeout_us == 0;
1014 83050 task_running_.store(true, std::memory_order_release);
1015
1016
2/2
✓ Branch 0 taken 77404 times.
✓ Branch 1 taken 5646 times.
83050 if (more_handlers)
1017 77404 unlock_and_signal_one(lock);
1018
1019 83050 run_task(lock, ctx);
1020
1021 83050 task_running_.store(false, std::memory_order_relaxed);
1022 83050 completed_ops_.push(&task_op_);
1023 83050 continue;
1024 83050 }
1025
1026 // Handle operation
1027
2/2
✓ Branch 0 taken 256218 times.
✓ Branch 1 taken 3 times.
256221 if (op != nullptr)
1028 {
1029
1/2
✓ Branch 1 taken 256218 times.
✗ Branch 2 not taken.
256218 if (!completed_ops_.empty())
1030
1/1
✓ Branch 1 taken 256218 times.
256218 unlock_and_signal_one(lock);
1031 else
1032 lock.unlock();
1033
1034 256218 work_cleanup on_exit{this, &lock, ctx};
1035
1036
1/1
✓ Branch 1 taken 256218 times.
256218 (*op)();
1037 256218 return 1;
1038 256218 }
1039
1040 // No pending work to wait on, or caller requested non-blocking poll
1041
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
6 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1042 timeout_us == 0)
1043 3 return 0;
1044
1045 clear_signal();
1046 if (timeout_us < 0)
1047 wait_for_signal(lock);
1048 else
1049 wait_for_signal_for(lock, timeout_us);
1050 83050 }
1051 }
1052
1053 } // namespace boost::corosio::detail
1054
1055 #endif
1056