src/ex/thread_pool.cpp

100.0% Lines (139/139) 100.0% List of functions (29/29)
thread_pool.cpp
f(x) Functions (29)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :66 17831x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :76 17997x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :87 32603x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :104 166x 100.0% 100.0% boost::capy::thread_pool::impl::running_in_this_thread() const :107 355x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :118 166x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :128 166x 100.0% 72.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :141 17831x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :152 347x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :158 347x 100.0% 100.0% boost::capy::thread_pool::impl::join() :172 178x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :188 61x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :200 168x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :212 17831x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :214 109x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :217 193x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :222 193x 100.0% 78.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::scoped_pool(boost::capy::thread_pool::impl const*) :233 193x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::~scoped_pool() :234 193x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :242 32603x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :258 166x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :269 166x 100.0% 55.0% boost::capy::thread_pool::join() :277 12x 100.0% 100.0% boost::capy::thread_pool::stop() :284 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :293 11580x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :301 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :308 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :315 17489x 100.0% 100.0% boost::capy::thread_pool::executor_type::dispatch(boost::capy::continuation&) const :322 355x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/detail/thread_local_ptr.hpp>
14 #include <boost/capy/ex/frame_allocator.hpp>
15 #include <boost/capy/test/thread_name.hpp>
16 #include <algorithm>
17 #include <atomic>
18 #include <condition_variable>
19 #include <cstdio>
20 #include <mutex>
21 #include <thread>
22 #include <vector>
23
24 /*
25 Thread pool implementation using a shared work queue.
26
27 Work items are continuations linked via their intrusive next pointer,
28 stored in a single queue protected by a mutex. No per-post heap
29 allocation: the continuation is owned by the caller and linked
30 directly. Worker threads wait on a condition_variable until work
31 is available or stop is requested.
32
33 Threads are started lazily on first post() via std::call_once to avoid
34 spawning threads for pools that are constructed but never used. Each
35 thread is named with a configurable prefix plus index for debugger
36 visibility.
37
38 Work tracking: on_work_started/on_work_finished maintain an atomic
39 outstanding_work_ counter. join() blocks until this counter reaches
40 zero, then signals workers to stop and joins threads.
41
42 Two shutdown paths:
43 - join(): waits for outstanding work to drain, then stops workers.
44 - stop(): immediately signals workers to exit; queued work is abandoned.
45 - Destructor: stop() then join() (abandon + wait for threads).
46 */
47
48 namespace boost {
49 namespace capy {
50
51 //------------------------------------------------------------------------------
52
53 class thread_pool::impl
54 {
55 // Identifies the pool owning the current worker thread, or
56 // nullptr if the calling thread is not a pool worker. Checked
57 // by dispatch() to decide between symmetric transfer (inline
58 // resume) and post.
59 static inline detail::thread_local_ptr<impl const> current_;
60
61 // Intrusive queue of continuations via continuation::next.
62 // No per-post allocation: the continuation is owned by the caller.
63 continuation* head_ = nullptr;
64 continuation* tail_ = nullptr;
65
66 17831x void push(continuation* c) noexcept
67 {
68 17831x c->next = nullptr;
69 17831x if(tail_)
70 6986x tail_->next = c;
71 else
72 10845x head_ = c;
73 17831x tail_ = c;
74 17831x }
75
76 17997x continuation* pop() noexcept
77 {
78 17997x if(!head_)
79 166x return nullptr;
80 17831x continuation* c = head_;
81 17831x head_ = head_->next;
82 17831x if(!head_)
83 10845x tail_ = nullptr;
84 17831x return c;
85 }
86
87 32603x bool empty() const noexcept
88 {
89 32603x return head_ == nullptr;
90 }
91
92 std::mutex mutex_;
93 std::condition_variable work_cv_;
94 std::condition_variable done_cv_;
95 std::vector<std::thread> threads_;
96 std::atomic<std::size_t> outstanding_work_{0};
97 bool stop_{false};
98 bool joined_{false};
99 std::size_t num_threads_;
100 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101 std::once_flag start_flag_;
102
103 public:
104 166x ~impl() = default;
105
106 bool
107 355x running_in_this_thread() const noexcept
108 {
109 355x return current_.get() == this;
110 }
111
112 // Destroy abandoned coroutine frames. Must be called
113 // before execution_context::shutdown()/destroy() so
114 // that suspended-frame destructors (e.g. delay_awaitable
115 // calling timer_service::cancel()) run while services
116 // are still valid.
117 void
118 166x drain_abandoned() noexcept
119 {
120 373x while(auto* c = pop())
121 {
122 207x auto h = c->h;
123 207x if(h && h != std::noop_coroutine())
124 156x h.destroy();
125 207x }
126 166x }
127
128 166x impl(std::size_t num_threads, std::string_view thread_name_prefix)
129 166x : num_threads_(num_threads)
130 {
131 166x if(num_threads_ == 0)
132 4x num_threads_ = std::max(
133 2x std::thread::hardware_concurrency(), 1u);
134
135 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
136 166x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
137 166x thread_name_prefix_[n] = '\0';
138 166x }
139
140 void
141 17831x post(continuation& c)
142 {
143 17831x ensure_started();
144 {
145 17831x std::lock_guard<std::mutex> lock(mutex_);
146 17831x push(&c);
147 17831x }
148 17831x work_cv_.notify_one();
149 17831x }
150
151 void
152 347x on_work_started() noexcept
153 {
154 347x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
155 347x }
156
157 void
158 347x on_work_finished() noexcept
159 {
160 347x if(outstanding_work_.fetch_sub(
161 347x 1, std::memory_order_acq_rel) == 1)
162 {
163 87x std::lock_guard<std::mutex> lock(mutex_);
164 87x if(joined_ && !stop_)
165 4x stop_ = true;
166 87x done_cv_.notify_all();
167 87x work_cv_.notify_all();
168 87x }
169 347x }
170
171 void
172 178x join() noexcept
173 {
174 {
175 178x std::unique_lock<std::mutex> lock(mutex_);
176 178x if(joined_)
177 12x return;
178 166x joined_ = true;
179
180 166x if(outstanding_work_.load(
181 166x std::memory_order_acquire) == 0)
182 {
183 110x stop_ = true;
184 110x work_cv_.notify_all();
185 }
186 else
187 {
188 56x done_cv_.wait(lock, [this]{
189 61x return stop_;
190 });
191 }
192 178x }
193
194 359x for(auto& t : threads_)
195 193x if(t.joinable())
196 193x t.join();
197 }
198
199 void
200 168x stop() noexcept
201 {
202 {
203 168x std::lock_guard<std::mutex> lock(mutex_);
204 168x stop_ = true;
205 168x }
206 168x work_cv_.notify_all();
207 168x done_cv_.notify_all();
208 168x }
209
210 private:
211 void
212 17831x ensure_started()
213 {
214 17831x std::call_once(start_flag_, [this]{
215 109x threads_.reserve(num_threads_);
216 302x for(std::size_t i = 0; i < num_threads_; ++i)
217 386x threads_.emplace_back([this, i]{ run(i); });
218 109x });
219 17831x }
220
221 void
222 193x run(std::size_t index)
223 {
224 // Build name; set_current_thread_name truncates to platform limits.
225 char name[16];
226 193x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
227 193x set_current_thread_name(name);
228
229 // Mark this thread as a worker of this pool so dispatch()
230 // can symmetric-transfer when called from within pool work.
231 struct scoped_pool
232 {
233 193x scoped_pool(impl const* p) noexcept { current_.set(p); }
234 193x ~scoped_pool() noexcept { current_.set(nullptr); }
235 193x } guard(this);
236
237 for(;;)
238 {
239 17817x continuation* c = nullptr;
240 {
241 17817x std::unique_lock<std::mutex> lock(mutex_);
242 17817x work_cv_.wait(lock, [this]{
243 47487x return !empty() ||
244 47487x stop_;
245 });
246 17817x if(stop_)
247 386x return;
248 17624x c = pop();
249 17817x }
250 17624x if(c)
251 17624x safe_resume(c->h);
252 17624x }
253 193x }
254 };
255
256 //------------------------------------------------------------------------------
257
258 166x thread_pool::
259 ~thread_pool()
260 {
261 166x impl_->stop();
262 166x impl_->join();
263 166x impl_->drain_abandoned();
264 166x shutdown();
265 166x destroy();
266 166x delete impl_;
267 166x }
268
269 166x thread_pool::
270 166x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
271 166x : impl_(new impl(num_threads, thread_name_prefix))
272 {
273 166x this->set_frame_allocator(std::allocator<void>{});
274 166x }
275
276 void
277 12x thread_pool::
278 join() noexcept
279 {
280 12x impl_->join();
281 12x }
282
283 void
284 2x thread_pool::
285 stop() noexcept
286 {
287 2x impl_->stop();
288 2x }
289
290 //------------------------------------------------------------------------------
291
292 thread_pool::executor_type
293 11580x thread_pool::
294 get_executor() const noexcept
295 {
296 11580x return executor_type(
297 11580x const_cast<thread_pool&>(*this));
298 }
299
300 void
301 347x thread_pool::executor_type::
302 on_work_started() const noexcept
303 {
304 347x pool_->impl_->on_work_started();
305 347x }
306
307 void
308 347x thread_pool::executor_type::
309 on_work_finished() const noexcept
310 {
311 347x pool_->impl_->on_work_finished();
312 347x }
313
314 void
315 17489x thread_pool::executor_type::
316 post(continuation& c) const
317 {
318 17489x pool_->impl_->post(c);
319 17489x }
320
321 std::coroutine_handle<>
322 355x thread_pool::executor_type::
323 dispatch(continuation& c) const
324 {
325 355x if(pool_->impl_->running_in_this_thread())
326 13x return c.h;
327 342x pool_->impl_->post(c);
328 342x return std::noop_coroutine();
329 }
330
331 } // capy
332 } // boost
333