23template <
typename Handle, PrepFuncLike Func>
class OpSenderOperationState {
25 template <
typename... HandleArgs>
26 OpSenderOperationState(Func prep_func, HandleArgs &&...handle_args)
27 : prep_func_(std::move(prep_func)),
28 finish_handle_(std::forward<HandleArgs>(handle_args)...) {}
30 OpSenderOperationState(OpSenderOperationState &&) =
delete;
31 OpSenderOperationState &operator=(OpSenderOperationState &&) =
delete;
32 OpSenderOperationState(
const OpSenderOperationState &) =
delete;
33 OpSenderOperationState &operator=(
const OpSenderOperationState &) =
delete;
36 void start(
unsigned int flags)
noexcept {
37 auto &context = detail::Context::current();
38 auto *ring = context.ring();
39 context.runtime()->pend_work();
40 io_uring_sqe *sqe = prep_func_(ring);
41 assert(sqe &&
"prep_func must return a valid sqe");
42 io_uring_sqe_set_flags(sqe, sqe->flags | flags);
43 auto work = encode_work(&finish_handle_.get(), WorkType::Common);
44 io_uring_sqe_set_data64(sqe, work);
46 finish_handle_.get().maybe_set_cancel(context.runtime());
51 HandleBox<Handle> finish_handle_;
54template <
unsigned int Flags,
typename Sender,
typename Receiver>
57 FlaggedOpState(Sender sender, Receiver receiver)
58 : op_state_(sender.connect(std::move(receiver))) {}
60 FlaggedOpState(FlaggedOpState &&) =
delete;
61 FlaggedOpState &operator=(FlaggedOpState &&) =
delete;
62 FlaggedOpState(
const FlaggedOpState &) =
delete;
63 FlaggedOpState &operator=(
const FlaggedOpState &) =
delete;
65 void start(
unsigned int flags)
noexcept { op_state_.start(flags | Flags); }
68 using OperationState = operation_state_t<Sender, Receiver>;
69 OperationState op_state_;
72template <
typename TokenType>
class WhenAnyCanceller {
74 auto chain_token(TokenType token)
noexcept {
75 if (token.stop_possible()) {
76 stop_callback_.emplace(std::move(token), Cancellation{
this});
78 return stop_source_.get_token();
81 void maybe_request_stop() noexcept { stop_source_.request_stop(); }
83 void maybe_reset() noexcept { stop_callback_.reset(); }
87 WhenAnyCanceller *self;
88 void operator()() noexcept { self->cancel_(); }
90 void cancel_() noexcept { stop_source_.request_stop(); }
92 using StopCallbackType = stop_callback_t<TokenType, Cancellation>;
94 std::stop_source stop_source_;
95 std::optional<StopCallbackType> stop_callback_;
98template <
typename TokenType>
class WhenAllCanceller {
100 auto chain_token(TokenType token)
noexcept {
return token; }
102 void maybe_request_stop() noexcept {}
104 void maybe_reset() noexcept {}
107template <
typename Receiver,
typename Canceller,
typename... Senders>
108class ParallelOperationState {
110 ParallelOperationState(std::tuple<Senders...> senders, Receiver receiver)
111 : receiver_(std::move(receiver)) {
112 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
113 connect_senders_(senders, next_token);
116 ParallelOperationState(ParallelOperationState &&) =
delete;
117 ParallelOperationState &operator=(ParallelOperationState &&) =
delete;
118 ParallelOperationState(
const ParallelOperationState &) =
delete;
119 ParallelOperationState &operator=(
const ParallelOperationState &) =
delete;
121 ~ParallelOperationState() {
122 std::apply([](
auto &&...states) { (states.destroy(), ...); },
126 void start(
unsigned int flags)
noexcept {
127 if constexpr (
sizeof...(Senders) == 0) {
128 std::move(receiver_)(
129 std::make_pair(std::move(order_), std::move(results_)));
132 [&](
auto &&...states) { (states.get().start(flags), ...); },
139 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
140 std::declval<stop_token_t<Receiver>>()))>;
142 template <
size_t I = 0>
143 void connect_senders_(std::tuple<Senders...> &senders,
144 const TokenType &token)
noexcept {
145 if constexpr (I <
sizeof...(Senders)) {
146 std::get<I>(op_states_).accept([&] {
147 return std::move(std::get<I>(senders))
148 .connect(ChildReceiver<I>{
this, token});
150 connect_senders_<I + 1>(senders, token);
154 template <
size_t I,
typename R>
void receive_(R &&result)
noexcept {
155 canceller_.maybe_request_stop();
156 auto no = completed_count_++;
158 std::get<I>(results_) = std::forward<R>(result);
159 if (no + 1 ==
sizeof...(Senders)) {
160 canceller_.maybe_reset();
161 std::move(receiver_)(
162 std::make_pair(std::move(order_), std::move(results_)));
166 template <
size_t I>
struct ChildReceiver {
167 ParallelOperationState *self;
168 TokenType stop_token;
169 template <
typename R>
void operator()(R &&result)
noexcept {
170 self->receive_<I>(std::forward<R>(result));
172 auto get_stop_token() const noexcept {
return stop_token; }
175 template <
typename T>
struct operation_state_traits;
176 template <
size_t... Is>
177 struct operation_state_traits<std::index_sequence<Is...>> {
178 using type = std::tuple<
179 RawStorage<operation_state_t<Senders, ChildReceiver<Is>>>...>;
181 using OperationStates =
typename operation_state_traits<
182 std::make_index_sequence<
sizeof...(Senders)>>::type;
185 OperationStates op_states_;
186 std::array<size_t,
sizeof...(Senders)> order_;
187 std::tuple<
typename Senders::ReturnType...> results_;
188 size_t completed_count_ = 0;
190 Canceller canceller_;
193template <
typename Receiver,
typename... Senders>
194using ParallelAnyOperationState =
195 ParallelOperationState<Receiver, WhenAnyCanceller<stop_token_t<Receiver>>,
198template <
typename Receiver,
typename... Senders>
199using ParallelAllOperationState =
200 ParallelOperationState<Receiver, WhenAllCanceller<stop_token_t<Receiver>>,
203template <
typename Receiver>
struct ReceiverAllWrapper {
205 ReceiverAllWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
206 template <
typename R>
void operator()(R &&result)
noexcept {
207 auto &[order, results] = result;
208 std::move(receiver)(std::move(results));
210 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
213template <
typename Receiver>
struct ReceiverAnyWrapper {
215 ReceiverAnyWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
216 template <
typename R>
void operator()(R &&result)
noexcept {
217 auto &[order, results] = result;
218 size_t index = order[0];
219 std::move(receiver)(tuple_at(results, index));
221 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
224template <
typename Receiver,
typename... Senders>
225using WhenAnyOperationState =
226 ParallelAnyOperationState<ReceiverAnyWrapper<Receiver>, Senders...>;
228template <
typename Receiver,
typename... Senders>
229using WhenAllOperationState =
230 ParallelAllOperationState<ReceiverAllWrapper<Receiver>, Senders...>;
232template <
typename Receiver,
unsigned int Flags,
typename... Senders>
233class LinkOperationState :
public WhenAllOperationState<Receiver, Senders...> {
235 using Base = WhenAllOperationState<Receiver, Senders...>;
238 void start(
unsigned int flags)
noexcept {
239 auto *ring = detail::Context::current().ring();
240 ring->reserve_space(
sizeof...(Senders));
241 start_linked_operations_(flags);
245 template <
size_t I = 0>
246 void start_linked_operations_(
unsigned int flags)
noexcept {
247 if constexpr (I <
sizeof...(Senders)) {
248 auto &state = std::get<I>(Base::op_states_);
249 if constexpr (I <
sizeof...(Senders) - 1) {
250 state.get().start(flags | Flags);
252 state.get().start(flags);
254 start_linked_operations_<I + 1>(flags);
259template <
typename Receiver,
typename Canceller,
typename Sender>
260class RangedParallelOperationState {
262 RangedParallelOperationState(std::vector<Sender> senders, Receiver receiver)
263 : op_states_(senders.size()), order_(senders.size()),
264 results_(senders.size()), receiver_(std::move(receiver)) {
265 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
266 for (
size_t i = 0; i < senders.size(); ++i) {
267 op_states_[i].accept([&] {
268 return std::move(senders[i])
269 .connect(ChildReceiver{
this, i, next_token});
274 RangedParallelOperationState(RangedParallelOperationState &&) =
delete;
275 RangedParallelOperationState &
276 operator=(RangedParallelOperationState &&) =
delete;
277 RangedParallelOperationState(
const RangedParallelOperationState &) =
delete;
278 RangedParallelOperationState &
279 operator=(
const RangedParallelOperationState &) =
delete;
281 ~RangedParallelOperationState() {
282 for (
auto &op_state : op_states_) {
287 void start(
unsigned int flags)
noexcept {
288 if (op_states_.empty()) {
289 std::move(receiver_)(
290 std::make_pair(std::move(order_), std::move(results_)));
292 for (
auto &op_state : op_states_) {
293 op_state.get().start(flags);
300 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
301 std::declval<stop_token_t<Receiver>>()))>;
303 template <
typename R>
void receive_(
size_t index, R &&result)
noexcept {
304 canceller_.maybe_request_stop();
305 size_t no = completed_count_++;
307 results_[index] = std::forward<R>(result);
308 if (no + 1 == op_states_.size()) {
309 std::move(receiver_)(
310 std::make_pair(std::move(order_), std::move(results_)));
314 struct ChildReceiver {
315 RangedParallelOperationState *self;
317 TokenType stop_token;
318 template <
typename R>
void operator()(R &&result)
noexcept {
319 self->receive_(index, std::forward<R>(result));
321 auto get_stop_token() const noexcept {
return stop_token; }
324 using OperationStates =
325 std::vector<RawStorage<operation_state_t<Sender, ChildReceiver>>>;
328 OperationStates op_states_;
329 std::vector<size_t> order_;
330 std::vector<typename Sender::ReturnType> results_;
331 size_t completed_count_ = 0;
333 Canceller canceller_;
336template <
typename Receiver,
typename Sender>
337using RangedParallelAllOperationState = RangedParallelOperationState<
338 Receiver, WhenAllCanceller<stop_token_t<Receiver>>, Sender>;
340template <
typename Receiver,
typename Sender>
341using RangedParallelAnyOperationState = RangedParallelOperationState<
342 Receiver, WhenAnyCanceller<stop_token_t<Receiver>>, Sender>;
344template <
typename Receiver>
345using ReceiverRangedAllWrapper = ReceiverAllWrapper<Receiver>;
347template <
typename Receiver>
struct ReceiverRangedAnyWrapper {
349 ReceiverRangedAnyWrapper(Receiver receiver)
350 : receiver(std::move(receiver)) {}
351 template <
typename R>
void operator()(R &&result)
noexcept {
352 auto &[order, results] = result;
353 size_t index = order[0];
354 std::move(receiver)(std::make_pair(index, std::move(results[index])));
356 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
359template <
typename Receiver,
typename Sender>
360using WhenAllRangeOperationState =
361 RangedParallelAllOperationState<ReceiverRangedAllWrapper<Receiver>, Sender>;
363template <
typename Receiver,
typename Sender>
364using WhenAnyRangeOperationState =
365 RangedParallelAnyOperationState<ReceiverRangedAnyWrapper<Receiver>, Sender>;
367template <
typename Receiver,
unsigned int Flags,
typename Sender>
368class RangedLinkOperationState
369 :
public WhenAllRangeOperationState<Receiver, Sender> {
371 using Base = WhenAllRangeOperationState<Receiver, Sender>;
374 void start(
unsigned int flags)
noexcept {
375 auto *ring = detail::Context::current().ring();
376 ring->reserve_space(Base::op_states_.size());
377 for (
size_t i = 0; i < Base::op_states_.size(); ++i) {
378 auto &op_state = Base::op_states_[i];
379 if (i < Base::op_states_.size() - 1) {
380 op_state.get().start(flags | Flags);
382 op_state.get().start(flags);
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Internal utility classes and functions used by Condy.