Condy v1.3.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
finish_handles.hpp
Go to the documentation of this file.
1
8
9#pragma once
10
11#include "condy/concepts.hpp"
12#include "condy/condy_uring.hpp"
13#include "condy/context.hpp"
14#include "condy/invoker.hpp"
15#include "condy/ring.hpp"
16#include "condy/work_type.hpp"
17#include <array>
18#include <cerrno>
19#include <cstddef>
20#include <limits>
21#include <tuple>
22#include <variant>
23#include <vector>
24
25namespace condy {
26
27class Ring;
28
29namespace detail {
30
31struct Action {
32 bool queue_work;
33 bool op_finish;
34};
35
36} // namespace detail
37
38class OpFinishHandleBase
39 : public InvokerAdapter<OpFinishHandleBase, WorkInvoker> {
40public:
41 using HandleCQEFunc = detail::Action (*)(void *, io_uring_cqe *);
42
43 void cancel() {
44 auto *ring = detail::Context::current().ring();
45 io_uring_sqe *sqe = ring->get_sqe();
46 io_uring_prep_cancel(sqe, this, 0);
47 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore));
48 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
49 }
50
51 detail::Action handle_cqe(io_uring_cqe *cqe) {
52 assert(handle_func_ != nullptr);
53 return handle_func_(this, cqe);
54 }
55
56 void invoke() {
57 assert(invoker_ != nullptr);
58 (*invoker_)();
59 }
60
61 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
62
63protected:
64 OpFinishHandleBase() = default;
65
66protected:
67 HandleCQEFunc handle_func_ = nullptr;
68 Invoker *invoker_ = nullptr;
69};
70
71template <CQEHandlerLike CQEHandler>
72class OpFinishHandle : public OpFinishHandleBase {
73public:
74 using ReturnType = typename CQEHandler::ReturnType;
75
76 template <typename... Args>
77 OpFinishHandle(Args &&...args) : cqe_handler_(std::forward<Args>(args)...) {
78 this->handle_func_ = handle_cqe_static_;
79 }
80
81 detail::Action handle_cqe_impl(io_uring_cqe *cqe) {
82 cqe_handler_.handle_cqe(cqe);
83 return {.queue_work = true, .op_finish = true};
84 }
85
86 ReturnType extract_result() { return cqe_handler_.extract_result(); }
87
88private:
89 static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) {
90 auto *self = static_cast<OpFinishHandle *>(data);
91 return self->handle_cqe_impl(cqe);
92 }
93
94protected:
95 CQEHandler cqe_handler_;
96};
97
98template <typename Func, OpFinishHandleLike HandleBase>
99class MultiShotMixin : public HandleBase {
100public:
101 template <typename... Args>
102 MultiShotMixin(Func func, Args &&...args)
103 : HandleBase(std::forward<Args>(args)...), func_(std::move(func)) {
104 this->handle_func_ = handle_cqe_static_;
105 }
106
107 detail::Action handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ {
108 if (cqe->flags & IORING_CQE_F_MORE) {
109 HandleBase::handle_cqe_impl(cqe);
110 func_(HandleBase::extract_result());
111 return {.queue_work = false, .op_finish = false};
112 } else {
113 HandleBase::handle_cqe_impl(cqe);
114 return {.queue_work = true, .op_finish = true};
115 }
116 }
117
118private:
119 static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) {
120 auto *self = static_cast<MultiShotMixin *>(data);
121 return self->handle_cqe_impl(cqe);
122 }
123
124protected:
125 Func func_;
126};
127
128template <CQEHandlerLike CQEHandler, typename MultiShotFunc>
129using MultiShotOpFinishHandle =
130 MultiShotMixin<MultiShotFunc, OpFinishHandle<CQEHandler>>;
131
132template <typename Func, OpFinishHandleLike HandleBase>
133class ZeroCopyMixin : public HandleBase {
134public:
135 template <typename... Args>
136 ZeroCopyMixin(Func func, Args &&...args)
137 : HandleBase(std::forward<Args>(args)...), free_func_(std::move(func)) {
138 this->func_ = invoke_static_;
139 this->handle_func_ = handle_cqe_static_;
140 }
141
142 void invoke() /* fake override */ {
143 assert(this->invoker_ != nullptr);
144 (*this->invoker_)();
145 resumed_ = true;
146 // Invocation of free_func_ should be delayed until the operation is
147 // finished since user may adjust the behavior of free_func_ based on
148 // the result of the operation.
149 maybe_free_();
150 }
151
152 detail::Action handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ {
153 if (cqe->flags & IORING_CQE_F_MORE) {
154 HandleBase::handle_cqe_impl(cqe);
155 return {.queue_work = true, .op_finish = false};
156 } else {
157 if (cqe->flags & IORING_CQE_F_NOTIF) {
158 notify_(cqe->res);
159 return {.queue_work = false, .op_finish = true};
160 } else {
161 // Only one cqe means the operation is finished without
162 // notification. This is rare but possible.
163 // https://github.com/axboe/liburing/issues/1462
164 notify_(0);
165 HandleBase::handle_cqe_impl(cqe);
166 return {.queue_work = true, .op_finish = true};
167 }
168 }
169 }
170
171private:
172 void maybe_free_() {
173 if (resumed_ && notified_) {
174 free_func_(notify_res_);
175 delete this;
176 }
177 }
178
179 void notify_(int32_t res) {
180 assert(res != -ENOTRECOVERABLE);
181 notify_res_ = res;
182 notified_ = true;
183 maybe_free_();
184 }
185
186 static void invoke_static_(void *data) {
187 auto *self = static_cast<ZeroCopyMixin *>(data);
188 self->invoke();
189 }
190
191 static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) {
192 auto *self = static_cast<ZeroCopyMixin *>(data);
193 return self->handle_cqe_impl(cqe);
194 }
195
196protected:
197 Func free_func_;
198 int32_t notify_res_ = -ENOTRECOVERABLE;
199 // Use these flags to handle race between invoke and notify
200 bool resumed_ = false;
201 bool notified_ = false;
202};
203
204template <CQEHandlerLike CQEHandler, typename FreeFunc>
205using ZeroCopyOpFinishHandle =
206 ZeroCopyMixin<FreeFunc, OpFinishHandle<CQEHandler>>;
207
208template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
209public:
210 using ChildReturnType = typename Handle::ReturnType;
211 using ReturnType =
212 std::pair<std::vector<size_t>, std::vector<ChildReturnType>>;
213
214 void init(std::vector<Handle *> handles) {
215 handles_ = std::move(handles);
216 child_invokers_.resize(handles_.size());
217 for (size_t i = 0; i < handles_.size(); i++) {
218 auto *handle = handles_[i];
219 auto &invoker = child_invokers_[i];
220 invoker.self_ = this;
221 invoker.no_ = i;
222 handle->set_invoker(&invoker);
223 }
224 order_.resize(handles_.size());
225 }
226
227 void cancel() {
228 if (!canceled_) {
229 canceled_ = true;
230 for (auto &handle : handles_) {
231 handle->cancel();
232 }
233 }
234 }
235
236 ReturnType extract_result() {
237 std::vector<ChildReturnType> result;
238 result.reserve(handles_.size());
239 for (size_t i = 0; i < handles_.size(); i++) {
240 result.push_back(handles_[i]->extract_result());
241 }
242 return std::make_pair(std::move(order_), std::move(result));
243 }
244
245 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
246
247private:
248 void finish_(size_t idx) {
249 size_t no = finished_count_++;
250 order_[no] = idx;
251
252 if constexpr (Cancel) {
253 if (!canceled_) {
254 canceled_ = true;
255 for (size_t i = 0; i < handles_.size(); i++) {
256 if (i != idx) {
257 handles_[i]->cancel();
258 }
259 }
260 }
261 }
262
263 if (no == handles_.size() - 1) {
264 // All finished or canceled
265 (*invoker_)();
266 return;
267 }
268 }
269
270private:
271 struct FinishInvoker : public InvokerAdapter<FinishInvoker> {
272 void invoke() { self_->finish_(no_); }
273 RangedParallelFinishHandle *self_;
274 size_t no_;
275 };
276
277 size_t finished_count_ = 0;
278 bool canceled_ = false;
279 std::vector<Handle *> handles_ = {};
280 std::vector<FinishInvoker> child_invokers_;
281 std::vector<size_t> order_;
282 Invoker *invoker_ = nullptr;
283};
284
285template <HandleLike Handle>
286using RangedParallelAllFinishHandle = RangedParallelFinishHandle<false, Handle>;
287
288template <HandleLike Handle>
289using RangedParallelAnyFinishHandle = RangedParallelFinishHandle<true, Handle>;
290
291template <HandleLike Handle>
292class RangedWhenAllFinishHandle : public RangedParallelAllFinishHandle<Handle> {
293public:
294 using Base = RangedParallelAllFinishHandle<Handle>;
295 using ReturnType = std::vector<typename Handle::ReturnType>;
296
297 ReturnType extract_result() {
298 auto r = Base::extract_result();
299 return std::move(r.second);
300 }
301};
302
303template <HandleLike Handle>
304class RangedWhenAnyFinishHandle : public RangedParallelAnyFinishHandle<Handle> {
305public:
306 using Base = RangedParallelAnyFinishHandle<Handle>;
307 using ChildReturnType = typename Handle::ReturnType;
308 using ReturnType = std::pair<size_t, ChildReturnType>;
309
310 ReturnType extract_result() {
311 auto r = Base::extract_result();
312 auto &[order, results] = r;
313 return std::make_pair(order[0], std::move(results[order[0]]));
314 }
315};
316
317template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
318public:
319 using ReturnType = std::pair<std::array<size_t, sizeof...(Handles)>,
320 std::tuple<typename Handles::ReturnType...>>;
321
322 template <typename... HandlePtr> void init(HandlePtr... handles) {
323 handles_ = std::make_tuple(handles...);
324 foreach_set_invoker_();
325 }
326
327 void cancel() {
328 if (!canceled_) {
329 canceled_ = true;
330 constexpr size_t SkipIdx = std::numeric_limits<size_t>::max();
331 foreach_call_cancel_<SkipIdx>();
332 }
333 }
334
335 ReturnType extract_result() {
336 auto result = std::apply(
337 [](auto *...handle_ptrs) {
338 return std::make_tuple(handle_ptrs->extract_result()...);
339 },
340 handles_);
341 return std::make_pair(std::move(order_), std::move(result));
342 }
343
344 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
345
346private:
347 template <size_t I = 0> void foreach_set_invoker_() {
348 if constexpr (I < sizeof...(Handles)) {
349 auto *handle = std::get<I>(handles_);
350 auto &invoker = std::get<I>(child_invokers_);
351 invoker.self_ = this;
352 handle->set_invoker(&invoker);
353 foreach_set_invoker_<I + 1>();
354 }
355 }
356
357 template <size_t SkipIdx, size_t I = 0> void foreach_call_cancel_() {
358 if constexpr (I < sizeof...(Handles)) {
359 auto handle = std::get<I>(handles_);
360 if constexpr (I != SkipIdx) {
361 handle->cancel();
362 }
363 foreach_call_cancel_<SkipIdx, I + 1>();
364 }
365 }
366
367 template <size_t Idx> void finish_() {
368 size_t no = finished_count_++;
369 order_[no] = Idx;
370
371 if constexpr (Cancel) {
372 if (!canceled_) {
373 canceled_ = true;
374 foreach_call_cancel_<Idx>();
375 }
376 }
377
378 if (no == sizeof...(Handles) - 1) {
379 // All finished or canceled
380 (*invoker_)();
381 }
382 }
383
384private:
385 template <size_t I>
386 struct FinishInvoker : public InvokerAdapter<FinishInvoker<I>> {
387 void invoke() { self_->template finish_<I>(); }
388 ParallelFinishHandle *self_;
389 };
390
391 template <typename... input_t>
392 using tuple_cat_t = decltype(std::tuple_cat(std::declval<input_t>()...));
393
394 template <size_t I, typename Arg, typename... Args> struct helper {
395 using type = tuple_cat_t<std::tuple<FinishInvoker<I>>,
396 typename helper<I + 1, Args...>::type>;
397 };
398
399 template <size_t I, typename Arg> struct helper<I, Arg> {
400 using type = std::tuple<FinishInvoker<I>>;
401 };
402
403 using InvokerTupleType = typename helper<0, Handles...>::type;
404
405 size_t finished_count_ = 0;
406 bool canceled_ = false;
407 std::tuple<Handles *...> handles_;
408 InvokerTupleType child_invokers_;
409 std::array<size_t, sizeof...(Handles)> order_;
410 Invoker *invoker_ = nullptr;
411};
412
413template <HandleLike... Handles>
414using ParallelAllFinishHandle = ParallelFinishHandle<false, Handles...>;
415
416template <HandleLike... Handles>
417using ParallelAnyFinishHandle = ParallelFinishHandle<true, Handles...>;
418
419template <HandleLike... Handles>
420class WhenAllFinishHandle : public ParallelAllFinishHandle<Handles...> {
421public:
422 using Base = ParallelAllFinishHandle<Handles...>;
423 using ReturnType = std::tuple<typename Handles::ReturnType...>;
424
425 ReturnType extract_result() {
426 auto r = Base::extract_result();
427 return std::move(r.second);
428 }
429};
430
431template <HandleLike... Handles>
432class WhenAnyFinishHandle : public ParallelAnyFinishHandle<Handles...> {
433public:
434 using Base = ParallelAnyFinishHandle<Handles...>;
435 using ReturnType = std::variant<typename Handles::ReturnType...>;
436
437 ReturnType extract_result() {
438 auto r = Base::extract_result();
439 auto &[order, results] = r;
440 return tuple_at_(std::move(results), order[0]);
441 }
442
443private:
444 template <size_t Idx = 0>
445 static std::variant<typename Handles::ReturnType...>
446 tuple_at_(std::tuple<typename Handles::ReturnType...> results, size_t idx) {
447 if constexpr (Idx < sizeof...(Handles)) {
448 if (idx == Idx) {
449 return std::variant<typename Handles::ReturnType...>{
450 std::in_place_index<Idx>,
451 std::move(std::get<Idx>(results))};
452 } else {
453 return tuple_at_<Idx + 1>(std::move(results), idx);
454 }
455 } else {
456 // Should not reach here, but we need to make compiler happy.
457 // Throwing an exception will lead to wrong optimization.
458 assert(false && "Index out of bounds");
459 return std::variant<typename Handles::ReturnType...>{
460 std::in_place_index<0>, std::move(std::get<0>(results))};
461 }
462 }
463};
464
465} // namespace condy
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
Wrapper classes for liburing interfaces.