Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
task.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/context.hpp"
11#include "condy/coro.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include <coroutine>
15#include <exception>
16#include <future>
17#include <utility>
18
19namespace condy {
20
21template <typename T = void, typename Allocator = void> class TaskBase {
22public:
23 using PromiseType = typename Coro<T, Allocator>::promise_type;
24
25 TaskBase() : TaskBase(nullptr) {}
26 TaskBase(std::coroutine_handle<PromiseType> h) : handle_(h) {}
27 TaskBase(TaskBase &&other) noexcept
28 : handle_(std::exchange(other.handle_, nullptr)) {}
29 TaskBase &operator=(TaskBase &&other) noexcept {
30 if (this != &other) {
31 if (handle_) {
32 panic_on("Task destroyed without being awaited");
33 }
34 handle_ = std::exchange(other.handle_, nullptr);
35 }
36 return *this;
37 }
38
39 TaskBase(const TaskBase &) = delete;
40 TaskBase &operator=(const TaskBase &) = delete;
41
42 ~TaskBase() {
43 if (handle_) {
44 panic_on("Task destroyed without being awaited");
45 }
46 }
47
48public:
56 void detach() noexcept {
57 handle_.promise().request_detach();
58 handle_ = nullptr;
59 }
60
65 bool joinable() const noexcept { return handle_ != nullptr; }
66
77 auto operator co_await() noexcept;
78
79protected:
80 static void wait_inner_(std::coroutine_handle<PromiseType> handle);
81
82protected:
83 std::coroutine_handle<PromiseType> handle_;
84};
85
86template <typename T, typename Allocator>
87void TaskBase<T, Allocator>::wait_inner_(
88 std::coroutine_handle<PromiseType> handle) {
89 if (detail::Context::current().runtime() != nullptr) [[unlikely]] {
90 throw std::logic_error("Sync wait inside runtime");
91 }
92 if (handle == nullptr) [[unlikely]] {
93 throw std::invalid_argument("Task not joinable");
94 }
95 std::promise<void> prom;
96 auto fut = prom.get_future();
97 struct TaskWaiter : public InvokerAdapter<TaskWaiter> {
98 TaskWaiter(std::promise<void> &p) : prom_(p) {}
99
100 void invoke() { prom_.set_value(); }
101
102 std::promise<void> &prom_;
103 };
104
105 TaskWaiter waiter(prom);
106 if (handle.promise().register_task_await(&waiter)) {
107 // Still not finished, wait
108 fut.get();
109 }
110}
111
125template <typename T = void, typename Allocator = void>
126class [[nodiscard]] Task : public TaskBase<T, Allocator> {
127public:
128 using Base = TaskBase<T, Allocator>;
129 using Base::Base;
130
141 T wait() {
142 auto handle = std::exchange(Base::handle_, nullptr);
143 Base::wait_inner_(handle);
144 auto exception = std::move(handle.promise()).exception();
145 if (exception) [[unlikely]] {
146 handle.destroy();
147 std::rethrow_exception(exception);
148 }
149 T value = std::move(handle.promise()).value();
150 handle.destroy();
151 return std::move(value);
152 }
153};
154
155template <typename Allocator>
156class [[nodiscard]] Task<void, Allocator> : public TaskBase<void, Allocator> {
157public:
158 using Base = TaskBase<void, Allocator>;
159 using Base::Base;
160
169 void wait() {
170 auto handle = std::exchange(Base::handle_, nullptr);
171 Base::wait_inner_(handle);
172 auto exception = std::move(handle.promise()).exception();
173 handle.destroy();
174 if (exception) [[unlikely]] {
175 std::rethrow_exception(exception);
176 }
177 }
178};
179
180template <typename T, typename Allocator>
181struct TaskAwaiterBase : public InvokerAdapter<TaskAwaiterBase<T, Allocator>> {
182 TaskAwaiterBase(
183 std::coroutine_handle<typename Coro<T, Allocator>::promise_type>
184 task_handle,
185 Runtime *runtime)
186 : task_handle_(task_handle), runtime_(runtime) {}
187
188 bool await_ready() const {
189 if (task_handle_ == nullptr) {
190 throw std::invalid_argument("Task not joinable");
191 }
192 return false;
193 }
194
195 template <typename PromiseType>
196 bool
197 await_suspend(std::coroutine_handle<PromiseType> caller_handle) noexcept {
198 detail::Context::current().runtime()->pend_work();
199 assert(runtime_ != nullptr);
200 caller_promise_ = &caller_handle.promise();
201 return task_handle_.promise().register_task_await(this);
202 }
203
204 void invoke() {
205 assert(caller_promise_ != nullptr);
206 runtime_->schedule(caller_promise_);
207 }
208
209 std::coroutine_handle<typename Coro<T, Allocator>::promise_type>
210 task_handle_;
211 Runtime *runtime_ = nullptr;
212 WorkInvoker *caller_promise_ = nullptr;
213};
214
215template <typename T, typename Allocator>
216struct TaskAwaiter : public TaskAwaiterBase<T, Allocator> {
217 using Base = TaskAwaiterBase<T, Allocator>;
218 using Base::Base;
219
220 T await_resume() {
221 detail::Context::current().runtime()->resume_work();
222 auto exception = std::move(Base::task_handle_.promise()).exception();
223 if (exception) [[unlikely]] {
224 Base::task_handle_.destroy();
225 std::rethrow_exception(exception);
226 }
227 T value = std::move(Base::task_handle_.promise()).value();
228 Base::task_handle_.destroy();
229 return value;
230 }
231};
232
233template <typename Allocator>
234struct TaskAwaiter<void, Allocator> : public TaskAwaiterBase<void, Allocator> {
235 using Base = TaskAwaiterBase<void, Allocator>;
236 using Base::Base;
237
238 void await_resume() {
239 detail::Context::current().runtime()->resume_work();
240 auto exception = std::move(Base::task_handle_.promise()).exception();
241 Base::task_handle_.destroy();
242 if (exception) [[unlikely]] {
243 std::rethrow_exception(exception);
244 }
245 }
246};
247
248template <typename T, typename Allocator>
249inline auto TaskBase<T, Allocator>::operator co_await() noexcept {
250 return TaskAwaiter<T, Allocator>(std::exchange(handle_, nullptr),
251 detail::Context::current().runtime());
252}
253
265template <typename T, typename Allocator>
267 auto handle = coro.release();
268 auto &promise = handle.promise();
269 promise.set_auto_destroy(false);
270
271 runtime.schedule(&promise);
272 return {handle};
273}
274
283template <typename T, typename Allocator>
285 auto *runtime = detail::Context::current().runtime();
286 if (runtime == nullptr) [[unlikely]] {
287 throw std::logic_error("No runtime to spawn coroutine task");
288 }
289 return co_spawn(*runtime, std::move(coro));
290}
291
292namespace detail {
293
294struct [[nodiscard]] SwitchAwaiter {
295 bool await_ready() const noexcept { return false; }
296
297 template <typename PromiseType>
298 void await_suspend(std::coroutine_handle<PromiseType> handle) noexcept {
299 runtime_->schedule(&handle.promise());
300 }
301
302 void await_resume() const noexcept {}
303
304 Runtime *runtime_;
305};
306
307} // namespace detail
308
317inline detail::SwitchAwaiter co_switch(Runtime &runtime) { return {&runtime}; }
318
319} // namespace condy
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:68
Coroutine task that runs concurrently in the runtime.
Definition task.hpp:126
T wait()
Wait synchronously for the task to complete and get the result.
Definition task.hpp:141
Coroutine definitions.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
detail::SwitchAwaiter co_switch(Runtime &runtime)
Switch current coroutine task to the given runtime.
Definition task.hpp:317
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266
Runtime type for running the io_uring event loop.