Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include "condy/utils.hpp"
15#include <bit>
16#include <cerrno>
17#include <coroutine>
18#include <cstddef>
19#include <new>
20#include <optional>
21#include <type_traits>
22
23namespace condy {
24
40template <typename T, size_t N = 2> class Channel {
41public:
48 : buffer_(capacity ? std::bit_ceil(capacity) : 0) {}
49 ~Channel() {
50 std::lock_guard<std::mutex> lock(mutex_);
51 push_close_inner_();
52 destruct_all_();
53 }
54
55 Channel(const Channel &) = delete;
56 Channel &operator=(const Channel &) = delete;
57 Channel(Channel &&) = delete;
58 Channel &operator=(Channel &&) = delete;
59
60public:
67 template <typename U>
68 requires std::is_same_v<std::decay_t<U>, T>
69 int try_push(U &&item) noexcept {
70 std::lock_guard<std::mutex> lock(mutex_);
71 if (closed_) {
72 return -EPIPE;
73 }
74 if (try_push_inner_(std::forward<U>(item))) {
75 return 0;
76 }
77 return -EAGAIN;
78 }
79
86 std::pair<int, T> try_pop() noexcept {
87 std::lock_guard<std::mutex> lock(mutex_);
88 auto item = try_pop_inner_();
89 if (item.has_value()) {
90 return {0, std::move(item.value())};
91 } else if (closed_) {
92 return {-EPIPE, T()};
93 } else {
94 return {-EAGAIN, T()};
95 }
96 }
97
98 void force_push(T item) noexcept {
99 std::lock_guard<std::mutex> lock(mutex_);
100 if (closed_) [[unlikely]] {
101 panic_on("Push to closed channel");
102 }
103 if (try_push_inner_(std::move(item))) [[likely]] {
104 return;
105 }
106 // This is safe because if try_push_inner_ returns false, the item has
107 // not been moved into the channel.
108 // NOLINTBEGIN(bugprone-use-after-move)
109 auto *fake_handle =
110 new (std::nothrow) PushFinishHandle(std::move(item));
111 // NOLINTEND(bugprone-use-after-move)
112 if (!fake_handle) {
113 panic_on("Allocation failed for PushFinishHandle");
114 }
115 assert(pop_awaiters_.empty());
116 push_awaiters_.push_back(fake_handle);
117 }
118
119 struct [[nodiscard]] PushAwaiter;
133 PushAwaiter push(T item) noexcept { return {*this, std::move(item)}; }
134
135 struct [[nodiscard]] PopAwaiter;
145 PopAwaiter pop() noexcept { return {*this}; }
146
150 size_t capacity() const noexcept { return buffer_.capacity(); }
151
156 size_t size() const noexcept {
157 std::lock_guard<std::mutex> lock(mutex_);
158 return size_;
159 }
160
165 bool empty() const noexcept {
166 std::lock_guard<std::mutex> lock(mutex_);
167 return size_ == 0;
168 }
169
174 bool is_closed() const noexcept {
175 std::lock_guard<std::mutex> lock(mutex_);
176 return closed_;
177 }
178
187 void push_close() noexcept {
188 std::lock_guard<std::mutex> lock(mutex_);
189 push_close_inner_();
190 }
191
192private:
193 class PushFinishHandle;
194 class PopFinishHandle;
195
196 int request_push_(PushFinishHandle *finish_handle) noexcept {
197 std::lock_guard<std::mutex> lock(mutex_);
198 if (closed_) {
199 return -EPIPE;
200 }
201 if (try_push_inner_(std::move(finish_handle->get_item()))) {
202 return 0;
203 }
204 assert(pop_awaiters_.empty());
205 push_awaiters_.push_back(finish_handle);
206 detail::Context::current().runtime()->pend_work();
207 return -EAGAIN;
208 }
209
210 bool cancel_push_(PushFinishHandle *finish_handle) noexcept {
211 std::lock_guard<std::mutex> lock(mutex_);
212 return push_awaiters_.remove(finish_handle);
213 }
214
215 std::pair<int, T> request_pop_(PopFinishHandle *finish_handle) noexcept {
216 std::lock_guard<std::mutex> lock(mutex_);
217 auto result = try_pop_inner_();
218 if (result.has_value()) {
219 return {0, std::move(result.value())};
220 }
221 assert(push_awaiters_.empty());
222 if (closed_) {
223 return {-EPIPE, T()};
224 }
225 pop_awaiters_.push_back(finish_handle);
226 detail::Context::current().runtime()->pend_work();
227 return {-EAGAIN, T()};
228 }
229
230 bool cancel_pop_(PopFinishHandle *finish_handle) noexcept {
231 std::lock_guard<std::mutex> lock(mutex_);
232 return pop_awaiters_.remove(finish_handle);
233 }
234
235private:
236 template <typename U>
237 requires std::is_same_v<std::decay_t<U>, T>
238 bool try_push_inner_(U &&item) noexcept {
239 if (!pop_awaiters_.empty()) {
240 assert(empty_inner_());
241 auto *pop_handle = pop_awaiters_.pop_front();
242 pop_handle->set_result({0, std::forward<U>(item)});
243 pop_handle->schedule();
244 return true;
245 }
246 if (!full_inner_()) {
247 push_inner_(std::forward<U>(item));
248 return true;
249 }
250 return false;
251 }
252
253 std::optional<T> try_pop_inner_() noexcept {
254 if (!push_awaiters_.empty()) {
255 assert(full_inner_());
256 auto *push_handle = push_awaiters_.pop_front();
257 T item = std::move(push_handle->get_item());
258 push_handle->set_result(0);
259 push_handle->schedule();
260 return pop_and_push_(std::move(item));
261 }
262 if (!empty_inner_()) {
263 T result = pop_inner_();
264 return result;
265 }
266 return std::nullopt;
267 }
268
269 T pop_and_push_(T item) noexcept {
270 if (no_buffer_()) {
271 return item;
272 } else {
273 T result = pop_inner_();
274 push_inner_(std::move(item));
275 return result;
276 }
277 }
278
279 template <typename U>
280 requires std::is_same_v<std::decay_t<U>, T>
281 void push_inner_(U &&item) noexcept {
282 assert(!full_inner_());
283 auto mask = buffer_.capacity() - 1;
284 buffer_[tail_ & mask].construct(std::forward<U>(item));
285 tail_++;
286 size_++;
287 }
288
289 T pop_inner_() noexcept {
290 assert(!empty_inner_());
291 auto mask = buffer_.capacity() - 1;
292 T item = std::move(buffer_[head_ & mask].get());
293 buffer_[head_ & mask].destroy();
294 head_++;
295 size_--;
296 return item;
297 }
298
299 bool no_buffer_() const noexcept { return buffer_.capacity() == 0; }
300
301 bool empty_inner_() const noexcept {
302 if (no_buffer_()) {
303 return true;
304 }
305 return size_ == 0;
306 }
307
308 bool full_inner_() const noexcept {
309 if (no_buffer_()) {
310 return true;
311 }
312 return size_ == buffer_.capacity();
313 }
314
315 void push_close_inner_() noexcept {
316 if (closed_) {
317 return;
318 }
319 closed_ = true;
320 // Cancel all pending pop awaiters
321 PopFinishHandle *pop_handle = nullptr;
322 while ((pop_handle = pop_awaiters_.pop_front()) != nullptr) {
323 assert(empty_inner_());
324 pop_handle->set_result({-EPIPE, T()});
325 pop_handle->schedule();
326 }
327 // Cancel all pending push awaiters
328 PushFinishHandle *push_handle = nullptr;
329 while ((push_handle = push_awaiters_.pop_front()) != nullptr) {
330 assert(full_inner_());
331 push_handle->set_result(-EPIPE);
332 push_handle->schedule();
333 }
334 }
335
336 void destruct_all_() noexcept {
337 while (!empty_inner_()) {
338 pop_inner_();
339 }
340 assert(size_ == 0);
341 assert(head_ == tail_);
342 }
343
344private:
345 template <typename Handle>
346 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
347
348 mutable std::mutex mutex_;
349 HandleList<PushFinishHandle> push_awaiters_;
350 HandleList<PopFinishHandle> pop_awaiters_;
351 size_t head_ = 0;
352 size_t tail_ = 0;
353 size_t size_ = 0;
354 SmallArray<RawStorage<T>, N> buffer_;
355 bool closed_ = false;
356};
357
358template <typename T, size_t N>
359class Channel<T, N>::PushFinishHandle
360 : public InvokerAdapter<PushFinishHandle, WorkInvoker> {
361public:
362 using ReturnType = int;
363
364 PushFinishHandle(T item) : item_(std::move(item)) {}
365
366 void cancel([[maybe_unused]] Runtime *runtime) noexcept {
367 assert(runtime == runtime_);
368 if (channel_->cancel_push_(this)) {
369 // Successfully canceled
370 assert(result_ == -ENOTRECOVERABLE);
371 result_ = -ECANCELED;
372 need_resume_ = true;
373 runtime_->schedule(this);
374 }
375 }
376
377 ReturnType extract_result() noexcept { return result_; }
378
379 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
380
381 void invoke() noexcept {
382 if (need_resume_) {
383 runtime_->resume_work();
384 }
385 (*invoker_)();
386 }
387
388public:
389 void init(Channel *channel, Runtime *runtime) noexcept {
390 channel_ = channel;
391 runtime_ = runtime;
392 }
393
394 T &get_item() noexcept { return item_; }
395
396 void schedule() noexcept {
397 if (runtime_ == nullptr) [[unlikely]] {
398 // Fake handle, no need to schedule
399 delete this;
400 } else {
401 need_resume_ = true;
402 runtime_->schedule(this);
403 }
404 }
405
406 void set_result(int result) noexcept { result_ = result; }
407
408public:
409 DoubleLinkEntry link_entry_;
410
411private:
412 Invoker *invoker_ = nullptr;
413 Channel *channel_ = nullptr;
414 Runtime *runtime_ = nullptr;
415 T item_;
416 bool need_resume_ = false;
417 int result_ = -ENOTRECOVERABLE; // Internal error if not set
418};
419
420template <typename T, size_t N>
421class Channel<T, N>::PopFinishHandle
422 : public InvokerAdapter<PopFinishHandle, WorkInvoker> {
423public:
424 using ReturnType = std::pair<int, T>;
425
426 void cancel([[maybe_unused]] Runtime *runtime) noexcept {
427 assert(runtime == runtime_);
428 if (channel_->cancel_pop_(this)) {
429 // Successfully canceled
430 assert(result_.first == -ENOTRECOVERABLE);
431 result_.first = -ECANCELED;
432 need_resume_ = true;
433 runtime_->schedule(this);
434 }
435 }
436
437 ReturnType extract_result() noexcept { return std::move(result_); }
438
439 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
440
441 void invoke() noexcept {
442 if (need_resume_) {
443 runtime_->resume_work();
444 }
445 (*invoker_)();
446 }
447
448public:
449 void init(Channel *channel, Runtime *runtime) noexcept {
450 channel_ = channel;
451 runtime_ = runtime;
452 }
453
454 void set_result(ReturnType result) noexcept { result_ = std::move(result); }
455
456 void schedule() noexcept {
457 assert(runtime_ != nullptr);
458 need_resume_ = true;
459 runtime_->schedule(this);
460 }
461
462public:
463 DoubleLinkEntry link_entry_;
464
465private:
466 Invoker *invoker_ = nullptr;
467 Channel *channel_ = nullptr;
468 Runtime *runtime_ = nullptr;
469 ReturnType result_ = {-ENOTRECOVERABLE, T()}; // Internal error if not set
470 bool need_resume_ = false;
471};
472
479template <typename T, size_t N> struct Channel<T, N>::PushAwaiter {
480public:
481 using HandleType = PushFinishHandle;
482
483 PushAwaiter(Channel &channel, T item)
484 : channel_(channel), finish_handle_(std::move(item)) {}
485
486public:
487 HandleType *get_handle() noexcept { return &finish_handle_; }
488
489 void init_finish_handle() noexcept { /* Leaf node, no-op */ }
490
491 void register_operation(unsigned int /*flags*/) noexcept {
492 auto *runtime = detail::Context::current().runtime();
493 finish_handle_.init(&channel_, runtime);
494 int r = channel_.request_push_(&finish_handle_);
495 if (r != -EAGAIN) {
496 // Operation completed immediately
497 finish_handle_.set_result(r);
498 runtime->schedule(&finish_handle_);
499 }
500 }
501
502public:
503 bool await_ready() const noexcept { return false; }
504
505 template <typename PromiseType>
506 bool await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
507 init_finish_handle();
508 finish_handle_.set_invoker(&h.promise());
509 finish_handle_.init(&channel_, detail::Context::current().runtime());
510 int r = channel_.request_push_(&finish_handle_);
511 if (r != -EAGAIN) {
512 // Operation completed immediately
513 finish_handle_.set_result(r);
514 return false; // Do not suspend
515 }
516 return true; // Suspend
517 }
518
519 auto await_resume() noexcept { return finish_handle_.extract_result(); }
520
521private:
522 Channel &channel_;
523 PushFinishHandle finish_handle_;
524};
525
532template <typename T, size_t N> struct Channel<T, N>::PopAwaiter {
533public:
534 using HandleType = PopFinishHandle;
535
536 PopAwaiter(Channel &channel) : channel_(channel) {}
537
538public:
539 HandleType *get_handle() noexcept { return &finish_handle_; }
540
541 void init_finish_handle() noexcept { /* Leaf node, no-op */ }
542
543 void register_operation(unsigned int /*flags*/) noexcept {
544 auto *runtime = detail::Context::current().runtime();
545 finish_handle_.init(&channel_, runtime);
546 auto item = channel_.request_pop_(&finish_handle_);
547 auto r = item.first;
548 if (r != -EAGAIN) {
549 finish_handle_.set_result(std::move(item));
550 runtime->schedule(&finish_handle_);
551 }
552 }
553
554public:
555 bool await_ready() const noexcept { return false; }
556
557 template <typename PromiseType>
558 bool await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
559 init_finish_handle();
560 finish_handle_.set_invoker(&h.promise());
561 finish_handle_.init(&channel_, detail::Context::current().runtime());
562 auto item = channel_.request_pop_(&finish_handle_);
563 auto r = item.first;
564 if (r != -EAGAIN) {
565 finish_handle_.set_result(std::move(item));
566 return false; // Do not suspend
567 }
568 return true; // Suspend
569 }
570
571 auto await_resume() noexcept { return finish_handle_.extract_result(); }
572
573private:
574 Channel &channel_;
575 PopFinishHandle finish_handle_;
576};
577
578} // namespace condy
Thread-safe bounded channel for communication and synchronization.
Definition channel.hpp:40
void push_close() noexcept
Close the channel.
Definition channel.hpp:187
int try_push(U &&item) noexcept
Try to push an item into the channel.
Definition channel.hpp:69
bool empty() const noexcept
Check if the channel is empty.
Definition channel.hpp:165
PushAwaiter push(T item) noexcept
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:133
size_t size() const noexcept
Get the current size of the channel.
Definition channel.hpp:156
std::pair< int, T > try_pop() noexcept
Try to pop an item from the channel.
Definition channel.hpp:86
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.hpp:174
size_t capacity() const noexcept
Get the capacity of the channel.
Definition channel.hpp:150
PopAwaiter pop() noexcept
Pop an item from the channel, awaiting if necessary.
Definition channel.hpp:145
Channel(size_t capacity)
Construct a new Channel object.
Definition channel.hpp:47
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
Runtime type for running the io_uring event loop.
Awaiter for popping an item from the channel.
Definition channel.hpp:532
Awaiter for pushing an item into the channel.
Definition channel.hpp:479
Internal utility classes and functions used by Condy.