Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
op_states.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/concepts.hpp"
11#include "condy/type_traits.hpp"
12#include "condy/utils.hpp"
13#include <array>
14#include <cstddef>
15#include <optional>
16#include <stop_token>
17#include <tuple>
18#include <vector>
19
20namespace condy {
21namespace detail {
22
23template <typename Handle, PrepFuncLike Func> class OpSenderOperationState {
24public:
25 template <typename... HandleArgs>
26 OpSenderOperationState(Func prep_func, HandleArgs &&...handle_args)
27 : prep_func_(std::move(prep_func)),
28 finish_handle_(std::forward<HandleArgs>(handle_args)...) {}
29
30 OpSenderOperationState(OpSenderOperationState &&) = delete;
31 OpSenderOperationState &operator=(OpSenderOperationState &&) = delete;
32 OpSenderOperationState(const OpSenderOperationState &) = delete;
33 OpSenderOperationState &operator=(const OpSenderOperationState &) = delete;
34
35public:
36 void start(unsigned int flags) noexcept {
37 auto &context = detail::Context::current();
38 auto *ring = context.ring();
39 context.runtime()->pend_work();
40 io_uring_sqe *sqe = prep_func_(ring);
41 assert(sqe && "prep_func must return a valid sqe");
42 io_uring_sqe_set_flags(sqe, sqe->flags | flags);
43 auto work = encode_work(&finish_handle_.get(), WorkType::Common);
44 io_uring_sqe_set_data64(sqe, work);
45
46 finish_handle_.get().maybe_set_cancel(context.runtime());
47 }
48
49private:
50 Func prep_func_;
51 HandleBox<Handle> finish_handle_;
52};
53
54template <unsigned int Flags, typename Sender, typename Receiver>
55class FlaggedOpState {
56public:
57 FlaggedOpState(Sender sender, Receiver receiver)
58 : op_state_(sender.connect(std::move(receiver))) {}
59
60 FlaggedOpState(FlaggedOpState &&) = delete;
61 FlaggedOpState &operator=(FlaggedOpState &&) = delete;
62 FlaggedOpState(const FlaggedOpState &) = delete;
63 FlaggedOpState &operator=(const FlaggedOpState &) = delete;
64
65 void start(unsigned int flags) noexcept { op_state_.start(flags | Flags); }
66
67private:
68 using OperationState = operation_state_t<Sender, Receiver>;
69 OperationState op_state_;
70};
71
72template <typename TokenType> class WhenAnyCanceller {
73public:
74 auto chain_token(TokenType token) noexcept {
75 if (token.stop_possible()) {
76 stop_callback_.emplace(std::move(token), Cancellation{this});
77 }
78 return stop_source_.get_token();
79 }
80
81 void maybe_request_stop() noexcept { stop_source_.request_stop(); }
82
83 void maybe_reset() noexcept { stop_callback_.reset(); }
84
85private:
86 struct Cancellation {
87 WhenAnyCanceller *self;
88 void operator()() noexcept { self->cancel_(); }
89 };
90 void cancel_() noexcept { stop_source_.request_stop(); }
91
92 using StopCallbackType = stop_callback_t<TokenType, Cancellation>;
93
94 std::stop_source stop_source_;
95 std::optional<StopCallbackType> stop_callback_;
96};
97
98template <typename TokenType> class WhenAllCanceller {
99public:
100 auto chain_token(TokenType token) noexcept { return token; }
101
102 void maybe_request_stop() noexcept {}
103
104 void maybe_reset() noexcept {}
105};
106
107template <typename Receiver, typename Canceller, typename... Senders>
108class ParallelOperationState {
109public:
110 ParallelOperationState(std::tuple<Senders...> senders, Receiver receiver)
111 : receiver_(std::move(receiver)) {
112 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
113 connect_senders_(senders, next_token);
114 }
115
116 ParallelOperationState(ParallelOperationState &&) = delete;
117 ParallelOperationState &operator=(ParallelOperationState &&) = delete;
118 ParallelOperationState(const ParallelOperationState &) = delete;
119 ParallelOperationState &operator=(const ParallelOperationState &) = delete;
120
121 ~ParallelOperationState() {
122 std::apply([](auto &&...states) { (states.destroy(), ...); },
123 op_states_);
124 }
125
126 void start(unsigned int flags) noexcept {
127 if constexpr (sizeof...(Senders) == 0) {
128 std::move(receiver_)(
129 std::make_pair(std::move(order_), std::move(results_)));
130 } else {
131 std::apply(
132 [&](auto &&...states) { (states.get().start(flags), ...); },
133 op_states_);
134 }
135 }
136
137private:
138 using TokenType =
139 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
140 std::declval<stop_token_t<Receiver>>()))>;
141
142 template <size_t I = 0>
143 void connect_senders_(std::tuple<Senders...> &senders,
144 const TokenType &token) noexcept {
145 if constexpr (I < sizeof...(Senders)) {
146 std::get<I>(op_states_).accept([&] {
147 return std::move(std::get<I>(senders))
148 .connect(ChildReceiver<I>{this, token});
149 });
150 connect_senders_<I + 1>(senders, token);
151 }
152 }
153
154 template <size_t I, typename R> void receive_(R &&result) noexcept {
155 canceller_.maybe_request_stop();
156 auto no = completed_count_++;
157 order_[no] = I;
158 std::get<I>(results_) = std::forward<R>(result);
159 if (no + 1 == sizeof...(Senders)) {
160 canceller_.maybe_reset();
161 std::move(receiver_)(
162 std::make_pair(std::move(order_), std::move(results_)));
163 }
164 }
165
166 template <size_t I> struct ChildReceiver {
167 ParallelOperationState *self;
168 TokenType stop_token;
169 template <typename R> void operator()(R &&result) noexcept {
170 self->receive_<I>(std::forward<R>(result));
171 }
172 auto get_stop_token() const noexcept { return stop_token; }
173 };
174
175 template <typename T> struct operation_state_traits;
176 template <size_t... Is>
177 struct operation_state_traits<std::index_sequence<Is...>> {
178 using type = std::tuple<
179 RawStorage<operation_state_t<Senders, ChildReceiver<Is>>>...>;
180 };
181 using OperationStates = typename operation_state_traits<
182 std::make_index_sequence<sizeof...(Senders)>>::type;
183
184protected:
185 OperationStates op_states_;
186 std::array<size_t, sizeof...(Senders)> order_;
187 std::tuple<typename Senders::ReturnType...> results_;
188 size_t completed_count_ = 0;
189 Receiver receiver_;
190 Canceller canceller_;
191};
192
193template <typename Receiver, typename... Senders>
194using ParallelAnyOperationState =
195 ParallelOperationState<Receiver, WhenAnyCanceller<stop_token_t<Receiver>>,
196 Senders...>;
197
198template <typename Receiver, typename... Senders>
199using ParallelAllOperationState =
200 ParallelOperationState<Receiver, WhenAllCanceller<stop_token_t<Receiver>>,
201 Senders...>;
202
203template <typename Receiver> struct ReceiverAllWrapper {
204 Receiver receiver;
205 ReceiverAllWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
206 template <typename R> void operator()(R &&result) noexcept {
207 auto &[order, results] = result;
208 std::move(receiver)(std::move(results));
209 }
210 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
211};
212
213template <typename Receiver> struct ReceiverAnyWrapper {
214 Receiver receiver;
215 ReceiverAnyWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
216 template <typename R> void operator()(R &&result) noexcept {
217 auto &[order, results] = result;
218 size_t index = order[0];
219 std::move(receiver)(tuple_at(results, index));
220 }
221 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
222};
223
224template <typename Receiver, typename... Senders>
225using WhenAnyOperationState =
226 ParallelAnyOperationState<ReceiverAnyWrapper<Receiver>, Senders...>;
227
228template <typename Receiver, typename... Senders>
229using WhenAllOperationState =
230 ParallelAllOperationState<ReceiverAllWrapper<Receiver>, Senders...>;
231
232template <typename Receiver, unsigned int Flags, typename... Senders>
233class LinkOperationState : public WhenAllOperationState<Receiver, Senders...> {
234public:
235 using Base = WhenAllOperationState<Receiver, Senders...>;
236 using Base::Base;
237
238 void start(unsigned int flags) noexcept {
239 auto *ring = detail::Context::current().ring();
240 ring->reserve_space(sizeof...(Senders));
241 start_linked_operations_(flags);
242 }
243
244private:
245 template <size_t I = 0>
246 void start_linked_operations_(unsigned int flags) noexcept {
247 if constexpr (I < sizeof...(Senders)) {
248 auto &state = std::get<I>(Base::op_states_);
249 if constexpr (I < sizeof...(Senders) - 1) {
250 state.get().start(flags | Flags);
251 } else {
252 state.get().start(flags);
253 }
254 start_linked_operations_<I + 1>(flags);
255 }
256 }
257};
258
259template <typename Receiver, typename Canceller, typename Sender>
260class RangedParallelOperationState {
261public:
262 RangedParallelOperationState(std::vector<Sender> senders, Receiver receiver)
263 : op_states_(senders.size()), order_(senders.size()),
264 results_(senders.size()), receiver_(std::move(receiver)) {
265 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
266 for (size_t i = 0; i < senders.size(); ++i) {
267 op_states_[i].accept([&] {
268 return std::move(senders[i])
269 .connect(ChildReceiver{this, i, next_token});
270 });
271 }
272 }
273
274 RangedParallelOperationState(RangedParallelOperationState &&) = delete;
275 RangedParallelOperationState &
276 operator=(RangedParallelOperationState &&) = delete;
277 RangedParallelOperationState(const RangedParallelOperationState &) = delete;
278 RangedParallelOperationState &
279 operator=(const RangedParallelOperationState &) = delete;
280
281 ~RangedParallelOperationState() {
282 for (auto &op_state : op_states_) {
283 op_state.destroy();
284 }
285 }
286
287 void start(unsigned int flags) noexcept {
288 if (op_states_.empty()) {
289 std::move(receiver_)(
290 std::make_pair(std::move(order_), std::move(results_)));
291 } else {
292 for (auto &op_state : op_states_) {
293 op_state.get().start(flags);
294 }
295 }
296 }
297
298private:
299 using TokenType =
300 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
301 std::declval<stop_token_t<Receiver>>()))>;
302
303 template <typename R> void receive_(size_t index, R &&result) noexcept {
304 canceller_.maybe_request_stop();
305 size_t no = completed_count_++;
306 order_[no] = index;
307 results_[index] = std::forward<R>(result);
308 if (no + 1 == op_states_.size()) {
309 std::move(receiver_)(
310 std::make_pair(std::move(order_), std::move(results_)));
311 }
312 }
313
314 struct ChildReceiver {
315 RangedParallelOperationState *self;
316 size_t index;
317 TokenType stop_token;
318 template <typename R> void operator()(R &&result) noexcept {
319 self->receive_(index, std::forward<R>(result));
320 }
321 auto get_stop_token() const noexcept { return stop_token; }
322 };
323
324 using OperationStates =
325 std::vector<RawStorage<operation_state_t<Sender, ChildReceiver>>>;
326
327protected:
328 OperationStates op_states_;
329 std::vector<size_t> order_;
330 std::vector<typename Sender::ReturnType> results_;
331 size_t completed_count_ = 0;
332 Receiver receiver_;
333 Canceller canceller_;
334};
335
336template <typename Receiver, typename Sender>
337using RangedParallelAllOperationState = RangedParallelOperationState<
338 Receiver, WhenAllCanceller<stop_token_t<Receiver>>, Sender>;
339
340template <typename Receiver, typename Sender>
341using RangedParallelAnyOperationState = RangedParallelOperationState<
342 Receiver, WhenAnyCanceller<stop_token_t<Receiver>>, Sender>;
343
344template <typename Receiver>
345using ReceiverRangedAllWrapper = ReceiverAllWrapper<Receiver>;
346
347template <typename Receiver> struct ReceiverRangedAnyWrapper {
348 Receiver receiver;
349 ReceiverRangedAnyWrapper(Receiver receiver)
350 : receiver(std::move(receiver)) {}
351 template <typename R> void operator()(R &&result) noexcept {
352 auto &[order, results] = result;
353 size_t index = order[0];
354 std::move(receiver)(std::make_pair(index, std::move(results[index])));
355 }
356 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
357};
358
359template <typename Receiver, typename Sender>
360using WhenAllRangeOperationState =
361 RangedParallelAllOperationState<ReceiverRangedAllWrapper<Receiver>, Sender>;
362
363template <typename Receiver, typename Sender>
364using WhenAnyRangeOperationState =
365 RangedParallelAnyOperationState<ReceiverRangedAnyWrapper<Receiver>, Sender>;
366
367template <typename Receiver, unsigned int Flags, typename Sender>
368class RangedLinkOperationState
369 : public WhenAllRangeOperationState<Receiver, Sender> {
370public:
371 using Base = WhenAllRangeOperationState<Receiver, Sender>;
372 using Base::Base;
373
374 void start(unsigned int flags) noexcept {
375 auto *ring = detail::Context::current().ring();
376 ring->reserve_space(Base::op_states_.size());
377 for (size_t i = 0; i < Base::op_states_.size(); ++i) {
378 auto &op_state = Base::op_states_[i];
379 if (i < Base::op_states_.size() - 1) {
380 op_state.get().start(flags | Flags);
381 } else {
382 op_state.get().start(flags);
383 }
384 }
385 }
386};
387
388} // namespace detail
389
390} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:30
Internal utility classes and functions used by Condy.