Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
futex.hpp
Go to the documentation of this file.
1
6
7#pragma once
8
9#include "condy/intrusive.hpp"
10#include "condy/invoker.hpp"
11#include "condy/runtime.hpp"
12#include "condy/type_traits.hpp"
13#include <atomic>
14#include <cerrno>
15#include <optional>
16
17namespace condy {
18
28template <typename T> class Futex {
29public:
34 Futex(std::atomic<T> &futex) : futex_(futex) {}
35
36 ~Futex() { notify_all_(-EIDRM); }
37
38 Futex(const Futex &) = delete;
39 Futex &operator=(const Futex &) = delete;
40 Futex(Futex &&) = delete;
41 Futex &operator=(Futex &&) = delete;
42
43public:
44 struct [[nodiscard]] WaitSender;
55 WaitSender wait(T old) noexcept { return {*this, old}; }
56
61 void notify_one() noexcept {
62 WaitFinishHandleBase *handle = nullptr;
63 {
64 std::lock_guard<std::mutex> lock(mutex_);
65 handle = wait_awaiters_.pop_front();
66 }
67 if (handle) {
68 handle->set_result(0);
69 handle->schedule();
70 }
71 }
72
77 void notify_all() noexcept { notify_all_(0); }
78
79private:
80 class WaitFinishHandleBase;
81 template <typename Receiver> class WaitFinishHandle;
82
83 bool cancel_wait_(WaitFinishHandleBase *handle) noexcept {
84 std::lock_guard<std::mutex> lock(mutex_);
85 return wait_awaiters_.remove(handle);
86 }
87
88 int32_t request_wait_(WaitFinishHandleBase *handle, T old) noexcept {
89 std::lock_guard<std::mutex> lock(mutex_);
90 auto val = futex_.load(std::memory_order_relaxed);
91 if (val != old) {
92 return 0; // No need to wait
93 }
94 wait_awaiters_.push_back(handle);
95 detail::Context::current().runtime()->pend_work();
96 return -EAGAIN; // Need to wait
97 }
98
99private:
100 void notify_all_(int32_t result) noexcept {
101 HandleList handles;
102 {
103 std::lock_guard<std::mutex> lock(mutex_);
104 handles = std::move(wait_awaiters_);
105 }
106 while (auto *handle = handles.pop_front()) {
107 handle->set_result(result);
108 handle->schedule();
109 }
110 }
111
112private:
113 using HandleList = IntrusiveDoubleList<WaitFinishHandleBase,
114 &WaitFinishHandleBase::link_entry_>;
115
116 mutable std::mutex mutex_;
117 HandleList wait_awaiters_;
118 std::atomic<T> &futex_;
119};
120
121template <typename T>
122class Futex<T>::WaitFinishHandleBase : public WorkInvoker {
123public:
124 void schedule() noexcept {
125 assert(runtime_ != nullptr);
126 runtime_->schedule(this);
127 }
128
129 void set_result(int32_t result) noexcept { result_ = result; }
130
131public:
132 DoubleLinkEntry link_entry_;
133
134protected:
135 Runtime *runtime_ = nullptr;
136 int32_t result_ = -ENOTRECOVERABLE; // Internal error if not set
137};
138
139template <typename T>
140template <typename Receiver>
141class Futex<T>::WaitFinishHandle
142 : public InvokerAdapter<WaitFinishHandle<Receiver>, WaitFinishHandleBase> {
143public:
144 using Base = InvokerAdapter<WaitFinishHandle, WaitFinishHandleBase>;
145
146 WaitFinishHandle(Futex &futex, Receiver receiver)
147 : futex_(futex), receiver_(std::move(receiver)) {}
148
149 void start(Runtime *runtime, T old) noexcept {
150 this->runtime_ = runtime;
151 int32_t r = futex_.request_wait_(this, old);
152 if (r != -EAGAIN) {
153 std::move(receiver_)(r);
154 return;
155 }
156
157 auto stop_token = receiver_.get_stop_token();
158 if (stop_token.stop_possible()) {
159 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
160 }
161 }
162
163 void invoke() noexcept {
164 stop_callback_.reset();
165 assert(this->runtime_ != nullptr);
166 this->runtime_->resume_work();
167 std::move(receiver_)(this->result_);
168 }
169
170private:
171 void cancel_() noexcept {
172 if (futex_.cancel_wait_(this)) {
173 // Successfully canceled
174 this->result_ = -ECANCELED;
175 assert(this->runtime_ != nullptr);
176 this->runtime_->schedule(this);
177 }
178 }
179
180 struct Cancellation {
181 WaitFinishHandle *self;
182 void operator()() noexcept { self->cancel_(); }
183 };
184
185 using StopCallbackType =
186 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
187
188private:
189 Futex &futex_;
190 Receiver receiver_;
191 std::optional<StopCallbackType> stop_callback_;
192};
193
194template <typename T> struct Futex<T>::WaitSender {
195public:
196 using ReturnType = int32_t;
197
198 WaitSender(Futex &futex, T old) : futex_(futex), old_(old) {}
199
200 template <typename Receiver> auto connect(Receiver receiver) noexcept {
201 return OperationState<Receiver>(futex_, old_, std::move(receiver));
202 }
203
204private:
205 template <typename Receiver>
206 class OperationState : public WaitFinishHandle<Receiver> {
207 public:
208 using Base = WaitFinishHandle<Receiver>;
209 OperationState(Futex &futex, T old, Receiver receiver)
210 : Base(futex, std::move(receiver)), old_(old) {}
211
212 void start(unsigned int /*flags*/) noexcept {
213 auto *runtime = detail::Context::current().runtime();
214 Base::start(runtime, old_);
215 }
216
217 private:
218 T old_;
219 };
220
221private:
222 Futex &futex_;
223 T old_;
224};
225
226} // namespace condy
User-space "futex" implementation for efficient synchronization between coroutines.
Definition futex.hpp:28
WaitSender wait(T old) noexcept
Wait if the futex value equals to the specified old value. The awaiting coroutine will be suspended u...
Definition futex.hpp:55
Futex(std::atomic< T > &futex)
Construct a new Futex object.
Definition futex.hpp:34
void notify_all() noexcept
Notify all awaiting coroutines.
Definition futex.hpp:77
void notify_one() noexcept
Notify one awaiting coroutine, if any.
Definition futex.hpp:61
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:31
Runtime type for running the io_uring event loop.