TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : // Copyright (c) 2026 Michael Vandeberg
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/corosio
10 : //
11 :
12 : #ifndef BOOST_COROSIO_IO_CONTEXT_HPP
13 : #define BOOST_COROSIO_IO_CONTEXT_HPP
14 :
15 : #include <boost/corosio/detail/config.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/corosio/detail/platform.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/continuation.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 :
22 : #include <chrono>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <limits>
26 : #include <thread>
27 :
28 : namespace boost::corosio {
29 :
30 : /** Runtime tuning options for @ref io_context.
31 :
32 : All fields have defaults that match the library's built-in
33 : values, so constructing a default `io_context_options` produces
34 : identical behavior to an unconfigured context.
35 :
36 : Options that apply only to a specific backend family are
37 : silently ignored when the active backend does not support them.
38 :
39 : @par Example
40 : @code
41 : io_context_options opts;
42 : opts.max_events_per_poll = 256; // larger batch per syscall
43 : opts.inline_budget_max = 32; // more speculative completions
44 : opts.thread_pool_size = 4; // more file-I/O workers
45 :
46 : io_context ioc(opts);
47 : @endcode
48 :
49 : @see io_context, native_io_context
50 : */
51 : struct io_context_options
52 : {
53 : /** Maximum events fetched per reactor poll call.
54 :
55 : Controls the buffer size passed to `epoll_wait()` or
56 : `kevent()`. Larger values reduce syscall frequency under
57 : high load; smaller values improve fairness between
58 : connections. Ignored on IOCP and select backends.
59 : */
60 : unsigned max_events_per_poll = 128;
61 :
62 : /** Starting inline completion budget per handler chain.
63 :
64 : After a posted handler executes, the reactor grants this
65 : many speculative inline completions before forcing a
66 : re-queue. Applies to reactor backends only.
67 :
68 : @note Constructing an `io_context` with `concurrency_hint > 1`
69 : and all three budget fields at their defaults overrides
70 : them to disable inline completion (post-everything mode),
71 : since multi-thread workloads benefit from cross-thread
72 : work-stealing. Setting any budget field to a non-default
73 : value disables the override.
74 : */
75 : unsigned inline_budget_initial = 2;
76 :
77 : /** Hard ceiling on adaptive inline budget ramp-up.
78 :
79 : The budget doubles each cycle it is fully consumed, up to
80 : this limit. Applies to reactor backends only.
81 : */
82 : unsigned inline_budget_max = 16;
83 :
84 : /** Inline budget when no other thread assists the reactor.
85 :
86 : When only one thread is running the event loop, this
87 : value caps the inline budget to preserve fairness.
88 : Applies to reactor backends only.
89 : */
90 : unsigned unassisted_budget = 4;
91 :
92 : /** Maximum `GetQueuedCompletionStatus` timeout in milliseconds.
93 :
94 : Bounds how long the IOCP scheduler blocks between timer
95 : rechecks. Lower values improve timer responsiveness at the
96 : cost of more syscalls. Applies to IOCP only.
97 : */
98 : unsigned gqcs_timeout_ms = 500;
99 :
100 : /** Thread pool size for blocking I/O (file I/O, DNS resolution).
101 :
102 : Sets the number of worker threads in the shared thread pool
103 : used by POSIX file services and DNS resolution. Must be at
104 : least 1. Applies to POSIX backends only; ignored on IOCP
105 : where file I/O uses native overlapped I/O.
106 : */
107 : unsigned thread_pool_size = 1;
108 :
109 : /** Enable single-threaded mode (disable scheduler locking).
110 :
111 : When true, the scheduler skips all mutex lock/unlock and
112 : condition variable operations on the hot path. This
113 : eliminates synchronization overhead when only one thread
114 : calls `run()`.
115 :
116 : @par Restrictions
117 : - Only one thread may call `run()` (or any run variant).
118 : - Posting work from another thread is undefined behavior.
119 : - DNS resolution returns `operation_not_supported`.
120 : - POSIX file I/O returns `operation_not_supported`.
121 : - Signal sets should not be shared across contexts.
122 :
123 : @note Constructing an `io_context` with `concurrency_hint == 1`
124 : automatically enables single-threaded mode regardless of
125 : this field's value, matching asio's convention. To opt out,
126 : pass `concurrency_hint > 1`.
127 : */
128 : bool single_threaded = false;
129 :
130 : /** Enable IORING_SETUP_SQPOLL on the io_uring backend.
131 :
132 : With SQPOLL, the kernel forks a thread that busy-polls the
133 : submission ring; submission becomes a userspace-only memory
134 : store, eliminating the io_uring_enter syscall on the submit
135 : path. Most useful for sustained traffic. Idle thread parks
136 : after `sq_thread_idle_ms` of no activity.
137 :
138 : Independent of `single_threaded`. Default: off.
139 :
140 : Ignored on non-io_uring backends.
141 : */
142 : bool enable_sqpoll = false;
143 :
144 : /** SQ-poll idle timeout in milliseconds.
145 :
146 : After this many ms of no submissions, the kernel polling
147 : thread sleeps; next submit re-wakes it via SQ_WAKEUP. 0
148 : means use the kernel default (1ms). Recommended for bursty
149 : workloads: 100-1000ms (avoids park/unpark thrash).
150 :
151 : Ignored unless `enable_sqpoll` is true. Ignored on
152 : non-io_uring backends.
153 : */
154 : unsigned sq_thread_idle_ms = 0;
155 :
156 : /** Pin the SQ-poll kernel thread to this CPU.
157 :
158 : -1 means do not pin (kernel scheduler picks). Pinning off
159 : the dispatch core is recommended on latency-sensitive
160 : deployments to avoid cache contention.
161 :
162 : Ignored unless `enable_sqpoll` is true. Ignored on
163 : non-io_uring backends.
164 : */
165 : int sq_thread_cpu = -1;
166 : };
167 :
168 : namespace detail {
169 : class timer_service;
170 : } // namespace detail
171 :
172 : /** An I/O context for running asynchronous operations.
173 :
174 : The io_context provides an execution environment for async
175 : operations. It maintains a queue of pending work items and
176 : processes them when `run()` is called.
177 :
178 : The default and unsigned constructors select the platform's
179 : native backend:
180 : - Windows: IOCP
181 : - Linux: epoll
182 : - BSD/macOS: kqueue
183 : - Other POSIX: select
184 :
185 : The template constructor accepts a backend tag value to
186 : choose a specific backend at compile time:
187 :
188 : @par Example
189 : @code
190 : io_context ioc; // platform default
191 : io_context ioc2(corosio::epoll); // explicit backend
192 : @endcode
193 :
194 : @par Thread Safety
195 : Distinct objects: Safe.@n
196 : Shared objects: Safe, if using a concurrency hint greater
197 : than 1.
198 :
199 : @see epoll_t, select_t, kqueue_t, iocp_t
200 : */
201 : class BOOST_COROSIO_DECL io_context : public capy::execution_context
202 : {
203 : /// Pre-create services that depend on options (before construct).
204 : void apply_options_pre_(io_context_options const& opts);
205 :
206 : /// Apply runtime tuning to the scheduler (after construct).
207 : void apply_options_post_(
208 : io_context_options const& opts,
209 : unsigned concurrency_hint);
210 :
211 : /// Switch the scheduler to single-threaded (lockless) mode.
212 : void configure_single_threaded_();
213 :
214 : protected:
215 : detail::scheduler* sched_;
216 :
217 : public:
218 : /** The executor type for this context. */
219 : class executor_type;
220 :
221 : /** Construct with default concurrency and platform backend.
222 :
223 : Uses `std::thread::hardware_concurrency()` clamped to a minimum
224 : of 2 as the concurrency hint, so the default constructor never
225 : silently engages single-threaded mode (see
226 : @ref io_context_options::single_threaded). Pass an explicit
227 : `concurrency_hint == 1` to opt into single-threaded mode.
228 : */
229 : io_context();
230 :
231 : /** Construct with a concurrency hint and platform backend.
232 :
233 : @param concurrency_hint Hint for the number of threads
234 : that will call `run()`.
235 : */
236 : explicit io_context(unsigned concurrency_hint);
237 :
238 : /** Construct with runtime tuning options and platform backend.
239 :
240 : @param opts Runtime options controlling scheduler and
241 : service behavior.
242 : @param concurrency_hint Hint for the number of threads
243 : that will call `run()`.
244 : */
245 : explicit io_context(
246 : io_context_options const& opts,
247 : unsigned concurrency_hint = std::thread::hardware_concurrency());
248 :
249 : /** Construct with an explicit backend tag.
250 :
251 : @param backend The backend tag value selecting the I/O
252 : multiplexer (e.g. `corosio::epoll`).
253 : @param concurrency_hint Hint for the number of threads
254 : that will call `run()`.
255 : */
256 : template<class Backend>
257 : requires requires { Backend::construct; }
258 HIT 1176 : explicit io_context(
259 : Backend backend,
260 : unsigned concurrency_hint = std::thread::hardware_concurrency())
261 : : capy::execution_context(this)
262 1176 : , sched_(nullptr)
263 : {
264 : (void)backend;
265 1176 : sched_ = &Backend::construct(*this, concurrency_hint);
266 1176 : if (concurrency_hint == 1)
267 4 : configure_single_threaded_();
268 1176 : }
269 :
270 : /** Construct with an explicit backend tag and runtime options.
271 :
272 : @param backend The backend tag value selecting the I/O
273 : multiplexer (e.g. `corosio::epoll`).
274 : @param opts Runtime options controlling scheduler and
275 : service behavior.
276 : @param concurrency_hint Hint for the number of threads
277 : that will call `run()`.
278 : */
279 : template<class Backend>
280 : requires requires { Backend::construct; }
281 12 : explicit io_context(
282 : Backend backend,
283 : io_context_options const& opts,
284 : unsigned concurrency_hint = std::thread::hardware_concurrency())
285 : : capy::execution_context(this)
286 12 : , sched_(nullptr)
287 : {
288 : (void)backend;
289 12 : apply_options_pre_(opts);
290 12 : sched_ = &Backend::construct(*this, concurrency_hint);
291 12 : apply_options_post_(opts, concurrency_hint);
292 12 : }
293 :
294 : ~io_context();
295 :
296 : io_context(io_context const&) = delete;
297 : io_context& operator=(io_context const&) = delete;
298 :
299 : /** Return an executor for this context.
300 :
301 : The returned executor can be used to dispatch coroutines
302 : and post work items to this context.
303 :
304 : @return An executor associated with this context.
305 : */
306 : executor_type get_executor() const noexcept;
307 :
308 : /** Signal the context to stop processing.
309 :
310 : This causes `run()` to return as soon as possible. Any pending
311 : work items remain queued.
312 : */
313 6 : void stop()
314 : {
315 6 : sched_->stop();
316 6 : }
317 :
318 : /** Return whether the context has been stopped.
319 :
320 : @return `true` if `stop()` has been called and `restart()`
321 : has not been called since.
322 : */
323 67 : bool stopped() const noexcept
324 : {
325 67 : return sched_->stopped();
326 : }
327 :
328 : /** Restart the context after being stopped.
329 :
330 : This function must be called before `run()` can be called
331 : again after `stop()` has been called.
332 : */
333 175 : void restart()
334 : {
335 175 : sched_->restart();
336 175 : }
337 :
338 : /** Process all pending work items.
339 :
340 : This function blocks until all pending work items have been
341 : executed or `stop()` is called. The context is stopped
342 : when there is no more outstanding work.
343 :
344 : @note The context must be restarted with `restart()` before
345 : calling this function again after it returns.
346 :
347 : @return The number of handlers executed.
348 : */
349 833 : std::size_t run()
350 : {
351 833 : return sched_->run();
352 : }
353 :
354 : /** Process at most one pending work item.
355 :
356 : This function blocks until one work item has been executed
357 : or `stop()` is called. The context is stopped when there
358 : is no more outstanding work.
359 :
360 : @note The context must be restarted with `restart()` before
361 : calling this function again after it returns.
362 :
363 : @return The number of handlers executed (0 or 1).
364 : */
365 16 : std::size_t run_one()
366 : {
367 16 : return sched_->run_one();
368 : }
369 :
370 : /** Process work items for the specified duration.
371 :
372 : This function blocks until work items have been executed for
373 : the specified duration, or `stop()` is called. The context
374 : is stopped when there is no more outstanding work.
375 :
376 : @note The context must be restarted with `restart()` before
377 : calling this function again after it returns.
378 :
379 : @param rel_time The duration for which to process work.
380 :
381 : @return The number of handlers executed.
382 : */
383 : template<class Rep, class Period>
384 10 : std::size_t run_for(std::chrono::duration<Rep, Period> const& rel_time)
385 : {
386 10 : return run_until(std::chrono::steady_clock::now() + rel_time);
387 : }
388 :
389 : /** Process work items until the specified time.
390 :
391 : This function blocks until the specified time is reached
392 : or `stop()` is called. The context is stopped when there
393 : is no more outstanding work.
394 :
395 : @note The context must be restarted with `restart()` before
396 : calling this function again after it returns.
397 :
398 : @param abs_time The time point until which to process work.
399 :
400 : @return The number of handlers executed.
401 : */
402 : template<class Clock, class Duration>
403 : std::size_t
404 10 : run_until(std::chrono::time_point<Clock, Duration> const& abs_time)
405 : {
406 10 : std::size_t n = 0;
407 28 : while (run_one_until(abs_time))
408 18 : if (n != (std::numeric_limits<std::size_t>::max)())
409 18 : ++n;
410 10 : return n;
411 : }
412 :
413 : /** Process at most one work item for the specified duration.
414 :
415 : This function blocks until one work item has been executed,
416 : the specified duration has elapsed, or `stop()` is called.
417 : The context is stopped when there is no more outstanding work.
418 :
419 : @note The context must be restarted with `restart()` before
420 : calling this function again after it returns.
421 :
422 : @param rel_time The duration for which the call may block.
423 :
424 : @return The number of handlers executed (0 or 1).
425 : */
426 : template<class Rep, class Period>
427 6 : std::size_t run_one_for(std::chrono::duration<Rep, Period> const& rel_time)
428 : {
429 6 : return run_one_until(std::chrono::steady_clock::now() + rel_time);
430 : }
431 :
432 : /** Process at most one work item until the specified time.
433 :
434 : This function blocks until one work item has been executed,
435 : the specified time is reached, or `stop()` is called.
436 : The context is stopped when there is no more outstanding work.
437 :
438 : @note The context must be restarted with `restart()` before
439 : calling this function again after it returns.
440 :
441 : @param abs_time The time point until which the call may block.
442 :
443 : @return The number of handlers executed (0 or 1).
444 : */
445 : template<class Clock, class Duration>
446 : std::size_t
447 42 : run_one_until(std::chrono::time_point<Clock, Duration> const& abs_time)
448 : {
449 42 : typename Clock::time_point now = Clock::now();
450 9 : for (;;)
451 : {
452 51 : auto rel_time = abs_time - now;
453 : using rel_type = decltype(rel_time);
454 51 : if (rel_time < rel_type::zero())
455 4 : rel_time = rel_type::zero();
456 47 : else if (rel_time > std::chrono::seconds(1))
457 23 : rel_time = std::chrono::seconds(1);
458 :
459 51 : std::size_t s = sched_->wait_one(
460 : static_cast<long>(
461 51 : std::chrono::duration_cast<std::chrono::microseconds>(
462 : rel_time)
463 51 : .count()));
464 :
465 51 : if (s || stopped())
466 42 : return s;
467 :
468 13 : now = Clock::now();
469 13 : if (now >= abs_time)
470 4 : return 0;
471 : }
472 : }
473 :
474 : /** Process all ready work items without blocking.
475 :
476 : This function executes all work items that are ready to run
477 : without blocking for more work. The context is stopped
478 : when there is no more outstanding work.
479 :
480 : @note The context must be restarted with `restart()` before
481 : calling this function again after it returns.
482 :
483 : @return The number of handlers executed.
484 : */
485 26 : std::size_t poll()
486 : {
487 26 : return sched_->poll();
488 : }
489 :
490 : /** Process at most one ready work item without blocking.
491 :
492 : This function executes at most one work item that is ready
493 : to run without blocking for more work. The context is
494 : stopped when there is no more outstanding work.
495 :
496 : @note The context must be restarted with `restart()` before
497 : calling this function again after it returns.
498 :
499 : @return The number of handlers executed (0 or 1).
500 : */
501 8 : std::size_t poll_one()
502 : {
503 8 : return sched_->poll_one();
504 : }
505 : };
506 :
507 : /** An executor for dispatching work to an I/O context.
508 :
509 : The executor provides the interface for posting work items and
510 : dispatching coroutines to the associated context. It satisfies
511 : the `capy::Executor` concept.
512 :
513 : Executors are lightweight handles that can be copied and compared
514 : for equality. Two executors compare equal if they refer to the
515 : same context.
516 :
517 : @par Thread Safety
518 : Distinct objects: Safe.@n
519 : Shared objects: Safe.
520 : */
521 : class io_context::executor_type
522 : {
523 : io_context* ctx_ = nullptr;
524 :
525 : public:
526 : /** Default constructor.
527 :
528 : Constructs an executor not associated with any context.
529 : */
530 : executor_type() = default;
531 :
532 : /** Construct an executor from a context.
533 :
534 : @param ctx The context to associate with this executor.
535 : */
536 1248 : explicit executor_type(io_context& ctx) noexcept : ctx_(&ctx) {}
537 :
538 : /** Return a reference to the associated execution context.
539 :
540 : @return Reference to the context.
541 : */
542 2099 : io_context& context() const noexcept
543 : {
544 2099 : return *ctx_;
545 : }
546 :
547 : /** Check if the current thread is running this executor's context.
548 :
549 : @return `true` if `run()` is being called on this thread.
550 : */
551 2154 : bool running_in_this_thread() const noexcept
552 : {
553 2154 : return ctx_->sched_->running_in_this_thread();
554 : }
555 :
556 : /** Informs the executor that work is beginning.
557 :
558 : Must be paired with `on_work_finished()`.
559 : */
560 2435 : void on_work_started() const noexcept
561 : {
562 2435 : ctx_->sched_->work_started();
563 2435 : }
564 :
565 : /** Informs the executor that work has completed.
566 :
567 : @par Preconditions
568 : A preceding call to `on_work_started()` on an equal executor.
569 : */
570 2390 : void on_work_finished() const noexcept
571 : {
572 2390 : ctx_->sched_->work_finished();
573 2390 : }
574 :
575 : /** Dispatch a continuation.
576 :
577 : Returns a handle for symmetric transfer. If called from
578 : within `run()`, returns `c.h`. Otherwise posts the
579 : enclosing continuation_op as a scheduler_op for later
580 : execution and returns `std::noop_coroutine()`.
581 :
582 : @param c The continuation to dispatch. Must be the `cont`
583 : member of a `detail::continuation_op`.
584 :
585 : @return A handle for symmetric transfer or `std::noop_coroutine()`.
586 : */
587 2150 : std::coroutine_handle<> dispatch(capy::continuation& c) const
588 : {
589 2150 : if (running_in_this_thread())
590 677 : return c.h;
591 1473 : post(c);
592 1473 : return std::noop_coroutine();
593 : }
594 :
595 : /** Post a continuation for deferred execution.
596 :
597 : If the continuation is backed by a continuation_op
598 : (tagged), posts it directly as a scheduler_op — zero
599 : heap allocation. Otherwise falls back to the
600 : heap-allocating post(coroutine_handle<>) path.
601 : */
602 10602 : void post(capy::continuation& c) const
603 : {
604 10602 : auto* op = detail::continuation_op::try_from_continuation(c);
605 10602 : if (op)
606 9123 : ctx_->sched_->post(op);
607 : else
608 1479 : ctx_->sched_->post(c.h);
609 10602 : }
610 :
611 : /** Post a bare coroutine handle for deferred execution.
612 :
613 : Heap-allocates a scheduler_op to wrap the handle. Prefer
614 : posting through a continuation_op-backed continuation when
615 : the continuation has suitable lifetime.
616 :
617 : @param h The coroutine handle to post.
618 : */
619 2876 : void post(std::coroutine_handle<> h) const
620 : {
621 2876 : ctx_->sched_->post(h);
622 2876 : }
623 :
624 : /** Compare two executors for equality.
625 :
626 : @return `true` if both executors refer to the same context.
627 : */
628 2 : bool operator==(executor_type const& other) const noexcept
629 : {
630 2 : return ctx_ == other.ctx_;
631 : }
632 :
633 : /** Compare two executors for inequality.
634 :
635 : @return `true` if the executors refer to different contexts.
636 : */
637 : bool operator!=(executor_type const& other) const noexcept
638 : {
639 : return ctx_ != other.ctx_;
640 : }
641 : };
642 :
643 : inline io_context::executor_type
644 1248 : io_context::get_executor() const noexcept
645 : {
646 1248 : return executor_type(const_cast<io_context&>(*this));
647 : }
648 :
649 : } // namespace boost::corosio
650 :
651 : #endif // BOOST_COROSIO_IO_CONTEXT_HPP
|