TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <boost/capy/detail/thread_local_ptr.hpp>
14 : #include <boost/capy/ex/frame_allocator.hpp>
15 : #include <boost/capy/test/thread_name.hpp>
16 : #include <algorithm>
17 : #include <atomic>
18 : #include <condition_variable>
19 : #include <cstdio>
20 : #include <mutex>
21 : #include <thread>
22 : #include <vector>
23 :
24 : /*
25 : Thread pool implementation using a shared work queue.
26 :
27 : Work items are continuations linked via their intrusive next pointer,
28 : stored in a single queue protected by a mutex. No per-post heap
29 : allocation: the continuation is owned by the caller and linked
30 : directly. Worker threads wait on a condition_variable until work
31 : is available or stop is requested.
32 :
33 : Threads are started lazily on first post() via std::call_once to avoid
34 : spawning threads for pools that are constructed but never used. Each
35 : thread is named with a configurable prefix plus index for debugger
36 : visibility.
37 :
38 : Work tracking: on_work_started/on_work_finished maintain an atomic
39 : outstanding_work_ counter. join() blocks until this counter reaches
40 : zero, then signals workers to stop and joins threads.
41 :
42 : Two shutdown paths:
43 : - join(): waits for outstanding work to drain, then stops workers.
44 : - stop(): immediately signals workers to exit; queued work is abandoned.
45 : - Destructor: stop() then join() (abandon + wait for threads).
46 : */
47 :
48 : namespace boost {
49 : namespace capy {
50 :
51 : //------------------------------------------------------------------------------
52 :
53 : class thread_pool::impl
54 : {
55 : // Identifies the pool owning the current worker thread, or
56 : // nullptr if the calling thread is not a pool worker. Checked
57 : // by dispatch() to decide between symmetric transfer (inline
58 : // resume) and post.
59 : static inline detail::thread_local_ptr<impl const> current_;
60 :
61 : // Intrusive queue of continuations via continuation::next.
62 : // No per-post allocation: the continuation is owned by the caller.
63 : continuation* head_ = nullptr;
64 : continuation* tail_ = nullptr;
65 :
66 HIT 17831 : void push(continuation* c) noexcept
67 : {
68 17831 : c->next = nullptr;
69 17831 : if(tail_)
70 6986 : tail_->next = c;
71 : else
72 10845 : head_ = c;
73 17831 : tail_ = c;
74 17831 : }
75 :
76 17997 : continuation* pop() noexcept
77 : {
78 17997 : if(!head_)
79 166 : return nullptr;
80 17831 : continuation* c = head_;
81 17831 : head_ = head_->next;
82 17831 : if(!head_)
83 10845 : tail_ = nullptr;
84 17831 : return c;
85 : }
86 :
87 32603 : bool empty() const noexcept
88 : {
89 32603 : return head_ == nullptr;
90 : }
91 :
92 : std::mutex mutex_;
93 : std::condition_variable work_cv_;
94 : std::condition_variable done_cv_;
95 : std::vector<std::thread> threads_;
96 : std::atomic<std::size_t> outstanding_work_{0};
97 : bool stop_{false};
98 : bool joined_{false};
99 : std::size_t num_threads_;
100 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101 : std::once_flag start_flag_;
102 :
103 : public:
104 166 : ~impl() = default;
105 :
106 : bool
107 355 : running_in_this_thread() const noexcept
108 : {
109 355 : return current_.get() == this;
110 : }
111 :
112 : // Destroy abandoned coroutine frames. Must be called
113 : // before execution_context::shutdown()/destroy() so
114 : // that suspended-frame destructors (e.g. delay_awaitable
115 : // calling timer_service::cancel()) run while services
116 : // are still valid.
117 : void
118 166 : drain_abandoned() noexcept
119 : {
120 373 : while(auto* c = pop())
121 : {
122 207 : auto h = c->h;
123 207 : if(h && h != std::noop_coroutine())
124 156 : h.destroy();
125 207 : }
126 166 : }
127 :
128 166 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
129 166 : : num_threads_(num_threads)
130 : {
131 166 : if(num_threads_ == 0)
132 4 : num_threads_ = std::max(
133 2 : std::thread::hardware_concurrency(), 1u);
134 :
135 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
136 166 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
137 166 : thread_name_prefix_[n] = '\0';
138 166 : }
139 :
140 : void
141 17831 : post(continuation& c)
142 : {
143 17831 : ensure_started();
144 : {
145 17831 : std::lock_guard<std::mutex> lock(mutex_);
146 17831 : push(&c);
147 17831 : }
148 17831 : work_cv_.notify_one();
149 17831 : }
150 :
151 : void
152 347 : on_work_started() noexcept
153 : {
154 347 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
155 347 : }
156 :
157 : void
158 347 : on_work_finished() noexcept
159 : {
160 347 : if(outstanding_work_.fetch_sub(
161 347 : 1, std::memory_order_acq_rel) == 1)
162 : {
163 87 : std::lock_guard<std::mutex> lock(mutex_);
164 87 : if(joined_ && !stop_)
165 4 : stop_ = true;
166 87 : done_cv_.notify_all();
167 87 : work_cv_.notify_all();
168 87 : }
169 347 : }
170 :
171 : void
172 178 : join() noexcept
173 : {
174 : {
175 178 : std::unique_lock<std::mutex> lock(mutex_);
176 178 : if(joined_)
177 12 : return;
178 166 : joined_ = true;
179 :
180 166 : if(outstanding_work_.load(
181 166 : std::memory_order_acquire) == 0)
182 : {
183 110 : stop_ = true;
184 110 : work_cv_.notify_all();
185 : }
186 : else
187 : {
188 56 : done_cv_.wait(lock, [this]{
189 61 : return stop_;
190 : });
191 : }
192 178 : }
193 :
194 359 : for(auto& t : threads_)
195 193 : if(t.joinable())
196 193 : t.join();
197 : }
198 :
199 : void
200 168 : stop() noexcept
201 : {
202 : {
203 168 : std::lock_guard<std::mutex> lock(mutex_);
204 168 : stop_ = true;
205 168 : }
206 168 : work_cv_.notify_all();
207 168 : done_cv_.notify_all();
208 168 : }
209 :
210 : private:
211 : void
212 17831 : ensure_started()
213 : {
214 17831 : std::call_once(start_flag_, [this]{
215 109 : threads_.reserve(num_threads_);
216 302 : for(std::size_t i = 0; i < num_threads_; ++i)
217 386 : threads_.emplace_back([this, i]{ run(i); });
218 109 : });
219 17831 : }
220 :
221 : void
222 193 : run(std::size_t index)
223 : {
224 : // Build name; set_current_thread_name truncates to platform limits.
225 : char name[16];
226 193 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
227 193 : set_current_thread_name(name);
228 :
229 : // Mark this thread as a worker of this pool so dispatch()
230 : // can symmetric-transfer when called from within pool work.
231 : struct scoped_pool
232 : {
233 193 : scoped_pool(impl const* p) noexcept { current_.set(p); }
234 193 : ~scoped_pool() noexcept { current_.set(nullptr); }
235 193 : } guard(this);
236 :
237 : for(;;)
238 : {
239 17817 : continuation* c = nullptr;
240 : {
241 17817 : std::unique_lock<std::mutex> lock(mutex_);
242 17817 : work_cv_.wait(lock, [this]{
243 47487 : return !empty() ||
244 47487 : stop_;
245 : });
246 17817 : if(stop_)
247 386 : return;
248 17624 : c = pop();
249 17817 : }
250 17624 : if(c)
251 17624 : safe_resume(c->h);
252 17624 : }
253 193 : }
254 : };
255 :
256 : //------------------------------------------------------------------------------
257 :
258 166 : thread_pool::
259 : ~thread_pool()
260 : {
261 166 : impl_->stop();
262 166 : impl_->join();
263 166 : impl_->drain_abandoned();
264 166 : shutdown();
265 166 : destroy();
266 166 : delete impl_;
267 166 : }
268 :
269 166 : thread_pool::
270 166 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
271 166 : : impl_(new impl(num_threads, thread_name_prefix))
272 : {
273 166 : this->set_frame_allocator(std::allocator<void>{});
274 166 : }
275 :
276 : void
277 12 : thread_pool::
278 : join() noexcept
279 : {
280 12 : impl_->join();
281 12 : }
282 :
283 : void
284 2 : thread_pool::
285 : stop() noexcept
286 : {
287 2 : impl_->stop();
288 2 : }
289 :
290 : //------------------------------------------------------------------------------
291 :
292 : thread_pool::executor_type
293 11580 : thread_pool::
294 : get_executor() const noexcept
295 : {
296 11580 : return executor_type(
297 11580 : const_cast<thread_pool&>(*this));
298 : }
299 :
300 : void
301 347 : thread_pool::executor_type::
302 : on_work_started() const noexcept
303 : {
304 347 : pool_->impl_->on_work_started();
305 347 : }
306 :
307 : void
308 347 : thread_pool::executor_type::
309 : on_work_finished() const noexcept
310 : {
311 347 : pool_->impl_->on_work_finished();
312 347 : }
313 :
314 : void
315 17489 : thread_pool::executor_type::
316 : post(continuation& c) const
317 : {
318 17489 : pool_->impl_->post(c);
319 17489 : }
320 :
321 : std::coroutine_handle<>
322 355 : thread_pool::executor_type::
323 : dispatch(continuation& c) const
324 : {
325 355 : if(pool_->impl_->running_in_this_thread())
326 13 : return c.h;
327 342 : pool_->impl_->post(c);
328 342 : return std::noop_coroutine();
329 : }
330 :
331 : } // capy
332 : } // boost
|