Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
sender_operations.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/concepts.hpp"
9#include "condy/senders.hpp"
10#include <coroutine>
11
12namespace condy {
13
14template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename... Args>
15auto build_op_sender(PrepFunc &&prep_func, Args &&...args) {
16 return OpSender<std::decay_t<PrepFunc>, CQEHandler>(
17 std::forward<PrepFunc>(prep_func),
18 CQEHandler(std::forward<Args>(args)...));
19}
20
21template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc,
22 typename MultiShotFunc, typename... Args>
23auto build_multishot_op_sender(PrepFunc &&func, MultiShotFunc &&multishot_func,
24 Args &&...handler_args) {
25 return MultiShotOpSender<std::decay_t<PrepFunc>, CQEHandler,
26 std::decay_t<MultiShotFunc>>(
27 std::forward<PrepFunc>(func),
28 CQEHandler(std::forward<Args>(handler_args)...),
29 std::forward<MultiShotFunc>(multishot_func));
30}
31
32template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename FreeFunc,
33 typename... Args>
34auto build_zero_copy_op_sender(PrepFunc &&func, FreeFunc &&free_func,
35 Args &&...handler_args) {
36 return ZeroCopyOpSender<std::decay_t<PrepFunc>, CQEHandler,
37 std::decay_t<FreeFunc>>(
38 std::forward<PrepFunc>(func),
39 CQEHandler(std::forward<Args>(handler_args)...),
40 std::forward<FreeFunc>(free_func));
41}
42
43namespace detail {
44
45struct NeverStopToken {
46public:
47 template <typename> struct callback_type {
48 constexpr explicit callback_type(NeverStopToken, auto &&) noexcept {}
49 };
50
51 static constexpr bool stop_requested() noexcept { return false; }
52
53 static constexpr bool stop_possible() noexcept { return false; }
54
55 constexpr bool operator==(NeverStopToken const &) const noexcept = default;
56};
57
58template <typename Sender> class SenderAwaiter {
59public:
60 SenderAwaiter(Sender sender)
61 : operation_state_(std::move(sender).connect(Receiver{this})) {}
62
63 SenderAwaiter(const SenderAwaiter &) = delete;
64 SenderAwaiter &operator=(const SenderAwaiter &) = delete;
65 SenderAwaiter(SenderAwaiter &&) = delete;
66 SenderAwaiter &operator=(SenderAwaiter &&) = delete;
67
68public:
69 bool await_ready() const noexcept { return false; }
70
71 template <typename Promise>
72 bool await_suspend(std::coroutine_handle<Promise> h) noexcept {
73 operation_state_.start(0);
74 if (handle_ == std::noop_coroutine()) {
75 // The operation completed synchronously, no need to suspend
76 return false;
77 } else {
78 handle_ = h;
79 return true;
80 }
81 }
82
83 auto await_resume() noexcept { return std::move(result_); }
84
85private:
86 struct Receiver {
87 SenderAwaiter *self;
88 template <typename R> void operator()(R &&result) noexcept {
89 self->handle_result_(std::forward<R>(result));
90 }
91 NeverStopToken get_stop_token() const noexcept { return {}; }
92 };
93
94 template <typename R> void handle_result_(R &&result) {
95 result_ = std::forward<R>(result);
96 if (handle_) {
97 handle_.resume();
98 } else {
99 handle_ = std::noop_coroutine();
100 }
101 }
102
103 using OperationState = operation_state_t<Sender, Receiver>;
104 // Await/complete path is serialized, so atomic is not needed here.
105 std::coroutine_handle<> handle_ = nullptr;
106 OperationState operation_state_;
107 typename Sender::ReturnType result_;
108};
109
110template <typename Sender> auto as_awaiter(Sender &&sender) {
111 return detail::SenderAwaiter<std::decay_t<Sender>>(
112 std::forward<Sender>(sender));
113}
114
115} // namespace detail
116
122template <unsigned int Flags, typename Sender> auto flag(Sender &&sender) {
123 return FlaggedOpSender<Flags, std::decay_t<Sender>>(
124 std::forward<Sender>(sender));
125}
126
131template <typename Sender> auto drain(Sender &&sender) {
132 return flag<IOSQE_IO_DRAIN>(std::forward<Sender>(sender));
133}
134
139template <typename Sender> auto always_async(Sender &&sender) {
140 return flag<IOSQE_ASYNC>(std::forward<Sender>(sender));
141}
142
149template <template <typename... Senders> typename SenderType,
150 typename... Senders>
151auto parallel(Senders &&...senders) {
152 return SenderType<std::decay_t<Senders>...>(
153 std::forward<Senders>(senders)...);
154}
155
162template <template <typename Sender> typename RangedSenderType,
163 std::ranges::range Range>
164auto parallel(Range &&range) {
165 using SenderType = typename std::decay_t<Range>::value_type;
166 auto begin = std::make_move_iterator(std::begin(range));
167 auto end = std::make_move_iterator(std::end(range));
168 std::vector<SenderType> senders(begin, end);
169 return RangedSenderType<SenderType>(std::move(senders));
170}
171
177template <typename... Senders> auto when_all(Senders &&...senders) {
178 return parallel<WhenAllSender>(std::forward<Senders>(senders)...);
179}
180
186template <std::ranges::range Range> auto when_all(Range &&range) {
187 return parallel<RangedWhenAllSender>(std::forward<Range>(range));
188}
189
195template <typename... Senders> auto when_any(Senders &&...senders) {
196 static_assert(sizeof...(Senders) > 0,
197 "when_any requires at least one sender");
198 return parallel<WhenAnySender>(std::forward<Senders>(senders)...);
199}
200
206template <std::ranges::range Range> auto when_any(Range &&range) {
207 return parallel<RangedWhenAnySender>(std::forward<Range>(range));
208}
209
215template <typename... Senders> auto link(Senders &&...senders) {
216 return parallel<LinkSender>(std::forward<Senders>(senders)...);
217}
218
224template <std::ranges::range Range> auto link(Range &&range) {
225 return parallel<RangedLinkSender>(std::forward<Range>(range));
226}
227
233template <typename... Senders> auto hard_link(Senders &&...senders) {
234 return parallel<HardLinkSender>(std::forward<Senders>(senders)...);
235}
236
242template <std::ranges::range Range> auto hard_link(Range &&range) {
243 return parallel<RangedHardLinkSender>(std::forward<Range>(range));
244}
245
249namespace operators {
250
254template <typename Sender1, typename Sender2>
255auto operator&&(Sender1 s1, Sender2 s2) {
256 return when_all(std::move(s1), std::move(s2));
257}
258
262template <typename S, typename... Ss>
263auto operator&&(WhenAllSender<Ss...> aws, S sender) {
264 return WhenAllSender<Ss..., std::decay_t<S>>(std::move(aws),
265 std::move(sender));
266}
267
271template <typename Sender1, typename Sender2>
272auto operator||(Sender1 s1, Sender2 s2) {
273 return when_any(std::move(s1), std::move(s2));
274}
275
279template <typename S, typename... Ss>
280auto operator||(WhenAnySender<Ss...> aws, S sender) {
281 return WhenAnySender<Ss..., std::decay_t<S>>(std::move(aws),
282 std::move(sender));
283}
284
288template <typename Sender1, typename Sender2>
289auto operator>>(Sender1 s1, Sender2 s2) {
290 return link(std::move(s1), std::move(s2));
291}
292
296template <typename S, typename... Ss>
297auto operator>>(LinkSender<Ss...> aws, S sender) {
298 return LinkSender<Ss..., std::decay_t<S>>(std::move(aws),
299 std::move(sender));
300}
301
302} // namespace operators
303
304} // namespace condy
Operators for composing operations.
auto operator&&(Sender1 s1, Sender2 s2)
Operator overloads version of condy::when_all.
auto operator>>(Sender1 s1, Sender2 s2)
Operator overloads version of condy::link.
auto operator||(Sender1 s1, Sender2 s2)
Operator overloads version of condy::when_any.
The main namespace for the Condy library.
Definition condy.hpp:30
auto drain(Sender &&sender)
Mark an operation as drain operation.
auto link(Senders &&...senders)
Compose multiple operations into a single operation that executes them in sequence.
auto parallel(Senders &&...senders)
Compose multiple operations into a single sender that executes them in parallel.
auto hard_link(Senders &&...senders)
Compose multiple operations into a single operation that executes them in sequence and continues even...
auto when_any(Senders &&...senders)
Compose multiple operations into a single operation that completes when any of them complete.
auto when_all(Senders &&...senders)
Compose multiple operations into a single operation that completes when all of them complete.
auto flag(Sender &&sender)
Decorates an operation with specific io_uring sqe flags.
auto always_async(Sender &&sender)
Mark an operation to always execute asynchronously.
Sender types for composing asynchronous operations.