TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_impl.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <coroutine>
14 : #include <memory>
15 : #include <utility>
16 :
17 : namespace boost {
18 : namespace capy {
19 : namespace detail {
20 :
21 : // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
22 : // in-flight invokers from repopulating a freed cache slot.
23 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
24 :
25 : /** Concrete strand_service.
26 :
27 : Holds a shared mutex pool (193 entries), a linked list of live
28 : impls (for shutdown traversal), and a single-slot invoker
29 : coroutine frame cache shared across all strands of this service.
30 :
31 : The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
32 : public so the namespace-scope `make_invoker` coroutine and the
33 : `strand_service` static methods can call them without friendship.
34 : */
35 : class strand_service_impl : public strand_service
36 : {
37 : public:
38 : static constexpr std::size_t num_mutexes = 193;
39 :
40 : std::mutex mutex_;
41 : std::size_t salt_ = 0;
42 : std::shared_ptr<std::mutex> mutexes_[num_mutexes];
43 : intrusive_list<strand_impl> impl_list_;
44 : std::atomic<void*> invoker_frame_cache_{nullptr};
45 :
46 : explicit
47 HIT 30 : strand_service_impl(execution_context&)
48 30 : {
49 30 : }
50 :
51 : std::shared_ptr<strand_impl>
52 11442 : create_implementation() override
53 : {
54 11442 : auto new_impl = std::make_shared<strand_impl>();
55 :
56 11442 : std::lock_guard<std::mutex> lock(mutex_);
57 :
58 11442 : std::size_t s = salt_++;
59 11442 : std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
60 11442 : idx += idx >> 3;
61 11442 : idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
62 11442 : idx %= num_mutexes;
63 11442 : if(!mutexes_[idx])
64 663 : mutexes_[idx] = std::make_shared<std::mutex>();
65 11442 : new_impl->mutex_ = mutexes_[idx].get();
66 :
67 11442 : impl_list_.push_back(new_impl.get());
68 11442 : new_impl->service_.store(this, std::memory_order_release);
69 :
70 22884 : return new_impl;
71 11442 : }
72 :
73 : static bool
74 30340 : enqueue(strand_impl& impl, std::coroutine_handle<> h)
75 : {
76 30340 : std::lock_guard<std::mutex> lock(*impl.mutex_);
77 30340 : impl.pending_.push(h);
78 30340 : if(!impl.locked_)
79 : {
80 17026 : impl.locked_ = true;
81 17026 : return true;
82 : }
83 13314 : return false;
84 30340 : }
85 :
86 : static void
87 17600 : dispatch_pending(strand_impl& impl)
88 : {
89 17600 : strand_queue::taken_batch batch;
90 : {
91 17600 : std::lock_guard<std::mutex> lock(*impl.mutex_);
92 17600 : batch = impl.pending_.take_all();
93 17600 : }
94 17600 : impl.pending_.dispatch_batch(batch);
95 17600 : }
96 :
97 : static bool
98 17600 : try_unlock(strand_impl& impl)
99 : {
100 17600 : std::lock_guard<std::mutex> lock(*impl.mutex_);
101 17600 : if(impl.pending_.empty())
102 : {
103 17026 : impl.locked_ = false;
104 17026 : return true;
105 : }
106 574 : return false;
107 17600 : }
108 :
109 : static void
110 17600 : set_dispatch_thread(strand_impl& impl) noexcept
111 : {
112 17600 : impl.dispatch_thread_.store(std::this_thread::get_id());
113 17600 : }
114 :
115 : static void
116 17026 : clear_dispatch_thread(strand_impl& impl) noexcept
117 : {
118 17026 : impl.dispatch_thread_.store(std::thread::id{});
119 17026 : }
120 :
121 : // Defined below; needs strand_invoker complete.
122 : static void
123 : post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
124 :
125 : protected:
126 : void
127 30 : shutdown() override
128 : {
129 30 : std::lock_guard<std::mutex> lock(mutex_);
130 30 : while(auto* p = impl_list_.pop_front())
131 : {
132 MIS 0 : std::lock_guard<std::mutex> impl_lock(*p->mutex_);
133 0 : p->locked_ = true;
134 0 : p->service_.store(nullptr, std::memory_order_release);
135 0 : }
136 :
137 HIT 30 : void* fp = invoker_frame_cache_.exchange(
138 : kCacheClosed, std::memory_order_acq_rel);
139 30 : if(fp) ::operator delete(fp);
140 30 : }
141 : };
142 :
143 : /** Invoker coroutine that drains a strand's pending queue.
144 :
145 : Runs once the strand transitions from unlocked to locked. Holds
146 : the impl alive via the coroutine parameter (a shared_ptr in the
147 : coroutine frame), so user code may drop its strand handle while
148 : the invoker is mid-flight.
149 :
150 : The frame's allocator recycles a single per-service slot. The
151 : trailer points at the service (lifetime: execution_context),
152 : NOT the impl (lifetime: per-strand), so operator delete is
153 : safe even after the impl has been destroyed.
154 : */
155 : struct strand_invoker
156 : {
157 : struct promise_type
158 : {
159 : // Stored in the coroutine frame so its address is stable for
160 : // posting to the inner executor.
161 : continuation self_;
162 :
163 : void*
164 17026 : operator new(
165 : std::size_t n,
166 : std::shared_ptr<strand_impl> const& impl)
167 : {
168 17026 : auto* svc = impl->service_.load(std::memory_order_acquire);
169 17026 : constexpr auto A = alignof(strand_service_impl*);
170 17026 : std::size_t padded = (n + A - 1) & ~(A - 1);
171 17026 : std::size_t total = padded + sizeof(strand_service_impl*);
172 :
173 17026 : void* p = svc->invoker_frame_cache_.exchange(
174 : nullptr, std::memory_order_acquire);
175 17026 : if(!p || p == kCacheClosed)
176 7674 : p = ::operator new(total);
177 :
178 17026 : *reinterpret_cast<strand_service_impl**>(
179 17026 : static_cast<char*>(p) + padded) = svc;
180 17026 : return p;
181 : }
182 :
183 : void
184 17026 : operator delete(void* p, std::size_t n) noexcept
185 : {
186 17026 : constexpr auto A = alignof(strand_service_impl*);
187 17026 : std::size_t padded = (n + A - 1) & ~(A - 1);
188 17026 : auto* svc = *reinterpret_cast<strand_service_impl**>(
189 : static_cast<char*>(p) + padded);
190 :
191 17026 : void* expected = nullptr;
192 17026 : if(!svc->invoker_frame_cache_.compare_exchange_strong(
193 : expected, p, std::memory_order_release))
194 7657 : ::operator delete(p);
195 17026 : }
196 :
197 : strand_invoker
198 17026 : get_return_object() noexcept
199 : {
200 17026 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
201 : }
202 :
203 17026 : std::suspend_always initial_suspend() noexcept { return {}; }
204 17026 : std::suspend_never final_suspend() noexcept { return {}; }
205 17026 : void return_void() noexcept {}
206 MIS 0 : void unhandled_exception() { std::terminate(); }
207 : };
208 :
209 : std::coroutine_handle<promise_type> h_;
210 : };
211 :
212 : // The by-value parameter lives in the coroutine frame for the
213 : // invoker's lifetime, keeping the impl alive past any user-side
214 : // strand drop.
215 : static
216 : strand_invoker
217 HIT 17026 : make_invoker(std::shared_ptr<strand_impl> impl)
218 : {
219 : auto* p = impl.get();
220 : for(;;)
221 : {
222 : strand_service_impl::set_dispatch_thread(*p);
223 : strand_service_impl::dispatch_pending(*p);
224 : if(strand_service_impl::try_unlock(*p))
225 : {
226 : strand_service_impl::clear_dispatch_thread(*p);
227 : co_return;
228 : }
229 : }
230 34052 : }
231 :
232 : void
233 17026 : strand_service_impl::post_invoker(
234 : std::shared_ptr<strand_impl> impl,
235 : executor_ref ex)
236 : {
237 17026 : auto invoker = make_invoker(std::move(impl));
238 17026 : auto& self = invoker.h_.promise().self_;
239 17026 : self.h = invoker.h_;
240 17026 : ex.post(self);
241 17026 : }
242 :
243 22884 : strand_impl::~strand_impl()
244 : {
245 11442 : auto* svc = service_.load(std::memory_order_acquire);
246 11442 : if(!svc) return;
247 11442 : std::lock_guard<std::mutex> lock(svc->mutex_);
248 11442 : svc->impl_list_.remove(this);
249 11442 : }
250 :
251 30 : strand_service::
252 30 : strand_service()
253 30 : : service()
254 : {
255 30 : }
256 :
257 30 : strand_service::
258 : ~strand_service() = default;
259 :
260 : bool
261 12 : strand_service::
262 : running_in_this_thread(strand_impl& impl) noexcept
263 : {
264 12 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
265 : }
266 :
267 : std::coroutine_handle<>
268 8 : strand_service::
269 : dispatch(
270 : std::shared_ptr<strand_impl> const& impl,
271 : executor_ref ex,
272 : std::coroutine_handle<> h)
273 : {
274 8 : if(running_in_this_thread(*impl))
275 3 : return h;
276 :
277 5 : if(strand_service_impl::enqueue(*impl, h))
278 5 : strand_service_impl::post_invoker(impl, ex);
279 5 : return std::noop_coroutine();
280 : }
281 :
282 : void
283 30335 : strand_service::
284 : post(
285 : std::shared_ptr<strand_impl> const& impl,
286 : executor_ref ex,
287 : std::coroutine_handle<> h)
288 : {
289 30335 : if(strand_service_impl::enqueue(*impl, h))
290 17021 : strand_service_impl::post_invoker(impl, ex);
291 30335 : }
292 :
293 : strand_service&
294 11442 : get_strand_service(execution_context& ctx)
295 : {
296 11442 : return ctx.use_service<strand_service_impl>();
297 : }
298 :
299 : } // namespace detail
300 : } // namespace capy
301 : } // namespace boost
|