Condy v1.3.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
awaiters.hpp
Go to the documentation of this file.
1
9
10#pragma once
11
12#include "condy/concepts.hpp"
13#include "condy/condy_uring.hpp"
14#include "condy/context.hpp"
16#include "condy/ring.hpp"
17#include "condy/runtime.hpp"
18#include "condy/work_type.hpp"
19#include <coroutine>
20#include <cstddef>
21#include <cstdint>
22#include <tuple>
23
24namespace condy {
25
26template <OpFinishHandleLike Handle> class HandleBox {
27public:
28 HandleBox(Handle h) : handle_(std::move(h)) {}
29
30 Handle &get() { return handle_; }
31
32private:
33 Handle handle_;
34};
35
36template <typename Func, OpFinishHandleLike HandleBase>
37class HandleBox<ZeroCopyMixin<Func, HandleBase>> {
38public:
39 using Handle = ZeroCopyMixin<Func, HandleBase>;
40 HandleBox(Handle h) : handle_ptr_(new Handle(std::move(h))) {}
41
42 Handle &get() { return *handle_ptr_; }
43
44private:
45 Handle *handle_ptr_;
46};
47
48template <OpFinishHandleLike Handle, PrepFuncLike Func> class OpAwaiterBase {
49public:
50 using HandleType = Handle;
51
52 OpAwaiterBase(HandleBox<Handle> handle, Func func)
53 : prep_func_(func), finish_handle_(std::move(handle)) {}
54 OpAwaiterBase(OpAwaiterBase &&) = default;
55
56 OpAwaiterBase(const OpAwaiterBase &) = delete;
57 OpAwaiterBase &operator=(const OpAwaiterBase &) = delete;
58 OpAwaiterBase &operator=(OpAwaiterBase &&) = delete;
59
60public:
61 HandleType *get_handle() { return &finish_handle_.get(); }
62
63 void init_finish_handle() { /* Leaf node, no-op */ }
64
65 void register_operation(unsigned int flags) {
66 auto &context = detail::Context::current();
67 auto *ring = context.ring();
68
69 context.runtime()->pend_work();
70
71 io_uring_sqe *sqe = prep_func_(ring);
72 assert(sqe && "prep_func must return a valid sqe");
73 sqe->flags |= static_cast<uint8_t>(flags);
74 io_uring_sqe_set_data(
75 sqe, encode_work(&finish_handle_.get(), WorkType::Common));
76 }
77
78public:
79 bool await_ready() { return false; }
80
81 template <typename PromiseType>
82 void await_suspend(std::coroutine_handle<PromiseType> h) {
83 init_finish_handle();
84 finish_handle_.get().set_invoker(&h.promise());
85 register_operation(0);
86 }
87
88 auto await_resume() { return finish_handle_.get().extract_result(); }
89
90protected:
91 Func prep_func_;
92 HandleBox<Handle> finish_handle_;
93};
94
95template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler>
96class [[nodiscard]] OpAwaiter
97 : public OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc> {
98public:
99 using Base = OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc>;
100 template <typename... Args>
101 OpAwaiter(PrepFunc func, Args &&...args)
102 : Base(HandleBox(
103 OpFinishHandle<CQEHandler>(std::forward<Args>(args)...)),
104 std::move(func)) {}
105};
106
107template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler,
108 typename MultiShotFunc>
109class [[nodiscard]] MultiShotOpAwaiter
110 : public OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
111 PrepFunc> {
112public:
113 using Base =
114 OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
115 PrepFunc>;
116 template <typename... Args>
117 MultiShotOpAwaiter(PrepFunc func, MultiShotFunc multishot_func,
118 Args &&...args)
119 : Base(HandleBox(MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>(
120 std::move(multishot_func), std::forward<Args>(args)...)),
121 std::move(func)) {}
122};
123
124template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler, typename FreeFunc>
125class [[nodiscard]] ZeroCopyOpAwaiter
126 : public OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>,
127 PrepFunc> {
128public:
129 using Base =
130 OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>, PrepFunc>;
131 template <typename... Args>
132 ZeroCopyOpAwaiter(PrepFunc func, FreeFunc free_func, Args &&...args)
133 : Base(HandleBox(ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>(
134 std::move(free_func), std::forward<Args>(args)...)),
135 std::move(func)) {}
136};
137
138template <unsigned int Flags, AwaiterLike Awaiter>
139class [[nodiscard]] FlaggedOpAwaiter : public Awaiter {
140public:
141 using Base = Awaiter;
142 FlaggedOpAwaiter(Awaiter awaiter) : Base(std::move(awaiter)) {}
143
144 void register_operation(unsigned int flags) {
145#if IO_URING_CHECK_VERSION(2, 12) // < 2.12
146 if constexpr (Flags & IOSQE_IO_DRAIN) {
147 auto *runtime = detail::Context::current().runtime();
148 // Ensure every operation before drain will complete
149 runtime->notify();
150 }
151#endif
152 Base::register_operation(flags | Flags);
153 }
154
155 template <typename PromiseType>
156 void await_suspend(std::coroutine_handle<PromiseType> h) {
157 Base::init_finish_handle();
158 Base::get_handle()->set_invoker(&h.promise());
159 register_operation(0);
160 }
161};
162
163template <HandleLike Handle, AwaiterLike Awaiter>
164class [[nodiscard]] RangedParallelAwaiterBase {
165public:
166 using HandleType = Handle;
167
168 RangedParallelAwaiterBase(std::vector<Awaiter> awaiters)
169 : awaiters_(std::move(awaiters)) {}
170 RangedParallelAwaiterBase(RangedParallelAwaiterBase &&) = default;
171
172 RangedParallelAwaiterBase(const RangedParallelAwaiterBase &) = delete;
173 RangedParallelAwaiterBase &
174 operator=(const RangedParallelAwaiterBase &) = delete;
175 RangedParallelAwaiterBase &operator=(RangedParallelAwaiterBase &&) = delete;
176
177public:
178 HandleType *get_handle() { return &finish_handle_; }
179
180 void init_finish_handle() {
181 using ChildHandle = typename Awaiter::HandleType;
182 std::vector<ChildHandle *> handles;
183 handles.reserve(awaiters_.size());
184 for (auto &awaiter : awaiters_) {
185 awaiter.init_finish_handle();
186 handles.push_back(awaiter.get_handle());
187 }
188 finish_handle_.init(std::move(handles));
189 }
190
191 void register_operation(unsigned int flags) {
192 for (auto &awaiter : awaiters_) {
193 awaiter.register_operation(flags);
194 }
195 }
196
197public:
198 bool await_ready() const noexcept { return false; }
199
200 template <typename PromiseType>
201 void await_suspend(std::coroutine_handle<PromiseType> h) {
202 init_finish_handle();
203 finish_handle_.set_invoker(&h.promise());
204 register_operation(0);
205 }
206
207 typename Handle::ReturnType await_resume() {
208 return finish_handle_.extract_result();
209 }
210
211public:
212 void push(Awaiter awaiter) { awaiters_.push_back(std::move(awaiter)); }
213
214protected:
215 HandleType finish_handle_;
216 std::vector<Awaiter> awaiters_;
217};
218
228template <typename Awaiter>
229using RangedParallelAllAwaiter = RangedParallelAwaiterBase<
230 RangedParallelAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
231
241template <typename Awaiter>
242using RangedParallelAnyAwaiter = RangedParallelAwaiterBase<
243 RangedParallelAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
244
251template <typename Awaiter>
252using RangedWhenAllAwaiter = RangedParallelAwaiterBase<
253 RangedWhenAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
254
262template <typename Awaiter>
263using RangedWhenAnyAwaiter = RangedParallelAwaiterBase<
264 RangedWhenAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
265
266template <unsigned int Flags, AwaiterLike Awaiter>
267class [[nodiscard]] RangedLinkAwaiterBase
268 : public RangedWhenAllAwaiter<Awaiter> {
269public:
271 using Base::Base;
272
273 void register_operation(unsigned int flags) {
274 auto *ring = detail::Context::current().ring();
275 ring->reserve_space(Base::awaiters_.size());
276 for (int i = 0; i < Base::awaiters_.size() - 1; ++i) {
277 Base::awaiters_[i].register_operation(flags | Flags);
278 }
279 Base::awaiters_.back().register_operation(flags);
280 }
281
282 template <typename PromiseType>
283 void await_suspend(std::coroutine_handle<PromiseType> h) {
284 Base::init_finish_handle();
285 Base::finish_handle_.set_invoker(&h.promise());
286 register_operation(0);
287 }
288};
289
296template <typename Awaiter>
297using RangedLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_LINK, Awaiter>;
298
306template <typename Awaiter>
307using RangedHardLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter>;
308
309template <HandleLike Handle, AwaiterLike... Awaiters>
310class [[nodiscard]] ParallelAwaiterBase {
311public:
312 using HandleType = Handle;
313
314 ParallelAwaiterBase(Awaiters... awaiters)
315 : awaiters_(std::move(awaiters)...) {}
316 ParallelAwaiterBase(ParallelAwaiterBase &&) = default;
317 template <typename ParallelAwaiter, AwaiterLike New>
318 ParallelAwaiterBase(ParallelAwaiter &&aws, New new_awaiter)
319 : awaiters_(std::tuple_cat(std::move(aws.awaiters_),
320 std::make_tuple(std::move(new_awaiter)))) {}
321
322 ParallelAwaiterBase(const ParallelAwaiterBase &) = delete;
323 ParallelAwaiterBase &operator=(const ParallelAwaiterBase &) = delete;
324 ParallelAwaiterBase &operator=(ParallelAwaiterBase &&) = delete;
325
326public:
327 HandleType *get_handle() { return &finish_handle_; }
328
329 void init_finish_handle() {
330 auto handles = foreach_init_finish_handle_();
331 static_assert(std::tuple_size<decltype(handles)>::value ==
332 sizeof...(Awaiters),
333 "Number of handles must match number of awaiters");
334 std::apply(
335 [this](auto &&...handle_ptrs) {
336 finish_handle_.init(handle_ptrs...);
337 },
338 handles);
339 }
340
341 void register_operation(unsigned int flags) {
342 foreach_register_operation_(flags);
343 }
344
345public:
346 bool await_ready() const noexcept { return false; }
347
348 template <typename PromiseType>
349 void await_suspend(std::coroutine_handle<PromiseType> h) {
350 init_finish_handle();
351 finish_handle_.set_invoker(&h.promise());
352 register_operation(0);
353 }
354
355 typename Handle::ReturnType await_resume() {
356 return finish_handle_.extract_result();
357 }
358
359private:
360 template <size_t Idx = 0> auto foreach_init_finish_handle_() {
361 if constexpr (Idx < sizeof...(Awaiters)) {
362 std::get<Idx>(awaiters_).init_finish_handle();
363 return std::tuple_cat(
364 std::make_tuple(std::get<Idx>(awaiters_).get_handle()),
365 foreach_init_finish_handle_<Idx + 1>());
366 } else {
367 return std::tuple<>();
368 }
369 }
370
371 template <size_t Idx = 0>
372 void foreach_register_operation_(unsigned int flags) {
373 if constexpr (Idx < sizeof...(Awaiters)) {
374 std::get<Idx>(awaiters_).register_operation(flags);
375 foreach_register_operation_<Idx + 1>(flags);
376 }
377 }
378
379protected:
380 HandleType finish_handle_;
381 std::tuple<Awaiters...> awaiters_;
382
383 // Make awaiters_ accessible to all template instantiations
384 template <HandleLike, AwaiterLike...> friend class ParallelAwaiterBase;
385};
386
396template <typename... Awaiter>
397using ParallelAllAwaiter = ParallelAwaiterBase<
398 ParallelAllFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
399
409template <typename... Awaiter>
410using ParallelAnyAwaiter = ParallelAwaiterBase<
411 ParallelAnyFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
412
419template <typename... Awaiter>
421 ParallelAwaiterBase<WhenAllFinishHandle<typename Awaiter::HandleType...>,
422 Awaiter...>;
423
431template <typename... Awaiter>
433 ParallelAwaiterBase<WhenAnyFinishHandle<typename Awaiter::HandleType...>,
434 Awaiter...>;
435
436template <unsigned int Flags, AwaiterLike... Awaiter>
437class [[nodiscard]] LinkAwaiterBase : public WhenAllAwaiter<Awaiter...> {
438public:
439 using Base = WhenAllAwaiter<Awaiter...>;
440 using Base::Base;
441
442 void register_operation(unsigned int flags) {
443 auto *ring = detail::Context::current().ring();
444 ring->reserve_space(sizeof...(Awaiter));
445 foreach_register_operation_(flags);
446 }
447
448 template <typename PromiseType>
449 void await_suspend(std::coroutine_handle<PromiseType> h) {
450 Base::init_finish_handle();
451 Base::finish_handle_.set_invoker(&h.promise());
452 register_operation(0);
453 }
454
455private:
456 template <size_t Idx = 0>
457 void foreach_register_operation_(unsigned int flags) {
458 if constexpr (Idx < sizeof...(Awaiter)) {
459 std::get<Idx>(Base::awaiters_)
460 .register_operation(Idx < sizeof...(Awaiter) - 1 ? flags | Flags
461 : flags);
462 foreach_register_operation_<Idx + 1>(flags);
463 }
464 }
465};
466
473template <typename... Awaiter>
474using LinkAwaiter = LinkAwaiterBase<IOSQE_IO_LINK, Awaiter...>;
475
482template <typename... Awaiter>
483using HardLinkAwaiter = LinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter...>;
484
485} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:28
RangedParallelAwaiterBase< RangedParallelAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:229
ParallelAwaiterBase< ParallelAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAllAwaiter
Awaiter to wait for all operations to complete in parallel.
Definition awaiters.hpp:397
RangedLinkAwaiterBase< IOSQE_IO_LINK, Awaiter > RangedLinkAwaiter
Awaiter that links multiple operations in a range using IO_LINK.
Definition awaiters.hpp:297
RangedParallelAwaiterBase< RangedWhenAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:263
RangedParallelAwaiterBase< RangedParallelAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:242
LinkAwaiterBase< IOSQE_IO_LINK, Awaiter... > LinkAwaiter
Awaiter that links multiple operations using IO_LINK.
Definition awaiters.hpp:474
RangedParallelAwaiterBase< RangedWhenAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:252
ParallelAwaiterBase< WhenAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAllAwaiter
Awaiter that waits for all operations to complete in parallel.
Definition awaiters.hpp:420
ParallelAwaiterBase< WhenAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAnyAwaiter
Awaiter that waits for any operation to complete in parallel.
Definition awaiters.hpp:432
ParallelAwaiterBase< ParallelAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAnyAwaiter
Awaiter to wait for any operation to complete in parallel.
Definition awaiters.hpp:410
LinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter... > HardLinkAwaiter
Awaiter that links multiple operations using IO_HARDLINK.
Definition awaiters.hpp:483
RangedLinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter > RangedHardLinkAwaiter
Awaiter that links multiple operations in a range using IO_HARDLINK.
Definition awaiters.hpp:307
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.