Condy v1.1.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
finish_handles.hpp
Go to the documentation of this file.
1
8
9#pragma once
10
11#include "condy/concepts.hpp"
12#include "condy/condy_uring.hpp"
13#include "condy/context.hpp"
14#include "condy/invoker.hpp"
15#include "condy/ring.hpp"
16#include "condy/work_type.hpp"
17#include <array>
18#include <cerrno>
19#include <cstddef>
20#include <limits>
21#include <tuple>
22#include <variant>
23#include <vector>
24
25// Cancel method for finish handles
26#define DEFINE_CANCEL_METHOD_() \
27 void cancel() { \
28 auto *ring = Context::current().ring(); \
29 io_uring_sqe *sqe = ring->get_sqe(); \
30 io_uring_prep_cancel(sqe, encode_work(this, work_type), 0); \
31 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); \
32 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); \
33 }
34
35namespace condy {
36
37class Ring;
38
39class OpFinishHandle : public InvokerAdapter<OpFinishHandle, WorkInvoker> {
40public:
41 static constexpr WorkType work_type = WorkType::Common;
42 using ReturnType = int32_t;
43
44 DEFINE_CANCEL_METHOD_();
45
46 void set_result(int32_t res, uint32_t flags) {
47 res_ = res;
48 flags_ = flags;
49 }
50
51 void invoke() {
52 assert(invoker_ != nullptr);
53 (*invoker_)();
54 }
55
56 ReturnType extract_result() { return res_; }
57
58 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
59
60protected:
61 int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
62 uint32_t flags_ = 0;
63 Invoker *invoker_ = nullptr;
64};
65
66class ExtendOpFinishHandle : public OpFinishHandle {
67public:
68 using ExtendFunc = void (*)(void *, int32_t);
69
70 void invoke_extend(int32_t res) {
71 assert(extend_func_ != nullptr);
72 extend_func_(this, res);
73 }
74
75protected:
76 ExtendFunc extend_func_ = nullptr;
77};
78
79template <typename Func, OpFinishHandleLike HandleBase>
80class MultiShotMixin : public HandleBase {
81public:
82 static constexpr WorkType work_type = WorkType::MultiShot;
83
84 template <typename... Args>
85 MultiShotMixin(Func func, Args &&...args)
86 : HandleBase(std::forward<Args>(args)...), func_(std::move(func)) {
87 this->extend_func_ = [](void *data, int32_t) {
88 auto *self = static_cast<MultiShotMixin<Func, HandleBase> *>(data);
89 self->invoke_multishot_();
90 };
91 }
92
93 DEFINE_CANCEL_METHOD_();
94
95private:
96 void invoke_multishot_() { func_(HandleBase::extract_result()); }
97
98private:
99 Func func_;
100};
101
102template <typename MultiShotFunc>
103using MultiShotOpFinishHandle =
104 MultiShotMixin<MultiShotFunc, ExtendOpFinishHandle>;
105
106template <typename Func, OpFinishHandleLike HandleBase>
107class ZeroCopyMixin : public HandleBase {
108public:
109 static constexpr WorkType work_type = WorkType::ZeroCopy;
110
111 template <typename... Args>
112 ZeroCopyMixin(Func func, Args &&...args)
113 : HandleBase(std::forward<Args>(args)...), free_func_(std::move(func)) {
114 this->func_ = [](void *data) {
115 auto *self = static_cast<ZeroCopyMixin<Func, HandleBase> *>(data);
116 self->invoke();
117 };
118 this->extend_func_ = [](void *data, int32_t res) {
119 auto *self = static_cast<ZeroCopyMixin<Func, HandleBase> *>(data);
120 self->invoke_notify_(res);
121 };
122 }
123
124 void invoke() /* fake override */ {
125 assert(this->invoker_ != nullptr);
126 (*this->invoker_)();
127 resumed_ = true;
128 if (notified_) {
129 free_func_(notify_res_);
130 delete this;
131 }
132 }
133
134 DEFINE_CANCEL_METHOD_();
135
136private:
137 void invoke_notify_(int32_t res) {
138 notify_res_ = res;
139 notified_ = true;
140 if (resumed_) {
141 free_func_(notify_res_);
142 delete this;
143 }
144 }
145
146private:
147 Func free_func_;
148 int32_t notify_res_ = -ENOTRECOVERABLE;
149 // Use these flags to handle race between invoke and notify
150 bool resumed_ = false;
151 bool notified_ = false;
152};
153
154template <typename FreeFunc>
155using ZeroCopyOpFinishHandle = ZeroCopyMixin<FreeFunc, ExtendOpFinishHandle>;
156
157template <BufferRingLike Br, OpFinishHandleLike HandleBase>
158class SelectBufferMixin : public HandleBase {
159public:
160 using ReturnType = std::pair<int, typename Br::ReturnType>;
161
162 template <typename... Args>
163 SelectBufferMixin(Br *buffers, Args &&...args)
164 : HandleBase(std::forward<Args>(args)...), buffers_(buffers) {}
165
166 ReturnType extract_result() {
167 int res = this->res_;
168 return std::make_pair(
169 res, buffers_->handle_finish(this->res_, this->flags_));
170 }
171
172private:
173 Br *buffers_;
174};
175
176template <BufferRingLike Br>
177using SelectBufferOpFinishHandle = SelectBufferMixin<Br, OpFinishHandle>;
178
179template <typename MultiShotFunc, BufferRingLike Br>
180using MultiShotSelectBufferOpFinishHandle =
181 MultiShotMixin<MultiShotFunc, SelectBufferMixin<Br, ExtendOpFinishHandle>>;
182
183template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
184public:
185 using ChildReturnType = typename Handle::ReturnType;
186 using ReturnType =
187 std::pair<std::vector<size_t>, std::vector<ChildReturnType>>;
188
189 void init(std::vector<Handle *> handles) {
190 handles_ = std::move(handles);
191 child_invokers_.resize(handles_.size());
192 for (size_t i = 0; i < handles_.size(); i++) {
193 auto *handle = handles_[i];
194 auto &invoker = child_invokers_[i];
195 invoker.self_ = this;
196 invoker.no_ = i;
197 handle->set_invoker(&invoker);
198 }
199 order_.resize(handles_.size());
200 }
201
202 void cancel() {
203 if (!canceled_) {
204 canceled_ = true;
205 for (auto &handle : handles_) {
206 handle->cancel();
207 }
208 }
209 }
210
211 ReturnType extract_result() {
212 std::vector<ChildReturnType> result;
213 result.reserve(handles_.size());
214 for (size_t i = 0; i < handles_.size(); i++) {
215 result.push_back(handles_[i]->extract_result());
216 }
217 return std::make_pair(std::move(order_), std::move(result));
218 }
219
220 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
221
222private:
223 void finish_(size_t idx) {
224 size_t no = finished_count_++;
225 order_[no] = idx;
226
227 if constexpr (Cancel) {
228 if (!canceled_) {
229 canceled_ = true;
230 for (size_t i = 0; i < handles_.size(); i++) {
231 if (i != idx) {
232 handles_[i]->cancel();
233 }
234 }
235 }
236 }
237
238 if (no == handles_.size() - 1) {
239 // All finished or canceled
240 (*invoker_)();
241 return;
242 }
243 }
244
245private:
246 struct FinishInvoker : public InvokerAdapter<FinishInvoker> {
247 void invoke() { self_->finish_(no_); }
248 RangedParallelFinishHandle *self_;
249 size_t no_;
250 };
251
252 size_t finished_count_ = 0;
253 bool canceled_ = false;
254 std::vector<Handle *> handles_ = {};
255 std::vector<FinishInvoker> child_invokers_;
256 std::vector<size_t> order_;
257 Invoker *invoker_ = nullptr;
258};
259
260template <HandleLike Handle>
261using RangedParallelAllFinishHandle = RangedParallelFinishHandle<false, Handle>;
262
263template <HandleLike Handle>
264using RangedParallelAnyFinishHandle = RangedParallelFinishHandle<true, Handle>;
265
266template <HandleLike Handle>
267class RangedWhenAllFinishHandle : public RangedParallelAllFinishHandle<Handle> {
268public:
269 using Base = RangedParallelAllFinishHandle<Handle>;
270 using ReturnType = std::vector<typename Handle::ReturnType>;
271
272 ReturnType extract_result() {
273 auto r = Base::extract_result();
274 return std::move(r.second);
275 }
276};
277
278template <HandleLike Handle>
279class RangedWhenAnyFinishHandle : public RangedParallelAnyFinishHandle<Handle> {
280public:
281 using Base = RangedParallelAnyFinishHandle<Handle>;
282 using ChildReturnType = typename Handle::ReturnType;
283 using ReturnType = std::pair<size_t, ChildReturnType>;
284
285 ReturnType extract_result() {
286 auto r = Base::extract_result();
287 auto &[order, results] = r;
288 return std::make_pair(order[0], std::move(results[order[0]]));
289 }
290};
291
292template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
293public:
294 using ReturnType = std::pair<std::array<size_t, sizeof...(Handles)>,
295 std::tuple<typename Handles::ReturnType...>>;
296
297 template <typename... HandlePtr> void init(HandlePtr... handles) {
298 handles_ = std::make_tuple(handles...);
299 foreach_set_invoker_();
300 }
301
302 void cancel() {
303 if (!canceled_) {
304 canceled_ = true;
305 constexpr size_t SkipIdx = std::numeric_limits<size_t>::max();
306 foreach_call_cancel_<SkipIdx>();
307 }
308 }
309
310 ReturnType extract_result() {
311 auto result = std::apply(
312 [](auto *...handle_ptrs) {
313 return std::make_tuple(handle_ptrs->extract_result()...);
314 },
315 handles_);
316 return std::make_pair(std::move(order_), std::move(result));
317 }
318
319 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
320
321private:
322 template <size_t I = 0> void foreach_set_invoker_() {
323 if constexpr (I < sizeof...(Handles)) {
324 auto *handle = std::get<I>(handles_);
325 auto &invoker = std::get<I>(child_invokers_);
326 invoker.self_ = this;
327 handle->set_invoker(&invoker);
328 foreach_set_invoker_<I + 1>();
329 }
330 }
331
332 template <size_t SkipIdx, size_t I = 0> void foreach_call_cancel_() {
333 if constexpr (I < sizeof...(Handles)) {
334 auto handle = std::get<I>(handles_);
335 if constexpr (I != SkipIdx) {
336 handle->cancel();
337 }
338 foreach_call_cancel_<SkipIdx, I + 1>();
339 }
340 }
341
342 template <size_t Idx> void finish_() {
343 size_t no = finished_count_++;
344 order_[no] = Idx;
345
346 if constexpr (Cancel) {
347 if (!canceled_) {
348 canceled_ = true;
349 foreach_call_cancel_<Idx>();
350 }
351 }
352
353 if (no == sizeof...(Handles) - 1) {
354 // All finished or canceled
355 (*invoker_)();
356 }
357 }
358
359private:
360 template <size_t I>
361 struct FinishInvoker : public InvokerAdapter<FinishInvoker<I>> {
362 void invoke() { self_->template finish_<I>(); }
363 ParallelFinishHandle *self_;
364 };
365
366 template <typename... input_t>
367 using tuple_cat_t = decltype(std::tuple_cat(std::declval<input_t>()...));
368
369 template <size_t I, typename Arg, typename... Args> struct helper {
370 using type = tuple_cat_t<std::tuple<FinishInvoker<I>>,
371 typename helper<I + 1, Args...>::type>;
372 };
373
374 template <size_t I, typename Arg> struct helper<I, Arg> {
375 using type = std::tuple<FinishInvoker<I>>;
376 };
377
378 using InvokerTupleType = typename helper<0, Handles...>::type;
379
380 size_t finished_count_ = 0;
381 bool canceled_ = false;
382 std::tuple<Handles *...> handles_;
383 InvokerTupleType child_invokers_;
384 std::array<size_t, sizeof...(Handles)> order_;
385 Invoker *invoker_ = nullptr;
386};
387
388template <HandleLike... Handles>
389using ParallelAllFinishHandle = ParallelFinishHandle<false, Handles...>;
390
391template <HandleLike... Handles>
392using ParallelAnyFinishHandle = ParallelFinishHandle<true, Handles...>;
393
394template <HandleLike... Handles>
395class WhenAllFinishHandle : public ParallelAllFinishHandle<Handles...> {
396public:
397 using Base = ParallelAllFinishHandle<Handles...>;
398 using ReturnType = std::tuple<typename Handles::ReturnType...>;
399
400 ReturnType extract_result() {
401 auto r = Base::extract_result();
402 return std::move(r.second);
403 }
404};
405
406template <HandleLike... Handles>
407class WhenAnyFinishHandle : public ParallelAnyFinishHandle<Handles...> {
408public:
409 using Base = ParallelAnyFinishHandle<Handles...>;
410 using ReturnType = std::variant<typename Handles::ReturnType...>;
411
412 ReturnType extract_result() {
413 auto r = Base::extract_result();
414 auto &[order, results] = r;
415 return tuple_at_(std::move(results), order[0]);
416 }
417
418private:
419 template <size_t Idx = 0>
420 static std::variant<typename Handles::ReturnType...>
421 tuple_at_(std::tuple<typename Handles::ReturnType...> results, size_t idx) {
422 if constexpr (Idx < sizeof...(Handles)) {
423 if (idx == Idx) {
424 return std::variant<typename Handles::ReturnType...>{
425 std::in_place_index<Idx>,
426 std::move(std::get<Idx>(results))};
427 } else {
428 return tuple_at_<Idx + 1>(std::move(results), idx);
429 }
430 } else {
431 // Should not reach here, but we need to make compiler happy.
432 // Throwing an exception will lead to wrong optimization.
433 assert(false && "Index out of bounds");
434 return std::variant<typename Handles::ReturnType...>{
435 std::in_place_index<0>, std::move(std::get<0>(results))};
436 }
437 }
438};
439
440#undef DEFINE_CANCEL_METHOD_
441
442} // namespace condy
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
Wrapper classes for liburing interfaces.