Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
awaiters.hpp
Go to the documentation of this file.
1
9
10#pragma once
11
12#include "condy/concepts.hpp"
13#include "condy/condy_uring.hpp"
14#include "condy/context.hpp"
16#include "condy/ring.hpp"
17#include "condy/runtime.hpp"
18#include "condy/work_type.hpp"
19#include <coroutine>
20#include <cstddef>
21#include <cstdint>
22#include <memory>
23#include <tuple>
24
25namespace condy {
26
27template <OpFinishHandleLike Handle> class HandleBox {
28public:
29 HandleBox(Handle h) : handle_(std::move(h)) {}
30
31 Handle &get() { return handle_; }
32
33 void maybe_release() { /* No-op */ }
34
35private:
36 Handle handle_;
37};
38
39template <typename Func, OpFinishHandleLike HandleBase>
40class HandleBox<ZeroCopyMixin<Func, HandleBase>> {
41public:
42 using Handle = ZeroCopyMixin<Func, HandleBase>;
43 HandleBox(Handle h) : handle_ptr_(std::make_unique<Handle>(std::move(h))) {}
44 HandleBox(const HandleBox &other) // Deep copy
45 : handle_ptr_(std::make_unique<Handle>(*other.handle_ptr_)) {}
46
47 Handle &get() { return *handle_ptr_; }
48
49 void maybe_release() { handle_ptr_.release(); }
50
51private:
52 std::unique_ptr<Handle> handle_ptr_;
53};
54
55template <OpFinishHandleLike Handle, PrepFuncLike Func> class OpAwaiterBase {
56public:
57 using HandleType = Handle;
58
59 OpAwaiterBase(HandleBox<Handle> handle, Func func)
60 : prep_func_(func), finish_handle_(std::move(handle)) {}
61
62public:
63 HandleType *get_handle() { return &finish_handle_.get(); }
64
65 void init_finish_handle() { /* Leaf node, no-op */ }
66
67 void register_operation(unsigned int flags) noexcept {
68 auto &context = detail::Context::current();
69 auto *ring = context.ring();
70
71 context.runtime()->pend_work();
72
73 io_uring_sqe *sqe = prep_func_(ring);
74 assert(sqe && "prep_func must return a valid sqe");
75 sqe->flags |= static_cast<uint8_t>(flags);
76 io_uring_sqe_set_data(
77 sqe, encode_work(&finish_handle_.get(), WorkType::Common));
78 }
79
80public:
81 bool await_ready() const noexcept { return false; }
82
83 template <typename PromiseType>
84 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
85 init_finish_handle();
86 finish_handle_.get().set_invoker(&h.promise());
87 register_operation(0);
88 }
89
90 auto await_resume() {
91 auto result = finish_handle_.get().extract_result();
92 finish_handle_.maybe_release();
93 return result;
94 }
95
96protected:
97 Func prep_func_;
98 HandleBox<Handle> finish_handle_;
99};
100
101template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler>
102class [[nodiscard]] OpAwaiter
103 : public OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc> {
104public:
105 using Base = OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc>;
106 template <typename... Args>
107 OpAwaiter(PrepFunc func, Args &&...args)
108 : Base(HandleBox(
109 OpFinishHandle<CQEHandler>(std::forward<Args>(args)...)),
110 std::move(func)) {}
111};
112
113template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler,
114 typename MultiShotFunc>
115class [[nodiscard]] MultiShotOpAwaiter
116 : public OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
117 PrepFunc> {
118public:
119 using Base =
120 OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
121 PrepFunc>;
122 template <typename... Args>
123 MultiShotOpAwaiter(PrepFunc func, MultiShotFunc multishot_func,
124 Args &&...args)
125 : Base(HandleBox(MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>(
126 std::move(multishot_func), std::forward<Args>(args)...)),
127 std::move(func)) {}
128};
129
130template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler, typename FreeFunc>
131class [[nodiscard]] ZeroCopyOpAwaiter
132 : public OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>,
133 PrepFunc> {
134public:
135 using Base =
136 OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>, PrepFunc>;
137 template <typename... Args>
138 ZeroCopyOpAwaiter(PrepFunc func, FreeFunc free_func, Args &&...args)
139 : Base(HandleBox(ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>(
140 std::move(free_func), std::forward<Args>(args)...)),
141 std::move(func)) {}
142};
143
144template <unsigned int Flags, AwaiterLike Awaiter>
145class [[nodiscard]] FlaggedOpAwaiter : public Awaiter {
146public:
147 using Base = Awaiter;
148 FlaggedOpAwaiter(Awaiter awaiter) : Base(std::move(awaiter)) {}
149
150 void register_operation(unsigned int flags) {
151 Base::register_operation(flags | Flags);
152 }
153
154 template <typename PromiseType>
155 void await_suspend(std::coroutine_handle<PromiseType> h) {
156 Base::init_finish_handle();
157 Base::get_handle()->set_invoker(&h.promise());
158 register_operation(0);
159 }
160};
161
162template <HandleLike Handle, AwaiterLike Awaiter>
163class [[nodiscard]] RangedParallelAwaiterBase {
164public:
165 using HandleType = Handle;
166
167 RangedParallelAwaiterBase(std::vector<Awaiter> awaiters)
168 : awaiters_(std::move(awaiters)) {}
169
170public:
171 HandleType *get_handle() { return &finish_handle_; }
172
173 void init_finish_handle() {
174 using ChildHandle = typename Awaiter::HandleType;
175 std::vector<ChildHandle *> handles;
176 handles.reserve(awaiters_.size());
177 for (auto &awaiter : awaiters_) {
178 awaiter.init_finish_handle();
179 handles.push_back(awaiter.get_handle());
180 }
181 finish_handle_.init(std::move(handles));
182 }
183
184 void register_operation(unsigned int flags) {
185 for (auto &awaiter : awaiters_) {
186 awaiter.register_operation(flags);
187 }
188 }
189
190public:
191 bool await_ready() const noexcept { return awaiters_.empty(); }
192
193 template <typename PromiseType>
194 void await_suspend(std::coroutine_handle<PromiseType> h) {
195 init_finish_handle();
196 finish_handle_.set_invoker(&h.promise());
197 register_operation(0);
198 }
199
200 typename Handle::ReturnType await_resume() {
201 return finish_handle_.extract_result();
202 }
203
204public:
205 void push(Awaiter awaiter) { awaiters_.push_back(std::move(awaiter)); }
206
207protected:
208 HandleType finish_handle_;
209 std::vector<Awaiter> awaiters_;
210};
211
221template <typename Awaiter>
222using RangedParallelAllAwaiter = RangedParallelAwaiterBase<
223 RangedParallelAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
224
234template <typename Awaiter>
235using RangedParallelAnyAwaiter = RangedParallelAwaiterBase<
236 RangedParallelAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
237
244template <typename Awaiter>
245using RangedWhenAllAwaiter = RangedParallelAwaiterBase<
246 RangedWhenAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
247
256template <typename Awaiter>
257using RangedWhenAnyAwaiter = RangedParallelAwaiterBase<
258 RangedWhenAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
259
260template <unsigned int Flags, AwaiterLike Awaiter>
261class [[nodiscard]] RangedLinkAwaiterBase
262 : public RangedWhenAllAwaiter<Awaiter> {
263public:
265 using Base::Base;
266
267 void register_operation(unsigned int flags) {
268 auto *ring = detail::Context::current().ring();
269 ring->reserve_space(Base::awaiters_.size());
270 for (int i = 0; i < Base::awaiters_.size() - 1; ++i) {
271 Base::awaiters_[i].register_operation(flags | Flags);
272 }
273 Base::awaiters_.back().register_operation(flags);
274 }
275
276 template <typename PromiseType>
277 void await_suspend(std::coroutine_handle<PromiseType> h) {
278 Base::init_finish_handle();
279 Base::finish_handle_.set_invoker(&h.promise());
280 register_operation(0);
281 }
282};
283
290template <typename Awaiter>
291using RangedLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_LINK, Awaiter>;
292
300template <typename Awaiter>
301using RangedHardLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter>;
302
303template <HandleLike Handle, AwaiterLike... Awaiters>
304class [[nodiscard]] ParallelAwaiterBase {
305public:
306 using HandleType = Handle;
307
308 ParallelAwaiterBase(Awaiters... awaiters)
309 : awaiters_(std::move(awaiters)...) {}
310 template <typename ParallelAwaiter, AwaiterLike New>
311 ParallelAwaiterBase(ParallelAwaiter &&aws, New new_awaiter)
312 : awaiters_(std::tuple_cat(std::move(aws.awaiters_),
313 std::make_tuple(std::move(new_awaiter)))) {}
314
315public:
316 HandleType *get_handle() { return &finish_handle_; }
317
318 void init_finish_handle() {
319 auto handles = foreach_init_finish_handle_();
320 static_assert(std::tuple_size<decltype(handles)>::value ==
321 sizeof...(Awaiters),
322 "Number of handles must match number of awaiters");
323 std::apply(
324 [this](auto &&...handle_ptrs) {
325 finish_handle_.init(handle_ptrs...);
326 },
327 handles);
328 }
329
330 void register_operation(unsigned int flags) {
331 foreach_register_operation_(flags);
332 }
333
334public:
335 bool await_ready() const noexcept { return false; }
336
337 template <typename PromiseType>
338 void await_suspend(std::coroutine_handle<PromiseType> h) {
339 init_finish_handle();
340 finish_handle_.set_invoker(&h.promise());
341 register_operation(0);
342 }
343
344 typename Handle::ReturnType await_resume() {
345 return finish_handle_.extract_result();
346 }
347
348private:
349 template <size_t Idx = 0> auto foreach_init_finish_handle_() {
350 if constexpr (Idx < sizeof...(Awaiters)) {
351 std::get<Idx>(awaiters_).init_finish_handle();
352 return std::tuple_cat(
353 std::make_tuple(std::get<Idx>(awaiters_).get_handle()),
354 foreach_init_finish_handle_<Idx + 1>());
355 } else {
356 return std::tuple<>();
357 }
358 }
359
360 template <size_t Idx = 0>
361 void foreach_register_operation_(unsigned int flags) {
362 if constexpr (Idx < sizeof...(Awaiters)) {
363 std::get<Idx>(awaiters_).register_operation(flags);
364 foreach_register_operation_<Idx + 1>(flags);
365 }
366 }
367
368protected:
369 HandleType finish_handle_;
370 std::tuple<Awaiters...> awaiters_;
371
372 // Make awaiters_ accessible to all template instantiations
373 template <HandleLike, AwaiterLike...> friend class ParallelAwaiterBase;
374};
375
385template <typename... Awaiter>
386using ParallelAllAwaiter = ParallelAwaiterBase<
387 ParallelAllFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
388
398template <typename... Awaiter>
399using ParallelAnyAwaiter = ParallelAwaiterBase<
400 ParallelAnyFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
401
408template <typename... Awaiter>
410 ParallelAwaiterBase<WhenAllFinishHandle<typename Awaiter::HandleType...>,
411 Awaiter...>;
412
420template <typename... Awaiter>
422 ParallelAwaiterBase<WhenAnyFinishHandle<typename Awaiter::HandleType...>,
423 Awaiter...>;
424
425template <unsigned int Flags, AwaiterLike... Awaiter>
426class [[nodiscard]] LinkAwaiterBase : public WhenAllAwaiter<Awaiter...> {
427public:
428 using Base = WhenAllAwaiter<Awaiter...>;
429 using Base::Base;
430
431 void register_operation(unsigned int flags) {
432 auto *ring = detail::Context::current().ring();
433 ring->reserve_space(sizeof...(Awaiter));
434 foreach_register_operation_(flags);
435 }
436
437 template <typename PromiseType>
438 void await_suspend(std::coroutine_handle<PromiseType> h) {
439 Base::init_finish_handle();
440 Base::finish_handle_.set_invoker(&h.promise());
441 register_operation(0);
442 }
443
444private:
445 template <size_t Idx = 0>
446 void foreach_register_operation_(unsigned int flags) {
447 if constexpr (Idx < sizeof...(Awaiter)) {
448 std::get<Idx>(Base::awaiters_)
449 .register_operation(Idx < sizeof...(Awaiter) - 1 ? flags | Flags
450 : flags);
451 foreach_register_operation_<Idx + 1>(flags);
452 }
453 }
454};
455
462template <typename... Awaiter>
463using LinkAwaiter = LinkAwaiterBase<IOSQE_IO_LINK, Awaiter...>;
464
471template <typename... Awaiter>
472using HardLinkAwaiter = LinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter...>;
473
474} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:28
RangedParallelAwaiterBase< RangedParallelAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:222
ParallelAwaiterBase< ParallelAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAllAwaiter
Awaiter to wait for all operations to complete in parallel.
Definition awaiters.hpp:386
RangedLinkAwaiterBase< IOSQE_IO_LINK, Awaiter > RangedLinkAwaiter
Awaiter that links multiple operations in a range using IO_LINK.
Definition awaiters.hpp:291
RangedParallelAwaiterBase< RangedWhenAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:257
RangedParallelAwaiterBase< RangedParallelAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:235
LinkAwaiterBase< IOSQE_IO_LINK, Awaiter... > LinkAwaiter
Awaiter that links multiple operations using IO_LINK.
Definition awaiters.hpp:463
RangedParallelAwaiterBase< RangedWhenAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:245
ParallelAwaiterBase< WhenAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAllAwaiter
Awaiter that waits for all operations to complete in parallel.
Definition awaiters.hpp:409
ParallelAwaiterBase< WhenAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAnyAwaiter
Awaiter that waits for any operation to complete in parallel.
Definition awaiters.hpp:421
ParallelAwaiterBase< ParallelAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAnyAwaiter
Awaiter to wait for any operation to complete in parallel.
Definition awaiters.hpp:399
LinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter... > HardLinkAwaiter
Awaiter that links multiple operations using IO_HARDLINK.
Definition awaiters.hpp:472
RangedLinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter > RangedHardLinkAwaiter
Awaiter that links multiple operations in a range using IO_HARDLINK.
Definition awaiters.hpp:301
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.