18#include "condy/type_traits.hpp"
28template <OpFinishHandleLike Handle>
class HandleBox {
30 HandleBox(Handle h) : handle_(std::move(h)) {}
32 Handle &get() noexcept {
return handle_; }
34 void maybe_release() noexcept { }
40template <
typename Func, OpFinishHandleLike HandleBase>
41class HandleBox<ZeroCopyMixin<Func, HandleBase>> {
43 using Handle = ZeroCopyMixin<Func, HandleBase>;
44 HandleBox(Handle h) : handle_ptr_(std::make_unique<Handle>(std::move(h))) {}
45 HandleBox(
const HandleBox &other)
46 : handle_ptr_(std::make_unique<Handle>(*other.handle_ptr_)) {}
48 Handle &get() noexcept {
return *handle_ptr_; }
50 void maybe_release() noexcept { handle_ptr_.release(); }
53 std::unique_ptr<Handle> handle_ptr_;
56template <OpFinishHandleLike Handle, PrepFuncLike Func>
class OpAwaiterBase {
58 using HandleType = Handle;
60 OpAwaiterBase(HandleBox<Handle> handle, Func func)
61 : prep_func_(func), finish_handle_(std::move(handle)) {}
64 HandleType *get_handle() noexcept {
return &finish_handle_.get(); }
66 void init_finish_handle() noexcept { }
68 void register_operation(
unsigned int flags)
noexcept {
69 auto &context = detail::Context::current();
70 auto *ring = context.ring();
72 context.runtime()->pend_work();
74 io_uring_sqe *sqe = prep_func_(ring);
75 assert(sqe &&
"prep_func must return a valid sqe");
76 sqe->flags |=
static_cast<uint8_t
>(flags);
77 io_uring_sqe_set_data(
78 sqe, encode_work(&finish_handle_.get(), WorkType::Common));
82 bool await_ready() const noexcept {
return false; }
84 template <
typename PromiseType>
85 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept {
87 finish_handle_.get().set_invoker(&h.promise());
88 register_operation(0);
91 auto await_resume() noexcept {
92 auto result = finish_handle_.get().extract_result();
93 finish_handle_.maybe_release();
99 HandleBox<Handle> finish_handle_;
102template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler>
103class [[nodiscard]] OpAwaiter
104 :
public OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc> {
106 using Base = OpAwaiterBase<OpFinishHandle<CQEHandler>, PrepFunc>;
107 template <
typename... Args>
108 OpAwaiter(PrepFunc func, Args &&...args)
110 OpFinishHandle<CQEHandler>(std::forward<Args>(args)...)),
114template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler,
115 typename MultiShotFunc>
116class [[nodiscard]] MultiShotOpAwaiter
117 :
public OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
121 OpAwaiterBase<MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>,
123 template <
typename... Args>
124 MultiShotOpAwaiter(PrepFunc func, MultiShotFunc multishot_func,
126 : Base(HandleBox(MultiShotOpFinishHandle<CQEHandler, MultiShotFunc>(
127 std::move(multishot_func), std::forward<Args>(args)...)),
131template <PrepFuncLike PrepFunc, CQEHandlerLike CQEHandler,
typename FreeFunc>
132class [[nodiscard]] ZeroCopyOpAwaiter
133 :
public OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>,
137 OpAwaiterBase<ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>, PrepFunc>;
138 template <
typename... Args>
139 ZeroCopyOpAwaiter(PrepFunc func, FreeFunc free_func, Args &&...args)
140 : Base(HandleBox(ZeroCopyOpFinishHandle<CQEHandler, FreeFunc>(
141 std::move(free_func), std::forward<Args>(args)...)),
145template <
unsigned int Flags, AwaiterLike Awaiter>
146class [[nodiscard]] FlaggedOpAwaiter :
public Awaiter {
148 using Base = Awaiter;
149 FlaggedOpAwaiter(Awaiter awaiter) : Base(std::move(awaiter)) {}
151 void register_operation(
unsigned int flags)
noexcept(
152 is_nothrow_suspendible_v<Awaiter>) {
153 Base::register_operation(flags | Flags);
156 template <
typename PromiseType>
157 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept(
158 is_nothrow_suspendible_v<Awaiter>) {
159 Base::init_finish_handle();
160 Base::get_handle()->set_invoker(&h.promise());
161 register_operation(0);
165template <HandleLike Handle, AwaiterLike Awaiter>
166class [[nodiscard]] RangedParallelAwaiterBase {
168 using HandleType = Handle;
170 RangedParallelAwaiterBase(std::vector<Awaiter> awaiters)
171 : awaiters_(std::move(awaiters)) {}
174 HandleType *get_handle() noexcept {
return &finish_handle_; }
176 void init_finish_handle() noexcept {
177 using ChildHandle =
typename Awaiter::HandleType;
178 std::vector<ChildHandle *> handles;
179 handles.reserve(awaiters_.size());
180 for (
auto &awaiter : awaiters_) {
181 awaiter.init_finish_handle();
182 handles.push_back(awaiter.get_handle());
184 finish_handle_.init(std::move(handles));
187 void register_operation(
unsigned int flags)
noexcept(
188 is_nothrow_suspendible_v<Awaiter>) {
189 for (
auto &awaiter : awaiters_) {
190 awaiter.register_operation(flags);
195 bool await_ready() const noexcept {
return awaiters_.empty(); }
197 template <
typename PromiseType>
198 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept(
199 is_nothrow_suspendible_v<Awaiter>) {
200 init_finish_handle();
201 finish_handle_.set_invoker(&h.promise());
202 register_operation(0);
205 typename Handle::ReturnType
206 await_resume() noexcept(is_nothrow_extract_result_v<Handle>) {
207 return finish_handle_.extract_result();
211 void push(Awaiter awaiter) { awaiters_.push_back(std::move(awaiter)); }
214 HandleType finish_handle_;
215 std::vector<Awaiter> awaiters_;
227template <
typename Awaiter>
229 RangedParallelAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
240template <
typename Awaiter>
242 RangedParallelAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
250template <
typename Awaiter>
252 RangedWhenAllFinishHandle<typename Awaiter::HandleType>, Awaiter>;
262template <
typename Awaiter>
264 RangedWhenAnyFinishHandle<typename Awaiter::HandleType>, Awaiter>;
266template <
unsigned int Flags, AwaiterLike Awaiter>
267class [[nodiscard]] RangedLinkAwaiterBase
273 void register_operation(
unsigned int flags)
noexcept(
274 is_nothrow_suspendible_v<Awaiter>) {
275 auto *ring = detail::Context::current().ring();
276 ring->reserve_space(Base::awaiters_.size());
277 for (
int i = 0; i < Base::awaiters_.size() - 1; ++i) {
278 Base::awaiters_[i].register_operation(flags | Flags);
280 Base::awaiters_.back().register_operation(flags);
283 template <
typename PromiseType>
284 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept(
285 is_nothrow_suspendible_v<Awaiter>) {
286 Base::init_finish_handle();
287 Base::finish_handle_.set_invoker(&h.promise());
288 register_operation(0);
298template <
typename Awaiter>
308template <
typename Awaiter>
311template <HandleLike Handle, AwaiterLike... Awaiters>
312class [[nodiscard]] ParallelAwaiterBase {
314 using HandleType = Handle;
316 ParallelAwaiterBase(Awaiters... awaiters)
317 : awaiters_(std::move(awaiters)...) {}
318 template <
typename ParallelAwaiter, AwaiterLike New>
319 ParallelAwaiterBase(ParallelAwaiter &&aws, New new_awaiter)
320 : awaiters_(std::tuple_cat(std::move(aws.awaiters_),
321 std::make_tuple(std::move(new_awaiter)))) {}
324 HandleType *get_handle()
noexcept {
return &finish_handle_; }
326 void init_finish_handle()
noexcept {
327 auto handles = foreach_init_finish_handle_();
328 static_assert(std::tuple_size<
decltype(handles)>::value ==
330 "Number of handles must match number of awaiters");
332 [
this](
auto &&...handle_ptrs) {
333 finish_handle_.init(handle_ptrs...);
338 void register_operation(
unsigned int flags)
noexcept(
339 (is_nothrow_suspendible_v<Awaiters> && ...)) {
340 foreach_register_operation_(flags);
344 bool await_ready()
const noexcept {
return false; }
346 template <
typename PromiseType>
347 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept(
348 (is_nothrow_suspendible_v<Awaiters> && ...)) {
349 init_finish_handle();
350 finish_handle_.set_invoker(&h.promise());
351 register_operation(0);
354 typename Handle::ReturnType
355 await_resume()
noexcept(is_nothrow_extract_result_v<Handle>) {
356 return finish_handle_.extract_result();
360 template <
size_t Idx = 0>
auto foreach_init_finish_handle_()
noexcept {
361 if constexpr (Idx <
sizeof...(Awaiters)) {
362 std::get<Idx>(awaiters_).init_finish_handle();
363 return std::tuple_cat(
364 std::make_tuple(std::get<Idx>(awaiters_).get_handle()),
365 foreach_init_finish_handle_<Idx + 1>());
367 return std::tuple<>();
371 template <
size_t Idx = 0>
372 void foreach_register_operation_(
unsigned int flags)
noexcept(
373 (is_nothrow_suspendible_v<Awaiters> && ...)) {
374 if constexpr (Idx <
sizeof...(Awaiters)) {
375 std::get<Idx>(awaiters_).register_operation(flags);
376 foreach_register_operation_<Idx + 1>(flags);
381 HandleType finish_handle_;
382 std::tuple<Awaiters...> awaiters_;
385 template <HandleLike, AwaiterLike...>
friend class ParallelAwaiterBase;
397template <
typename... Awaiter>
399 ParallelAllFinishHandle<
typename Awaiter::HandleType...>, Awaiter...>;
410template <
typename... Awaiter>
412 ParallelAnyFinishHandle<
typename Awaiter::HandleType...>, Awaiter...>;
420template <
typename... Awaiter>
422 ParallelAwaiterBase<WhenAllFinishHandle<
typename Awaiter::HandleType...>,
432template <
typename... Awaiter>
434 ParallelAwaiterBase<WhenAnyFinishHandle<
typename Awaiter::HandleType...>,
437template <
unsigned int Flags, AwaiterLike... Awaiter>
438class [[nodiscard]] LinkAwaiterBase :
public WhenAllAwaiter<Awaiter...> {
443 void register_operation(
unsigned int flags)
noexcept(
444 (is_nothrow_suspendible_v<Awaiter> && ...)) {
445 auto *ring = detail::Context::current().ring();
446 ring->reserve_space(
sizeof...(Awaiter));
447 foreach_register_operation_(flags);
450 template <
typename PromiseType>
451 void await_suspend(std::coroutine_handle<PromiseType> h)
noexcept(
452 (is_nothrow_suspendible_v<Awaiter> && ...)) {
453 Base::init_finish_handle();
454 Base::finish_handle_.set_invoker(&h.promise());
455 register_operation(0);
459 template <
size_t Idx = 0>
460 void foreach_register_operation_(
unsigned int flags)
noexcept(
461 (is_nothrow_suspendible_v<Awaiter> && ...)) {
462 if constexpr (Idx <
sizeof...(Awaiter)) {
463 std::get<Idx>(Base::awaiters_)
464 .register_operation(Idx <
sizeof...(Awaiter) - 1 ? flags | Flags
466 foreach_register_operation_<Idx + 1>(flags);
477template <
typename... Awaiter>
486template <
typename... Awaiter>
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
RangedParallelAwaiterBase< RangedParallelAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAllAwaiter
Awaiter to wait for all operations in a range to complete.
ParallelAwaiterBase< ParallelAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAllAwaiter
Awaiter to wait for all operations to complete in parallel.
RangedLinkAwaiterBase< IOSQE_IO_LINK, Awaiter > RangedLinkAwaiter
Awaiter that links multiple operations in a range using IO_LINK.
RangedParallelAwaiterBase< RangedWhenAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAnyAwaiter
Awaiter to wait for any operation in a range to complete.
RangedParallelAwaiterBase< RangedParallelAnyFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedParallelAnyAwaiter
Awaiter to wait for any operation in a range to complete.
LinkAwaiterBase< IOSQE_IO_LINK, Awaiter... > LinkAwaiter
Awaiter that links multiple operations using IO_LINK.
RangedParallelAwaiterBase< RangedWhenAllFinishHandle< typename Awaiter::HandleType >, Awaiter > RangedWhenAllAwaiter
Awaiter to wait for all operations in a range to complete.
ParallelAwaiterBase< WhenAllFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAllAwaiter
Awaiter that waits for all operations to complete in parallel.
ParallelAwaiterBase< WhenAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > WhenAnyAwaiter
Awaiter that waits for any operation to complete in parallel.
ParallelAwaiterBase< ParallelAnyFinishHandle< typename Awaiter::HandleType... >, Awaiter... > ParallelAnyAwaiter
Awaiter to wait for any operation to complete in parallel.
LinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter... > HardLinkAwaiter
Awaiter that links multiple operations using IO_HARDLINK.
RangedLinkAwaiterBase< IOSQE_IO_HARDLINK, Awaiter > RangedHardLinkAwaiter
Awaiter that links multiple operations in a range using IO_HARDLINK.
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.