38class OpFinishHandleBase
39 :
public InvokerAdapter<OpFinishHandleBase, WorkInvoker> {
41 using HandleCQEFunc = detail::Action (*)(
void *, io_uring_cqe *);
44 auto *ring = detail::Context::current().ring();
45 io_uring_sqe *sqe = ring->get_sqe();
46 io_uring_prep_cancel(sqe,
this, 0);
47 io_uring_sqe_set_data(sqe, encode_work(
nullptr, WorkType::Ignore));
48 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
51 detail::Action handle_cqe(io_uring_cqe *cqe) {
52 assert(handle_func_ !=
nullptr);
53 return handle_func_(
this, cqe);
57 assert(invoker_ !=
nullptr);
61 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
64 OpFinishHandleBase() =
default;
67 HandleCQEFunc handle_func_ =
nullptr;
68 Invoker *invoker_ =
nullptr;
71template <CQEHandlerLike CQEHandler>
72class OpFinishHandle :
public OpFinishHandleBase {
74 using ReturnType =
typename CQEHandler::ReturnType;
76 template <
typename... Args>
77 OpFinishHandle(Args &&...args) : cqe_handler_(std::forward<Args>(args)...) {
78 this->handle_func_ = handle_cqe_static_;
81 detail::Action handle_cqe_impl(io_uring_cqe *cqe) {
82 cqe_handler_.handle_cqe(cqe);
83 return {.queue_work =
true, .op_finish =
true};
86 ReturnType extract_result() {
return cqe_handler_.extract_result(); }
89 static detail::Action handle_cqe_static_(
void *data, io_uring_cqe *cqe) {
90 auto *self =
static_cast<OpFinishHandle *
>(data);
91 return self->handle_cqe_impl(cqe);
95 CQEHandler cqe_handler_;
98template <
typename Func, OpFinishHandleLike HandleBase>
99class MultiShotMixin :
public HandleBase {
101 template <
typename... Args>
102 MultiShotMixin(Func func, Args &&...args)
103 : HandleBase(std::forward<Args>(args)...), func_(std::move(func)) {
104 this->handle_func_ = handle_cqe_static_;
107 detail::Action handle_cqe_impl(io_uring_cqe *cqe) {
108 if (cqe->flags & IORING_CQE_F_MORE) {
109 HandleBase::handle_cqe_impl(cqe);
110 func_(HandleBase::extract_result());
111 return {.queue_work =
false, .op_finish =
false};
113 HandleBase::handle_cqe_impl(cqe);
114 return {.queue_work =
true, .op_finish =
true};
119 static detail::Action handle_cqe_static_(
void *data, io_uring_cqe *cqe) {
120 auto *self =
static_cast<MultiShotMixin *
>(data);
121 return self->handle_cqe_impl(cqe);
128template <CQEHandlerLike CQEHandler,
typename MultiShotFunc>
129using MultiShotOpFinishHandle =
130 MultiShotMixin<MultiShotFunc, OpFinishHandle<CQEHandler>>;
132template <
typename Func, OpFinishHandleLike HandleBase>
133class ZeroCopyMixin :
public HandleBase {
135 template <
typename... Args>
136 ZeroCopyMixin(Func func, Args &&...args)
137 : HandleBase(std::forward<Args>(args)...), free_func_(std::move(func)) {
138 this->func_ = invoke_static_;
139 this->handle_func_ = handle_cqe_static_;
143 assert(this->invoker_ !=
nullptr);
152 detail::Action handle_cqe_impl(io_uring_cqe *cqe) {
153 if (cqe->flags & IORING_CQE_F_MORE) {
154 HandleBase::handle_cqe_impl(cqe);
155 return {.queue_work =
true, .op_finish =
false};
157 if (cqe->flags & IORING_CQE_F_NOTIF) {
159 return {.queue_work =
false, .op_finish =
true};
165 HandleBase::handle_cqe_impl(cqe);
166 return {.queue_work =
true, .op_finish =
true};
173 if (resumed_ && notified_) {
174 free_func_(notify_res_);
179 void notify_(int32_t res) {
180 assert(res != -ENOTRECOVERABLE);
186 static void invoke_static_(
void *data) {
187 auto *self =
static_cast<ZeroCopyMixin *
>(data);
191 static detail::Action handle_cqe_static_(
void *data, io_uring_cqe *cqe) {
192 auto *self =
static_cast<ZeroCopyMixin *
>(data);
193 return self->handle_cqe_impl(cqe);
198 int32_t notify_res_ = -ENOTRECOVERABLE;
200 bool resumed_ =
false;
201 bool notified_ =
false;
204template <CQEHandlerLike CQEHandler,
typename FreeFunc>
205using ZeroCopyOpFinishHandle =
206 ZeroCopyMixin<FreeFunc, OpFinishHandle<CQEHandler>>;
208template <
bool Cancel, HandleLike Handle>
class RangedParallelFinishHandle {
210 using ChildReturnType =
typename Handle::ReturnType;
212 std::pair<std::vector<size_t>, std::vector<ChildReturnType>>;
214 void init(std::vector<Handle *> handles) {
215 handles_ = std::move(handles);
216 child_invokers_.resize(handles_.size());
217 for (
size_t i = 0; i < handles_.size(); i++) {
218 auto *handle = handles_[i];
219 auto &invoker = child_invokers_[i];
220 invoker.self_ =
this;
222 handle->set_invoker(&invoker);
224 order_.resize(handles_.size());
230 for (
auto &handle : handles_) {
236 ReturnType extract_result() {
237 std::vector<ChildReturnType> result;
238 result.reserve(handles_.size());
239 for (
size_t i = 0; i < handles_.size(); i++) {
240 result.push_back(handles_[i]->extract_result());
242 return std::make_pair(std::move(order_), std::move(result));
245 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
248 void finish_(
size_t idx) {
249 size_t no = finished_count_++;
252 if constexpr (Cancel) {
255 for (
size_t i = 0; i < handles_.size(); i++) {
257 handles_[i]->cancel();
263 if (no == handles_.size() - 1) {
271 struct FinishInvoker :
public InvokerAdapter<FinishInvoker> {
272 void invoke() { self_->finish_(no_); }
273 RangedParallelFinishHandle *self_;
277 size_t finished_count_ = 0;
278 bool canceled_ =
false;
279 std::vector<Handle *> handles_ = {};
280 std::vector<FinishInvoker> child_invokers_;
281 std::vector<size_t> order_;
282 Invoker *invoker_ =
nullptr;
285template <HandleLike Handle>
286using RangedParallelAllFinishHandle = RangedParallelFinishHandle<false, Handle>;
288template <HandleLike Handle>
289using RangedParallelAnyFinishHandle = RangedParallelFinishHandle<true, Handle>;
291template <HandleLike Handle>
292class RangedWhenAllFinishHandle :
public RangedParallelAllFinishHandle<Handle> {
294 using Base = RangedParallelAllFinishHandle<Handle>;
295 using ReturnType = std::vector<typename Handle::ReturnType>;
297 ReturnType extract_result() {
298 auto r = Base::extract_result();
299 return std::move(r.second);
303template <HandleLike Handle>
304class RangedWhenAnyFinishHandle :
public RangedParallelAnyFinishHandle<Handle> {
306 using Base = RangedParallelAnyFinishHandle<Handle>;
307 using ChildReturnType =
typename Handle::ReturnType;
308 using ReturnType = std::pair<size_t, ChildReturnType>;
310 ReturnType extract_result() {
311 auto r = Base::extract_result();
312 auto &[order, results] = r;
313 return std::make_pair(order[0], std::move(results[order[0]]));
317template <
bool Cancel, HandleLike... Handles>
class ParallelFinishHandle {
319 using ReturnType = std::pair<std::array<size_t,
sizeof...(Handles)>,
320 std::tuple<typename Handles::ReturnType...>>;
322 template <
typename... HandlePtr>
void init(HandlePtr... handles) {
323 handles_ = std::make_tuple(handles...);
324 foreach_set_invoker_();
330 constexpr size_t SkipIdx = std::numeric_limits<size_t>::max();
331 foreach_call_cancel_<SkipIdx>();
335 ReturnType extract_result() {
336 auto result = std::apply(
337 [](
auto *...handle_ptrs) {
338 return std::make_tuple(handle_ptrs->extract_result()...);
341 return std::make_pair(std::move(order_), std::move(result));
344 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
347 template <
size_t I = 0>
void foreach_set_invoker_() {
348 if constexpr (I <
sizeof...(Handles)) {
349 auto *handle = std::get<I>(handles_);
350 auto &invoker = std::get<I>(child_invokers_);
351 invoker.self_ =
this;
352 handle->set_invoker(&invoker);
353 foreach_set_invoker_<I + 1>();
357 template <
size_t SkipIdx,
size_t I = 0>
void foreach_call_cancel_() {
358 if constexpr (I <
sizeof...(Handles)) {
359 auto handle = std::get<I>(handles_);
360 if constexpr (I != SkipIdx) {
363 foreach_call_cancel_<SkipIdx, I + 1>();
367 template <
size_t Idx>
void finish_() {
368 size_t no = finished_count_++;
371 if constexpr (Cancel) {
374 foreach_call_cancel_<Idx>();
378 if (no ==
sizeof...(Handles) - 1) {
386 struct FinishInvoker :
public InvokerAdapter<FinishInvoker<I>> {
387 void invoke() { self_->template finish_<I>(); }
388 ParallelFinishHandle *self_;
391 template <
typename... input_t>
392 using tuple_cat_t =
decltype(std::tuple_cat(std::declval<input_t>()...));
394 template <
size_t I,
typename Arg,
typename... Args>
struct helper {
395 using type = tuple_cat_t<std::tuple<FinishInvoker<I>>,
396 typename helper<I + 1, Args...>::type>;
399 template <
size_t I,
typename Arg>
struct helper<I, Arg> {
400 using type = std::tuple<FinishInvoker<I>>;
403 using InvokerTupleType =
typename helper<0, Handles...>::type;
405 size_t finished_count_ = 0;
406 bool canceled_ =
false;
407 std::tuple<Handles *...> handles_;
408 InvokerTupleType child_invokers_;
409 std::array<size_t,
sizeof...(Handles)> order_;
410 Invoker *invoker_ =
nullptr;
413template <HandleLike... Handles>
414using ParallelAllFinishHandle = ParallelFinishHandle<
false, Handles...>;
416template <HandleLike... Handles>
417using ParallelAnyFinishHandle = ParallelFinishHandle<
true, Handles...>;
419template <HandleLike... Handles>
420class WhenAllFinishHandle :
public ParallelAllFinishHandle<Handles...> {
422 using Base = ParallelAllFinishHandle<Handles...>;
423 using ReturnType = std::tuple<
typename Handles::ReturnType...>;
425 ReturnType extract_result() {
426 auto r = Base::extract_result();
427 return std::move(r.second);
431template <HandleLike... Handles>
432class WhenAnyFinishHandle :
public ParallelAnyFinishHandle<Handles...> {
434 using Base = ParallelAnyFinishHandle<Handles...>;
435 using ReturnType = std::variant<
typename Handles::ReturnType...>;
437 ReturnType extract_result() {
438 auto r = Base::extract_result();
439 auto &[order, results] = r;
440 return tuple_at_(std::move(results), order[0]);
444 template <
size_t Idx = 0>
445 static std::variant<
typename Handles::ReturnType...>
446 tuple_at_(std::tuple<typename Handles::ReturnType...> results,
size_t idx) {
447 if constexpr (Idx <
sizeof...(Handles)) {
449 return std::variant<
typename Handles::ReturnType...>{
450 std::in_place_index<Idx>,
451 std::move(std::get<Idx>(results))};
453 return tuple_at_<Idx + 1>(std::move(results), idx);
458 assert(
false &&
"Index out of bounds");
459 return std::variant<
typename Handles::ReturnType...>{
460 std::in_place_index<0>, std::move(std::get<0>(results))};
Polymorphic invocation utilities.
The main namespace for the Condy library.
Wrapper classes for liburing interfaces.