Condy v1.1.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
awaiters.hpp
Go to the documentation of this file.
1
9
10#pragma once
11
12#include "condy/concepts.hpp"
13#include "condy/condy_uring.hpp"
14#include "condy/context.hpp"
16#include "condy/ring.hpp"
17#include "condy/runtime.hpp"
18#include "condy/work_type.hpp"
19#include <coroutine>
20#include <cstddef>
21#include <cstdint>
22#include <tuple>
23
24namespace condy {
25
26template <OpFinishHandleLike Handle> class HandleBox {
27public:
28 HandleBox(Handle h) : handle_(std::move(h)) {}
29
30 Handle &get() { return handle_; }
31
32private:
33 Handle handle_;
34};
35
36template <typename Func, OpFinishHandleLike HandleBase>
37class HandleBox<ZeroCopyMixin<Func, HandleBase>> {
38public:
39 using Handle = ZeroCopyMixin<Func, HandleBase>;
40 HandleBox(Handle h) : handle_ptr_(new Handle(std::move(h))) {}
41
42 Handle &get() { return *handle_ptr_; }
43
44private:
45 Handle *handle_ptr_;
46};
47
48template <OpFinishHandleLike Handle, typename Func> class OpAwaiterBase {
49public:
50 using HandleType = Handle;
51
52 OpAwaiterBase(HandleBox<Handle> handle, Func func)
53 : prep_func_(func), finish_handle_(std::move(handle)) {}
54 OpAwaiterBase(OpAwaiterBase &&) = default;
55
56 OpAwaiterBase(const OpAwaiterBase &) = delete;
57 OpAwaiterBase &operator=(const OpAwaiterBase &) = delete;
58 OpAwaiterBase &operator=(OpAwaiterBase &&) = delete;
59
60public:
61 HandleType *get_handle() { return &finish_handle_.get(); }
62
63 void init_finish_handle() { /* Leaf node, no-op */ }
64
65 void register_operation(unsigned int flags) {
66 auto &context = Context::current();
67 auto *ring = context.ring();
68
69 context.runtime()->pend_work();
70 io_uring_sqe *sqe = ring->get_sqe();
71 prep_op_(sqe, flags);
72 }
73
74public:
75 bool await_ready() { return false; }
76
77 template <typename PromiseType>
78 void await_suspend(std::coroutine_handle<PromiseType> h) {
79 init_finish_handle();
80 finish_handle_.get().set_invoker(&h.promise());
81 register_operation(0);
82 }
83
84 auto await_resume() { return finish_handle_.get().extract_result(); }
85
86private:
87 void prep_op_(io_uring_sqe *sqe, unsigned int flags) {
88 prep_func_(sqe);
89 sqe->flags |= static_cast<uint8_t>(flags);
90 io_uring_sqe_set_data(
91 sqe, encode_work(&finish_handle_.get(), Handle::work_type));
92 sqe->personality = Context::current().cred_id();
93 }
94
95protected:
96 Func prep_func_;
97 HandleBox<Handle> finish_handle_;
98};
99
100template <typename Func>
101class [[nodiscard]] OpAwaiter : public OpAwaiterBase<OpFinishHandle, Func> {
102public:
103 using Base = OpAwaiterBase<OpFinishHandle, Func>;
104 OpAwaiter(Func func) : Base(OpFinishHandle(), func) {}
105};
106
107template <typename MultiShotFunc, typename Func>
108class [[nodiscard]] MultiShotOpAwaiter
109 : public OpAwaiterBase<MultiShotOpFinishHandle<MultiShotFunc>, Func> {
110public:
111 using Base = OpAwaiterBase<MultiShotOpFinishHandle<MultiShotFunc>, Func>;
112 MultiShotOpAwaiter(MultiShotFunc multishot_func, Func func)
113 : Base(
114 MultiShotOpFinishHandle<MultiShotFunc>(std::move(multishot_func)),
115 func) {}
116};
117
118template <typename FreeFunc, typename Func>
119class [[nodiscard]] ZeroCopyOpAwaiter
120 : public OpAwaiterBase<ZeroCopyOpFinishHandle<FreeFunc>, Func> {
121public:
122 using Base = OpAwaiterBase<ZeroCopyOpFinishHandle<FreeFunc>, Func>;
123 ZeroCopyOpAwaiter(FreeFunc free_func, Func func)
124 : Base(ZeroCopyOpFinishHandle<FreeFunc>(std::move(free_func)), func) {}
125};
126
127template <BufferRingLike Br, typename Func>
128class [[nodiscard]] SelectBufferOpAwaiter
129 : public OpAwaiterBase<SelectBufferOpFinishHandle<Br>, Func> {
130public:
131 using Base = OpAwaiterBase<SelectBufferOpFinishHandle<Br>, Func>;
132 SelectBufferOpAwaiter(Br *buffers, Func func)
133 : Base(SelectBufferOpFinishHandle<Br>(buffers), func) {}
134};
135
136template <typename MultiShotFunc, BufferRingLike Br, typename Func>
137class [[nodiscard]] MultiShotSelectBufferOpAwaiter
138 : public OpAwaiterBase<
139 MultiShotSelectBufferOpFinishHandle<MultiShotFunc, Br>, Func> {
140public:
141 using Base =
142 OpAwaiterBase<MultiShotSelectBufferOpFinishHandle<MultiShotFunc, Br>,
143 Func>;
144 MultiShotSelectBufferOpAwaiter(MultiShotFunc multishot_func, Br *buffers,
145 Func func)
146 : Base(MultiShotSelectBufferOpFinishHandle<MultiShotFunc, Br>(
147 std::move(multishot_func), buffers),
148 func) {}
149};
150
151template <unsigned int Flags, AwaiterLike Awaiter>
152class [[nodiscard]] FlaggedOpAwaiter : public Awaiter {
153public:
154 using Base = Awaiter;
155 FlaggedOpAwaiter(Awaiter awaiter) : Base(std::move(awaiter)) {}
156
157 void register_operation(unsigned int flags) {
158#if IO_URING_CHECK_VERSION(2, 12) // < 2.12
159 if constexpr (Flags & IOSQE_IO_DRAIN) {
160 auto *runtime = Context::current().runtime();
161 // Ensure every operation before drain will complete
162 runtime->notify();
163 }
164#endif
165 Base::register_operation(flags | Flags);
166 }
167
168 template <typename PromiseType>
169 void await_suspend(std::coroutine_handle<PromiseType> h) {
170 Base::init_finish_handle();
171 Base::get_handle()->set_invoker(&h.promise());
172 register_operation(0);
173 }
174};
175
176template <HandleLike Handle, AwaiterLike Awaiter>
177class [[nodiscard]] RangedParallelAwaiterBase {
178public:
179 using HandleType = Handle;
180
181 RangedParallelAwaiterBase(std::vector<Awaiter> awaiters)
182 : awaiters_(std::move(awaiters)) {}
183 RangedParallelAwaiterBase(RangedParallelAwaiterBase &&) = default;
184
185 RangedParallelAwaiterBase(const RangedParallelAwaiterBase &) = delete;
186 RangedParallelAwaiterBase &
187 operator=(const RangedParallelAwaiterBase &) = delete;
188 RangedParallelAwaiterBase &operator=(RangedParallelAwaiterBase &&) = delete;
189
190public:
191 HandleType *get_handle() { return &finish_handle_; }
192
193 void init_finish_handle() {
194 using ChildHandle = typename Awaiter::HandleType;
195 std::vector<ChildHandle *> handles;
196 handles.reserve(awaiters_.size());
197 for (auto &awaiter : awaiters_) {
198 awaiter.init_finish_handle();
199 handles.push_back(awaiter.get_handle());
200 }
201 finish_handle_.init(std::move(handles));
202 }
203
204 void register_operation(unsigned int flags) {
205 for (auto &awaiter : awaiters_) {
206 awaiter.register_operation(flags);
207 }
208 }
209
210public:
211 bool await_ready() const noexcept { return false; }
212
213 template <typename PromiseType>
214 void await_suspend(std::coroutine_handle<PromiseType> h) {
215 init_finish_handle();
216 finish_handle_.set_invoker(&h.promise());
217 register_operation(0);
218 }
219
220 typename Handle::ReturnType await_resume() {
221 return finish_handle_.extract_result();
222 }
223
224public:
225 void push(Awaiter awaiter) { awaiters_.push_back(std::move(awaiter)); }
226
227protected:
228 HandleType finish_handle_;
229 std::vector<Awaiter> awaiters_;
230};
231
241template <typename Awaiter>
242using RangedParallelAllAwaiter = RangedParallelAwaiterBase<
243 RangedParallelAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
244
254template <typename Awaiter>
255using RangedParallelAnyAwaiter = RangedParallelAwaiterBase<
256 RangedParallelAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
257
264template <typename Awaiter>
265using RangedWhenAllAwaiter = RangedParallelAwaiterBase<
266 RangedWhenAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
267
275template <typename Awaiter>
276using RangedWhenAnyAwaiter = RangedParallelAwaiterBase<
277 RangedWhenAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
278
279template <unsigned int Flags, AwaiterLike Awaiter>
280class [[nodiscard]] RangedLinkAwaiterBase
281 : public RangedWhenAllAwaiter<Awaiter> {
282public:
284 using Base::Base;
285
286 void register_operation(unsigned int flags) {
287 auto *ring = Context::current().ring();
288 ring->reserve_space(Base::awaiters_.size());
289 for (int i = 0; i < Base::awaiters_.size() - 1; ++i) {
290 Base::awaiters_[i].register_operation(flags | Flags);
291 }
292 Base::awaiters_.back().register_operation(flags);
293 }
294
295 template <typename PromiseType>
296 void await_suspend(std::coroutine_handle<PromiseType> h) {
297 Base::init_finish_handle();
298 Base::finish_handle_.set_invoker(&h.promise());
299 register_operation(0);
300 }
301};
302
309template <typename Awaiter>
310using RangedLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_LINK, Awaiter>;
311
319template <typename Awaiter>
320using RangedHardLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter>;
321
322template <HandleLike Handle, AwaiterLike... Awaiters>
323class [[nodiscard]] ParallelAwaiterBase {
324public:
325 using HandleType = Handle;
326
327 ParallelAwaiterBase(Awaiters... awaiters)
328 : awaiters_(std::move(awaiters)...) {}
329 ParallelAwaiterBase(ParallelAwaiterBase &&) = default;
330 template <typename ParallelAwaiter, AwaiterLike New>
331 ParallelAwaiterBase(ParallelAwaiter &&aws, New new_awaiter)
332 : awaiters_(std::tuple_cat(std::move(aws.awaiters_),
333 std::make_tuple(std::move(new_awaiter)))) {}
334
335 ParallelAwaiterBase(const ParallelAwaiterBase &) = delete;
336 ParallelAwaiterBase &operator=(const ParallelAwaiterBase &) = delete;
337 ParallelAwaiterBase &operator=(ParallelAwaiterBase &&) = delete;
338
339public:
340 HandleType *get_handle() { return &finish_handle_; }
341
342 void init_finish_handle() {
343 auto handles = foreach_init_finish_handle_();
344 static_assert(std::tuple_size<decltype(handles)>::value ==
345 sizeof...(Awaiters),
346 "Number of handles must match number of awaiters");
347 std::apply(
348 [this](auto &&...handle_ptrs) {
349 finish_handle_.init(handle_ptrs...);
350 },
351 handles);
352 }
353
354 void register_operation(unsigned int flags) {
355 foreach_register_operation_(flags);
356 }
357
358public:
359 bool await_ready() const noexcept { return false; }
360
361 template <typename PromiseType>
362 void await_suspend(std::coroutine_handle<PromiseType> h) {
363 init_finish_handle();
364 finish_handle_.set_invoker(&h.promise());
365 register_operation(0);
366 }
367
368 typename Handle::ReturnType await_resume() {
369 return finish_handle_.extract_result();
370 }
371
372private:
373 template <size_t Idx = 0> auto foreach_init_finish_handle_() {
374 if constexpr (Idx < sizeof...(Awaiters)) {
375 std::get<Idx>(awaiters_).init_finish_handle();
376 return std::tuple_cat(
377 std::make_tuple(std::get<Idx>(awaiters_).get_handle()),
378 foreach_init_finish_handle_<Idx + 1>());
379 } else {
380 return std::tuple<>();
381 }
382 }
383
384 template <size_t Idx = 0>
385 void foreach_register_operation_(unsigned int flags) {
386 if constexpr (Idx < sizeof...(Awaiters)) {
387 std::get<Idx>(awaiters_).register_operation(flags);
388 foreach_register_operation_<Idx + 1>(flags);
389 }
390 }
391
392protected:
393 HandleType finish_handle_;
394 std::tuple<Awaiters...> awaiters_;
395
396 // Make awaiters_ accessible to all template instantiations
397 template <HandleLike, AwaiterLike...> friend class ParallelAwaiterBase;
398};
399
409template <typename... Awaiter>
410using ParallelAllAwaiter = ParallelAwaiterBase<
411 ParallelAllFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
412
422template <typename... Awaiter>
423using ParallelAnyAwaiter = ParallelAwaiterBase<
424 ParallelAnyFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
425
432template <typename... Awaiter>
434 ParallelAwaiterBase<WhenAllFinishHandle<typename Awaiter::HandleType...>,
435 Awaiter...>;
436
444template <typename... Awaiter>
446 ParallelAwaiterBase<WhenAnyFinishHandle<typename Awaiter::HandleType...>,
447 Awaiter...>;
448
449template <unsigned int Flags, AwaiterLike... Awaiter>
450class [[nodiscard]] LinkAwaiterBase : public WhenAllAwaiter<Awaiter...> {
451public:
452 using Base = WhenAllAwaiter<Awaiter...>;
453 using Base::Base;
454
455 void register_operation(unsigned int flags) {
456 auto *ring = Context::current().ring();
457 ring->reserve_space(sizeof...(Awaiter));
458 foreach_register_operation_(flags);
459 }
460
461 template <typename PromiseType>
462 void await_suspend(std::coroutine_handle<PromiseType> h) {
463 Base::init_finish_handle();
464 Base::finish_handle_.set_invoker(&h.promise());
465 register_operation(0);
466 }
467
468private:
469 template <size_t Idx = 0>
470 void foreach_register_operation_(unsigned int flags) {
471 if constexpr (Idx < sizeof...(Awaiter)) {
472 std::get<Idx>(Base::awaiters_)
473 .register_operation(Idx < sizeof...(Awaiter) - 1 ? flags | Flags
474 : flags);
475 foreach_register_operation_<Idx + 1>(flags);
476 }
477 }
478};
479
486template <typename... Awaiter>
487using LinkAwaiter = LinkAwaiterBase<IOSQE_IO_LINK, Awaiter...>;
488
495template <typename... Awaiter>
496using HardLinkAwaiter = LinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter...>;
497
498} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:28
RangedParallelAwaiterBase< RangedParallelAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:242
ParallelAwaiterBase< ParallelAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAllAwaiter
Awaiter to wait for all operations to complete in parallel.
Definition awaiters.hpp:410
RangedLinkAwaiterBase< IOSQE_IO_LINK, Awaiter > RangedLinkAwaiter
Awaiter that links multiple operations in a range using IO_LINK.
Definition awaiters.hpp:310
RangedParallelAwaiterBase< RangedWhenAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:276
RangedParallelAwaiterBase< RangedParallelAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:255
LinkAwaiterBase< IOSQE_IO_LINK, Awaiter... > LinkAwaiter
Awaiter that links multiple operations using IO_LINK.
Definition awaiters.hpp:487
RangedParallelAwaiterBase< RangedWhenAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:265
ParallelAwaiterBase< WhenAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAllAwaiter
Awaiter that waits for all operations to complete in parallel.
Definition awaiters.hpp:433
ParallelAwaiterBase< WhenAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAnyAwaiter
Awaiter that waits for any operation to complete in parallel.
Definition awaiters.hpp:445
ParallelAwaiterBase< ParallelAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAnyAwaiter
Awaiter to wait for any operation to complete in parallel.
Definition awaiters.hpp:423
LinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter... > HardLinkAwaiter
Awaiter that links multiple operations using IO_HARDLINK.
Definition awaiters.hpp:496
RangedLinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter > RangedHardLinkAwaiter
Awaiter that links multiple operations in a range using IO_HARDLINK.
Definition awaiters.hpp:320
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.