OSSIA
Open Scenario System for Interactive Application
jthread.hpp
1 #pragma once
2 #include <version>
3 
4 #if __has_include(<stop_token>)
5  #if __cpp_lib_jthread >= 201911L
6  #define OSSIA_HAS_STD_JTHREAD 1
7 
8  #elif ((_LIBCPP_VERSION >= 18100) && (_LIBCPP_VERSION < 99999)) || (_LIBCPP_VERSION >= 180100)
9  #if defined(_LIBCPP_HAS_NO_EXPERIMENTAL_STOP_TOKEN)
10  #error Rebuild with -fexperimental-library, clang 18 ships headers which are incompatible with this file but hides half of them behind that flag
11  #else
12  #define OSSIA_HAS_STD_JTHREAD 1
13  #endif
14 
15  #endif
16 #endif
17 
18 #if OSSIA_HAS_STD_JTHREAD
19 #include <stop_token>
20 #include <thread>
21 #else
22 #include <ossia/detail/audio_spin_mutex.hpp>
23 
24 // Polyfill until libc++ gets jthread.
25 // License: CC-4.0
26 // https://github.com/StirlingLabs/jthread/blob/main/LICENSE
27 
28 // <stop_token> header
29 
30 #include <atomic>
31 #include <thread>
32 #include <type_traits>
33 #include <utility>
34 #ifdef SAFE
35 #include <iostream>
36 #endif
37 
38 namespace std
39 {
40 //-----------------------------------------------
41 // internal types for shared stop state
42 //-----------------------------------------------
43 
44 struct __stop_callback_base
45 {
46  void (*__callback_)(__stop_callback_base*) = nullptr;
47 
48  __stop_callback_base* __next_ = nullptr;
49  __stop_callback_base** __prev_ = nullptr;
50  bool* __isRemoved_ = nullptr;
51  std::atomic<bool> __callbackFinishedExecuting_{false};
52 
53  void __execute() noexcept { __callback_(this); }
54 
55 protected:
56  // it shall only by us who deletes this
57  // (workaround for virtual __execute() and destructor)
58  ~__stop_callback_base() = default;
59 };
60 
61 struct __stop_state
62 {
63 public:
64  void __add_token_reference() noexcept
65  {
66  __state_.fetch_add(__token_ref_increment, std::memory_order_relaxed);
67  }
68 
69  void __remove_token_reference() noexcept
70  {
71  auto __oldState
72  = __state_.fetch_sub(__token_ref_increment, std::memory_order_acq_rel);
73  if(__oldState < (__token_ref_increment + __token_ref_increment))
74  {
75  delete this;
76  }
77  }
78 
79  void __add_source_reference() noexcept
80  {
81  __state_.fetch_add(__source_ref_increment, std::memory_order_relaxed);
82  }
83 
84  void __remove_source_reference() noexcept
85  {
86  auto __oldState
87  = __state_.fetch_sub(__source_ref_increment, std::memory_order_acq_rel);
88  if(__oldState < (__token_ref_increment + __source_ref_increment))
89  {
90  delete this;
91  }
92  }
93 
94  bool __request_stop() noexcept
95  {
96 
97  if(!__try_lock_and_signal_until_signalled())
98  {
99  // Stop has already been requested.
100  return false;
101  }
102 
103  // Set the 'stop_requested' signal and acquired the lock.
104 
105  __signallingThread_ = std::this_thread::get_id();
106 
107  while(__head_ != nullptr)
108  {
109  // Dequeue the head of the queue
110  auto* __cb = __head_;
111  __head_ = __cb->__next_;
112  const bool anyMore = __head_ != nullptr;
113  if(anyMore)
114  {
115  __head_->__prev_ = &__head_;
116  }
117  // Mark this item as removed from the list.
118  __cb->__prev_ = nullptr;
119 
120  // Don't hold lock while executing callback
121  // so we don't block other threads from deregistering callbacks.
122  __unlock();
123 
124  // TRICKY: Need to store a flag on the stack here that the callback
125  // can use to signal that the destructor was executed inline
126  // during the call. If the destructor was executed inline then
127  // it's not safe to dereference __cb after __execute() returns.
128  // If the destructor runs on some other thread then the other
129  // thread will block waiting for this thread to signal that the
130  // callback has finished executing.
131  bool __isRemoved = false;
132  __cb->__isRemoved_ = &__isRemoved;
133 
134  __cb->__execute();
135 
136  if(!__isRemoved)
137  {
138  __cb->__isRemoved_ = nullptr;
139  __cb->__callbackFinishedExecuting_.store(true, std::memory_order_release);
140  }
141 
142  if(!anyMore)
143  {
144  // This was the last item in the queue when we dequeued it.
145  // No more items should be added to the queue after we have
146  // marked the state as interrupted, only removed from the queue.
147  // Avoid acquring/releasing the lock in this case.
148  return true;
149  }
150 
151  __lock();
152  }
153 
154  __unlock();
155 
156  return true;
157  }
158 
159  bool __is_stop_requested() noexcept
160  {
161  return __is_stop_requested(__state_.load(std::memory_order_acquire));
162  }
163 
164  bool __is_stop_requestable() noexcept
165  {
166  return __is_stop_requestable(__state_.load(std::memory_order_acquire));
167  }
168 
169  bool __try_add_callback(
170  __stop_callback_base* __cb, bool __incrementRefCountIfSuccessful) noexcept
171  {
172  std::uint64_t __oldState;
173  goto __load_state;
174  do
175  {
176  goto __check_state;
177  do
178  {
179  ossia_rwlock_pause();
180  __load_state:
181  __oldState = __state_.load(std::memory_order_acquire);
182  __check_state:
183  if(__is_stop_requested(__oldState))
184  {
185  __cb->__execute();
186  return false;
187  }
188  else if(!__is_stop_requestable(__oldState))
189  {
190  return false;
191  }
192  } while(__is_locked(__oldState));
193  } while(!__state_.compare_exchange_weak(
194  __oldState, __oldState | __locked_flag, std::memory_order_acquire));
195 
196  // Push callback onto callback list.
197  __cb->__next_ = __head_;
198  if(__cb->__next_ != nullptr)
199  {
200  __cb->__next_->__prev_ = &__cb->__next_;
201  }
202  __cb->__prev_ = &__head_;
203  __head_ = __cb;
204 
205  if(__incrementRefCountIfSuccessful)
206  {
207  __unlock_and_increment_token_ref_count();
208  }
209  else
210  {
211  __unlock();
212  }
213 
214  // Successfully added the callback.
215  return true;
216  }
217 
218  void __remove_callback(__stop_callback_base* __cb) noexcept
219  {
220  __lock();
221 
222  if(__cb->__prev_ != nullptr)
223  {
224  // Still registered, not yet executed
225  // Just remove from the list.
226  *__cb->__prev_ = __cb->__next_;
227  if(__cb->__next_ != nullptr)
228  {
229  __cb->__next_->__prev_ = __cb->__prev_;
230  }
231 
232  __unlock_and_decrement_token_ref_count();
233 
234  return;
235  }
236 
237  __unlock();
238 
239  // Callback has either already executed or is executing
240  // concurrently on another thread.
241 
242  if(__signallingThread_ == std::this_thread::get_id())
243  {
244  // Callback executed on this thread or is still currently executing
245  // and is deregistering itself from within the callback.
246  if(__cb->__isRemoved_ != nullptr)
247  {
248  // Currently inside the callback, let the __request_stop() method
249  // know the object is about to be destructed and that it should
250  // not try to access the object when the callback returns.
251  *__cb->__isRemoved_ = true;
252  }
253  }
254  else
255  {
256  // Callback is currently executing on another thread,
257  // block until it finishes executing.
258  while(!__cb->__callbackFinishedExecuting_.load(std::memory_order_acquire))
259  {
260  ossia_rwlock_pause();
261  }
262  }
263 
264  __remove_token_reference();
265  }
266 
267 private:
268  static bool __is_locked(std::uint64_t __state) noexcept
269  {
270  return (__state & __locked_flag) != 0;
271  }
272 
273  static bool __is_stop_requested(std::uint64_t __state) noexcept
274  {
275  return (__state & __stop_requested_flag) != 0;
276  }
277 
278  static bool __is_stop_requestable(std::uint64_t __state) noexcept
279  {
280  // Interruptible if it has already been interrupted or if there are
281  // still interrupt_source instances in existence.
282  return __is_stop_requested(__state) || (__state >= __source_ref_increment);
283  }
284 
285  bool __try_lock_and_signal_until_signalled() noexcept
286  {
287  std::uint64_t __oldState = __state_.load(std::memory_order_acquire);
288  do
289  {
290  if(__is_stop_requested(__oldState))
291  return false;
292  while(__is_locked(__oldState))
293  {
294  ossia_rwlock_pause();
295  __oldState = __state_.load(std::memory_order_acquire);
296  if(__is_stop_requested(__oldState))
297  return false;
298  }
299  } while(!__state_.compare_exchange_weak(
300  __oldState, __oldState | __stop_requested_flag | __locked_flag,
301  std::memory_order_acq_rel, std::memory_order_acquire));
302  return true;
303  }
304 
305  void __lock() noexcept
306  {
307  auto __oldState = __state_.load(std::memory_order_relaxed);
308  do
309  {
310  while(__is_locked(__oldState))
311  {
312  ossia_rwlock_pause();
313  __oldState = __state_.load(std::memory_order_relaxed);
314  }
315  } while(!__state_.compare_exchange_weak(
316  __oldState, __oldState | __locked_flag, std::memory_order_acquire,
317  std::memory_order_relaxed));
318  }
319 
320  void __unlock() noexcept
321  {
322  __state_.fetch_sub(__locked_flag, std::memory_order_release);
323  }
324 
325  void __unlock_and_increment_token_ref_count() noexcept
326  {
327  __state_.fetch_sub(__locked_flag - __token_ref_increment, std::memory_order_release);
328  }
329 
330  void __unlock_and_decrement_token_ref_count() noexcept
331  {
332  auto __oldState = __state_.fetch_sub(
333  __locked_flag + __token_ref_increment, std::memory_order_acq_rel);
334  // Check if new state is less than __token_ref_increment which would
335  // indicate that this was the last reference.
336  if(__oldState < (__locked_flag + __token_ref_increment + __token_ref_increment))
337  {
338  delete this;
339  }
340  }
341 
342  static constexpr std::uint64_t __stop_requested_flag = 1u;
343  static constexpr std::uint64_t __locked_flag = 2u;
344  static constexpr std::uint64_t __token_ref_increment = 4u;
345  static constexpr std::uint64_t __source_ref_increment = static_cast<std::uint64_t>(1u)
346  << 33u;
347 
348  // bit 0 - stop-requested
349  // bit 1 - locked
350  // bits 2-32 - token ref count (31 bits)
351  // bits 33-63 - source ref count (31 bits)
352  std::atomic<std::uint64_t> __state_{__source_ref_increment};
353  __stop_callback_base* __head_ = nullptr;
354  std::thread::id __signallingThread_{};
355 };
356 
357 //-----------------------------------------------
358 // forward declarations
359 //-----------------------------------------------
360 
361 class stop_source;
362 template <typename _Callback>
363 class stop_callback;
364 
365 // std::nostopstate
366 // - to initialize a stop_source without shared stop state
367 struct nostopstate_t
368 {
369  explicit nostopstate_t() = default;
370 };
371 inline constexpr nostopstate_t nostopstate{};
372 
373 //-----------------------------------------------
374 // stop_token
375 //-----------------------------------------------
376 
377 class stop_token
378 {
379 public:
380  // construct:
381  // - TODO: explicit?
382  stop_token() noexcept
383  : __state_(nullptr)
384  {
385  }
386 
387  // copy/move/assign/destroy:
388  stop_token(const stop_token& __it) noexcept
389  : __state_(__it.__state_)
390  {
391  if(__state_ != nullptr)
392  {
393  __state_->__add_token_reference();
394  }
395  }
396 
397  stop_token(stop_token&& __it) noexcept
398  : __state_(std::exchange(__it.__state_, nullptr))
399  {
400  }
401 
402  ~stop_token()
403  {
404  if(__state_ != nullptr)
405  {
406  __state_->__remove_token_reference();
407  }
408  }
409 
410  stop_token& operator=(const stop_token& __it) noexcept
411  {
412  if(__state_ != __it.__state_)
413  {
414  stop_token __tmp{__it};
415  swap(__tmp);
416  }
417  return *this;
418  }
419 
420  stop_token& operator=(stop_token&& __it) noexcept
421  {
422  stop_token __tmp{std::move(__it)};
423  swap(__tmp);
424  return *this;
425  }
426 
427  void swap(stop_token& __it) noexcept { std::swap(__state_, __it.__state_); }
428 
429  // stop handling:
430  [[nodiscard]] bool stop_requested() const noexcept
431  {
432  return __state_ != nullptr && __state_->__is_stop_requested();
433  }
434 
435  [[nodiscard]] bool stop_possible() const noexcept
436  {
437  return __state_ != nullptr && __state_->__is_stop_requestable();
438  }
439 
440  [[nodiscard]] friend bool
441  operator==(const stop_token& __a, const stop_token& __b) noexcept
442  {
443  return __a.__state_ == __b.__state_;
444  }
445  [[nodiscard]] friend bool
446  operator!=(const stop_token& __a, const stop_token& __b) noexcept
447  {
448  return __a.__state_ != __b.__state_;
449  }
450 
451 private:
452  friend class stop_source;
453  template <typename _Callback>
454  friend class stop_callback;
455 
456  explicit stop_token(__stop_state* __state) noexcept
457  : __state_(__state)
458  {
459  if(__state_ != nullptr)
460  {
461  __state_->__add_token_reference();
462  }
463  }
464 
465  __stop_state* __state_;
466 };
467 
468 //-----------------------------------------------
469 // stop_source
470 //-----------------------------------------------
471 
472 class stop_source
473 {
474 public:
475  stop_source()
476  : __state_(new __stop_state())
477  {
478  }
479 
480  explicit stop_source(nostopstate_t) noexcept
481  : __state_(nullptr)
482  {
483  }
484 
485  ~stop_source()
486  {
487  if(__state_ != nullptr)
488  {
489  __state_->__remove_source_reference();
490  }
491  }
492 
493  stop_source(const stop_source& __other) noexcept
494  : __state_(__other.__state_)
495  {
496  if(__state_ != nullptr)
497  {
498  __state_->__add_source_reference();
499  }
500  }
501 
502  stop_source(stop_source&& __other) noexcept
503  : __state_(std::exchange(__other.__state_, nullptr))
504  {
505  }
506 
507  stop_source& operator=(stop_source&& __other) noexcept
508  {
509  stop_source __tmp{std::move(__other)};
510  swap(__tmp);
511  return *this;
512  }
513 
514  stop_source& operator=(const stop_source& __other) noexcept
515  {
516  if(__state_ != __other.__state_)
517  {
518  stop_source __tmp{__other};
519  swap(__tmp);
520  }
521  return *this;
522  }
523 
524  [[nodiscard]] bool stop_requested() const noexcept
525  {
526  return __state_ != nullptr && __state_->__is_stop_requested();
527  }
528 
529  [[nodiscard]] bool stop_possible() const noexcept { return __state_ != nullptr; }
530 
531  bool request_stop() noexcept
532  {
533  if(__state_ != nullptr)
534  {
535  return __state_->__request_stop();
536  }
537  return false;
538  }
539 
540  [[nodiscard]] stop_token get_token() const noexcept { return stop_token{__state_}; }
541 
542  void swap(stop_source& __other) noexcept { std::swap(__state_, __other.__state_); }
543 
544  [[nodiscard]] friend bool
545  operator==(const stop_source& __a, const stop_source& __b) noexcept
546  {
547  return __a.__state_ == __b.__state_;
548  }
549  [[nodiscard]] friend bool
550  operator!=(const stop_source& __a, const stop_source& __b) noexcept
551  {
552  return __a.__state_ != __b.__state_;
553  }
554 
555 private:
556  __stop_state* __state_;
557 };
558 
559 //-----------------------------------------------
560 // stop_callback
561 //-----------------------------------------------
562 
563 template <typename _Callback>
564 // requires Destructible<_Callback> && Invocable<_Callback>
565 class [[nodiscard]] stop_callback : private __stop_callback_base
566 {
567 public:
568  using callback_type = _Callback;
569 
570  template <
571  typename _CB, std::enable_if_t<std::is_constructible_v<_Callback, _CB>, int> = 0>
572  // requires Constructible<Callback, C>
573  explicit stop_callback(const stop_token& __token, _CB&& __cb) noexcept(
574  std::is_nothrow_constructible_v<_Callback, _CB>)
575  : __stop_callback_base{[](__stop_callback_base* __that) noexcept {
576  static_cast<stop_callback*>(__that)->__execute();
577  }}
578  , __state_(nullptr)
579  , __cb_(static_cast<_CB&&>(__cb))
580  {
581  if(__token.__state_ != nullptr && __token.__state_->__try_add_callback(this, true))
582  {
583  __state_ = __token.__state_;
584  }
585  }
586 
587  template <
588  typename _CB, std::enable_if_t<std::is_constructible_v<_Callback, _CB>, int> = 0>
589  // requires Constructible<Callback, C>
590  explicit stop_callback(stop_token&& __token, _CB&& __cb) noexcept(
591  std::is_nothrow_constructible_v<_Callback, _CB>)
592  : __stop_callback_base{[](__stop_callback_base* __that) noexcept {
593  static_cast<stop_callback*>(__that)->__execute();
594  }}
595  , __state_(nullptr)
596  , __cb_(static_cast<_CB&&>(__cb))
597  {
598  if(__token.__state_ != nullptr && __token.__state_->__try_add_callback(this, false))
599  {
600  __state_ = std::exchange(__token.__state_, nullptr);
601  }
602  }
603 
604  ~stop_callback()
605  {
606 #ifdef SAFE
607  if(__inExecute_.load())
608  {
609  std::cerr << "*** OOPS: ~stop_callback() while callback executed\n";
610  }
611 #endif
612  if(__state_ != nullptr)
613  {
614  __state_->__remove_callback(this);
615  }
616  }
617 
618  stop_callback& operator=(const stop_callback&) = delete;
619  stop_callback& operator=(stop_callback&&) = delete;
620  stop_callback(const stop_callback&) = delete;
621  stop_callback(stop_callback&&) = delete;
622 
623 private:
624  void __execute() noexcept
625  {
626  // Executed in a noexcept context
627  // If it throws then we call std::terminate().
628 #ifdef SAFE
629  __inExecute_.store(true);
630  __cb_();
631  __inExecute_.store(false);
632 #else
633  __cb_();
634 #endif
635  }
636 
637  __stop_state* __state_;
638  _Callback __cb_;
639 #ifdef SAFE
640  std::atomic<bool> __inExecute_{false};
641 #endif
642 };
643 
644 template <typename _Callback>
645 stop_callback(stop_token, _Callback) -> stop_callback<_Callback>;
646 
647 } // namespace std
648 
649 #include <functional> // for invoke()
650 #include <future>
651 #include <iostream> // for debugging output
652 #include <thread>
653 #include <type_traits>
654 
655 namespace std
656 {
657 
658 //*****************************************
659 //* class jthread
660 //* - joining std::thread with signaling stop/end support
661 //*****************************************
662 class jthread
663 {
664 public:
665  //*****************************************
666  //* standardized API:
667  //*****************************************
668  // - cover full API of std::thread
669  // to be able to switch from std::thread to std::jthread
670 
671  // types are those from std::thread:
672  using id = ::std::thread::id;
673  using native_handle_type = ::std::thread::native_handle_type;
674 
675  // construct/copy/destroy:
676  jthread() noexcept;
677  //template <typename F, typename... Args> explicit jthread(F&& f, Args&&... args);
678  // THE constructor that starts the thread:
679  // - NOTE: does SFINAE out copy constructor semantics
680  template <
681  typename Callable, typename... Args,
682  typename
683  = ::std::enable_if_t<!::std::is_same_v<::std::decay_t<Callable>, jthread>>>
684  explicit jthread(Callable&& cb, Args&&... args);
685  ~jthread();
686 
687  jthread(const jthread&) = delete;
688  jthread(jthread&&) noexcept = default;
689  jthread& operator=(const jthread&) = delete;
690  jthread& operator=(jthread&&) noexcept;
691 
692  // members:
693  void swap(jthread&) noexcept;
694  bool joinable() const noexcept;
695  void join();
696  void detach();
697 
698  id get_id() const noexcept;
699  native_handle_type native_handle();
700 
701  // static members:
702  static unsigned hardware_concurrency() noexcept
703  {
704  return ::std::thread::hardware_concurrency();
705  };
706 
707  //*****************************************
708  // - supplementary API:
709  // - for the calling thread:
710  [[nodiscard]] stop_source get_stop_source() noexcept;
711  [[nodiscard]] stop_token get_stop_token() const noexcept;
712  bool request_stop() noexcept { return get_stop_source().request_stop(); }
713 
714  //*****************************************
715  //* implementation:
716  //*****************************************
717 
718 private:
719  //*** API for the starting thread:
720  stop_source _stopSource; // stop_source for started thread
721  ::std::thread _thread{}; // started thread (if any)
722 };
723 
724 //**********************************************************************
725 
726 //*****************************************
727 //* implementation of class jthread
728 //*****************************************
729 
730 // default constructor:
731 inline jthread::jthread() noexcept
732  : _stopSource{nostopstate}
733 {
734 }
735 
736 // THE constructor that starts the thread:
737 // - NOTE: declaration does SFINAE out copy constructor semantics
738 template <typename Callable, typename... Args, typename>
739 inline jthread::jthread(Callable&& cb, Args&&... args)
740  : _stopSource{}
741  , // initialize stop_source
742  _thread{
743  [](stop_token st, auto&& cb, auto&&... args) { // called lambda in the thread
744  // perform tasks of the thread:
745  if constexpr(std::is_invocable_v<Callable, stop_token, Args...>)
746  {
747  // pass the stop_token as first argument to the started thread:
748  ::std::invoke(
749  ::std::forward<decltype(cb)>(cb), std::move(st),
750  ::std::forward<decltype(args)>(args)...);
751  }
752  else
753  {
754  // started thread does not expect a stop token:
755  ::std::invoke(
756  ::std::forward<decltype(cb)>(cb), ::std::forward<decltype(args)>(args)...);
757  }
758  },
759  _stopSource.get_token(), // not captured due to possible races if immediately set
760  ::std::forward<Callable>(cb), // pass callable
761  ::std::forward<Args>(args)... // pass arguments for callable
762 }
763 {
764 }
765 
766 // move assignment operator:
767 inline jthread& jthread::operator=(jthread&& t) noexcept
768 {
769  if(joinable())
770  { // if not joined/detached, signal stop and wait for end:
771  request_stop();
772  join();
773  }
774 
775  _thread = std::move(t._thread);
776  _stopSource = std::move(t._stopSource);
777  return *this;
778 }
779 
780 // destructor:
781 inline jthread::~jthread()
782 {
783  if(joinable())
784  { // if not joined/detached, signal stop and wait for end:
785  request_stop();
786  join();
787  }
788 }
789 
790 // others:
791 inline bool jthread::joinable() const noexcept
792 {
793  return _thread.joinable();
794 }
795 inline void jthread::join()
796 {
797  _thread.join();
798 }
799 inline void jthread::detach()
800 {
801  _thread.detach();
802 }
803 inline typename jthread::id jthread::get_id() const noexcept
804 {
805  return _thread.get_id();
806 }
807 inline typename jthread::native_handle_type jthread::native_handle()
808 {
809  return _thread.native_handle();
810 }
811 
812 inline stop_source jthread::get_stop_source() noexcept
813 {
814  return _stopSource;
815 }
816 inline stop_token jthread::get_stop_token() const noexcept
817 {
818  return _stopSource.get_token();
819 }
820 
821 inline void jthread::swap(jthread& t) noexcept
822 {
823  std::swap(_stopSource, t._stopSource);
824  std::swap(_thread, t._thread);
825 }
826 
827 } // std
828 
829 #endif