LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 95.6 % 114 109 5
Test Date: 2026-04-30 19:05:39 Functions: 92.0 % 25 23 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #include "src/ex/detail/strand_impl.hpp"
      11                 : #include <boost/capy/ex/detail/strand_service.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <coroutine>
      14                 : #include <memory>
      15                 : #include <utility>
      16                 : 
      17                 : namespace boost {
      18                 : namespace capy {
      19                 : namespace detail {
      20                 : 
      21                 : // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
      22                 : // in-flight invokers from repopulating a freed cache slot.
      23                 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
      24                 : 
      25                 : /** Concrete strand_service.
      26                 : 
      27                 :     Holds a shared mutex pool (193 entries), a linked list of live
      28                 :     impls (for shutdown traversal), and a single-slot invoker
      29                 :     coroutine frame cache shared across all strands of this service.
      30                 : 
      31                 :     The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
      32                 :     public so the namespace-scope `make_invoker` coroutine and the
      33                 :     `strand_service` static methods can call them without friendship.
      34                 : */
      35                 : class strand_service_impl : public strand_service
      36                 : {
      37                 : public:
      38                 :     static constexpr std::size_t num_mutexes = 193;
      39                 : 
      40                 :     std::mutex mutex_;
      41                 :     std::size_t salt_ = 0;
      42                 :     std::shared_ptr<std::mutex> mutexes_[num_mutexes];
      43                 :     intrusive_list<strand_impl> impl_list_;
      44                 :     std::atomic<void*> invoker_frame_cache_{nullptr};
      45                 : 
      46                 :     explicit
      47 HIT          30 :     strand_service_impl(execution_context&)
      48              30 :     {
      49              30 :     }
      50                 : 
      51                 :     std::shared_ptr<strand_impl>
      52           11442 :     create_implementation() override
      53                 :     {
      54           11442 :         auto new_impl = std::make_shared<strand_impl>();
      55                 : 
      56           11442 :         std::lock_guard<std::mutex> lock(mutex_);
      57                 : 
      58           11442 :         std::size_t s = salt_++;
      59           11442 :         std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
      60           11442 :         idx += idx >> 3;
      61           11442 :         idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
      62           11442 :         idx %= num_mutexes;
      63           11442 :         if(!mutexes_[idx])
      64             663 :             mutexes_[idx] = std::make_shared<std::mutex>();
      65           11442 :         new_impl->mutex_ = mutexes_[idx].get();
      66                 : 
      67           11442 :         impl_list_.push_back(new_impl.get());
      68           11442 :         new_impl->service_.store(this, std::memory_order_release);
      69                 : 
      70           22884 :         return new_impl;
      71           11442 :     }
      72                 : 
      73                 :     static bool
      74           30340 :     enqueue(strand_impl& impl, std::coroutine_handle<> h)
      75                 :     {
      76           30340 :         std::lock_guard<std::mutex> lock(*impl.mutex_);
      77           30340 :         impl.pending_.push(h);
      78           30340 :         if(!impl.locked_)
      79                 :         {
      80           17026 :             impl.locked_ = true;
      81           17026 :             return true;
      82                 :         }
      83           13314 :         return false;
      84           30340 :     }
      85                 : 
      86                 :     static void
      87           17600 :     dispatch_pending(strand_impl& impl)
      88                 :     {
      89           17600 :         strand_queue::taken_batch batch;
      90                 :         {
      91           17600 :             std::lock_guard<std::mutex> lock(*impl.mutex_);
      92           17600 :             batch = impl.pending_.take_all();
      93           17600 :         }
      94           17600 :         impl.pending_.dispatch_batch(batch);
      95           17600 :     }
      96                 : 
      97                 :     static bool
      98           17600 :     try_unlock(strand_impl& impl)
      99                 :     {
     100           17600 :         std::lock_guard<std::mutex> lock(*impl.mutex_);
     101           17600 :         if(impl.pending_.empty())
     102                 :         {
     103           17026 :             impl.locked_ = false;
     104           17026 :             return true;
     105                 :         }
     106             574 :         return false;
     107           17600 :     }
     108                 : 
     109                 :     static void
     110           17600 :     set_dispatch_thread(strand_impl& impl) noexcept
     111                 :     {
     112           17600 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     113           17600 :     }
     114                 : 
     115                 :     static void
     116           17026 :     clear_dispatch_thread(strand_impl& impl) noexcept
     117                 :     {
     118           17026 :         impl.dispatch_thread_.store(std::thread::id{});
     119           17026 :     }
     120                 : 
     121                 :     // Defined below; needs strand_invoker complete.
     122                 :     static void
     123                 :     post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
     124                 : 
     125                 : protected:
     126                 :     void
     127              30 :     shutdown() override
     128                 :     {
     129              30 :         std::lock_guard<std::mutex> lock(mutex_);
     130              30 :         while(auto* p = impl_list_.pop_front())
     131                 :         {
     132 MIS           0 :             std::lock_guard<std::mutex> impl_lock(*p->mutex_);
     133               0 :             p->locked_ = true;
     134               0 :             p->service_.store(nullptr, std::memory_order_release);
     135               0 :         }
     136                 : 
     137 HIT          30 :         void* fp = invoker_frame_cache_.exchange(
     138                 :             kCacheClosed, std::memory_order_acq_rel);
     139              30 :         if(fp) ::operator delete(fp);
     140              30 :     }
     141                 : };
     142                 : 
     143                 : /** Invoker coroutine that drains a strand's pending queue.
     144                 : 
     145                 :     Runs once the strand transitions from unlocked to locked. Holds
     146                 :     the impl alive via the coroutine parameter (a shared_ptr in the
     147                 :     coroutine frame), so user code may drop its strand handle while
     148                 :     the invoker is mid-flight.
     149                 : 
     150                 :     The frame's allocator recycles a single per-service slot. The
     151                 :     trailer points at the service (lifetime: execution_context),
     152                 :     NOT the impl (lifetime: per-strand), so operator delete is
     153                 :     safe even after the impl has been destroyed.
     154                 : */
     155                 : struct strand_invoker
     156                 : {
     157                 :     struct promise_type
     158                 :     {
     159                 :         // Stored in the coroutine frame so its address is stable for
     160                 :         // posting to the inner executor.
     161                 :         continuation self_;
     162                 : 
     163                 :         void*
     164           17026 :         operator new(
     165                 :             std::size_t n,
     166                 :             std::shared_ptr<strand_impl> const& impl)
     167                 :         {
     168           17026 :             auto* svc = impl->service_.load(std::memory_order_acquire);
     169           17026 :             constexpr auto A = alignof(strand_service_impl*);
     170           17026 :             std::size_t padded = (n + A - 1) & ~(A - 1);
     171           17026 :             std::size_t total = padded + sizeof(strand_service_impl*);
     172                 : 
     173           17026 :             void* p = svc->invoker_frame_cache_.exchange(
     174                 :                 nullptr, std::memory_order_acquire);
     175           17026 :             if(!p || p == kCacheClosed)
     176            7674 :                 p = ::operator new(total);
     177                 : 
     178           17026 :             *reinterpret_cast<strand_service_impl**>(
     179           17026 :                 static_cast<char*>(p) + padded) = svc;
     180           17026 :             return p;
     181                 :         }
     182                 : 
     183                 :         void
     184           17026 :         operator delete(void* p, std::size_t n) noexcept
     185                 :         {
     186           17026 :             constexpr auto A = alignof(strand_service_impl*);
     187           17026 :             std::size_t padded = (n + A - 1) & ~(A - 1);
     188           17026 :             auto* svc = *reinterpret_cast<strand_service_impl**>(
     189                 :                 static_cast<char*>(p) + padded);
     190                 : 
     191           17026 :             void* expected = nullptr;
     192           17026 :             if(!svc->invoker_frame_cache_.compare_exchange_strong(
     193                 :                     expected, p, std::memory_order_release))
     194            7657 :                 ::operator delete(p);
     195           17026 :         }
     196                 : 
     197                 :         strand_invoker
     198           17026 :         get_return_object() noexcept
     199                 :         {
     200           17026 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
     201                 :         }
     202                 : 
     203           17026 :         std::suspend_always initial_suspend() noexcept { return {}; }
     204           17026 :         std::suspend_never  final_suspend()   noexcept { return {}; }
     205           17026 :         void return_void() noexcept {}
     206 MIS           0 :         void unhandled_exception() { std::terminate(); }
     207                 :     };
     208                 : 
     209                 :     std::coroutine_handle<promise_type> h_;
     210                 : };
     211                 : 
     212                 : // The by-value parameter lives in the coroutine frame for the
     213                 : // invoker's lifetime, keeping the impl alive past any user-side
     214                 : // strand drop.
     215                 : static
     216                 : strand_invoker
     217 HIT       17026 : make_invoker(std::shared_ptr<strand_impl> impl)
     218                 : {
     219                 :     auto* p = impl.get();
     220                 :     for(;;)
     221                 :     {
     222                 :         strand_service_impl::set_dispatch_thread(*p);
     223                 :         strand_service_impl::dispatch_pending(*p);
     224                 :         if(strand_service_impl::try_unlock(*p))
     225                 :         {
     226                 :             strand_service_impl::clear_dispatch_thread(*p);
     227                 :             co_return;
     228                 :         }
     229                 :     }
     230           34052 : }
     231                 : 
     232                 : void
     233           17026 : strand_service_impl::post_invoker(
     234                 :     std::shared_ptr<strand_impl> impl,
     235                 :     executor_ref ex)
     236                 : {
     237           17026 :     auto invoker = make_invoker(std::move(impl));
     238           17026 :     auto& self = invoker.h_.promise().self_;
     239           17026 :     self.h = invoker.h_;
     240           17026 :     ex.post(self);
     241           17026 : }
     242                 : 
     243           22884 : strand_impl::~strand_impl()
     244                 : {
     245           11442 :     auto* svc = service_.load(std::memory_order_acquire);
     246           11442 :     if(!svc) return;
     247           11442 :     std::lock_guard<std::mutex> lock(svc->mutex_);
     248           11442 :     svc->impl_list_.remove(this);
     249           11442 : }
     250                 : 
     251              30 : strand_service::
     252              30 : strand_service()
     253              30 :     : service()
     254                 : {
     255              30 : }
     256                 : 
     257              30 : strand_service::
     258                 : ~strand_service() = default;
     259                 : 
     260                 : bool
     261              12 : strand_service::
     262                 : running_in_this_thread(strand_impl& impl) noexcept
     263                 : {
     264              12 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     265                 : }
     266                 : 
     267                 : std::coroutine_handle<>
     268               8 : strand_service::
     269                 : dispatch(
     270                 :     std::shared_ptr<strand_impl> const& impl,
     271                 :     executor_ref ex,
     272                 :     std::coroutine_handle<> h)
     273                 : {
     274               8 :     if(running_in_this_thread(*impl))
     275               3 :         return h;
     276                 : 
     277               5 :     if(strand_service_impl::enqueue(*impl, h))
     278               5 :         strand_service_impl::post_invoker(impl, ex);
     279               5 :     return std::noop_coroutine();
     280                 : }
     281                 : 
     282                 : void
     283           30335 : strand_service::
     284                 : post(
     285                 :     std::shared_ptr<strand_impl> const& impl,
     286                 :     executor_ref ex,
     287                 :     std::coroutine_handle<> h)
     288                 : {
     289           30335 :     if(strand_service_impl::enqueue(*impl, h))
     290           17021 :         strand_service_impl::post_invoker(impl, ex);
     291           30335 : }
     292                 : 
     293                 : strand_service&
     294           11442 : get_strand_service(execution_context& ctx)
     295                 : {
     296           11442 :     return ctx.use_service<strand_service_impl>();
     297                 : }
     298                 : 
     299                 : } // namespace detail
     300                 : } // namespace capy
     301                 : } // namespace boost
        

Generated by: LCOV version 2.3