28template <
typename T>
class Futex {
34 Futex(std::atomic<T> &futex) : futex_(futex) {}
36 ~Futex() { notify_all_(-EIDRM); }
44 struct [[nodiscard]] WaitSender;
55 WaitSender
wait(T old)
noexcept {
return {*
this, old}; }
62 WaitFinishHandleBase *handle =
nullptr;
64 std::lock_guard<std::mutex> lock(mutex_);
65 handle = wait_awaiters_.pop_front();
68 handle->set_result(0);
80 class WaitFinishHandleBase;
81 template <
typename Receiver>
class WaitFinishHandle;
83 bool cancel_wait_(WaitFinishHandleBase *handle)
noexcept {
84 std::lock_guard<std::mutex> lock(mutex_);
85 return wait_awaiters_.remove(handle);
88 int32_t request_wait_(WaitFinishHandleBase *handle, T old)
noexcept {
89 std::lock_guard<std::mutex> lock(mutex_);
90 auto val = futex_.load(std::memory_order_relaxed);
94 wait_awaiters_.push_back(handle);
95 detail::Context::current().runtime()->pend_work();
100 void notify_all_(int32_t result)
noexcept {
103 std::lock_guard<std::mutex> lock(mutex_);
104 handles = std::move(wait_awaiters_);
106 while (
auto *handle = handles.pop_front()) {
107 handle->set_result(result);
113 using HandleList = IntrusiveDoubleList<WaitFinishHandleBase,
114 &WaitFinishHandleBase::link_entry_>;
116 mutable std::mutex mutex_;
117 HandleList wait_awaiters_;
118 std::atomic<T> &futex_;
122class Futex<T>::WaitFinishHandleBase :
public WorkInvoker {
124 void schedule() noexcept {
125 assert(runtime_ !=
nullptr);
126 runtime_->schedule(
this);
129 void set_result(int32_t result)
noexcept { result_ = result; }
132 DoubleLinkEntry link_entry_;
135 Runtime *runtime_ =
nullptr;
136 int32_t result_ = -ENOTRECOVERABLE;
140template <
typename Receiver>
141class Futex<T>::WaitFinishHandle
142 :
public InvokerAdapter<WaitFinishHandle<Receiver>, WaitFinishHandleBase> {
144 using Base = InvokerAdapter<WaitFinishHandle, WaitFinishHandleBase>;
146 WaitFinishHandle(
Futex &futex, Receiver receiver)
147 : futex_(futex), receiver_(std::move(receiver)) {}
149 void start(Runtime *runtime, T old)
noexcept {
150 this->runtime_ = runtime;
151 int32_t r = futex_.request_wait_(
this, old);
153 std::move(receiver_)(r);
157 auto stop_token = receiver_.get_stop_token();
158 if (stop_token.stop_possible()) {
159 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
163 void invoke() noexcept {
164 stop_callback_.reset();
165 assert(this->runtime_ !=
nullptr);
166 this->runtime_->resume_work();
167 std::move(receiver_)(this->result_);
171 void cancel_() noexcept {
172 if (futex_.cancel_wait_(
this)) {
174 this->result_ = -ECANCELED;
175 assert(this->runtime_ !=
nullptr);
176 this->runtime_->schedule(
this);
180 struct Cancellation {
181 WaitFinishHandle *self;
182 void operator()() noexcept { self->cancel_(); }
185 using StopCallbackType =
186 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
191 std::optional<StopCallbackType> stop_callback_;
194template <
typename T>
struct Futex<T>::WaitSender {
196 using ReturnType = int32_t;
198 WaitSender(
Futex &futex, T old) : futex_(futex), old_(old) {}
200 template <
typename Receiver>
auto connect(Receiver receiver)
noexcept {
201 return OperationState<Receiver>(futex_, old_, std::move(receiver));
205 template <
typename Receiver>
206 class OperationState :
public WaitFinishHandle<Receiver> {
208 using Base = WaitFinishHandle<Receiver>;
209 OperationState(Futex &futex, T old, Receiver receiver)
210 : Base(futex, std::move(receiver)), old_(old) {}
212 void start(
unsigned int )
noexcept {
213 auto *runtime = detail::Context::current().runtime();
214 Base::start(runtime, old_);
User-space "futex" implementation for efficient synchronization between coroutines.
WaitSender wait(T old) noexcept
Wait if the futex value equals to the specified old value. The awaiting coroutine will be suspended u...
Futex(std::atomic< T > &futex)
Construct a new Futex object.
void notify_all() noexcept
Notify all awaiting coroutines.
void notify_one() noexcept
Notify one awaiting coroutine, if any.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Runtime type for running the io_uring event loop.