Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
coro.inl
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/coro.hpp"
9#include "condy/invoker.hpp"
11#include "condy/utils.hpp"
12#include <atomic>
13#include <coroutine>
14#include <exception>
15#include <new>
16#include <optional>
17
18namespace condy {
19
20template <typename...> struct always_false {
21 static constexpr bool value = false;
22};
23
24template <typename Allocator, typename... Args>
25struct first_is_not_allocator : public std::true_type {};
26
27template <typename Allocator, typename Arg, typename... Args>
28struct first_is_not_allocator<Allocator, Arg, Args...> {
29 static constexpr bool value =
30 !std::is_same_v<std::remove_cvref_t<Arg>, Allocator>;
31};
32
33template <typename Promise, typename Allocator>
34class BindAllocator : public Promise {
35public:
36#ifdef __clang__
37 template <typename... Args>
38 requires(first_is_not_allocator<Allocator, Args...>::value)
39 static void *operator new(size_t, Args &&...) {
40 // If user didn't provide a signature like (Allocator&, ...), clang will
41 // fall back to ::new, we don't want that.
42 // https://github.com/llvm/llvm-project/issues/54881
43 static_assert(always_false<Args...>::value,
44 "Invalid arguments for allocator-bound coroutine");
45 }
46#endif
47
48 template <typename... Args>
49 static void *operator new(size_t size, Allocator &alloc, const Args &...) {
50 size_t allocator_offset =
51 (size + alignof(Allocator) - 1) & ~(alignof(Allocator) - 1);
52 size_t total_size = allocator_offset + sizeof(Allocator);
53
54 Pointer mem = alloc.allocate(total_size);
55 try {
56 new (mem + allocator_offset) Allocator(alloc);
57 } catch (...) {
58 alloc.deallocate(mem, total_size);
59 throw;
60 }
61 return mem;
62 }
63
64 void operator delete(void *ptr, size_t size) noexcept {
65 size_t allocator_offset =
66 (size + alignof(Allocator) - 1) & ~(alignof(Allocator) - 1);
67 size_t total_size = allocator_offset + sizeof(Allocator);
68 Pointer mem = static_cast<Pointer>(ptr);
69 Allocator &alloc = *std::launder(
70 reinterpret_cast<Allocator *>(mem + allocator_offset));
71 Allocator alloc_copy = std::move(alloc);
72 alloc.~Allocator();
73 alloc_copy.deallocate(mem, total_size);
74 }
75
76private:
77 using Pointer = typename std::allocator_traits<Allocator>::pointer;
78 using T = std::remove_pointer_t<Pointer>;
79 static_assert(sizeof(T) == 1, "Allocator pointer must point to byte type");
80};
81
82template <typename Promise>
83class BindAllocator<Promise, void> : public Promise {};
84
85template <typename Coro>
86class PromiseBase : public InvokerAdapter<PromiseBase<Coro>, WorkInvoker> {
87public:
88 using PromiseType = typename Coro::promise_type;
89
90 ~PromiseBase() {
91 if (exception_) [[unlikely]] {
92 try {
93 std::rethrow_exception(exception_);
94 } catch (const std::exception &e) {
95 panic_on(std::format(
96 "Unhandled exception in detached coroutine: {}", e.what()));
97 } catch (...) {
98 panic_on("Unhandled unknown exception in detached coroutine");
99 }
100 }
101 }
102
103 Coro get_return_object() noexcept {
104 return Coro{std::coroutine_handle<PromiseType>::from_promise(
105 static_cast<PromiseType &>(*this))};
106 }
107
108 std::suspend_always initial_suspend() const noexcept { return {}; }
109
110 void unhandled_exception() noexcept {
111 exception_ = std::current_exception();
112 }
113
114 struct FinalAwaiter {
115 bool await_ready() const noexcept { return false; }
116
117 std::coroutine_handle<>
118 await_suspend(std::coroutine_handle<PromiseType> handle) noexcept {
119 auto &self = handle.promise();
120
121 State expected = self.state_.load(std::memory_order_acquire);
122 State desired;
123 do {
124 if (expected == State::Idle) {
125 return self.caller_handle_;
126 } else if (expected == State::RunningJoinable) {
127 desired = State::Zombie;
128 } else if (expected == State::RunningDetached ||
129 expected == State::RunningJoining) {
130 desired = State::Finished;
131 } else [[unlikely]] {
132 panic_on(std::format(
133 "Invalid coroutine state in final_suspend: {}",
134 static_cast<int>(expected)));
135 }
136 } while (!self.state_.compare_exchange_weak(
137 expected, desired, std::memory_order_acq_rel,
138 std::memory_order_acquire));
139
140 State prev = expected;
141 if (prev == State::RunningDetached) {
142 handle.destroy();
143 return std::noop_coroutine();
144 } else if (prev == State::RunningJoining) {
145 auto *callback = self.callback_;
146 (*callback)();
147 return std::noop_coroutine();
148 } else {
149 assert(prev == State::RunningJoinable);
150 return std::noop_coroutine();
151 }
152 }
153
154 void await_resume() const noexcept {}
155 };
156
157 FinalAwaiter final_suspend() const noexcept { return {}; }
158
159 template <typename T>
160 requires(requires { typename std::decay_t<T>::ReturnType; })
161 auto await_transform(T &&value) {
162 return detail::as_awaiter(std::forward<T>(value));
163 }
164
165 template <typename T> T &&await_transform(T &&value) {
166 return std::forward<T>(value);
167 }
168
169public:
170 // Should be called before task is scheduled
171 void mark_running() noexcept {
172 state_.store(State::RunningJoinable, std::memory_order_relaxed);
173 }
174
175 void set_caller_handle(std::coroutine_handle<> handle) noexcept {
176 caller_handle_ = handle;
177 }
178
179 void request_detach() noexcept {
180 State expected = state_.load(std::memory_order_acquire);
181 State desired;
182 do {
183 if (expected == State::RunningJoinable) {
184 desired = State::RunningDetached;
185 } else if (expected == State::Zombie) {
186 desired = State::Finished;
187 } else [[unlikely]] {
188 panic_on(
189 std::format("Invalid coroutine state in request_detach: {}",
190 static_cast<int>(expected)));
191 }
192 } while (!state_.compare_exchange_weak(expected, desired,
193 std::memory_order_acq_rel,
194 std::memory_order_acquire));
195
196 State prev = expected;
197 if (prev == State::Zombie) {
198 auto h = std::coroutine_handle<PromiseType>::from_promise(
199 static_cast<PromiseType &>(*this));
200 h.destroy();
201 }
202 }
203
204 bool request_join(Invoker *remote_callback) noexcept {
205 State expected = state_.load(std::memory_order_acquire);
206 State desired;
207 do {
208 if (expected == State::RunningJoinable) {
209 desired = State::RunningJoining;
210 callback_ = remote_callback;
211 } else if (expected == State::Zombie) {
212 desired = State::Finished;
213 } else [[unlikely]] {
214 panic_on(
215 std::format("Invalid coroutine state in request_join: {}",
216 static_cast<int>(expected)));
217 }
218 } while (!state_.compare_exchange_weak(expected, desired,
219 std::memory_order_acq_rel,
220 std::memory_order_acquire));
221
222 State prev = expected;
223 if (prev == State::Zombie) {
224 return false; // ready to resume immediately
225 } else {
226 assert(prev == State::RunningJoinable);
227 return true;
228 }
229 }
230
231 std::exception_ptr exception() noexcept { return std::move(exception_); }
232
233 void invoke() noexcept {
234 auto h = std::coroutine_handle<PromiseType>::from_promise(
235 static_cast<PromiseType &>(*this));
236 h.resume();
237 }
238
239protected:
240 // Promise lifecycle state machine:
241 //
242 // 1) co_spawn():
243 // - Idle -> RunningJoinable:
244 // Task has been scheduled and is joinable.
245 //
246 // 2) final_suspend():
247 // - RunningJoinable -> Zombie:
248 // Coroutine completed, but no join/detach consumer has claimed
249 // final ownership yet.
250 // - RunningDetached -> Finished:
251 // Detach path, performs destroy() immediately.
252 // - RunningJoining -> Finished:
253 // Join path, invokes the registered callback to wake the
254 // waiter/continuation.
255 //
256 // 3) request_detach():
257 // - RunningJoinable -> RunningDetached:
258 // Detach requested before completion; final_suspend will destroy
259 // later.
260 // - Zombie -> Finished:
261 // Detach requested after completion; requester destroys coroutine
262 // immediately.
263 //
264 // 4) request_join():
265 // - RunningJoinable -> RunningJoining:
266 // Join requested before completion; stores callback and waits for
267 // final_suspend callback.
268 // - Zombie -> Finished:
269 // Join requested after completion; caller can resume/wait
270 // immediately (no callback path needed).
271 enum class State : uint8_t {
272 Idle,
273 RunningJoinable,
274 RunningDetached,
275 RunningJoining,
276 Zombie,
277 Finished,
278 };
279 static_assert(std::atomic<State>::is_always_lock_free);
280
281 std::atomic<State> state_ = State::Idle;
282 union {
283 std::coroutine_handle<> caller_handle_ = std::noop_coroutine();
284 Invoker *callback_;
285 };
286 std::exception_ptr exception_;
287};
288
289template <typename Allocator>
290class Promise<void, Allocator>
291 : public BindAllocator<PromiseBase<Coro<void, Allocator>>, Allocator> {
292public:
293 void return_void() const noexcept {}
294};
295
296template <typename T, typename Allocator>
297class Promise
298 : public BindAllocator<PromiseBase<Coro<T, Allocator>>, Allocator> {
299public:
300 void return_value(T value) { value_ = std::move(value); }
301
302 T value() { return std::move(value_.value()); }
303
304private:
305 std::optional<T> value_;
306};
307
308template <typename PromiseType> struct CoroAwaiterBase {
309 bool await_ready() const noexcept { return false; }
310
311 std::coroutine_handle<PromiseType>
312 await_suspend(std::coroutine_handle<> caller_handle) noexcept {
313 handle_.promise().set_caller_handle(caller_handle);
314 return handle_;
315 }
316
317 std::coroutine_handle<PromiseType> handle_;
318};
319
320template <typename T, typename Allocator>
321struct CoroAwaiter
322 : public CoroAwaiterBase<typename Coro<T, Allocator>::promise_type> {
323 using Base = CoroAwaiterBase<typename Coro<T, Allocator>::promise_type>;
324 T await_resume() {
325 auto exception = Base::handle_.promise().exception();
326 if (exception) [[unlikely]] {
327 Base::handle_.destroy();
328 std::rethrow_exception(exception);
329 }
330 T value = Base::handle_.promise().value();
331 Base::handle_.destroy();
332 return value;
333 }
334};
335
336template <typename Allocator>
337struct CoroAwaiter<void, Allocator>
338 : public CoroAwaiterBase<typename Coro<void, Allocator>::promise_type> {
339 using Base = CoroAwaiterBase<typename Coro<void, Allocator>::promise_type>;
340 void await_resume() {
341 auto exception = Base::handle_.promise().exception();
342 Base::handle_.destroy();
343 if (exception) [[unlikely]] {
344 std::rethrow_exception(exception);
345 }
346 }
347};
348
349template <typename T, typename Allocator>
350inline auto Coro<T, Allocator>::operator co_await() noexcept {
351 return CoroAwaiter<T, Allocator>{release()};
352}
353
354} // namespace condy
Coroutine definitions.
Polymorphic invocation utilities.
condy::Coro< T, std::pmr::polymorphic_allocator< std::byte > > Coro
Coroutine type using polymorphic allocator.
Definition pmr.hpp:26
The main namespace for the Condy library.
Definition condy.hpp:30
Helper functions for composing asynchronous operations.
Internal utility classes and functions used by Condy.