Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
awaiters.hpp
Go to the documentation of this file.
1
9
10#pragma once
11
12#include "condy/concepts.hpp"
13#include "condy/condy_uring.hpp"
14#include "condy/context.hpp"
16#include "condy/ring.hpp"
17#include "condy/runtime.hpp"
18#include "condy/type_traits.hpp"
19#include "condy/work_type.hpp"
20#include <coroutine>
21#include <cstddef>
22#include <cstdint>
23#include <memory>
24#include <tuple>
25
26namespace condy {
27
28template <OpFinishHandleLike Handle> class HandleBox {
29public:
30 HandleBox(Handle h) : handle_(std::move(h)) {}
31
32 Handle &get() noexcept { return handle_; }
33
34 void maybe_release() noexcept { /* No-op */ }
35
36private:
37 Handle handle_;
38};
39
40template <typename Func, OpFinishHandleLike HandleBase>
41class HandleBox<ZeroCopyMixin<Func, HandleBase>> {
42public:
43 using Handle = ZeroCopyMixin<Func, HandleBase>;
44 HandleBox(Handle h) : handle_ptr_(std::make_unique<Handle>(std::move(h))) {}
45 HandleBox(const HandleBox &other) // Deep copy
46 : handle_ptr_(std::make_unique<Handle>(*other.handle_ptr_)) {}
47
48 Handle &get() noexcept { return *handle_ptr_; }
49
50 void maybe_release() noexcept { handle_ptr_.release(); }
51
52private:
53 std::unique_ptr<Handle> handle_ptr_;
54};
55
56template <OpFinishHandleLike Handle, PrepFuncLike Func> class OpAwaiterBase {
57public:
58 using HandleType = Handle;
59
60 OpAwaiterBase(HandleBox<Handle> handle, Func func)
61 : prep_func_(func), finish_handle_(std::move(handle)) {}
62
63public:
64 HandleType *get_handle() noexcept { return &finish_handle_.get(); }
65
66 void init_finish_handle() noexcept { /* Leaf node, no-op */ }
67
68 void register_operation(unsigned int flags) noexcept {
69 auto &context = detail::Context::current();
70 auto *ring = context.ring();
71
72 context.runtime()->pend_work();
73
74 io_uring_sqe *sqe = prep_func_(ring);
75 assert(sqe && "prep_func must return a valid sqe");
76 sqe->flags |= static_cast<uint8_t>(flags);
77 io_uring_sqe_set_data(
78 sqe, encode_work(&finish_handle_.get(), WorkType::Common));
79 }
80
81public:
82 bool await_ready() const noexcept { return false; }
83
84 template <typename PromiseType>
85 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
86 init_finish_handle();
87 finish_handle_.get().set_invoker(&h.promise());
88 register_operation(0);
89 }
90
91 auto await_resume() noexcept {
92 auto result = finish_handle_.get().extract_result();
93 finish_handle_.maybe_release();
94 return result;
95 }
96
97protected:
98 Func prep_func_;
99 HandleBox<Handle> finish_handle_;
100};
101
102template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler>
103class [[nodiscard]] OpAwaiter
104 : public OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc> {
105public:
106 using Base = OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc>;
107 template <typename... Args>
108 OpAwaiter(PrepFunc func, Args &&...args)
109 : Base(HandleBox(
110 OpFinishHandle<CQEHandler>(std::forward<Args>(args)...)),
111 std::move(func)) {}
112};
113
114template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler,
115 typename MultiShotFunc>
116class [[nodiscard]] MultiShotOpAwaiter
117 : public OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
118 PrepFunc> {
119public:
120 using Base =
121 OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
122 PrepFunc>;
123 template <typename... Args>
124 MultiShotOpAwaiter(PrepFunc func, MultiShotFunc multishot_func,
125 Args &&...args)
126 : Base(HandleBox(MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>(
127 std::move(multishot_func), std::forward<Args>(args)...)),
128 std::move(func)) {}
129};
130
131template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler, typename FreeFunc>
132class [[nodiscard]] ZeroCopyOpAwaiter
133 : public OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>,
134 PrepFunc> {
135public:
136 using Base =
137 OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>, PrepFunc>;
138 template <typename... Args>
139 ZeroCopyOpAwaiter(PrepFunc func, FreeFunc free_func, Args &&...args)
140 : Base(HandleBox(ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>(
141 std::move(free_func), std::forward<Args>(args)...)),
142 std::move(func)) {}
143};
144
145template <unsigned int Flags, AwaiterLike Awaiter>
146class [[nodiscard]] FlaggedOpAwaiter : public Awaiter {
147public:
148 using Base = Awaiter;
149 FlaggedOpAwaiter(Awaiter awaiter) : Base(std::move(awaiter)) {}
150
151 void register_operation(unsigned int flags) noexcept(
152 is_nothrow_suspendible_v<Awaiter>) {
153 Base::register_operation(flags | Flags);
154 }
155
156 template <typename PromiseType>
157 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept(
158 is_nothrow_suspendible_v<Awaiter>) {
159 Base::init_finish_handle();
160 Base::get_handle()->set_invoker(&h.promise());
161 register_operation(0);
162 }
163};
164
165template <HandleLike Handle, AwaiterLike Awaiter>
166class [[nodiscard]] RangedParallelAwaiterBase {
167public:
168 using HandleType = Handle;
169
170 RangedParallelAwaiterBase(std::vector<Awaiter> awaiters)
171 : awaiters_(std::move(awaiters)) {}
172
173public:
174 HandleType *get_handle() noexcept { return &finish_handle_; }
175
176 void init_finish_handle() noexcept {
177 using ChildHandle = typename Awaiter::HandleType;
178 std::vector<ChildHandle *> handles;
179 handles.reserve(awaiters_.size());
180 for (auto &awaiter : awaiters_) {
181 awaiter.init_finish_handle();
182 handles.push_back(awaiter.get_handle());
183 }
184 finish_handle_.init(std::move(handles));
185 }
186
187 void register_operation(unsigned int flags) noexcept(
188 is_nothrow_suspendible_v<Awaiter>) {
189 for (auto &awaiter : awaiters_) {
190 awaiter.register_operation(flags);
191 }
192 }
193
194public:
195 bool await_ready() const noexcept { return awaiters_.empty(); }
196
197 template <typename PromiseType>
198 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept(
199 is_nothrow_suspendible_v<Awaiter>) {
200 init_finish_handle();
201 finish_handle_.set_invoker(&h.promise());
202 register_operation(0);
203 }
204
205 typename Handle::ReturnType
206 await_resume() noexcept(is_nothrow_extract_result_v<Handle>) {
207 return finish_handle_.extract_result();
208 }
209
210public:
211 void push(Awaiter awaiter) { awaiters_.push_back(std::move(awaiter)); }
212
213protected:
214 HandleType finish_handle_;
215 std::vector<Awaiter> awaiters_;
216};
217
227template <typename Awaiter>
228using RangedParallelAllAwaiter = RangedParallelAwaiterBase<
229 RangedParallelAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
230
240template <typename Awaiter>
241using RangedParallelAnyAwaiter = RangedParallelAwaiterBase<
242 RangedParallelAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
243
250template <typename Awaiter>
251using RangedWhenAllAwaiter = RangedParallelAwaiterBase<
252 RangedWhenAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
253
262template <typename Awaiter>
263using RangedWhenAnyAwaiter = RangedParallelAwaiterBase<
264 RangedWhenAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
265
266template <unsigned int Flags, AwaiterLike Awaiter>
267class [[nodiscard]] RangedLinkAwaiterBase
268 : public RangedWhenAllAwaiter<Awaiter> {
269public:
271 using Base::Base;
272
273 void register_operation(unsigned int flags) noexcept(
274 is_nothrow_suspendible_v<Awaiter>) {
275 auto *ring = detail::Context::current().ring();
276 ring->reserve_space(Base::awaiters_.size());
277 for (int i = 0; i < Base::awaiters_.size() - 1; ++i) {
278 Base::awaiters_[i].register_operation(flags | Flags);
279 }
280 Base::awaiters_.back().register_operation(flags);
281 }
282
283 template <typename PromiseType>
284 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept(
285 is_nothrow_suspendible_v<Awaiter>) {
286 Base::init_finish_handle();
287 Base::finish_handle_.set_invoker(&h.promise());
288 register_operation(0);
289 }
290};
291
298template <typename Awaiter>
299using RangedLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_LINK, Awaiter>;
300
308template <typename Awaiter>
309using RangedHardLinkAwaiter = RangedLinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter>;
310
311template <HandleLike Handle, AwaiterLike... Awaiters>
312class [[nodiscard]] ParallelAwaiterBase {
313public:
314 using HandleType = Handle;
315
316 ParallelAwaiterBase(Awaiters... awaiters)
317 : awaiters_(std::move(awaiters)...) {}
318 template <typename ParallelAwaiter, AwaiterLike New>
319 ParallelAwaiterBase(ParallelAwaiter &&aws, New new_awaiter)
320 : awaiters_(std::tuple_cat(std::move(aws.awaiters_),
321 std::make_tuple(std::move(new_awaiter)))) {}
322
323public:
324 HandleType *get_handle() noexcept { return &finish_handle_; }
325
326 void init_finish_handle() noexcept {
327 auto handles = foreach_init_finish_handle_();
328 static_assert(std::tuple_size<decltype(handles)>::value ==
329 sizeof...(Awaiters),
330 "Number of handles must match number of awaiters");
331 std::apply(
332 [this](auto &&...handle_ptrs) {
333 finish_handle_.init(handle_ptrs...);
334 },
335 handles);
336 }
337
338 void register_operation(unsigned int flags) noexcept(
339 (is_nothrow_suspendible_v<Awaiters> && ...)) {
340 foreach_register_operation_(flags);
341 }
342
343public:
344 bool await_ready() const noexcept { return false; }
345
346 template <typename PromiseType>
347 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept(
348 (is_nothrow_suspendible_v<Awaiters> && ...)) {
349 init_finish_handle();
350 finish_handle_.set_invoker(&h.promise());
351 register_operation(0);
352 }
353
354 typename Handle::ReturnType
355 await_resume() noexcept(is_nothrow_extract_result_v<Handle>) {
356 return finish_handle_.extract_result();
357 }
358
359private:
360 template <size_t Idx = 0> auto foreach_init_finish_handle_() noexcept {
361 if constexpr (Idx < sizeof...(Awaiters)) {
362 std::get<Idx>(awaiters_).init_finish_handle();
363 return std::tuple_cat(
364 std::make_tuple(std::get<Idx>(awaiters_).get_handle()),
365 foreach_init_finish_handle_<Idx + 1>());
366 } else {
367 return std::tuple<>();
368 }
369 }
370
371 template <size_t Idx = 0>
372 void foreach_register_operation_(unsigned int flags) noexcept(
373 (is_nothrow_suspendible_v<Awaiters> && ...)) {
374 if constexpr (Idx < sizeof...(Awaiters)) {
375 std::get<Idx>(awaiters_).register_operation(flags);
376 foreach_register_operation_<Idx + 1>(flags);
377 }
378 }
379
380protected:
381 HandleType finish_handle_;
382 std::tuple<Awaiters...> awaiters_;
383
384 // Make awaiters_ accessible to all template instantiations
385 template <HandleLike, AwaiterLike...> friend class ParallelAwaiterBase;
386};
387
397template <typename... Awaiter>
398using ParallelAllAwaiter = ParallelAwaiterBase<
399 ParallelAllFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
400
410template <typename... Awaiter>
411using ParallelAnyAwaiter = ParallelAwaiterBase<
412 ParallelAnyFinishHandle<typename Awaiter::HandleType...>, Awaiter...>;
413
420template <typename... Awaiter>
422 ParallelAwaiterBase<WhenAllFinishHandle<typename Awaiter::HandleType...>,
423 Awaiter...>;
424
432template <typename... Awaiter>
434 ParallelAwaiterBase<WhenAnyFinishHandle<typename Awaiter::HandleType...>,
435 Awaiter...>;
436
437template <unsigned int Flags, AwaiterLike... Awaiter>
438class [[nodiscard]] LinkAwaiterBase : public WhenAllAwaiter<Awaiter...> {
439public:
440 using Base = WhenAllAwaiter<Awaiter...>;
441 using Base::Base;
442
443 void register_operation(unsigned int flags) noexcept(
444 (is_nothrow_suspendible_v<Awaiter> && ...)) {
445 auto *ring = detail::Context::current().ring();
446 ring->reserve_space(sizeof...(Awaiter));
447 foreach_register_operation_(flags);
448 }
449
450 template <typename PromiseType>
451 void await_suspend(std::coroutine_handle<PromiseType> h) noexcept(
452 (is_nothrow_suspendible_v<Awaiter> && ...)) {
453 Base::init_finish_handle();
454 Base::finish_handle_.set_invoker(&h.promise());
455 register_operation(0);
456 }
457
458private:
459 template <size_t Idx = 0>
460 void foreach_register_operation_(unsigned int flags) noexcept(
461 (is_nothrow_suspendible_v<Awaiter> && ...)) {
462 if constexpr (Idx < sizeof...(Awaiter)) {
463 std::get<Idx>(Base::awaiters_)
464 .register_operation(Idx < sizeof...(Awaiter) - 1 ? flags | Flags
465 : flags);
466 foreach_register_operation_<Idx + 1>(flags);
467 }
468 }
469};
470
477template <typename... Awaiter>
478using LinkAwaiter = LinkAwaiterBase<IOSQE_IO_LINK, Awaiter...>;
479
486template <typename... Awaiter>
487using HardLinkAwaiter = LinkAwaiterBase<IOSQE_IO_HARDLINK, Awaiter...>;
488
489} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:28
RangedParallelAwaiterBase< RangedParallelAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:228
ParallelAwaiterBase< ParallelAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAllAwaiter
Awaiter to wait for all operations to complete in parallel.
Definition awaiters.hpp:398
RangedLinkAwaiterBase< IOSQE_IO_LINK, Awaiter > RangedLinkAwaiter
Awaiter that links multiple operations in a range using IO_LINK.
Definition awaiters.hpp:299
RangedParallelAwaiterBase< RangedWhenAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:263
RangedParallelAwaiterBase< RangedParallelAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAnyAwaiter
Awaiter to wait for any operation in a range to complete.
Definition awaiters.hpp:241
LinkAwaiterBase< IOSQE_IO_LINK, Awaiter... > LinkAwaiter
Awaiter that links multiple operations using IO_LINK.
Definition awaiters.hpp:478
RangedParallelAwaiterBase< RangedWhenAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAllAwaiter
Awaiter to wait for all operations in a range to complete.
Definition awaiters.hpp:251
ParallelAwaiterBase< WhenAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAllAwaiter
Awaiter that waits for all operations to complete in parallel.
Definition awaiters.hpp:421
ParallelAwaiterBase< WhenAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAnyAwaiter
Awaiter that waits for any operation to complete in parallel.
Definition awaiters.hpp:433
ParallelAwaiterBase< ParallelAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAnyAwaiter
Awaiter to wait for any operation to complete in parallel.
Definition awaiters.hpp:411
LinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter... > HardLinkAwaiter
Awaiter that links multiple operations using IO_HARDLINK.
Definition awaiters.hpp:487
RangedLinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter > RangedHardLinkAwaiter
Awaiter that links multiple operations in a range using IO_HARDLINK.
Definition awaiters.hpp:309
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.