100.00% Lines (28/28) 100.00% Functions (13/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_EX_STRAND_HPP 10   #ifndef BOOST_CAPY_EX_STRAND_HPP
11   #define BOOST_CAPY_EX_STRAND_HPP 11   #define BOOST_CAPY_EX_STRAND_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/continuation.hpp> 14   #include <boost/capy/continuation.hpp>
15   #include <coroutine> 15   #include <coroutine>
16   #include <boost/capy/ex/detail/strand_service.hpp> 16   #include <boost/capy/ex/detail/strand_service.hpp>
17   17  
18   #include <type_traits> 18   #include <type_traits>
19   19  
20   namespace boost { 20   namespace boost {
21   namespace capy { 21   namespace capy {
22   22  
23   /** Provides serialized coroutine execution for any executor type. 23   /** Provides serialized coroutine execution for any executor type.
24   24  
25   A strand wraps an inner executor and ensures that coroutines 25   A strand wraps an inner executor and ensures that coroutines
26   dispatched through it never run concurrently. At most one 26   dispatched through it never run concurrently. At most one
27   coroutine executes at a time within a strand, even when the 27   coroutine executes at a time within a strand, even when the
28   underlying executor runs on multiple threads. 28   underlying executor runs on multiple threads.
29   29  
30   Strands are lightweight handles that can be copied freely. 30   Strands are lightweight handles that can be copied freely.
31   Copies share the same internal serialization state, so 31   Copies share the same internal serialization state, so
32   coroutines dispatched through any copy are serialized with 32   coroutines dispatched through any copy are serialized with
33   respect to all other copies. 33   respect to all other copies.
34   34  
35   @par Invariant 35   @par Invariant
36   Coroutines resumed through a strand shall not run concurrently. 36   Coroutines resumed through a strand shall not run concurrently.
37   37  
38   @par Implementation 38   @par Implementation
39   Each strand allocates a private serialization state. Strands 39   Each strand allocates a private serialization state. Strands
40   constructed from the same execution context share a small pool 40   constructed from the same execution context share a small pool
41   of mutexes (193 entries) selected by hash; mutex sharing causes 41   of mutexes (193 entries) selected by hash; mutex sharing causes
42   only brief contention on the push/pop critical section, never 42   only brief contention on the push/pop critical section, never
43   cross-strand state sharing. Construction cost: one 43   cross-strand state sharing. Construction cost: one
44   `std::make_shared` per strand. 44   `std::make_shared` per strand.
45   45  
46   @par Executor Concept 46   @par Executor Concept
47   This class satisfies the `Executor` concept, providing: 47   This class satisfies the `Executor` concept, providing:
48   - `context()` - Returns the underlying execution context 48   - `context()` - Returns the underlying execution context
49   - `on_work_started()` / `on_work_finished()` - Work tracking 49   - `on_work_started()` / `on_work_finished()` - Work tracking
50   - `dispatch(continuation&)` - May run immediately if strand is idle 50   - `dispatch(continuation&)` - May run immediately if strand is idle
51   - `post(continuation&)` - Always queues for later execution 51   - `post(continuation&)` - Always queues for later execution
52   52  
53   @par Thread Safety 53   @par Thread Safety
54   Distinct objects: Safe. 54   Distinct objects: Safe.
55   Shared objects: Safe. 55   Shared objects: Safe.
56   56  
57   @par Example 57   @par Example
58   @code 58   @code
59   thread_pool pool(4); 59   thread_pool pool(4);
60   auto strand = make_strand(pool.get_executor()); 60   auto strand = make_strand(pool.get_executor());
61   61  
62   // These continuations will never run concurrently 62   // These continuations will never run concurrently
63   continuation c1{h1}, c2{h2}, c3{h3}; 63   continuation c1{h1}, c2{h2}, c3{h3};
64   strand.post(c1); 64   strand.post(c1);
65   strand.post(c2); 65   strand.post(c2);
66   strand.post(c3); 66   strand.post(c3);
67   @endcode 67   @endcode
68   68  
69   @tparam E The type of the underlying executor. Must 69   @tparam E The type of the underlying executor. Must
70   satisfy the `Executor` concept. 70   satisfy the `Executor` concept.
71   71  
72   @see make_strand, Executor 72   @see make_strand, Executor
73   */ 73   */
74   template<typename Ex> 74   template<typename Ex>
75   class strand 75   class strand
76   { 76   {
77   std::shared_ptr<detail::strand_impl> impl_; 77   std::shared_ptr<detail::strand_impl> impl_;
78   Ex ex_; 78   Ex ex_;
79   79  
80   friend struct strand_test; 80   friend struct strand_test;
81   81  
82   public: 82   public:
83   /** The type of the underlying executor. 83   /** The type of the underlying executor.
84   */ 84   */
85   using inner_executor_type = Ex; 85   using inner_executor_type = Ex;
86   86  
87   /** Construct a strand for the specified executor. 87   /** Construct a strand for the specified executor.
88   88  
89   Allocates a fresh strand implementation from the service 89   Allocates a fresh strand implementation from the service
90   associated with the executor's context. 90   associated with the executor's context.
91   91  
92   @param ex The inner executor to wrap. Coroutines will 92   @param ex The inner executor to wrap. Coroutines will
93   ultimately be dispatched through this executor. 93   ultimately be dispatched through this executor.
94   94  
95   @note This constructor is disabled if the argument is a 95   @note This constructor is disabled if the argument is a
96   strand type, to prevent strand-of-strand wrapping. 96   strand type, to prevent strand-of-strand wrapping.
97   */ 97   */
98   template<typename Ex1, 98   template<typename Ex1,
99   typename = std::enable_if_t< 99   typename = std::enable_if_t<
100   !std::is_same_v<std::decay_t<Ex1>, strand> && 100   !std::is_same_v<std::decay_t<Ex1>, strand> &&
101   !detail::is_strand<std::decay_t<Ex1>>::value && 101   !detail::is_strand<std::decay_t<Ex1>>::value &&
102   std::is_convertible_v<Ex1, Ex>>> 102   std::is_convertible_v<Ex1, Ex>>>
103   explicit 103   explicit
HITCBC 104   11442 strand(Ex1&& ex) 104   11442 strand(Ex1&& ex)
HITCBC 105   11442 : impl_(detail::get_strand_service(ex.context()) 105   11442 : impl_(detail::get_strand_service(ex.context())
HITCBC 106   11442 .create_implementation()) 106   11442 .create_implementation())
HITCBC 107   11442 , ex_(std::forward<Ex1>(ex)) 107   11442 , ex_(std::forward<Ex1>(ex))
108   { 108   {
HITCBC 109   11442 } 109   11442 }
110   110  
111   /** Construct a copy. 111   /** Construct a copy.
112   112  
113   Creates a strand that shares serialization state with 113   Creates a strand that shares serialization state with
114   the original. Coroutines dispatched through either strand 114   the original. Coroutines dispatched through either strand
115   will be serialized with respect to each other. 115   will be serialized with respect to each other.
116   */ 116   */
HITCBC 117   9 strand(strand const&) = default; 117   9 strand(strand const&) = default;
118   118  
119   /** Construct by moving. 119   /** Construct by moving.
120   120  
121   @note A moved-from strand is only safe to destroy 121   @note A moved-from strand is only safe to destroy
122   or reassign. 122   or reassign.
123   */ 123   */
HITCBC 124   11443 strand(strand&&) = default; 124   11443 strand(strand&&) = default;
125   125  
126   /** Assign by copying. 126   /** Assign by copying.
127   */ 127   */
HITCBC 128   1 strand& operator=(strand const&) = default; 128   1 strand& operator=(strand const&) = default;
129   129  
130   /** Assign by moving. 130   /** Assign by moving.
131   131  
132   @note A moved-from strand is only safe to destroy 132   @note A moved-from strand is only safe to destroy
133   or reassign. 133   or reassign.
134   */ 134   */
HITCBC 135   1 strand& operator=(strand&&) = default; 135   1 strand& operator=(strand&&) = default;
136   136  
137   /** Return the underlying executor. 137   /** Return the underlying executor.
138   138  
139   @return A const reference to the inner executor. 139   @return A const reference to the inner executor.
140   */ 140   */
141   Ex const& 141   Ex const&
HITCBC 142   1 get_inner_executor() const noexcept 142   1 get_inner_executor() const noexcept
143   { 143   {
HITCBC 144   1 return ex_; 144   1 return ex_;
145   } 145   }
146   146  
147   /** Return the underlying execution context. 147   /** Return the underlying execution context.
148   148  
149   @return A reference to the execution context associated 149   @return A reference to the execution context associated
150   with the inner executor. 150   with the inner executor.
151   */ 151   */
152   auto& 152   auto&
HITCBC 153   5 context() const noexcept 153   5 context() const noexcept
154   { 154   {
HITCBC 155   5 return ex_.context(); 155   5 return ex_.context();
156   } 156   }
157   157  
158   /** Notify that work has started. 158   /** Notify that work has started.
159   159  
160   Delegates to the inner executor's `on_work_started()`. 160   Delegates to the inner executor's `on_work_started()`.
161   This is a no-op for most executor types. 161   This is a no-op for most executor types.
162   */ 162   */
163   void 163   void
HITCBC 164   6 on_work_started() const noexcept 164   6 on_work_started() const noexcept
165   { 165   {
HITCBC 166   6 ex_.on_work_started(); 166   6 ex_.on_work_started();
HITCBC 167   6 } 167   6 }
168   168  
169   /** Notify that work has finished. 169   /** Notify that work has finished.
170   170  
171   Delegates to the inner executor's `on_work_finished()`. 171   Delegates to the inner executor's `on_work_finished()`.
172   This is a no-op for most executor types. 172   This is a no-op for most executor types.
173   */ 173   */
174   void 174   void
HITCBC 175   6 on_work_finished() const noexcept 175   6 on_work_finished() const noexcept
176   { 176   {
HITCBC 177   6 ex_.on_work_finished(); 177   6 ex_.on_work_finished();
HITCBC 178   6 } 178   6 }
179   179  
180   /** Determine whether the strand is running in the current thread. 180   /** Determine whether the strand is running in the current thread.
181   181  
182   @return true if the current thread is executing a coroutine 182   @return true if the current thread is executing a coroutine
183   within this strand's dispatch loop. 183   within this strand's dispatch loop.
184   */ 184   */
185   bool 185   bool
HITCBC 186   4 running_in_this_thread() const noexcept 186   4 running_in_this_thread() const noexcept
187   { 187   {
HITCBC 188   4 return detail::strand_service::running_in_this_thread(*impl_); 188   4 return detail::strand_service::running_in_this_thread(*impl_);
189   } 189   }
190   190  
191   /** Compare two strands for equality. 191   /** Compare two strands for equality.
192   192  
193   Two strands are equal if they share the same internal 193   Two strands are equal if they share the same internal
194   serialization state. Equal strands serialize coroutines 194   serialization state. Equal strands serialize coroutines
195   with respect to each other. 195   with respect to each other.
196   196  
197   @param other The strand to compare against. 197   @param other The strand to compare against.
198   @return true if both strands share the same implementation. 198   @return true if both strands share the same implementation.
199   */ 199   */
200   bool 200   bool
HITCBC 201   499505 operator==(strand const& other) const noexcept 201   499505 operator==(strand const& other) const noexcept
202   { 202   {
HITCBC 203   499505 return impl_.get() == other.impl_.get(); 203   499505 return impl_.get() == other.impl_.get();
204   } 204   }
205   205  
206   /** Post a continuation to the strand. 206   /** Post a continuation to the strand.
207   207  
208   The continuation is always queued for execution, never resumed 208   The continuation is always queued for execution, never resumed
209   immediately. When the strand becomes available, queued 209   immediately. When the strand becomes available, queued
210   work executes in FIFO order on the underlying executor. 210   work executes in FIFO order on the underlying executor.
211   211  
212   @par Ordering 212   @par Ordering
213   Guarantees strict FIFO ordering relative to other post() calls. 213   Guarantees strict FIFO ordering relative to other post() calls.
214   Use this instead of dispatch() when ordering matters. 214   Use this instead of dispatch() when ordering matters.
215   215  
216   @param c The continuation to post. The caller retains 216   @param c The continuation to post. The caller retains
217   ownership; the continuation must remain valid until 217   ownership; the continuation must remain valid until
218   it is dequeued and resumed. 218   it is dequeued and resumed.
219   */ 219   */
220   void 220   void
HITCBC 221   30335 post(continuation& c) const 221   30335 post(continuation& c) const
222   { 222   {
HITCBC 223   30335 detail::strand_service::post(impl_, executor_ref(ex_), c.h); 223   30335 detail::strand_service::post(impl_, executor_ref(ex_), c.h);
HITCBC 224   30335 } 224   30335 }
225   225  
226   /** Dispatch a continuation through the strand. 226   /** Dispatch a continuation through the strand.
227   227  
228   Returns a handle for symmetric transfer. If the calling 228   Returns a handle for symmetric transfer. If the calling
229   thread is already executing within this strand, returns `c.h`. 229   thread is already executing within this strand, returns `c.h`.
230   Otherwise, the continuation is queued and 230   Otherwise, the continuation is queued and
231   `std::noop_coroutine()` is returned. 231   `std::noop_coroutine()` is returned.
232   232  
233   @par Ordering 233   @par Ordering
234   Callers requiring strict FIFO ordering should use post() 234   Callers requiring strict FIFO ordering should use post()
235   instead, which always queues the continuation. 235   instead, which always queues the continuation.
236   236  
237   @param c The continuation to dispatch. The caller retains 237   @param c The continuation to dispatch. The caller retains
238   ownership; the continuation must remain valid until 238   ownership; the continuation must remain valid until
239   it is dequeued and resumed. 239   it is dequeued and resumed.
240   240  
241   @return A handle for symmetric transfer or `std::noop_coroutine()`. 241   @return A handle for symmetric transfer or `std::noop_coroutine()`.
242   */ 242   */
243   std::coroutine_handle<> 243   std::coroutine_handle<>
HITCBC 244   8 dispatch(continuation& c) const 244   8 dispatch(continuation& c) const
245   { 245   {
HITCBC 246   8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c.h); 246   8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c.h);
247   } 247   }
248   }; 248   };
249   249  
250   // Deduction guide 250   // Deduction guide
251   template<typename Ex> 251   template<typename Ex>
252   strand(Ex) -> strand<Ex>; 252   strand(Ex) -> strand<Ex>;
253   253  
254   } // namespace capy 254   } // namespace capy
255   } // namespace boost 255   } // namespace boost
256   256  
257   #endif 257   #endif