libs/corosio/src/corosio/src/detail/select/scheduler.cpp

73.4% Lines (268/365) 87.9% Functions (29/33) 62.2% Branches (166/267)
libs/corosio/src/corosio/src/detail/select/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/scheduler.hpp"
15 #include "src/detail/select/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <sys/select.h>
29 #include <sys/socket.h>
30 #include <unistd.h>
31
32 /*
33 select Scheduler - Single Reactor Model
34 =======================================
35
36 This scheduler mirrors the epoll_scheduler design but uses select() instead
37 of epoll for I/O multiplexing. The thread coordination strategy is identical:
38 one thread becomes the "reactor" while others wait on a condition variable.
39
40 Thread Model
41 ------------
42 - ONE thread runs select() at a time (the reactor thread)
43 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
44 - When work is posted, exactly one waiting thread wakes via notify_one()
45
46 Key Differences from epoll
47 --------------------------
48 - Uses self-pipe instead of eventfd for interruption (more portable)
49 - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
50 - FD_SETSIZE limit (~1024 fds on most systems)
51 - Level-triggered only (no edge-triggered mode)
52
53 Self-Pipe Pattern
54 -----------------
55 To interrupt a blocking select() call (e.g., when work is posted or a timer
56 expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
57 always in the read_fds set, so select() returns immediately. We drain the
58 pipe to clear the readable state.
59
60 fd-to-op Mapping
61 ----------------
62 We use an unordered_map<int, fd_state> to track which operations are
63 registered for each fd. This allows O(1) lookup when select() returns
64 ready fds. Each fd can have at most one read op and one write op registered.
65 */
66
67 namespace boost::corosio::detail {
68
69 namespace {
70
71 struct scheduler_context
72 {
73 select_scheduler const* key;
74 scheduler_context* next;
75 };
76
77 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
78
79 struct thread_context_guard
80 {
81 scheduler_context frame_;
82
83 95 explicit thread_context_guard(
84 select_scheduler const* ctx) noexcept
85 95 : frame_{ctx, context_stack.get()}
86 {
87 95 context_stack.set(&frame_);
88 95 }
89
90 95 ~thread_context_guard() noexcept
91 {
92 95 context_stack.set(frame_.next);
93 95 }
94 };
95
96 } // namespace
97
98 120 select_scheduler::
99 select_scheduler(
100 capy::execution_context& ctx,
101 120 int)
102 120 : pipe_fds_{-1, -1}
103 120 , outstanding_work_(0)
104 120 , stopped_(false)
105 120 , shutdown_(false)
106 120 , max_fd_(-1)
107 120 , reactor_running_(false)
108 120 , reactor_interrupted_(false)
109 240 , idle_thread_count_(0)
110 {
111 // Create self-pipe for interrupting select()
112
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 120 times.
120 if (::pipe(pipe_fds_) < 0)
113 detail::throw_system_error(make_err(errno), "pipe");
114
115 // Set both ends to non-blocking and close-on-exec
116
2/2
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 120 times.
360 for (int i = 0; i < 2; ++i)
117 {
118
1/1
✓ Branch 1 taken 240 times.
240 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 if (flags == -1)
120 {
121 int errn = errno;
122 ::close(pipe_fds_[0]);
123 ::close(pipe_fds_[1]);
124 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
125 }
126
2/3
✓ Branch 1 taken 240 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 240 times.
240 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
127 {
128 int errn = errno;
129 ::close(pipe_fds_[0]);
130 ::close(pipe_fds_[1]);
131 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
132 }
133
2/3
✓ Branch 1 taken 240 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 240 times.
240 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
134 {
135 int errn = errno;
136 ::close(pipe_fds_[0]);
137 ::close(pipe_fds_[1]);
138 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
139 }
140 }
141
142
1/1
✓ Branch 1 taken 120 times.
120 timer_svc_ = &get_timer_service(ctx, *this);
143
1/1
✓ Branch 3 taken 120 times.
120 timer_svc_->set_on_earliest_changed(
144 timer_service::callback(
145 this,
146 2519 [](void* p) { static_cast<select_scheduler*>(p)->interrupt_reactor(); }));
147
148 // Initialize resolver service
149
1/1
✓ Branch 1 taken 120 times.
120 get_resolver_service(ctx, *this);
150
151 // Initialize signal service
152
1/1
✓ Branch 1 taken 120 times.
120 get_signal_service(ctx, *this);
153
154 // Push task sentinel to interleave reactor runs with handler execution
155 120 completed_ops_.push(&task_op_);
156 120 }
157
158 240 select_scheduler::
159 120 ~select_scheduler()
160 {
161
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 if (pipe_fds_[0] >= 0)
162 120 ::close(pipe_fds_[0]);
163
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 if (pipe_fds_[1] >= 0)
164 120 ::close(pipe_fds_[1]);
165 240 }
166
167 void
168 120 select_scheduler::
169 shutdown()
170 {
171 {
172
1/1
✓ Branch 1 taken 120 times.
120 std::unique_lock lock(mutex_);
173 120 shutdown_ = true;
174
175
2/2
✓ Branch 1 taken 120 times.
✓ Branch 2 taken 120 times.
240 while (auto* h = completed_ops_.pop())
176 {
177
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 if (h == &task_op_)
178 120 continue;
179 lock.unlock();
180 h->destroy();
181 lock.lock();
182 120 }
183 120 }
184
185 120 outstanding_work_.store(0, std::memory_order_release);
186
187
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 if (pipe_fds_[1] >= 0)
188 120 interrupt_reactor();
189
190 120 wakeup_event_.notify_all();
191 120 }
192
193 void
194 2816 select_scheduler::
195 post(std::coroutine_handle<> h) const
196 {
197 struct post_handler final
198 : scheduler_op
199 {
200 std::coroutine_handle<> h_;
201
202 explicit
203 2816 post_handler(std::coroutine_handle<> h)
204 2816 : h_(h)
205 {
206 2816 }
207
208 5632 ~post_handler() = default;
209
210 2816 void operator()() override
211 {
212 2816 auto h = h_;
213
1/2
✓ Branch 0 taken 2816 times.
✗ Branch 1 not taken.
2816 delete this;
214
1/1
✓ Branch 1 taken 2816 times.
2816 h.resume();
215 2816 }
216
217 void destroy() override
218 {
219 delete this;
220 }
221 };
222
223
1/1
✓ Branch 1 taken 2816 times.
2816 auto ph = std::make_unique<post_handler>(h);
224 2816 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
225
226
1/1
✓ Branch 1 taken 2816 times.
2816 std::unique_lock lock(mutex_);
227 2816 completed_ops_.push(ph.release());
228
1/1
✓ Branch 1 taken 2816 times.
2816 wake_one_thread_and_unlock(lock);
229 2816 }
230
231 void
232 167151 select_scheduler::
233 post(scheduler_op* h) const
234 {
235 167151 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
236
237
1/1
✓ Branch 1 taken 167151 times.
167151 std::unique_lock lock(mutex_);
238 167151 completed_ops_.push(h);
239
1/1
✓ Branch 1 taken 167151 times.
167151 wake_one_thread_and_unlock(lock);
240 167151 }
241
242 void
243 3051 select_scheduler::
244 on_work_started() noexcept
245 {
246 3051 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
247 3051 }
248
249 void
250 3045 select_scheduler::
251 on_work_finished() noexcept
252 {
253
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3045 times.
6090 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
254 stop();
255 3045 }
256
257 bool
258 520 select_scheduler::
259 running_in_this_thread() const noexcept
260 {
261
2/2
✓ Branch 1 taken 365 times.
✓ Branch 2 taken 155 times.
520 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
262
1/2
✓ Branch 0 taken 365 times.
✗ Branch 1 not taken.
365 if (c->key == this)
263 365 return true;
264 155 return false;
265 }
266
267 void
268 1 select_scheduler::
269 stop()
270 {
271 1 bool expected = false;
272
1/2
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
1 if (stopped_.compare_exchange_strong(expected, true,
273 std::memory_order_release, std::memory_order_relaxed))
274 {
275 // Wake all threads so they notice stopped_ and exit
276 {
277
1/1
✓ Branch 1 taken 1 time.
1 std::lock_guard lock(mutex_);
278 1 wakeup_event_.notify_all();
279 1 }
280
1/1
✓ Branch 1 taken 1 time.
1 interrupt_reactor();
281 }
282 1 }
283
284 bool
285 1 select_scheduler::
286 stopped() const noexcept
287 {
288 1 return stopped_.load(std::memory_order_acquire);
289 }
290
291 void
292 34 select_scheduler::
293 restart()
294 {
295 34 stopped_.store(false, std::memory_order_release);
296 34 }
297
298 std::size_t
299 89 select_scheduler::
300 run()
301 {
302
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 89 times.
89 if (stopped_.load(std::memory_order_acquire))
303 return 0;
304
305
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 89 times.
178 if (outstanding_work_.load(std::memory_order_acquire) == 0)
306 {
307 stop();
308 return 0;
309 }
310
311 89 thread_context_guard ctx(this);
312
313 89 std::size_t n = 0;
314
3/3
✓ Branch 1 taken 174815 times.
✓ Branch 3 taken 174726 times.
✓ Branch 4 taken 89 times.
174815 while (do_one(-1))
315
1/2
✓ Branch 1 taken 174726 times.
✗ Branch 2 not taken.
174726 if (n != (std::numeric_limits<std::size_t>::max)())
316 174726 ++n;
317 89 return n;
318 89 }
319
320 std::size_t
321 select_scheduler::
322 run_one()
323 {
324 if (stopped_.load(std::memory_order_acquire))
325 return 0;
326
327 if (outstanding_work_.load(std::memory_order_acquire) == 0)
328 {
329 stop();
330 return 0;
331 }
332
333 thread_context_guard ctx(this);
334 return do_one(-1);
335 }
336
337 std::size_t
338 7 select_scheduler::
339 wait_one(long usec)
340 {
341
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (stopped_.load(std::memory_order_acquire))
342 return 0;
343
344
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 6 times.
14 if (outstanding_work_.load(std::memory_order_acquire) == 0)
345 {
346
1/1
✓ Branch 1 taken 1 time.
1 stop();
347 1 return 0;
348 }
349
350 6 thread_context_guard ctx(this);
351
1/1
✓ Branch 1 taken 6 times.
6 return do_one(usec);
352 6 }
353
354 std::size_t
355 select_scheduler::
356 poll()
357 {
358 if (stopped_.load(std::memory_order_acquire))
359 return 0;
360
361 if (outstanding_work_.load(std::memory_order_acquire) == 0)
362 {
363 stop();
364 return 0;
365 }
366
367 thread_context_guard ctx(this);
368
369 std::size_t n = 0;
370 while (do_one(0))
371 if (n != (std::numeric_limits<std::size_t>::max)())
372 ++n;
373 return n;
374 }
375
376 std::size_t
377 select_scheduler::
378 poll_one()
379 {
380 if (stopped_.load(std::memory_order_acquire))
381 return 0;
382
383 if (outstanding_work_.load(std::memory_order_acquire) == 0)
384 {
385 stop();
386 return 0;
387 }
388
389 thread_context_guard ctx(this);
390 return do_one(0);
391 }
392
393 void
394 4926 select_scheduler::
395 register_fd(int fd, select_op* op, int events) const
396 {
397 // Validate fd is within select() limits
398
2/4
✓ Branch 0 taken 4926 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4926 times.
4926 if (fd < 0 || fd >= FD_SETSIZE)
399 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
400
401 {
402
1/1
✓ Branch 1 taken 4926 times.
4926 std::lock_guard lock(mutex_);
403
404
1/1
✓ Branch 1 taken 4926 times.
4926 auto& state = registered_fds_[fd];
405
2/2
✓ Branch 0 taken 2603 times.
✓ Branch 1 taken 2323 times.
4926 if (events & event_read)
406 2603 state.read_op = op;
407
2/2
✓ Branch 0 taken 2323 times.
✓ Branch 1 taken 2603 times.
4926 if (events & event_write)
408 2323 state.write_op = op;
409
410
2/2
✓ Branch 0 taken 226 times.
✓ Branch 1 taken 4700 times.
4926 if (fd > max_fd_)
411 226 max_fd_ = fd;
412 4926 }
413
414 // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
415 // with the newly registered fd.
416 4926 interrupt_reactor();
417 4926 }
418
419 void
420 4859 select_scheduler::
421 deregister_fd(int fd, int events) const
422 {
423
1/1
✓ Branch 1 taken 4859 times.
4859 std::lock_guard lock(mutex_);
424
425
1/1
✓ Branch 1 taken 4859 times.
4859 auto it = registered_fds_.find(fd);
426
2/2
✓ Branch 2 taken 4698 times.
✓ Branch 3 taken 161 times.
4859 if (it == registered_fds_.end())
427 4698 return;
428
429
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 if (events & event_read)
430 161 it->second.read_op = nullptr;
431
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161 times.
161 if (events & event_write)
432 it->second.write_op = nullptr;
433
434 // Remove entry if both are null
435
3/6
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 161 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 161 times.
✗ Branch 7 not taken.
161 if (!it->second.read_op && !it->second.write_op)
436 {
437
1/1
✓ Branch 1 taken 161 times.
161 registered_fds_.erase(it);
438
439 // Recalculate max_fd_ if needed
440
2/2
✓ Branch 0 taken 160 times.
✓ Branch 1 taken 1 time.
161 if (fd == max_fd_)
441 {
442 160 max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
443
1/2
✗ Branch 7 not taken.
✓ Branch 8 taken 160 times.
160 for (auto& [registered_fd, state] : registered_fds_)
444 {
445 if (registered_fd > max_fd_)
446 max_fd_ = registered_fd;
447 }
448 }
449 }
450 4859 }
451
452 void
453 4926 select_scheduler::
454 work_started() const noexcept
455 {
456 4926 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
457 4926 }
458
459 void
460 174899 select_scheduler::
461 work_finished() const noexcept
462 {
463
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 174809 times.
349798 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
464 {
465 // Last work item completed - wake all threads so they can exit.
466 90 std::unique_lock lock(mutex_);
467 90 wakeup_event_.notify_all();
468
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
90 if (reactor_running_ && !reactor_interrupted_)
469 {
470 reactor_interrupted_ = true;
471 lock.unlock();
472 interrupt_reactor();
473 }
474 90 }
475 174899 }
476
477 void
478 10083 select_scheduler::
479 interrupt_reactor() const
480 {
481 10083 char byte = 1;
482
1/1
✓ Branch 1 taken 10083 times.
10083 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
483 10083 }
484
485 void
486 169967 select_scheduler::
487 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
488 {
489
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 169967 times.
169967 if (idle_thread_count_ > 0)
490 {
491 // Idle worker exists - wake it via condvar
492 wakeup_event_.notify_one();
493 lock.unlock();
494 }
495
4/4
✓ Branch 0 taken 2520 times.
✓ Branch 1 taken 167447 times.
✓ Branch 2 taken 2517 times.
✓ Branch 3 taken 3 times.
169967 else if (reactor_running_ && !reactor_interrupted_)
496 {
497 // No idle workers but reactor is running - interrupt it
498 2517 reactor_interrupted_ = true;
499 2517 lock.unlock();
500 2517 interrupt_reactor();
501 }
502 else
503 {
504 // No one to wake
505 167450 lock.unlock();
506 }
507 169967 }
508
509 struct work_guard
510 {
511 select_scheduler const* self;
512 174732 ~work_guard() { self->work_finished(); }
513 };
514
515 long
516 7266 select_scheduler::
517 calculate_timeout(long requested_timeout_us) const
518 {
519
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7266 times.
7266 if (requested_timeout_us == 0)
520 return 0;
521
522 7266 auto nearest = timer_svc_->nearest_expiry();
523
3/3
✓ Branch 2 taken 7266 times.
✓ Branch 4 taken 35 times.
✓ Branch 5 taken 7231 times.
7266 if (nearest == timer_service::time_point::max())
524 35 return requested_timeout_us;
525
526 7231 auto now = std::chrono::steady_clock::now();
527
3/3
✓ Branch 1 taken 7231 times.
✓ Branch 4 taken 10 times.
✓ Branch 5 taken 7221 times.
7231 if (nearest <= now)
528 10 return 0;
529
530
1/1
✓ Branch 1 taken 7221 times.
7221 auto timer_timeout_us = std::chrono::duration_cast<std::chrono::microseconds>(
531
1/1
✓ Branch 1 taken 7221 times.
14442 nearest - now).count();
532
533
1/2
✓ Branch 0 taken 7221 times.
✗ Branch 1 not taken.
7221 if (requested_timeout_us < 0)
534 7221 return static_cast<long>(timer_timeout_us);
535
536 return static_cast<long>((std::min)(
537 static_cast<long long>(requested_timeout_us),
538 static_cast<long long>(timer_timeout_us)));
539 }
540
541 void
542 92455 select_scheduler::
543 run_reactor(std::unique_lock<std::mutex>& lock)
544 {
545 // Calculate timeout considering timers, use 0 if interrupted
546
3/3
✓ Branch 0 taken 85189 times.
✓ Branch 1 taken 7266 times.
✓ Branch 3 taken 7266 times.
92455 long effective_timeout_us = reactor_interrupted_ ? 0 : calculate_timeout(-1);
547
548 // Build fd_sets from registered_fds_
549 fd_set read_fds, write_fds, except_fds;
550
2/2
✓ Branch 0 taken 1479280 times.
✓ Branch 1 taken 92455 times.
1571735 FD_ZERO(&read_fds);
551
2/2
✓ Branch 0 taken 1479280 times.
✓ Branch 1 taken 92455 times.
1571735 FD_ZERO(&write_fds);
552
2/2
✓ Branch 0 taken 1479280 times.
✓ Branch 1 taken 92455 times.
1571735 FD_ZERO(&except_fds);
553
554 // Always include the interrupt pipe
555 92455 FD_SET(pipe_fds_[0], &read_fds);
556 92455 int nfds = pipe_fds_[0];
557
558 // Add registered fds
559
2/2
✓ Branch 7 taken 11666 times.
✓ Branch 8 taken 92455 times.
104121 for (auto& [fd, state] : registered_fds_)
560 {
561
2/2
✓ Branch 0 taken 9343 times.
✓ Branch 1 taken 2323 times.
11666 if (state.read_op)
562 9343 FD_SET(fd, &read_fds);
563
2/2
✓ Branch 0 taken 2323 times.
✓ Branch 1 taken 9343 times.
11666 if (state.write_op)
564 {
565 2323 FD_SET(fd, &write_fds);
566 // Also monitor for errors on connect operations
567 2323 FD_SET(fd, &except_fds);
568 }
569
2/2
✓ Branch 0 taken 9346 times.
✓ Branch 1 taken 2320 times.
11666 if (fd > nfds)
570 9346 nfds = fd;
571 }
572
573 // Convert timeout to timeval
574 struct timeval tv;
575 92455 struct timeval* tv_ptr = nullptr;
576
2/2
✓ Branch 0 taken 92420 times.
✓ Branch 1 taken 35 times.
92455 if (effective_timeout_us >= 0)
577 {
578 92420 tv.tv_sec = effective_timeout_us / 1000000;
579 92420 tv.tv_usec = effective_timeout_us % 1000000;
580 92420 tv_ptr = &tv;
581 }
582
583
1/1
✓ Branch 1 taken 92455 times.
92455 lock.unlock();
584
585
1/1
✓ Branch 1 taken 92455 times.
92455 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
586 92455 int saved_errno = errno;
587
588 // Process timers outside the lock
589
1/1
✓ Branch 1 taken 92455 times.
92455 timer_svc_->process_expired();
590
591
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 92455 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
92455 if (ready < 0 && saved_errno != EINTR)
592 detail::throw_system_error(make_err(saved_errno), "select");
593
594 // Re-acquire lock before modifying completed_ops_
595
1/1
✓ Branch 1 taken 92455 times.
92455 lock.lock();
596
597 // Drain the interrupt pipe if readable
598
3/4
✓ Branch 0 taken 7593 times.
✓ Branch 1 taken 84862 times.
✓ Branch 2 taken 7593 times.
✗ Branch 3 not taken.
92455 if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
599 {
600 char buf[256];
601
3/3
✓ Branch 1 taken 15186 times.
✓ Branch 3 taken 7593 times.
✓ Branch 4 taken 7593 times.
15186 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0) {}
602 }
603
604 // Process I/O completions
605 92455 int completions_queued = 0;
606
2/2
✓ Branch 0 taken 7593 times.
✓ Branch 1 taken 84862 times.
92455 if (ready > 0)
607 {
608 // Iterate over registered fds (copy keys to avoid iterator invalidation)
609 7593 std::vector<int> fds_to_check;
610
1/1
✓ Branch 2 taken 7593 times.
7593 fds_to_check.reserve(registered_fds_.size());
611
2/2
✓ Branch 7 taken 9377 times.
✓ Branch 8 taken 7593 times.
16970 for (auto& [fd, state] : registered_fds_)
612
1/1
✓ Branch 1 taken 9377 times.
9377 fds_to_check.push_back(fd);
613
614
2/2
✓ Branch 5 taken 9377 times.
✓ Branch 6 taken 7593 times.
16970 for (int fd : fds_to_check)
615 {
616
1/1
✓ Branch 1 taken 9377 times.
9377 auto it = registered_fds_.find(fd);
617
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 9377 times.
9377 if (it == registered_fds_.end())
618 continue;
619
620 9377 auto& state = it->second;
621
622 // Check for errors (especially for connect operations)
623 9377 bool has_error = FD_ISSET(fd, &except_fds);
624
625 // Process read readiness
626
5/6
✓ Branch 0 taken 7054 times.
✓ Branch 1 taken 2323 times.
✓ Branch 2 taken 4612 times.
✓ Branch 3 taken 2442 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4612 times.
9377 if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
627 {
628 2442 auto* op = state.read_op;
629 // Claim the op by exchanging to unregistered. Both registering and
630 // registered states mean the op is ours to complete.
631 2442 auto prev = op->registered.exchange(
632 select_registration_state::unregistered, std::memory_order_acq_rel);
633
1/2
✓ Branch 0 taken 2442 times.
✗ Branch 1 not taken.
2442 if (prev != select_registration_state::unregistered)
634 {
635 2442 state.read_op = nullptr;
636
637
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2442 times.
2442 if (has_error)
638 {
639 int errn = 0;
640 socklen_t len = sizeof(errn);
641 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
642 errn = errno;
643 if (errn == 0)
644 errn = EIO;
645 op->complete(errn, 0);
646 }
647 else
648 {
649 2442 op->perform_io();
650 }
651
652 2442 completed_ops_.push(op);
653 2442 ++completions_queued;
654 }
655 }
656
657 // Process write readiness
658
3/6
✓ Branch 0 taken 2323 times.
✓ Branch 1 taken 7054 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2323 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
9377 if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
659 {
660 2323 auto* op = state.write_op;
661 // Claim the op by exchanging to unregistered. Both registering and
662 // registered states mean the op is ours to complete.
663 2323 auto prev = op->registered.exchange(
664 select_registration_state::unregistered, std::memory_order_acq_rel);
665
1/2
✓ Branch 0 taken 2323 times.
✗ Branch 1 not taken.
2323 if (prev != select_registration_state::unregistered)
666 {
667 2323 state.write_op = nullptr;
668
669
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2323 times.
2323 if (has_error)
670 {
671 int errn = 0;
672 socklen_t len = sizeof(errn);
673 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
674 errn = errno;
675 if (errn == 0)
676 errn = EIO;
677 op->complete(errn, 0);
678 }
679 else
680 {
681 2323 op->perform_io();
682 }
683
684 2323 completed_ops_.push(op);
685 2323 ++completions_queued;
686 }
687 }
688
689 // Clean up empty entries
690
3/4
✓ Branch 0 taken 4765 times.
✓ Branch 1 taken 4612 times.
✓ Branch 2 taken 4765 times.
✗ Branch 3 not taken.
9377 if (!state.read_op && !state.write_op)
691
1/1
✓ Branch 1 taken 4765 times.
4765 registered_fds_.erase(it);
692 }
693 7593 }
694
695
2/2
✓ Branch 0 taken 2445 times.
✓ Branch 1 taken 90010 times.
92455 if (completions_queued > 0)
696 {
697
2/2
✓ Branch 0 taken 125 times.
✓ Branch 1 taken 2320 times.
2445 if (completions_queued == 1)
698 125 wakeup_event_.notify_one();
699 else
700 2320 wakeup_event_.notify_all();
701 }
702 92455 }
703
704 std::size_t
705 174821 select_scheduler::
706 do_one(long timeout_us)
707 {
708
1/1
✓ Branch 1 taken 174821 times.
174821 std::unique_lock lock(mutex_);
709
710 for (;;)
711 {
712
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 267276 times.
267276 if (stopped_.load(std::memory_order_acquire))
713 return 0;
714
715 267276 scheduler_op* op = completed_ops_.pop();
716
717
2/2
✓ Branch 0 taken 92544 times.
✓ Branch 1 taken 174732 times.
267276 if (op == &task_op_)
718 {
719 92544 bool more_handlers = !completed_ops_.empty();
720
721
2/2
✓ Branch 0 taken 7355 times.
✓ Branch 1 taken 85189 times.
92544 if (!more_handlers)
722 {
723
2/2
✓ Branch 1 taken 89 times.
✓ Branch 2 taken 7266 times.
14710 if (outstanding_work_.load(std::memory_order_acquire) == 0)
724 {
725 89 completed_ops_.push(&task_op_);
726 89 return 0;
727 }
728
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7266 times.
7266 if (timeout_us == 0)
729 {
730 completed_ops_.push(&task_op_);
731 return 0;
732 }
733 }
734
735
3/4
✓ Branch 0 taken 7266 times.
✓ Branch 1 taken 85189 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 7266 times.
92455 reactor_interrupted_ = more_handlers || timeout_us == 0;
736 92455 reactor_running_ = true;
737
738
3/4
✓ Branch 0 taken 85189 times.
✓ Branch 1 taken 7266 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 85189 times.
92455 if (more_handlers && idle_thread_count_ > 0)
739 wakeup_event_.notify_one();
740
741
1/1
✓ Branch 1 taken 92455 times.
92455 run_reactor(lock);
742
743 92455 reactor_running_ = false;
744 92455 completed_ops_.push(&task_op_);
745 92455 continue;
746 92455 }
747
748
1/2
✓ Branch 0 taken 174732 times.
✗ Branch 1 not taken.
174732 if (op != nullptr)
749 {
750
1/1
✓ Branch 1 taken 174732 times.
174732 lock.unlock();
751 174732 work_guard g{this};
752
1/1
✓ Branch 1 taken 174732 times.
174732 (*op)();
753 174732 return 1;
754 174732 }
755
756 if (outstanding_work_.load(std::memory_order_acquire) == 0)
757 return 0;
758
759 if (timeout_us == 0)
760 return 0;
761
762 ++idle_thread_count_;
763 if (timeout_us < 0)
764 wakeup_event_.wait(lock);
765 else
766 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
767 --idle_thread_count_;
768 92455 }
769 174821 }
770
771 } // namespace boost::corosio::detail
772
773 #endif // BOOST_COROSIO_HAS_SELECT
774