76.56% Lines (49/64) 92.31% Functions (12/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/ex/frame_allocator.hpp> 14   #include <boost/capy/ex/frame_allocator.hpp>
15   15  
16   #include <coroutine> 16   #include <coroutine>
17   #include <cstddef> 17   #include <cstddef>
18   #include <exception> 18   #include <exception>
19   19  
20   namespace boost { 20   namespace boost {
21   namespace capy { 21   namespace capy {
22   namespace detail { 22   namespace detail {
23   23  
24   class strand_queue; 24   class strand_queue;
25   25  
26   //---------------------------------------------------------- 26   //----------------------------------------------------------
27   27  
28   // Metadata stored before the coroutine frame 28   // Metadata stored before the coroutine frame
29   struct frame_prefix 29   struct frame_prefix
30   { 30   {
31   frame_prefix* next; 31   frame_prefix* next;
32   strand_queue* queue; 32   strand_queue* queue;
33   std::size_t alloc_size; 33   std::size_t alloc_size;
34   }; 34   };
35   35  
36   //---------------------------------------------------------- 36   //----------------------------------------------------------
37   37  
38   /** Wrapper coroutine for strand queue dispatch operations. 38   /** Wrapper coroutine for strand queue dispatch operations.
39   39  
40   This coroutine wraps a target coroutine handle and resumes 40   This coroutine wraps a target coroutine handle and resumes
41   it when dispatched. The wrapper ensures control returns to 41   it when dispatched. The wrapper ensures control returns to
42   the dispatch loop after the target suspends or completes. 42   the dispatch loop after the target suspends or completes.
43   43  
44   The promise contains an intrusive list node for queue 44   The promise contains an intrusive list node for queue
45   storage and supports a custom allocator that recycles 45   storage and supports a custom allocator that recycles
46   coroutine frames via a free list. 46   coroutine frames via a free list.
47   */ 47   */
48   struct strand_op 48   struct strand_op
49   { 49   {
50   struct promise_type 50   struct promise_type
51   { 51   {
52   promise_type* next = nullptr; 52   promise_type* next = nullptr;
53   53  
54   void* 54   void*
55   operator new( 55   operator new(
56   std::size_t size, 56   std::size_t size,
57   strand_queue& q, 57   strand_queue& q,
58   std::coroutine_handle<void>); 58   std::coroutine_handle<void>);
59   59  
60   void 60   void
61   operator delete(void* p, std::size_t); 61   operator delete(void* p, std::size_t);
62   62  
63   strand_op 63   strand_op
HITCBC 64   30340 get_return_object() noexcept 64   30340 get_return_object() noexcept
65   { 65   {
HITCBC 66   30340 return {std::coroutine_handle<promise_type>::from_promise(*this)}; 66   30340 return {std::coroutine_handle<promise_type>::from_promise(*this)};
67   } 67   }
68   68  
69   std::suspend_always 69   std::suspend_always
HITCBC 70   30340 initial_suspend() noexcept 70   30340 initial_suspend() noexcept
71   { 71   {
HITCBC 72   30340 return {}; 72   30340 return {};
73   } 73   }
74   74  
75   std::suspend_always 75   std::suspend_always
HITCBC 76   30340 final_suspend() noexcept 76   30340 final_suspend() noexcept
77   { 77   {
HITCBC 78   30340 return {}; 78   30340 return {};
79   } 79   }
80   80  
81   void 81   void
HITCBC 82   30340 return_void() noexcept 82   30340 return_void() noexcept
83   { 83   {
HITCBC 84   30340 } 84   30340 }
85   85  
86   void 86   void
87   unhandled_exception() 87   unhandled_exception()
88   { 88   {
89   std::terminate(); 89   std::terminate();
90   } 90   }
91   }; 91   };
92   92  
93   std::coroutine_handle<promise_type> h_; 93   std::coroutine_handle<promise_type> h_;
94   }; 94   };
95   95  
96   //---------------------------------------------------------- 96   //----------------------------------------------------------
97   97  
98   /** Single-threaded dispatch queue for coroutine handles. 98   /** Single-threaded dispatch queue for coroutine handles.
99   99  
100   This queue stores coroutine handles and resumes them 100   This queue stores coroutine handles and resumes them
101   sequentially when dispatch() is called. Each pushed 101   sequentially when dispatch() is called. Each pushed
102   handle is wrapped in a strand_op coroutine that ensures 102   handle is wrapped in a strand_op coroutine that ensures
103   control returns to the dispatch loop after the target 103   control returns to the dispatch loop after the target
104   suspends or completes. 104   suspends or completes.
105   105  
106   The queue uses an intrusive singly-linked list through 106   The queue uses an intrusive singly-linked list through
107   the promise type to avoid separate node allocations. 107   the promise type to avoid separate node allocations.
108   A free list recycles wrapper coroutine frames to reduce 108   A free list recycles wrapper coroutine frames to reduce
109   allocation overhead during repeated push/dispatch cycles. 109   allocation overhead during repeated push/dispatch cycles.
110   110  
111   @par Thread Safety 111   @par Thread Safety
112   This class is not thread-safe. All operations must be 112   This class is not thread-safe. All operations must be
113   called from a single thread. 113   called from a single thread.
114   */ 114   */
115   class strand_queue 115   class strand_queue
116   { 116   {
117   using promise_type = strand_op::promise_type; 117   using promise_type = strand_op::promise_type;
118   118  
119   promise_type* head_ = nullptr; 119   promise_type* head_ = nullptr;
120   promise_type* tail_ = nullptr; 120   promise_type* tail_ = nullptr;
121   frame_prefix* free_list_ = nullptr; 121   frame_prefix* free_list_ = nullptr;
122   122  
123   friend struct strand_op::promise_type; 123   friend struct strand_op::promise_type;
124   124  
125   static 125   static
126   strand_op 126   strand_op
HITCBC 127   30340 make_strand_op( 127   30340 make_strand_op(
128   strand_queue& q, 128   strand_queue& q,
129   std::coroutine_handle<void> target) 129   std::coroutine_handle<void> target)
130   { 130   {
131   (void)q; 131   (void)q;
132   safe_resume(target); 132   safe_resume(target);
133   co_return; 133   co_return;
HITCBC 134   60680 } 134   60680 }
135   135  
136   public: 136   public:
HITCBC 137   11442 strand_queue() = default; 137   11442 strand_queue() = default;
138   138  
139   strand_queue(strand_queue const&) = delete; 139   strand_queue(strand_queue const&) = delete;
140   strand_queue& operator=(strand_queue const&) = delete; 140   strand_queue& operator=(strand_queue const&) = delete;
141   141  
142   /** Destructor. 142   /** Destructor.
143   143  
144   Destroys any pending wrappers without resuming them, 144   Destroys any pending wrappers without resuming them,
145   then frees all memory in the free list. 145   then frees all memory in the free list.
146   */ 146   */
HITCBC 147   11442 ~strand_queue() 147   11442 ~strand_queue()
148   { 148   {
149   // Destroy pending wrappers 149   // Destroy pending wrappers
HITCBC 150   11442 while(head_) 150   11442 while(head_)
151   { 151   {
MISUBC 152   promise_type* p = head_; 152   promise_type* p = head_;
MISUBC 153   head_ = p->next; 153   head_ = p->next;
154   154  
MISUBC 155   auto h = std::coroutine_handle<promise_type>::from_promise(*p); 155   auto h = std::coroutine_handle<promise_type>::from_promise(*p);
MISUBC 156   h.destroy(); 156   h.destroy();
157   } 157   }
158   158  
159   // Free the free list memory 159   // Free the free list memory
HITCBC 160   11442 while(free_list_) 160   11442 while(free_list_)
161   { 161   {
MISUBC 162   frame_prefix* prefix = free_list_; 162   frame_prefix* prefix = free_list_;
MISUBC 163   free_list_ = prefix->next; 163   free_list_ = prefix->next;
MISUBC 164   ::operator delete(prefix); 164   ::operator delete(prefix);
165   } 165   }
HITCBC 166   11442 } 166   11442 }
167   167  
168   /** Returns true if there are no pending operations. 168   /** Returns true if there are no pending operations.
169   */ 169   */
170   bool 170   bool
HITCBC 171   20512 empty() const noexcept 171   17600 empty() const noexcept
172   { 172   {
HITCBC 173   20512 return head_ == nullptr; 173   17600 return head_ == nullptr;
174   } 174   }
175   175  
176   /** Push a coroutine handle to the queue. 176   /** Push a coroutine handle to the queue.
177   177  
178   Creates a wrapper coroutine and appends it to the 178   Creates a wrapper coroutine and appends it to the
179   queue. The wrapper will resume the target handle 179   queue. The wrapper will resume the target handle
180   when dispatch() processes it. 180   when dispatch() processes it.
181   181  
182   @param h The coroutine handle to dispatch. 182   @param h The coroutine handle to dispatch.
183   */ 183   */
184   void 184   void
HITCBC 185   30340 push(std::coroutine_handle<void> h) 185   30340 push(std::coroutine_handle<void> h)
186   { 186   {
HITCBC 187   30340 strand_op op = make_strand_op(*this, h); 187   30340 strand_op op = make_strand_op(*this, h);
188   188  
HITCBC 189   30340 promise_type* p = &op.h_.promise(); 189   30340 promise_type* p = &op.h_.promise();
HITCBC 190   30340 p->next = nullptr; 190   30340 p->next = nullptr;
191   191  
HITCBC 192   30340 if(tail_) 192   30340 if(tail_)
HITCBC 193   9828 tail_->next = p; 193   12740 tail_->next = p;
194   else 194   else
HITCBC 195   20512 head_ = p; 195   17600 head_ = p;
HITCBC 196   30340 tail_ = p; 196   30340 tail_ = p;
HITCBC 197   30340 } 197   30340 }
198   198  
199   /** Resume all queued coroutines in sequence. 199   /** Resume all queued coroutines in sequence.
200   200  
201   Processes each wrapper in FIFO order, resuming its 201   Processes each wrapper in FIFO order, resuming its
202   target coroutine. After each target suspends or 202   target coroutine. After each target suspends or
203   completes, the wrapper is destroyed and its frame 203   completes, the wrapper is destroyed and its frame
204   is added to the free list for reuse. 204   is added to the free list for reuse.
205   205  
206   Coroutines resumed during dispatch may push new 206   Coroutines resumed during dispatch may push new
207   handles, which will also be processed in the same 207   handles, which will also be processed in the same
208   dispatch call. 208   dispatch call.
209   209  
210   @warning Not thread-safe. Do not call while another 210   @warning Not thread-safe. Do not call while another
211   thread may be calling push(). 211   thread may be calling push().
212   */ 212   */
213   void 213   void
214   dispatch() 214   dispatch()
215   { 215   {
216   while(head_) 216   while(head_)
217   { 217   {
218   promise_type* p = head_; 218   promise_type* p = head_;
219   head_ = p->next; 219   head_ = p->next;
220   if(!head_) 220   if(!head_)
221   tail_ = nullptr; 221   tail_ = nullptr;
222   222  
223   auto h = std::coroutine_handle<promise_type>::from_promise(*p); 223   auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224   safe_resume(h); 224   safe_resume(h);
225   h.destroy(); 225   h.destroy();
226   } 226   }
227   } 227   }
228   228  
229   /** Batch of taken items for thread-safe dispatch. */ 229   /** Batch of taken items for thread-safe dispatch. */
230   struct taken_batch 230   struct taken_batch
231   { 231   {
232   promise_type* head = nullptr; 232   promise_type* head = nullptr;
233   promise_type* tail = nullptr; 233   promise_type* tail = nullptr;
234   }; 234   };
235   235  
236   /** Take all pending items atomically. 236   /** Take all pending items atomically.
237   237  
238   Removes all items from the queue and returns them 238   Removes all items from the queue and returns them
239   as a batch. The queue is left empty. 239   as a batch. The queue is left empty.
240   240  
241   @return The batch of taken items. 241   @return The batch of taken items.
242   */ 242   */
243   taken_batch 243   taken_batch
HITCBC 244   20512 take_all() noexcept 244   17600 take_all() noexcept
245   { 245   {
HITCBC 246   20512 taken_batch batch{head_, tail_}; 246   17600 taken_batch batch{head_, tail_};
HITCBC 247   20512 head_ = tail_ = nullptr; 247   17600 head_ = tail_ = nullptr;
HITCBC 248   20512 return batch; 248   17600 return batch;
249   } 249   }
250   250  
251   /** Dispatch a batch of taken items. 251   /** Dispatch a batch of taken items.
252   252  
253   @param batch The batch to dispatch. 253   @param batch The batch to dispatch.
254   254  
255   @note This is thread-safe w.r.t. push() because it doesn't 255   @note This is thread-safe w.r.t. push() because it doesn't
256   access the queue's free_list_. Frames are deleted directly 256   access the queue's free_list_. Frames are deleted directly
257   rather than recycled. 257   rather than recycled.
258   */ 258   */
259   static 259   static
260   void 260   void
HITCBC 261   20512 dispatch_batch(taken_batch& batch) 261   17600 dispatch_batch(taken_batch& batch)
262   { 262   {
HITCBC 263   50852 while(batch.head) 263   47940 while(batch.head)
264   { 264   {
HITCBC 265   30340 promise_type* p = batch.head; 265   30340 promise_type* p = batch.head;
HITCBC 266   30340 batch.head = p->next; 266   30340 batch.head = p->next;
267   267  
HITCBC 268   30340 auto h = std::coroutine_handle<promise_type>::from_promise(*p); 268   30340 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
HITCBC 269   30340 safe_resume(h); 269   30340 safe_resume(h);
270   // Don't use h.destroy() - it would call operator delete which 270   // Don't use h.destroy() - it would call operator delete which
271   // accesses the queue's free_list_ (race with push). 271   // accesses the queue's free_list_ (race with push).
272   // Instead, manually free the frame without recycling. 272   // Instead, manually free the frame without recycling.
273   // h.address() returns the frame base (what operator new returned). 273   // h.address() returns the frame base (what operator new returned).
HITCBC 274   30340 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; 274   30340 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
HITCBC 275   30340 ::operator delete(prefix); 275   30340 ::operator delete(prefix);
276   } 276   }
HITCBC 277   20512 batch.tail = nullptr; 277   17600 batch.tail = nullptr;
HITCBC 278   20512 } 278   17600 }
279   }; 279   };
280   280  
281   //---------------------------------------------------------- 281   //----------------------------------------------------------
282   282  
283   inline 283   inline
284   void* 284   void*
HITCBC 285   30340 strand_op::promise_type::operator new( 285   30340 strand_op::promise_type::operator new(
286   std::size_t size, 286   std::size_t size,
287   strand_queue& q, 287   strand_queue& q,
288   std::coroutine_handle<void>) 288   std::coroutine_handle<void>)
289   { 289   {
290   // Total size includes prefix 290   // Total size includes prefix
HITCBC 291   30340 std::size_t alloc_size = size + sizeof(frame_prefix); 291   30340 std::size_t alloc_size = size + sizeof(frame_prefix);
292   void* raw; 292   void* raw;
293   293  
294   // Try to reuse from free list 294   // Try to reuse from free list
HITCBC 295   30340 if(q.free_list_) 295   30340 if(q.free_list_)
296   { 296   {
MISUBC 297   frame_prefix* prefix = q.free_list_; 297   frame_prefix* prefix = q.free_list_;
MISUBC 298   q.free_list_ = prefix->next; 298   q.free_list_ = prefix->next;
MISUBC 299   raw = prefix; 299   raw = prefix;
300   } 300   }
301   else 301   else
302   { 302   {
HITCBC 303   30340 raw = ::operator new(alloc_size); 303   30340 raw = ::operator new(alloc_size);
304   } 304   }
305   305  
306   // Initialize prefix 306   // Initialize prefix
HITCBC 307   30340 frame_prefix* prefix = static_cast<frame_prefix*>(raw); 307   30340 frame_prefix* prefix = static_cast<frame_prefix*>(raw);
HITCBC 308   30340 prefix->next = nullptr; 308   30340 prefix->next = nullptr;
HITCBC 309   30340 prefix->queue = &q; 309   30340 prefix->queue = &q;
HITCBC 310   30340 prefix->alloc_size = alloc_size; 310   30340 prefix->alloc_size = alloc_size;
311   311  
312   // Return pointer AFTER the prefix (this is where coroutine frame goes) 312   // Return pointer AFTER the prefix (this is where coroutine frame goes)
HITCBC 313   30340 return prefix + 1; 313   30340 return prefix + 1;
314   } 314   }
315   315  
316   inline 316   inline
317   void 317   void
MISUBC 318   strand_op::promise_type::operator delete(void* p, std::size_t) 318   strand_op::promise_type::operator delete(void* p, std::size_t)
319   { 319   {
320   // Calculate back to get the prefix 320   // Calculate back to get the prefix
MISUBC 321   frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; 321   frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322   322  
323   // Add to free list 323   // Add to free list
MISUBC 324   prefix->next = prefix->queue->free_list_; 324   prefix->next = prefix->queue->free_list_;
MISUBC 325   prefix->queue->free_list_ = prefix; 325   prefix->queue->free_list_ = prefix;
MISUBC 326   } 326   }
327   327  
328   } // namespace detail 328   } // namespace detail
329   } // namespace capy 329   } // namespace capy
330   } // namespace boost 330   } // namespace boost
331   331  
332   #endif 332   #endif