Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
finish_handles.hpp
Go to the documentation of this file.
1
8
9#pragma once
10
11#include "condy/concepts.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include <array>
15#include <cassert>
16#include <cerrno>
17#include <cstddef>
18#include <tuple>
19#include <utility>
20#include <variant>
21#include <vector>
22
23namespace condy {
24
25template <CQEHandlerLike CQEHandler>
26class OpFinishHandle : public OpFinishHandleBase {
27public:
28 using ReturnType = typename CQEHandler::ReturnType;
29
30 template <typename... Args>
31 OpFinishHandle(Args &&...args) : cqe_handler_(std::forward<Args>(args)...) {
32 this->handle_func_ = handle_static_;
33 }
34
35 ReturnType extract_result() noexcept {
36 return cqe_handler_.extract_result();
37 }
38
39 void cancel(Runtime *runtime) noexcept {
40 assert(runtime != nullptr);
41 runtime->cancel(this);
42 }
43
44private:
45 static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept {
46 auto *self = static_cast<OpFinishHandle *>(data);
47 return self->handle_impl_(cqe);
48 }
49
50 bool handle_impl_(io_uring_cqe *cqe) noexcept {
51 cqe_handler_.handle_cqe(cqe);
52 assert(invoker_ != nullptr);
53 (*invoker_)();
54 return true;
55 }
56
57protected:
58 CQEHandler cqe_handler_;
59};
60
61template <CQEHandlerLike CQEHandler, typename Func>
62class MultiShotOpFinishHandle : public OpFinishHandle<CQEHandler> {
63public:
64 template <typename... Args>
65 MultiShotOpFinishHandle(Func func, Args &&...args)
66 : OpFinishHandle<CQEHandler>(std::forward<Args>(args)...),
67 func_(std::move(func)) {
68 this->handle_func_ = handle_static_;
69 }
70
71private:
72 static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept {
73 auto *self = static_cast<MultiShotOpFinishHandle *>(data);
74 return self->handle_impl_(cqe);
75 }
76
77 bool handle_impl_(io_uring_cqe *cqe) noexcept
78 /* fake override */ {
79 if (cqe->flags & IORING_CQE_F_MORE) {
80 this->cqe_handler_.handle_cqe(cqe);
81 func_(this->cqe_handler_.extract_result());
82 return false;
83 } else {
84 this->cqe_handler_.handle_cqe(cqe);
85 assert(this->invoker_ != nullptr);
86 (*this->invoker_)();
87 return true;
88 }
89 }
90
91protected:
92 Func func_;
93};
94
95template <CQEHandlerLike CQEHandler, typename Func>
96class ZeroCopyOpFinishHandle : public OpFinishHandle<CQEHandler> {
97public:
98 template <typename... Args>
99 ZeroCopyOpFinishHandle(Func func, Args &&...args)
100 : OpFinishHandle<CQEHandler>(std::forward<Args>(args)...),
101 free_func_(std::move(func)) {
102 this->handle_func_ = handle_static_;
103 }
104
105private:
106 static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept {
107 auto *self = static_cast<ZeroCopyOpFinishHandle *>(data);
108 return self->handle_impl_(cqe);
109 }
110
111 bool handle_impl_(io_uring_cqe *cqe) noexcept
112 /* fake override */ {
113 if (cqe->flags & IORING_CQE_F_MORE) {
114 this->cqe_handler_.handle_cqe(cqe);
115 assert(this->invoker_ != nullptr);
116 (*this->invoker_)();
117 return false;
118 } else {
119 if (cqe->flags & IORING_CQE_F_NOTIF) {
120 notify_(cqe->res);
121 return true;
122 } else {
123 // Only one cqe means the operation is finished without
124 // notification. This is rare but possible.
125 // https://github.com/axboe/liburing/issues/1462
126 this->cqe_handler_.handle_cqe(cqe);
127 assert(this->invoker_ != nullptr);
128 (*this->invoker_)();
129 notify_(0);
130 return true;
131 }
132 }
133 }
134
135 void notify_(int32_t res) noexcept {
136 free_func_(res);
137 delete this;
138 }
139
140protected:
141 Func free_func_;
142};
143
144template <typename T>
145constexpr bool is_nothrow_extract_result_v =
146 noexcept(std::declval<T>().extract_result());
147
148template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
149public:
150 using ChildReturnType = typename Handle::ReturnType;
151 using ReturnType =
152 std::pair<std::vector<size_t>, std::vector<ChildReturnType>>;
153
154 void init(std::vector<Handle *> handles) noexcept {
155 handles_ = std::move(handles);
156 child_invokers_.resize(handles_.size());
157 for (size_t i = 0; i < handles_.size(); i++) {
158 auto *handle = handles_[i];
159 auto &invoker = child_invokers_[i];
160 invoker.self_ = this;
161 invoker.no_ = i;
162 handle->set_invoker(&invoker);
163 }
164 order_.resize(handles_.size());
165 }
166
167 void cancel(Runtime *runtime) noexcept {
168 if (!canceled_) {
169 canceled_ = true;
170 for (auto &handle : handles_) {
171 handle->cancel(runtime);
172 }
173 }
174 }
175
176 ReturnType extract_result() noexcept(is_nothrow_extract_result_v<Handle>) {
177 std::vector<ChildReturnType> result;
178 result.reserve(handles_.size());
179 for (size_t i = 0; i < handles_.size(); i++) {
180 result.push_back(handles_[i]->extract_result());
181 }
182 return std::make_pair(std::move(order_), std::move(result));
183 }
184
185 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
186
187private:
188 void finish_(size_t idx) noexcept {
189 size_t no = finished_count_++;
190 order_[no] = idx;
191
192 if constexpr (Cancel) {
193 if (!canceled_) {
194 canceled_ = true;
195 auto *runtime = detail::Context::current().runtime();
196 for (size_t i = 0; i < handles_.size(); i++) {
197 if (i != idx) {
198 handles_[i]->cancel(runtime);
199 }
200 }
201 }
202 }
203
204 if (no == handles_.size() - 1) {
205 // All finished or canceled
206 assert(invoker_ != nullptr);
207 (*invoker_)();
208 return;
209 }
210 }
211
212private:
213 struct FinishInvoker : public InvokerAdapter<FinishInvoker> {
214 void invoke() noexcept { self_->finish_(no_); }
215 RangedParallelFinishHandle *self_;
216 size_t no_;
217 };
218
219 size_t finished_count_ = 0;
220 bool canceled_ = false;
221 std::vector<Handle *> handles_ = {};
222 std::vector<FinishInvoker> child_invokers_;
223 std::vector<size_t> order_;
224 Invoker *invoker_ = nullptr;
225};
226
227template <HandleLike Handle>
228using RangedParallelAllFinishHandle = RangedParallelFinishHandle<false, Handle>;
229
230template <HandleLike Handle>
231using RangedParallelAnyFinishHandle = RangedParallelFinishHandle<true, Handle>;
232
233template <HandleLike Handle>
234class RangedWhenAllFinishHandle : public RangedParallelAllFinishHandle<Handle> {
235public:
236 using Base = RangedParallelAllFinishHandle<Handle>;
237 using ReturnType = std::vector<typename Handle::ReturnType>;
238
239 ReturnType extract_result() noexcept(noexcept(Base::extract_result())) {
240 auto r = Base::extract_result();
241 return std::move(r.second);
242 }
243};
244
245template <HandleLike Handle>
246class RangedWhenAnyFinishHandle : public RangedParallelAnyFinishHandle<Handle> {
247public:
248 using Base = RangedParallelAnyFinishHandle<Handle>;
249 using ChildReturnType = typename Handle::ReturnType;
250 using ReturnType = std::pair<size_t, ChildReturnType>;
251
252 ReturnType extract_result() {
253 auto r = Base::extract_result();
254 auto &[order, results] = r;
255 // May throw out_of_range if the range is empty, which is expected and
256 // should be handled by caller.
257 auto idx = order.at(0);
258 return std::make_pair(idx, std::move(results[idx]));
259 }
260};
261
262template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
263public:
264 using ReturnType = std::pair<std::array<size_t, sizeof...(Handles)>,
265 std::tuple<typename Handles::ReturnType...>>;
266
267 template <typename... HandlePtr> void init(HandlePtr... handles) noexcept {
268 handles_ = std::make_tuple(handles...);
269 foreach_set_invoker_();
270 }
271
272 void cancel(Runtime *runtime) noexcept {
273 if (!canceled_) {
274 canceled_ = true;
275 std::apply(
276 [runtime](auto *...handles) {
277 (handles->cancel(runtime), ...);
278 },
279 handles_);
280 }
281 }
282
283 ReturnType
284 extract_result() noexcept((is_nothrow_extract_result_v<Handles> && ...)) {
285 auto result = std::apply(
286 [](auto *...handle_ptrs) {
287 return std::make_tuple(handle_ptrs->extract_result()...);
288 },
289 handles_);
290 return std::make_pair(std::move(order_), std::move(result));
291 }
292
293 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
294
295private:
296 template <size_t I = 0> void foreach_set_invoker_() noexcept {
297 if constexpr (I < sizeof...(Handles)) {
298 auto *handle = std::get<I>(handles_);
299 auto &invoker = std::get<I>(child_invokers_);
300 invoker.self_ = this;
301 handle->set_invoker(&invoker);
302 foreach_set_invoker_<I + 1>();
303 }
304 }
305
306 template <size_t SkipIdx, size_t I = 0>
307 void foreach_call_cancel_(Runtime *runtime) noexcept {
308 if constexpr (I < sizeof...(Handles)) {
309 auto handle = std::get<I>(handles_);
310 if constexpr (I != SkipIdx) {
311 handle->cancel(runtime);
312 }
313 foreach_call_cancel_<SkipIdx, I + 1>(runtime);
314 }
315 }
316
317 template <size_t Idx> void finish_() noexcept {
318 size_t no = finished_count_++;
319 order_[no] = Idx;
320
321 if constexpr (Cancel) {
322 if (!canceled_) {
323 canceled_ = true;
324 auto *runtime = detail::Context::current().runtime();
325 foreach_call_cancel_<Idx>(runtime);
326 }
327 }
328
329 if (no == sizeof...(Handles) - 1) {
330 // All finished or canceled
331 assert(invoker_ != nullptr);
332 (*invoker_)();
333 }
334 }
335
336private:
337 template <size_t I>
338 struct FinishInvoker : public InvokerAdapter<FinishInvoker<I>> {
339 void invoke() noexcept { self_->template finish_<I>(); }
340 ParallelFinishHandle *self_;
341 };
342
343 template <typename... input_t>
344 using tuple_cat_t = decltype(std::tuple_cat(std::declval<input_t>()...));
345
346 template <size_t I, typename Arg, typename... Args> struct helper {
347 using type = tuple_cat_t<std::tuple<FinishInvoker<I>>,
348 typename helper<I + 1, Args...>::type>;
349 };
350
351 template <size_t I, typename Arg> struct helper<I, Arg> {
352 using type = std::tuple<FinishInvoker<I>>;
353 };
354
355 using InvokerTupleType = typename helper<0, Handles...>::type;
356
357 size_t finished_count_ = 0;
358 bool canceled_ = false;
359 std::tuple<Handles *...> handles_;
360 InvokerTupleType child_invokers_;
361 std::array<size_t, sizeof...(Handles)> order_;
362 Invoker *invoker_ = nullptr;
363};
364
365template <HandleLike... Handles>
366using ParallelAllFinishHandle = ParallelFinishHandle<false, Handles...>;
367
368template <HandleLike... Handles>
369using ParallelAnyFinishHandle = ParallelFinishHandle<true, Handles...>;
370
371template <HandleLike... Handles>
372class WhenAllFinishHandle : public ParallelAllFinishHandle<Handles...> {
373public:
374 using Base = ParallelAllFinishHandle<Handles...>;
375 using ReturnType = std::tuple<typename Handles::ReturnType...>;
376
377 ReturnType extract_result() noexcept(noexcept(Base::extract_result())) {
378 auto r = Base::extract_result();
379 return std::move(r.second);
380 }
381};
382
383template <HandleLike... Handles>
384class WhenAnyFinishHandle : public ParallelAnyFinishHandle<Handles...> {
385public:
386 using Base = ParallelAnyFinishHandle<Handles...>;
387 using ReturnType = std::variant<typename Handles::ReturnType...>;
388
389 ReturnType extract_result() noexcept(noexcept(Base::extract_result())) {
390 auto r = Base::extract_result();
391 auto &[order, results] = r;
392 return tuple_at_(results, order[0]);
393 }
394
395private:
396 template <size_t Idx = 0>
397 static auto tuple_at_(std::tuple<typename Handles::ReturnType...> &results,
398 size_t idx) {
399 if constexpr (Idx < sizeof...(Handles)) {
400 if (idx == Idx) {
401 return ReturnType{std::in_place_index<Idx>,
402 std::move(std::get<Idx>(results))};
403 } else {
404 return tuple_at_<Idx + 1>(results, idx);
405 }
406 } else {
407 // Should not reach here, but we need to make compiler happy.
408 // Throwing an exception will lead to wrong optimization.
409 assert(false && "Index out of bounds");
410 return ReturnType{std::in_place_index<0>,
411 std::move(std::get<0>(results))};
412 }
413 }
414};
415
416} // namespace condy
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
Runtime type for running the io_uring event loop.