86.67% Lines (143/165) 100.00% Functions (10/10)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP 10   #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11   #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP 11   #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12   12  
13   #include <boost/corosio/detail/platform.hpp> 13   #include <boost/corosio/detail/platform.hpp>
14   14  
15   #if BOOST_COROSIO_HAS_SELECT 15   #if BOOST_COROSIO_HAS_SELECT
16   16  
17   #include <boost/corosio/detail/config.hpp> 17   #include <boost/corosio/detail/config.hpp>
18   #include <boost/capy/ex/execution_context.hpp> 18   #include <boost/capy/ex/execution_context.hpp>
19   19  
20   #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp> 20   #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21   21  
22   #include <boost/corosio/native/detail/select/select_traits.hpp> 22   #include <boost/corosio/native/detail/select/select_traits.hpp>
23   #include <boost/corosio/detail/timer_service.hpp> 23   #include <boost/corosio/detail/timer_service.hpp>
24   #include <boost/corosio/native/detail/make_err.hpp> 24   #include <boost/corosio/native/detail/make_err.hpp>
25   #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> 25   #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26   #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> 26   #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27   #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp> 27   #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28   #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp> 28   #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29   29  
30   #include <boost/corosio/detail/except.hpp> 30   #include <boost/corosio/detail/except.hpp>
31   31  
32   #include <sys/select.h> 32   #include <sys/select.h>
33   #include <unistd.h> 33   #include <unistd.h>
34   #include <errno.h> 34   #include <errno.h>
35   #include <fcntl.h> 35   #include <fcntl.h>
36   36  
37   #include <atomic> 37   #include <atomic>
38   #include <chrono> 38   #include <chrono>
39   #include <cstdint> 39   #include <cstdint>
40   #include <limits> 40   #include <limits>
41   #include <mutex> 41   #include <mutex>
42   #include <unordered_map> 42   #include <unordered_map>
43   43  
44   namespace boost::corosio::detail { 44   namespace boost::corosio::detail {
45   45  
46   struct select_op; 46   struct select_op;
47   47  
48   /** POSIX scheduler using select() for I/O multiplexing. 48   /** POSIX scheduler using select() for I/O multiplexing.
49   49  
50   This scheduler implements the scheduler interface using the POSIX select() 50   This scheduler implements the scheduler interface using the POSIX select()
51   call for I/O event notification. It inherits the shared reactor threading 51   call for I/O event notification. It inherits the shared reactor threading
52   model from reactor_scheduler: signal state machine, inline completion 52   model from reactor_scheduler: signal state machine, inline completion
53   budget, work counting, and the do_one event loop. 53   budget, work counting, and the do_one event loop.
54   54  
55   The design mirrors epoll_scheduler for behavioral consistency: 55   The design mirrors epoll_scheduler for behavioral consistency:
56   - Same single-reactor thread coordination model 56   - Same single-reactor thread coordination model
57   - Same deferred I/O pattern (reactor marks ready; workers do I/O) 57   - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58   - Same timer integration pattern 58   - Same timer integration pattern
59   59  
60   Known Limitations: 60   Known Limitations:
61   - FD_SETSIZE (~1024) limits maximum concurrent connections 61   - FD_SETSIZE (~1024) limits maximum concurrent connections
62   - O(n) scanning: rebuilds fd_sets each iteration 62   - O(n) scanning: rebuilds fd_sets each iteration
63   - Level-triggered only (no edge-triggered mode) 63   - Level-triggered only (no edge-triggered mode)
64   64  
65   @par Thread Safety 65   @par Thread Safety
66   All public member functions are thread-safe. 66   All public member functions are thread-safe.
67   */ 67   */
68   class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler 68   class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
69   { 69   {
70   public: 70   public:
71   /** Construct the scheduler. 71   /** Construct the scheduler.
72   72  
73   Creates a self-pipe for reactor interruption. 73   Creates a self-pipe for reactor interruption.
74   74  
75   @param ctx Reference to the owning execution_context. 75   @param ctx Reference to the owning execution_context.
76   @param concurrency_hint Hint for expected thread count (unused). 76   @param concurrency_hint Hint for expected thread count (unused).
77   */ 77   */
78   select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); 78   select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79   79  
80   /// Destroy the scheduler. 80   /// Destroy the scheduler.
81   ~select_scheduler() override; 81   ~select_scheduler() override;
82   82  
83   select_scheduler(select_scheduler const&) = delete; 83   select_scheduler(select_scheduler const&) = delete;
84   select_scheduler& operator=(select_scheduler const&) = delete; 84   select_scheduler& operator=(select_scheduler const&) = delete;
85   85  
86   /// Shut down the scheduler, draining pending operations. 86   /// Shut down the scheduler, draining pending operations.
87   void shutdown() override; 87   void shutdown() override;
88   88  
89   /** Return the maximum file descriptor value supported. 89   /** Return the maximum file descriptor value supported.
90   90  
91   Returns FD_SETSIZE - 1, the maximum fd value that can be 91   Returns FD_SETSIZE - 1, the maximum fd value that can be
92   monitored by select(). Operations with fd >= FD_SETSIZE 92   monitored by select(). Operations with fd >= FD_SETSIZE
93   will fail with EINVAL. 93   will fail with EINVAL.
94   94  
95   @return The maximum supported file descriptor value. 95   @return The maximum supported file descriptor value.
96   */ 96   */
97   static constexpr int max_fd() noexcept 97   static constexpr int max_fd() noexcept
98   { 98   {
99   return FD_SETSIZE - 1; 99   return FD_SETSIZE - 1;
100   } 100   }
101   101  
102   /** Register a descriptor for persistent monitoring. 102   /** Register a descriptor for persistent monitoring.
103   103  
104   The fd is added to the registered_descs_ map and will be 104   The fd is added to the registered_descs_ map and will be
105   included in subsequent select() calls. The reactor is 105   included in subsequent select() calls. The reactor is
106   interrupted so a blocked select() rebuilds its fd_sets. 106   interrupted so a blocked select() rebuilds its fd_sets.
107   107  
108   @param fd The file descriptor to register. 108   @param fd The file descriptor to register.
109   @param desc Pointer to descriptor state for this fd. 109   @param desc Pointer to descriptor state for this fd.
110   */ 110   */
111   void register_descriptor(int fd, reactor_descriptor_state* desc) const; 111   void register_descriptor(int fd, reactor_descriptor_state* desc) const;
112   112  
113   /** Deregister a persistently registered descriptor. 113   /** Deregister a persistently registered descriptor.
114   114  
115   @param fd The file descriptor to deregister. 115   @param fd The file descriptor to deregister.
116   */ 116   */
117   void deregister_descriptor(int fd) const; 117   void deregister_descriptor(int fd) const;
118   118  
119   /** Interrupt the reactor so it rebuilds its fd_sets. 119   /** Interrupt the reactor so it rebuilds its fd_sets.
120   120  
121   Called when a write or connect op is registered after 121   Called when a write or connect op is registered after
122   the reactor's snapshot was taken. Without this, select() 122   the reactor's snapshot was taken. Without this, select()
123   may block not watching for writability on the fd. 123   may block not watching for writability on the fd.
124   */ 124   */
125   void notify_reactor() const; 125   void notify_reactor() const;
126   126  
127   private: 127   private:
128   void 128   void
129   run_task(lock_type& lock, context_type* ctx, 129   run_task(lock_type& lock, context_type* ctx,
130   long timeout_us) override; 130   long timeout_us) override;
131   void interrupt_reactor() const override; 131   void interrupt_reactor() const override;
132   long calculate_timeout(long requested_timeout_us) const; 132   long calculate_timeout(long requested_timeout_us) const;
133   133  
134   // Self-pipe for interrupting select() 134   // Self-pipe for interrupting select()
135   int pipe_fds_[2]; // [0]=read, [1]=write 135   int pipe_fds_[2]; // [0]=read, [1]=write
136   136  
137   // Per-fd tracking for fd_set building 137   // Per-fd tracking for fd_set building
138   mutable std::unordered_map<int, reactor_descriptor_state*> registered_descs_; 138   mutable std::unordered_map<int, reactor_descriptor_state*> registered_descs_;
139   mutable int max_fd_ = -1; 139   mutable int max_fd_ = -1;
140   }; 140   };
141   141  
HITCBC 142   590 inline select_scheduler::select_scheduler(capy::execution_context& ctx, int) 142   594 inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
HITCBC 143   590 : pipe_fds_{-1, -1} 143   594 : pipe_fds_{-1, -1}
HITCBC 144   590 , max_fd_(-1) 144   594 , max_fd_(-1)
145   { 145   {
HITCBC 146   590 if (::pipe(pipe_fds_) < 0) 146   594 if (::pipe(pipe_fds_) < 0)
MISUBC 147   detail::throw_system_error(make_err(errno), "pipe"); 147   detail::throw_system_error(make_err(errno), "pipe");
148   148  
HITCBC 149   1770 for (int i = 0; i < 2; ++i) 149   1782 for (int i = 0; i < 2; ++i)
150   { 150   {
HITCBC 151   1180 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0); 151   1188 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
HITCBC 152   1180 if (flags == -1) 152   1188 if (flags == -1)
153   { 153   {
MISUBC 154   int errn = errno; 154   int errn = errno;
MISUBC 155   ::close(pipe_fds_[0]); 155   ::close(pipe_fds_[0]);
MISUBC 156   ::close(pipe_fds_[1]); 156   ::close(pipe_fds_[1]);
MISUBC 157   detail::throw_system_error(make_err(errn), "fcntl F_GETFL"); 157   detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158   } 158   }
HITCBC 159   1180 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1) 159   1188 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160   { 160   {
MISUBC 161   int errn = errno; 161   int errn = errno;
MISUBC 162   ::close(pipe_fds_[0]); 162   ::close(pipe_fds_[0]);
MISUBC 163   ::close(pipe_fds_[1]); 163   ::close(pipe_fds_[1]);
MISUBC 164   detail::throw_system_error(make_err(errn), "fcntl F_SETFL"); 164   detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165   } 165   }
HITCBC 166   1180 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1) 166   1188 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167   { 167   {
MISUBC 168   int errn = errno; 168   int errn = errno;
MISUBC 169   ::close(pipe_fds_[0]); 169   ::close(pipe_fds_[0]);
MISUBC 170   ::close(pipe_fds_[1]); 170   ::close(pipe_fds_[1]);
MISUBC 171   detail::throw_system_error(make_err(errn), "fcntl F_SETFD"); 171   detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172   } 172   }
173   } 173   }
174   174  
HITCBC 175   590 timer_svc_ = &get_timer_service(ctx, *this); 175   594 timer_svc_ = &get_timer_service(ctx, *this);
HITCBC 176   590 timer_svc_->set_on_earliest_changed( 176   594 timer_svc_->set_on_earliest_changed(
HITCBC 177   4512 timer_service::callback(this, [](void* p) { 177   4814 timer_service::callback(this, [](void* p) {
HITCBC 178   3922 static_cast<select_scheduler*>(p)->interrupt_reactor(); 178   4220 static_cast<select_scheduler*>(p)->interrupt_reactor();
HITCBC 179   3922 })); 179   4220 }));
180   180  
HITCBC 181   590 get_resolver_service(ctx, *this); 181   594 get_resolver_service(ctx, *this);
HITCBC 182   590 get_signal_service(ctx, *this); 182   594 get_signal_service(ctx, *this);
HITCBC 183   590 get_stream_file_service(ctx, *this); 183   594 get_stream_file_service(ctx, *this);
HITCBC 184   590 get_random_access_file_service(ctx, *this); 184   594 get_random_access_file_service(ctx, *this);
185   185  
HITCBC 186   590 completed_ops_.push(&task_op_); 186   594 completed_ops_.push(&task_op_);
HITCBC 187   590 } 187   594 }
188   188  
HITCBC 189   1180 inline select_scheduler::~select_scheduler() 189   1188 inline select_scheduler::~select_scheduler()
190   { 190   {
HITCBC 191   590 if (pipe_fds_[0] >= 0) 191   594 if (pipe_fds_[0] >= 0)
HITCBC 192   590 ::close(pipe_fds_[0]); 192   594 ::close(pipe_fds_[0]);
HITCBC 193   590 if (pipe_fds_[1] >= 0) 193   594 if (pipe_fds_[1] >= 0)
HITCBC 194   590 ::close(pipe_fds_[1]); 194   594 ::close(pipe_fds_[1]);
HITCBC 195   1180 } 195   1188 }
196   196  
197   inline void 197   inline void
HITCBC 198   590 select_scheduler::shutdown() 198   594 select_scheduler::shutdown()
199   { 199   {
HITCBC 200   590 shutdown_drain(); 200   594 shutdown_drain();
201   201  
HITCBC 202   590 if (pipe_fds_[1] >= 0) 202   594 if (pipe_fds_[1] >= 0)
HITCBC 203   590 interrupt_reactor(); 203   594 interrupt_reactor();
HITCBC 204   590 } 204   594 }
205   205  
206   inline void 206   inline void
HITCBC 207   7755 select_scheduler::register_descriptor( 207   8351 select_scheduler::register_descriptor(
208   int fd, reactor_descriptor_state* desc) const 208   int fd, reactor_descriptor_state* desc) const
209   { 209   {
HITCBC 210   7755 if (fd < 0 || fd >= FD_SETSIZE) 210   8351 if (fd < 0 || fd >= FD_SETSIZE)
MISUBC 211   detail::throw_system_error(make_err(EINVAL), "select: fd out of range"); 211   detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212   212  
HITCBC 213   7755 desc->registered_events = reactor_event_read | reactor_event_write; 213   8351 desc->registered_events = reactor_event_read | reactor_event_write;
HITCBC 214   7755 desc->fd = fd; 214   8351 desc->fd = fd;
HITCBC 215   7755 desc->scheduler_ = this; 215   8351 desc->scheduler_ = this;
HITCBC 216   7755 desc->mutex.set_enabled(!single_threaded_); 216   8351 desc->mutex.set_enabled(!single_threaded_);
HITCBC 217   7755 desc->ready_events_.store(0, std::memory_order_relaxed); 217   8351 desc->ready_events_.store(0, std::memory_order_relaxed);
218   218  
219   { 219   {
HITCBC 220   7755 conditionally_enabled_mutex::scoped_lock lock(desc->mutex); 220   8351 conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
HITCBC 221   7755 desc->impl_ref_.reset(); 221   8351 desc->impl_ref_.reset();
HITCBC 222   7755 desc->read_ready = false; 222   8351 desc->read_ready = false;
HITCBC 223   7755 desc->write_ready = false; 223   8351 desc->write_ready = false;
HITCBC 224   7755 } 224   8351 }
225   225  
226   { 226   {
HITCBC 227   7755 mutex_type::scoped_lock lock(mutex_); 227   8351 mutex_type::scoped_lock lock(mutex_);
HITCBC 228   7755 registered_descs_[fd] = desc; 228   8351 registered_descs_[fd] = desc;
HITCBC 229   7755 if (fd > max_fd_) 229   8351 if (fd > max_fd_)
HITCBC 230   7750 max_fd_ = fd; 230   8346 max_fd_ = fd;
HITCBC 231   7755 } 231   8351 }
232   232  
HITCBC 233   7755 interrupt_reactor(); 233   8351 interrupt_reactor();
HITCBC 234   7755 } 234   8351 }
235   235  
236   inline void 236   inline void
HITCBC 237   7755 select_scheduler::deregister_descriptor(int fd) const 237   8351 select_scheduler::deregister_descriptor(int fd) const
238   { 238   {
HITCBC 239   7755 mutex_type::scoped_lock lock(mutex_); 239   8351 mutex_type::scoped_lock lock(mutex_);
240   240  
HITCBC 241   7755 auto it = registered_descs_.find(fd); 241   8351 auto it = registered_descs_.find(fd);
HITCBC 242   7755 if (it == registered_descs_.end()) 242   8351 if (it == registered_descs_.end())
MISUBC 243   return; 243   return;
244   244  
HITCBC 245   7755 registered_descs_.erase(it); 245   8351 registered_descs_.erase(it);
246   246  
HITCBC 247   7755 if (fd == max_fd_) 247   8351 if (fd == max_fd_)
248   { 248   {
HITCBC 249   7642 max_fd_ = pipe_fds_[0]; 249   8238 max_fd_ = pipe_fds_[0];
HITCBC 250   15036 for (auto& [registered_fd, state] : registered_descs_) 250   16228 for (auto& [registered_fd, state] : registered_descs_)
251   { 251   {
HITCBC 252   7394 if (registered_fd > max_fd_) 252   7990 if (registered_fd > max_fd_)
HITCBC 253   7347 max_fd_ = registered_fd; 253   7943 max_fd_ = registered_fd;
254   } 254   }
255   } 255   }
HITCBC 256   7755 } 256   8351 }
257   257  
258   inline void 258   inline void
HITCBC 259   3782 select_scheduler::notify_reactor() const 259   4080 select_scheduler::notify_reactor() const
260   { 260   {
HITCBC 261   3782 interrupt_reactor(); 261   4080 interrupt_reactor();
HITCBC 262   3782 } 262   4080 }
263   263  
264   inline void 264   inline void
HITCBC 265   16498 select_scheduler::interrupt_reactor() const 265   17679 select_scheduler::interrupt_reactor() const
266   { 266   {
HITCBC 267   16498 char byte = 1; 267   17679 char byte = 1;
HITCBC 268   16498 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1); 268   17679 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
HITCBC 269   16498 } 269   17679 }
270   270  
271   inline long 271   inline long
HITCBC 272   222670 select_scheduler::calculate_timeout(long requested_timeout_us) const 272   234581 select_scheduler::calculate_timeout(long requested_timeout_us) const
273   { 273   {
HITCBC 274   222670 if (requested_timeout_us == 0) 274   234581 if (requested_timeout_us == 0)
MISUBC 275   return 0; 275   return 0;
276   276  
HITCBC 277   222670 auto nearest = timer_svc_->nearest_expiry(); 277   234581 auto nearest = timer_svc_->nearest_expiry();
HITCBC 278   222670 if (nearest == timer_service::time_point::max()) 278   234581 if (nearest == timer_service::time_point::max())
HITCBC 279   418 return requested_timeout_us; 279   403 return requested_timeout_us;
280   280  
HITCBC 281   222252 auto now = std::chrono::steady_clock::now(); 281   234178 auto now = std::chrono::steady_clock::now();
HITCBC 282   222252 if (nearest <= now) 282   234178 if (nearest <= now)
HITCBC 283   1255 return 0; 283   1746 return 0;
284   284  
285   auto timer_timeout_us = 285   auto timer_timeout_us =
HITCBC 286   220997 std::chrono::duration_cast<std::chrono::microseconds>(nearest - now) 286   232432 std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
HITCBC 287   220997 .count(); 287   232432 .count();
288   288  
HITCBC 289   220997 constexpr auto long_max = 289   232432 constexpr auto long_max =
290   static_cast<long long>((std::numeric_limits<long>::max)()); 290   static_cast<long long>((std::numeric_limits<long>::max)());
291   auto capped_timer_us = 291   auto capped_timer_us =
HITCBC 292   220997 (std::min)((std::max)(static_cast<long long>(timer_timeout_us), 292   232432 (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
HITCBC 293   220997 static_cast<long long>(0)), 293   232432 static_cast<long long>(0)),
HITCBC 294   220997 long_max); 294   232432 long_max);
295   295  
HITCBC 296   220997 if (requested_timeout_us < 0) 296   232432 if (requested_timeout_us < 0)
HITCBC 297   220997 return static_cast<long>(capped_timer_us); 297   232432 return static_cast<long>(capped_timer_us);
298   298  
299   return static_cast<long>( 299   return static_cast<long>(
MISUBC 300   (std::min)(static_cast<long long>(requested_timeout_us), 300   (std::min)(static_cast<long long>(requested_timeout_us),
MISUBC 301   capped_timer_us)); 301   capped_timer_us));
302   } 302   }
303   303  
304   inline void 304   inline void
HITCBC 305   248392 select_scheduler::run_task( 305   259802 select_scheduler::run_task(
306   lock_type& lock, context_type* ctx, long timeout_us) 306   lock_type& lock, context_type* ctx, long timeout_us)
307   { 307   {
308   long effective_timeout_us = 308   long effective_timeout_us =
HITCBC 309   248392 task_interrupted_ ? 0 : calculate_timeout(timeout_us); 309   259802 task_interrupted_ ? 0 : calculate_timeout(timeout_us);
310   310  
311   // Snapshot registered descriptors while holding lock. 311   // Snapshot registered descriptors while holding lock.
312   // Record which fds need write monitoring to avoid a hot loop: 312   // Record which fds need write monitoring to avoid a hot loop:
313   // select is level-triggered so writable sockets (nearly always 313   // select is level-triggered so writable sockets (nearly always
314   // writable) would cause select() to return immediately every 314   // writable) would cause select() to return immediately every
315   // iteration if unconditionally added to write_fds. 315   // iteration if unconditionally added to write_fds.
316   struct fd_entry 316   struct fd_entry
317   { 317   {
318   int fd; 318   int fd;
319   reactor_descriptor_state* desc; 319   reactor_descriptor_state* desc;
320   bool needs_write; 320   bool needs_write;
321   }; 321   };
322   fd_entry snapshot[FD_SETSIZE]; 322   fd_entry snapshot[FD_SETSIZE];
HITCBC 323   248392 int snapshot_count = 0; 323   259802 int snapshot_count = 0;
324   324  
HITCBC 325   658752 for (auto& [fd, desc] : registered_descs_) 325   680339 for (auto& [fd, desc] : registered_descs_)
326   { 326   {
HITCBC 327   410360 if (snapshot_count < FD_SETSIZE) 327   420537 if (snapshot_count < FD_SETSIZE)
328   { 328   {
HITCBC 329   410360 conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex); 329   420537 conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
HITCBC 330   410360 snapshot[snapshot_count].fd = fd; 330   420537 snapshot[snapshot_count].fd = fd;
HITCBC 331   410360 snapshot[snapshot_count].desc = desc; 331   420537 snapshot[snapshot_count].desc = desc;
HITCBC 332   410360 snapshot[snapshot_count].needs_write = 332   420537 snapshot[snapshot_count].needs_write =
HITCBC 333   410360 (desc->write_op || desc->connect_op); 333   420537 (desc->write_op || desc->connect_op);
HITCBC 334   410360 ++snapshot_count; 334   420537 ++snapshot_count;
HITCBC 335   410360 } 335   420537 }
336   } 336   }
337   337  
HITCBC 338   248392 if (lock.owns_lock()) 338   259802 if (lock.owns_lock())
HITCBC 339   222670 lock.unlock(); 339   234581 lock.unlock();
340   340  
HITCBC 341   248392 task_cleanup on_exit{this, &lock, ctx}; 341   259802 task_cleanup on_exit{this, &lock, ctx};
342   342  
343   fd_set read_fds, write_fds, except_fds; 343   fd_set read_fds, write_fds, except_fds;
HITCBC 344   4222664 FD_ZERO(&read_fds); 344   4416634 FD_ZERO(&read_fds);
HITCBC 345   4222664 FD_ZERO(&write_fds); 345   4416634 FD_ZERO(&write_fds);
HITCBC 346   4222664 FD_ZERO(&except_fds); 346   4416634 FD_ZERO(&except_fds);
347   347  
HITCBC 348   248392 FD_SET(pipe_fds_[0], &read_fds); 348   259802 FD_SET(pipe_fds_[0], &read_fds);
HITCBC 349   248392 int nfds = pipe_fds_[0]; 349   259802 int nfds = pipe_fds_[0];
350   350  
HITCBC 351   658752 for (int i = 0; i < snapshot_count; ++i) 351   680339 for (int i = 0; i < snapshot_count; ++i)
352   { 352   {
HITCBC 353   410360 int fd = snapshot[i].fd; 353   420537 int fd = snapshot[i].fd;
HITCBC 354   410360 FD_SET(fd, &read_fds); 354   420537 FD_SET(fd, &read_fds);
HITCBC 355   410360 if (snapshot[i].needs_write) 355   420537 if (snapshot[i].needs_write)
HITCBC 356   12751 FD_SET(fd, &write_fds); 356   14733 FD_SET(fd, &write_fds);
HITCBC 357   410360 FD_SET(fd, &except_fds); 357   420537 FD_SET(fd, &except_fds);
HITCBC 358   410360 if (fd > nfds) 358   420537 if (fd > nfds)
HITCBC 359   247932 nfds = fd; 359   259356 nfds = fd;
360   } 360   }
361   361  
362   struct timeval tv; 362   struct timeval tv;
HITCBC 363   248392 struct timeval* tv_ptr = nullptr; 363   259802 struct timeval* tv_ptr = nullptr;
HITCBC 364   248392 if (effective_timeout_us >= 0) 364   259802 if (effective_timeout_us >= 0)
365   { 365   {
HITCBC 366   247978 tv.tv_sec = effective_timeout_us / 1000000; 366   259403 tv.tv_sec = effective_timeout_us / 1000000;
HITCBC 367   247978 tv.tv_usec = effective_timeout_us % 1000000; 367   259403 tv.tv_usec = effective_timeout_us % 1000000;
HITCBC 368   247978 tv_ptr = &tv; 368   259403 tv_ptr = &tv;
369   } 369   }
370   370  
HITCBC 371   248392 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr); 371   259802 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
372   372  
373   // EINTR: signal interrupted select(), just retry. 373   // EINTR: signal interrupted select(), just retry.
374   // EBADF: an fd was closed between snapshot and select(); retry 374   // EBADF: an fd was closed between snapshot and select(); retry
375   // with a fresh snapshot from registered_descs_. 375   // with a fresh snapshot from registered_descs_.
HITCBC 376   248392 if (ready < 0) 376   259802 if (ready < 0)
377   { 377   {
MISUBC 378   if (errno == EINTR || errno == EBADF) 378   if (errno == EINTR || errno == EBADF)
MISUBC 379   return; 379   return;
MISUBC 380   detail::throw_system_error(make_err(errno), "select"); 380   detail::throw_system_error(make_err(errno), "select");
381   } 381   }
382   382  
383   // Process timers outside the lock 383   // Process timers outside the lock
HITCBC 384   248392 timer_svc_->process_expired(); 384   259802 timer_svc_->process_expired();
385   385  
HITCBC 386   248392 op_queue local_ops; 386   259802 op_queue local_ops;
387   387  
HITCBC 388   248392 if (ready > 0) 388   259802 if (ready > 0)
389   { 389   {
HITCBC 390   229639 if (FD_ISSET(pipe_fds_[0], &read_fds)) 390   241169 if (FD_ISSET(pipe_fds_[0], &read_fds))
391   { 391   {
392   char buf[256]; 392   char buf[256];
HITCBC 393   16024 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0) 393   17182 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
394   { 394   {
395   } 395   }
396   } 396   }
397   397  
HITCBC 398   589449 for (int i = 0; i < snapshot_count; ++i) 398   609570 for (int i = 0; i < snapshot_count; ++i)
399   { 399   {
HITCBC 400   359810 int fd = snapshot[i].fd; 400   368401 int fd = snapshot[i].fd;
HITCBC 401   359810 reactor_descriptor_state* desc = snapshot[i].desc; 401   368401 reactor_descriptor_state* desc = snapshot[i].desc;
402   402  
HITCBC 403   359810 std::uint32_t flags = 0; 403   368401 std::uint32_t flags = 0;
HITCBC 404   359810 if (FD_ISSET(fd, &read_fds)) 404   368401 if (FD_ISSET(fd, &read_fds))
HITCBC 405   225551 flags |= reactor_event_read; 405   236799 flags |= reactor_event_read;
HITCBC 406   359810 if (FD_ISSET(fd, &write_fds)) 406   368401 if (FD_ISSET(fd, &write_fds))
HITCBC 407   3779 flags |= reactor_event_write; 407   4077 flags |= reactor_event_write;
HITCBC 408   359810 if (FD_ISSET(fd, &except_fds)) 408   368401 if (FD_ISSET(fd, &except_fds))
MISUBC 409   flags |= reactor_event_error; 409   flags |= reactor_event_error;
410   410  
HITCBC 411   359810 if (flags == 0) 411   368401 if (flags == 0)
HITCBC 412   130489 continue; 412   127534 continue;
413   413  
HITCBC 414   229321 desc->add_ready_events(flags); 414   240867 desc->add_ready_events(flags);
415   415  
HITCBC 416   229321 bool expected = false; 416   240867 bool expected = false;
HITCBC 417   229321 if (desc->is_enqueued_.compare_exchange_strong( 417   240867 if (desc->is_enqueued_.compare_exchange_strong(
418   expected, true, std::memory_order_release, 418   expected, true, std::memory_order_release,
419   std::memory_order_relaxed)) 419   std::memory_order_relaxed))
420   { 420   {
HITCBC 421   229321 local_ops.push(desc); 421   240867 local_ops.push(desc);
422   } 422   }
423   } 423   }
424   } 424   }
425   425  
HITCBC 426   248392 lock.lock(); 426   259802 lock.lock();
427   427  
HITCBC 428   248392 if (!local_ops.empty()) 428   259802 if (!local_ops.empty())
HITCBC 429   225633 completed_ops_.splice(local_ops); 429   236881 completed_ops_.splice(local_ops);
HITCBC 430   248392 } 430   259802 }
431   431  
432   } // namespace boost::corosio::detail 432   } // namespace boost::corosio::detail
433   433  
434   #endif // BOOST_COROSIO_HAS_SELECT 434   #endif // BOOST_COROSIO_HAS_SELECT
435   435  
436   #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP 436   #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP