Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include "condy/type_traits.hpp"
15#include "condy/utils.hpp"
16#include <bit>
17#include <cerrno>
18#include <cstddef>
19#include <cstdint>
20#include <new>
21#include <optional>
22#include <type_traits>
23
24namespace condy {
25
38template <typename T, size_t N = 2> class Channel {
39public:
46 : buffer_(capacity ? std::bit_ceil(capacity) : 0) {}
47 ~Channel() {
48 std::lock_guard<std::mutex> lock(mutex_);
49 push_close_inner_();
50 destruct_all_();
51 }
52
53 Channel(const Channel &) = delete;
54 Channel &operator=(const Channel &) = delete;
55 Channel(Channel &&) = delete;
56 Channel &operator=(Channel &&) = delete;
57
58public:
65 template <typename U>
66 requires std::is_same_v<std::decay_t<U>, T>
67 int32_t try_push(U &&item) noexcept {
68 std::lock_guard<std::mutex> lock(mutex_);
69 if (closed_) {
70 return -EPIPE;
71 }
72 if (try_push_inner_(std::forward<U>(item))) {
73 return 0;
74 }
75 return -EAGAIN;
76 }
77
84 std::pair<int32_t, T> try_pop() noexcept {
85 std::lock_guard<std::mutex> lock(mutex_);
86 auto item = try_pop_inner_();
87 if (item.has_value()) {
88 return {0, std::move(item.value())};
89 } else if (closed_) {
90 return {-EPIPE, T()};
91 } else {
92 return {-EAGAIN, T()};
93 }
94 }
95
96 void force_push(T item) noexcept {
97 std::lock_guard<std::mutex> lock(mutex_);
98 if (closed_) [[unlikely]] {
99 panic_on("Push to closed channel");
100 }
101 if (try_push_inner_(std::move(item))) [[likely]] {
102 return;
103 }
104 // This is safe because if try_push_inner_ returns false, the item has
105 // not been moved into the channel.
106 // NOLINTBEGIN(bugprone-use-after-move)
107 auto *fake_handle =
108 new (std::nothrow) FakePushFinishHandle(std::move(item));
109 // NOLINTEND(bugprone-use-after-move)
110 if (!fake_handle) {
111 panic_on("Allocation failed for PushFinishHandle");
112 }
113 assert(pop_awaiters_.empty());
114 push_awaiters_.push_back(fake_handle);
115 }
116
117 class [[nodiscard]] MovePushSender;
130 MovePushSender push(T &&item) noexcept { return {*this, std::move(item)}; }
131
132 class [[nodiscard]] CopyPushSender;
140 CopyPushSender push(const T &item) noexcept
141 requires std::copy_constructible<T>
142 {
143 return {*this, item};
144 }
145
146 class [[nodiscard]] PopSender;
153 PopSender pop() noexcept { return {*this}; }
154
158 size_t capacity() const noexcept { return buffer_.capacity(); }
159
164 size_t size() const noexcept {
165 std::lock_guard<std::mutex> lock(mutex_);
166 return size_inner_();
167 }
168
173 bool empty() const noexcept {
174 std::lock_guard<std::mutex> lock(mutex_);
175 return empty_inner_();
176 }
177
182 bool is_closed() const noexcept {
183 std::lock_guard<std::mutex> lock(mutex_);
184 return closed_;
185 }
186
195 void push_close() noexcept {
196 std::lock_guard<std::mutex> lock(mutex_);
197 push_close_inner_();
198 }
199
200private:
201 class PushFinishHandleBase;
202 template <typename Receiver> class PushFinishHandle;
203 class FakePushFinishHandle;
204
205 class PopFinishHandleBase;
206 template <typename Receiver> class PopFinishHandle;
207
208 int32_t request_push_(PushFinishHandleBase *finish_handle) noexcept {
209 std::lock_guard<std::mutex> lock(mutex_);
210 if (closed_) {
211 return -EPIPE;
212 }
213 if (try_push_inner_(std::move(finish_handle->get_item()))) {
214 return 0;
215 }
216 assert(pop_awaiters_.empty());
217 push_awaiters_.push_back(finish_handle);
218 detail::Context::current().runtime()->pend_work();
219 return -EAGAIN;
220 }
221
222 bool cancel_push_(PushFinishHandleBase *finish_handle) noexcept {
223 std::lock_guard<std::mutex> lock(mutex_);
224 return push_awaiters_.remove(finish_handle);
225 }
226
227 std::pair<int32_t, T>
228 request_pop_(PopFinishHandleBase *finish_handle) noexcept {
229 std::lock_guard<std::mutex> lock(mutex_);
230 auto result = try_pop_inner_();
231 if (result.has_value()) {
232 return {0, std::move(result.value())};
233 }
234 assert(push_awaiters_.empty());
235 if (closed_) {
236 return {-EPIPE, T()};
237 }
238 pop_awaiters_.push_back(finish_handle);
239 detail::Context::current().runtime()->pend_work();
240 return {-EAGAIN, T()};
241 }
242
243 bool cancel_pop_(PopFinishHandleBase *finish_handle) noexcept {
244 std::lock_guard<std::mutex> lock(mutex_);
245 return pop_awaiters_.remove(finish_handle);
246 }
247
248private:
249 template <typename U>
250 requires std::is_same_v<std::decay_t<U>, T>
251 bool try_push_inner_(U &&item) noexcept {
252 if (!pop_awaiters_.empty()) {
253 assert(empty_inner_());
254 auto *pop_handle = pop_awaiters_.pop_front();
255 pop_handle->set_result({0, std::forward<U>(item)});
256 pop_handle->schedule();
257 return true;
258 }
259 if (!full_inner_()) {
260 push_inner_(std::forward<U>(item));
261 return true;
262 }
263 return false;
264 }
265
266 std::optional<T> try_pop_inner_() noexcept {
267 if (!push_awaiters_.empty()) {
268 assert(full_inner_());
269 auto *push_handle = push_awaiters_.pop_front();
270 T item = std::move(push_handle->get_item());
271 push_handle->set_result(0);
272 push_handle->schedule();
273 return pop_and_push_(std::move(item));
274 }
275 if (!empty_inner_()) {
276 T result = pop_inner_();
277 return result;
278 }
279 return std::nullopt;
280 }
281
282 T pop_and_push_(T item) noexcept {
283 if (no_buffer_()) {
284 return item;
285 } else {
286 T result = pop_inner_();
287 push_inner_(std::move(item));
288 return result;
289 }
290 }
291
292 template <typename U>
293 requires std::is_same_v<std::decay_t<U>, T>
294 void push_inner_(U &&item) noexcept {
295 assert(!full_inner_());
296 auto mask = buffer_.capacity() - 1;
297 buffer_[tail_ & mask].construct(std::forward<U>(item));
298 tail_++;
299 }
300
301 T pop_inner_() noexcept {
302 assert(!empty_inner_());
303 auto mask = buffer_.capacity() - 1;
304 T item = std::move(buffer_[head_ & mask].get());
305 buffer_[head_ & mask].destroy();
306 head_++;
307 return item;
308 }
309
310 bool no_buffer_() const noexcept { return buffer_.capacity() == 0; }
311
312 bool empty_inner_() const noexcept { return size_inner_() == 0; }
313
314 bool full_inner_() const noexcept {
315 return size_inner_() == buffer_.capacity();
316 }
317
318 void push_close_inner_() noexcept {
319 if (closed_) {
320 return;
321 }
322 closed_ = true;
323 // Cancel all pending pop awaiters
324 PopFinishHandleBase *pop_handle = nullptr;
325 while ((pop_handle = pop_awaiters_.pop_front()) != nullptr) {
326 assert(empty_inner_());
327 pop_handle->set_result({-EPIPE, T()});
328 pop_handle->schedule();
329 }
330 // Cancel all pending push awaiters
331 PushFinishHandleBase *push_handle = nullptr;
332 while ((push_handle = push_awaiters_.pop_front()) != nullptr) {
333 assert(full_inner_());
334 push_handle->set_result(-EPIPE);
335 push_handle->schedule();
336 }
337 }
338
339 void destruct_all_() noexcept {
340 while (!empty_inner_()) {
341 pop_inner_();
342 }
343 assert(head_ == tail_);
344 }
345
346 size_t size_inner_() const noexcept { return tail_ - head_; }
347
348private:
349 template <typename Handle>
350 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
351
352 mutable std::mutex mutex_;
353 HandleList<PushFinishHandleBase> push_awaiters_;
354 HandleList<PopFinishHandleBase> pop_awaiters_;
355 size_t head_ = 0;
356 size_t tail_ = 0;
357 SmallArray<RawStorage<T>, N> buffer_;
358 bool closed_ = false;
359};
360
361template <typename T, size_t N>
362class Channel<T, N>::PushFinishHandleBase : public WorkInvoker {
363public:
364 PushFinishHandleBase(T &item) : item_(item) {}
365
366 void schedule() noexcept {
367 if (runtime_ == nullptr) [[unlikely]] {
368 // Fake handle, no need to schedule
369 auto *this_fake = static_cast<FakePushFinishHandle *>(this);
370 delete this_fake;
371 } else {
372 runtime_->schedule(this);
373 }
374 }
375
376 T &get_item() noexcept { return item_; }
377
378 void set_result(int32_t result) noexcept { result_ = result; }
379
380public:
381 DoubleLinkEntry link_entry_;
382
383public:
384 Runtime *runtime_ = nullptr;
385 T &item_;
386 int32_t result_ = -ENOTRECOVERABLE; // Internal error if not set
387};
388
389template <typename T, size_t N>
390template <typename Receiver>
391class Channel<T, N>::PushFinishHandle
392 : public InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase> {
393public:
394 using Base =
395 InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase>;
396
397 PushFinishHandle(Channel &channel, T &item, Receiver receiver)
398 : Base(item), channel_(channel), receiver_(std::move(receiver)) {}
399
400 void start(Runtime *runtime) noexcept {
401 this->runtime_ = runtime;
402 int32_t r = channel_.request_push_(this);
403 if (r != -EAGAIN) {
404 std::move(receiver_)(r);
405 return;
406 }
407
408 auto stop_token = receiver_.get_stop_token();
409 if (stop_token.stop_possible()) {
410 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
411 }
412 }
413
414 void invoke() noexcept {
415 stop_callback_.reset();
416 assert(this->runtime_ != nullptr);
417 this->runtime_->resume_work();
418 std::move(receiver_)(this->result_);
419 }
420
421private:
422 void cancel_() noexcept {
423 if (channel_.cancel_push_(this)) {
424 // Successfully canceled
425 assert(this->result_ == -ENOTRECOVERABLE);
426 this->result_ = -ECANCELED;
427 assert(this->runtime_ != nullptr);
428 this->runtime_->schedule(this);
429 }
430 }
431
432 struct Cancellation {
433 PushFinishHandle *self;
434 void operator()() noexcept { self->cancel_(); }
435 };
436
437 using StopCallbackType =
438 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
439
440private:
441 Channel &channel_;
442 Receiver receiver_;
443 std::optional<StopCallbackType> stop_callback_;
444};
445
446template <typename T, size_t N>
447class Channel<T, N>::FakePushFinishHandle : public PushFinishHandleBase {
448public:
449 FakePushFinishHandle(T &&item)
450 : PushFinishHandleBase(item_copy_), item_copy_(std::move(item)) {}
451
452private:
453 T item_copy_;
454};
455
456template <typename T, size_t N>
457class Channel<T, N>::PopFinishHandleBase : public WorkInvoker {
458public:
459 void schedule() noexcept {
460 assert(runtime_ != nullptr);
461 runtime_->schedule(this);
462 }
463
464 void set_result(std::pair<int32_t, T> result) noexcept {
465 result_ = std::move(result);
466 }
467
468public:
469 DoubleLinkEntry link_entry_;
470
471protected:
472 Runtime *runtime_ = nullptr;
473 // Internal error if not set
474 std::pair<int32_t, T> result_ = {-ENOTRECOVERABLE, T()};
475};
476
477template <typename T, size_t N>
478template <typename Receiver>
479class Channel<T, N>::PopFinishHandle
480 : public InvokerAdapter<PopFinishHandle<Receiver>, PopFinishHandleBase> {
481public:
482 PopFinishHandle(Channel &channel, Receiver receiver)
483 : channel_(channel), receiver_(std::move(receiver)) {}
484
485 void start(Runtime *runtime) noexcept {
486 this->runtime_ = runtime;
487 auto item = channel_.request_pop_(this);
488 auto r = item.first;
489 if (r != -EAGAIN) {
490 std::move(receiver_)(std::move(item));
491 return;
492 }
493
494 auto stop_token = receiver_.get_stop_token();
495 if (stop_token.stop_possible()) {
496 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
497 }
498 }
499
500 void invoke() noexcept {
501 stop_callback_.reset();
502 assert(this->runtime_ != nullptr);
503 this->runtime_->resume_work();
504 std::move(receiver_)(std::move(this->result_));
505 }
506
507private:
508 void cancel_() noexcept {
509 if (channel_.cancel_pop_(this)) {
510 // Successfully canceled
511 assert(this->result_.first == -ENOTRECOVERABLE);
512 this->result_.first = -ECANCELED;
513 assert(this->runtime_ != nullptr);
514 this->runtime_->schedule(this);
515 }
516 }
517
518 struct Cancellation {
519 PopFinishHandle *self;
520 void operator()() noexcept { self->cancel_(); }
521 };
522
523 using StopCallbackType =
524 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
525
526private:
527 Channel &channel_;
528 Receiver receiver_;
529 std::optional<StopCallbackType> stop_callback_;
530};
531
532template <typename T, size_t N> class Channel<T, N>::MovePushSender {
533public:
534 using ReturnType = int32_t;
535
536 MovePushSender(Channel &channel, T &&item)
537 : channel_(channel), item_(std::move(item)) {}
538
539 template <typename Receiver> auto connect(Receiver receiver) noexcept {
540 return OperationState<Receiver>(channel_, std::move(item_),
541 std::move(receiver));
542 }
543
544private:
545 template <typename Receiver>
546 class OperationState
547 : public Channel<T, N>::template PushFinishHandle<Receiver> {
548 public:
549 using Base =
550 typename Channel<T, N>::template PushFinishHandle<Receiver>;
551 OperationState(Channel &channel, T &&item, Receiver receiver)
552 : Base(channel, item, std::move(receiver)) {}
553
554 void start(unsigned int /*flags*/) noexcept {
555 auto *runtime = detail::Context::current().runtime();
556 Base::start(runtime);
557 }
558 };
559
560 Channel &channel_;
561 T &&item_;
562};
563
564template <typename T, size_t N> class Channel<T, N>::CopyPushSender {
565public:
566 using ReturnType = int32_t;
567
568 CopyPushSender(Channel &channel, const T &item)
569 : channel_(channel), item_(item) {}
570
571 template <typename Receiver> auto connect(Receiver receiver) noexcept {
572 return OperationState<Receiver>(channel_, item_, std::move(receiver));
573 }
574
575private:
576 template <typename Receiver>
577 class OperationState
578 : public Channel<T, N>::template PushFinishHandle<Receiver> {
579 public:
580 using Base =
581 typename Channel<T, N>::template PushFinishHandle<Receiver>;
582 OperationState(Channel &channel, const T &item, Receiver receiver)
583 : Base(channel, item_copy_, std::move(receiver)), item_copy_(item) {
584 }
585
586 void start(unsigned int /*flags*/) noexcept {
587 auto *runtime = detail::Context::current().runtime();
588 Base::start(runtime);
589 }
590
591 private:
592 T item_copy_;
593 };
594
595 Channel &channel_;
596 const T &item_;
597};
598
599template <typename T, size_t N> class Channel<T, N>::PopSender {
600public:
601 using ReturnType = std::pair<int32_t, T>;
602
603 PopSender(Channel &channel) : channel_(channel) {}
604
605 template <typename Receiver> auto connect(Receiver receiver) noexcept {
606 return OperationState<Receiver>(channel_, std::move(receiver));
607 }
608
609private:
610 template <typename Receiver>
611 class OperationState
612 : public Channel<T, N>::template PopFinishHandle<Receiver> {
613 public:
614 using Base = typename Channel<T, N>::template PopFinishHandle<Receiver>;
615 using Base::Base;
616
617 void start(unsigned int /*flags*/) noexcept {
618 auto *runtime = detail::Context::current().runtime();
619 Base::start(runtime);
620 }
621 };
622
623 Channel &channel_;
624};
625
626} // namespace condy
Thread-safe bounded channel for communication and synchronization.
Definition channel.hpp:38
void push_close() noexcept
Close the channel.
Definition channel.hpp:195
MovePushSender push(T &&item) noexcept
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:130
std::pair< int32_t, T > try_pop() noexcept
Try to pop an item from the channel.
Definition channel.hpp:84
bool empty() const noexcept
Check if the channel is empty.
Definition channel.hpp:173
CopyPushSender push(const T &item) noexcept
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:140
PopSender pop() noexcept
Pop an item from the channel, awaiting if necessary.
Definition channel.hpp:153
size_t size() const noexcept
Get the current size of the channel.
Definition channel.hpp:164
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.hpp:182
int32_t try_push(U &&item) noexcept
Try to push an item into the channel.
Definition channel.hpp:67
size_t capacity() const noexcept
Get the capacity of the channel.
Definition channel.hpp:158
Channel(size_t capacity)
Construct a new Channel object.
Definition channel.hpp:45
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.