Condy v1.1.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include "condy/utils.hpp"
15#include <coroutine>
16#include <cstddef>
17#include <optional>
18
19namespace condy {
20
33template <typename T, size_t N = 2> class Channel {
34public:
41 : buffer_(capacity ? std::bit_ceil(capacity) : 0) {}
42 ~Channel() {
43 std::lock_guard<std::mutex> lock(mutex_);
44 push_close_inner_();
45 destruct_all_();
46 }
47
48 Channel(const Channel &) = delete;
49 Channel &operator=(const Channel &) = delete;
50 Channel(Channel &&) = delete;
51 Channel &operator=(Channel &&) = delete;
52
53public:
62 template <typename U> bool try_push(U &&item) {
63 std::lock_guard<std::mutex> lock(mutex_);
64 return try_push_inner_(std::forward<U>(item));
65 }
66
72 std::optional<T> try_pop() {
73 std::lock_guard<std::mutex> lock(mutex_);
74 return try_pop_inner_();
75 }
76
77 template <typename U> void force_push(U &&item) {
78 std::lock_guard<std::mutex> lock(mutex_);
79 if (try_push_inner_(std::forward<U>(item))) [[likely]] {
80 return;
81 }
82 auto *fake_handle = new PushFinishHandle(std::forward<U>(item));
83 assert(pop_awaiters_.empty());
84 push_awaiters_.push_back(fake_handle);
85 }
86
87 struct [[nodiscard]] PushAwaiter;
101 template <typename U> PushAwaiter push(U &&item) {
102 return {*this, std::forward<U>(item)};
103 }
104
105 struct [[nodiscard]] PopAwaiter;
114 PopAwaiter pop() { return {*this}; }
115
119 size_t capacity() const noexcept { return buffer_.capacity(); }
120
125 size_t size() const noexcept {
126 std::lock_guard<std::mutex> lock(mutex_);
127 return size_;
128 }
129
134 bool empty() const noexcept {
135 std::lock_guard<std::mutex> lock(mutex_);
136 return size_ == 0;
137 }
138
143 bool is_closed() const noexcept {
144 std::lock_guard<std::mutex> lock(mutex_);
145 return closed_;
146 }
147
156 void push_close() {
157 std::lock_guard<std::mutex> lock(mutex_);
158 push_close_inner_();
159 }
160
161private:
162 class PushFinishHandle;
163 class PopFinishHandle;
164
165 bool request_push_(PushFinishHandle *finish_handle) {
166 std::lock_guard<std::mutex> lock(mutex_);
167 if (try_push_inner_(std::move(finish_handle->get_item()))) {
168 return true;
169 }
170 assert(pop_awaiters_.empty());
171 push_awaiters_.push_back(finish_handle);
172 Context::current().runtime()->pend_work();
173 return false;
174 }
175
176 bool cancel_push_(PushFinishHandle *finish_handle) {
177 std::lock_guard<std::mutex> lock(mutex_);
178 return push_awaiters_.remove(finish_handle);
179 }
180
181 std::optional<T> request_pop_(PopFinishHandle *finish_handle) {
182 std::lock_guard<std::mutex> lock(mutex_);
183 auto result = try_pop_inner_();
184 if (result.has_value()) {
185 return result;
186 }
187 assert(push_awaiters_.empty());
188 pop_awaiters_.push_back(finish_handle);
189 Context::current().runtime()->pend_work();
190 return std::nullopt;
191 }
192
193 bool cancel_pop_(PopFinishHandle *finish_handle) {
194 std::lock_guard<std::mutex> lock(mutex_);
195 return pop_awaiters_.remove(finish_handle);
196 }
197
198private:
199 template <typename U> bool try_push_inner_(U &&item) {
200 if (closed_) [[unlikely]] {
201 throw std::logic_error("Push to closed channel");
202 }
203 if (!pop_awaiters_.empty()) {
204 assert(empty_inner_());
205 auto *pop_handle = pop_awaiters_.pop_front();
206 pop_handle->set_result(std::forward<U>(item));
207 pop_handle->schedule();
208 return true;
209 }
210 if (!full_inner_()) {
211 push_inner_(std::forward<U>(item));
212 return true;
213 }
214 return false;
215 }
216
217 std::optional<T> try_pop_inner_() {
218 if (!push_awaiters_.empty()) {
219 assert(full_inner_());
220 auto *push_handle = push_awaiters_.pop_front();
221 T item = std::move(push_handle->get_item());
222 push_handle->schedule();
223 if (no_buffer_()) {
224 return item;
225 } else {
226 T result = pop_inner_();
227 push_inner_(std::move(item));
228 return result;
229 }
230 }
231 if (!empty_inner_()) {
232 T result = pop_inner_();
233 return result;
234 }
235 if (closed_) [[unlikely]] {
236 // Default indicates closed channel
237 T return_value = {};
238 return return_value;
239 }
240 return std::nullopt;
241 }
242
243 template <typename U> void push_inner_(U &&item) {
244 assert(!full_inner_());
245 auto mask = buffer_.capacity() - 1;
246 buffer_[tail_ & mask].construct(std::forward<U>(item));
247 tail_++;
248 size_++;
249 }
250
251 T pop_inner_() {
252 assert(!empty_inner_());
253 auto mask = buffer_.capacity() - 1;
254 T item = std::move(buffer_[head_ & mask].get());
255 buffer_[head_ & mask].destroy();
256 head_++;
257 size_--;
258 return item;
259 }
260
261 bool no_buffer_() const noexcept { return buffer_.capacity() == 0; }
262
263 bool empty_inner_() const noexcept {
264 if (no_buffer_()) {
265 return true;
266 }
267 return size_ == 0;
268 }
269
270 bool full_inner_() const noexcept {
271 if (no_buffer_()) {
272 return true;
273 }
274 return size_ == buffer_.capacity();
275 }
276
277 void push_close_inner_() {
278 if (closed_) {
279 return;
280 }
281 closed_ = true;
282 // Cancel all pending pop awaiters
283 PopFinishHandle *pop_handle = nullptr;
284 while ((pop_handle = pop_awaiters_.pop_front()) != nullptr) {
285 assert(empty_inner_());
286 pop_handle->schedule();
287 }
288 // Throw exception to all pending push awaiters
289 PushFinishHandle *push_handle = nullptr;
290 while ((push_handle = push_awaiters_.pop_front()) != nullptr) {
291 push_handle->enable_throw();
292 push_handle->schedule();
293 }
294 }
295
296 void destruct_all_() {
297 while (!empty_inner_()) {
298 pop_inner_();
299 }
300 assert(size_ == 0);
301 assert(head_ == tail_);
302 }
303
304private:
305 template <typename Handle>
306 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
307
308 mutable std::mutex mutex_;
309 HandleList<PushFinishHandle> push_awaiters_;
310 HandleList<PopFinishHandle> pop_awaiters_;
311 size_t head_ = 0;
312 size_t tail_ = 0;
313 size_t size_ = 0;
314 SmallArray<RawStorage<T>, N> buffer_;
315 bool closed_ = false;
316};
317
318template <typename T, size_t N>
319class Channel<T, N>::PushFinishHandle
320 : public InvokerAdapter<PushFinishHandle, WorkInvoker> {
321public:
322 using ReturnType = bool;
323
324 PushFinishHandle(T item) : item_(std::move(item)) {}
325
326 void cancel() {
327 if (channel_->cancel_push_(this)) {
328 // Successfully canceled
329 canceled_ = true;
330 runtime_->resume_work();
331 runtime_->schedule(this);
332 }
333 }
334
335 ReturnType extract_result() {
336 if (should_throw_) [[unlikely]] {
337 throw std::logic_error("Push to closed channel");
338 }
339 bool success = !canceled_;
340 return success;
341 }
342
343 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
344
345 void invoke() {
346 if (need_resume_) {
347 runtime_->resume_work();
348 }
349 (*invoker_)();
350 }
351
352public:
353 void init(Channel *channel, Runtime *runtime) {
354 channel_ = channel;
355 runtime_ = runtime;
356 }
357
358 T &get_item() { return item_; }
359
360 void schedule() {
361 if (runtime_ == nullptr) [[unlikely]] {
362 // Fake handle, no need to schedule
363 delete this;
364 } else {
365 need_resume_ = true;
366 runtime_->schedule(this);
367 }
368 }
369
370 void enable_throw() { should_throw_ = true; }
371
372public:
373 DoubleLinkEntry link_entry_;
374
375private:
376 Invoker *invoker_ = nullptr;
377 Channel *channel_ = nullptr;
378 Runtime *runtime_ = nullptr;
379 T item_;
380 bool need_resume_ = false;
381 bool should_throw_ = false;
382 bool canceled_ = false;
383};
384
385template <typename T, size_t N>
386class Channel<T, N>::PopFinishHandle
387 : public InvokerAdapter<PopFinishHandle, WorkInvoker> {
388public:
389 using ReturnType = T;
390
391 void cancel() {
392 if (channel_->cancel_pop_(this)) {
393 // Successfully canceled
394 runtime_->resume_work();
395 runtime_->schedule(this);
396 }
397 }
398
399 ReturnType extract_result() { return std::move(result_); }
400
401 void set_invoker(Invoker *invoker) { invoker_ = invoker; }
402
403 void invoke() {
404 if (need_resume_) {
405 runtime_->resume_work();
406 }
407 (*invoker_)();
408 }
409
410public:
411 void init(Channel *channel, Runtime *runtime) {
412 channel_ = channel;
413 runtime_ = runtime;
414 }
415
416 void set_result(T result) { result_ = std::move(result); }
417
418 void schedule() {
419 assert(runtime_ != nullptr);
420 need_resume_ = true;
421 runtime_->schedule(this);
422 }
423
424public:
425 DoubleLinkEntry link_entry_;
426
427private:
428 Invoker *invoker_ = nullptr;
429 Channel *channel_ = nullptr;
430 Runtime *runtime_ = nullptr;
431 T result_ = {};
432 bool need_resume_ = false;
433};
434
441template <typename T, size_t N> struct Channel<T, N>::PushAwaiter {
442public:
443 using HandleType = PushFinishHandle;
444
445 PushAwaiter(Channel &channel, T item)
446 : channel_(channel), finish_handle_(std::move(item)) {}
447 PushAwaiter(PushAwaiter &&) = default;
448
449 PushAwaiter(const PushAwaiter &) = delete;
450 PushAwaiter &operator=(const PushAwaiter &) = delete;
451 PushAwaiter &operator=(PushAwaiter &&) = delete;
452
453public:
454 HandleType *get_handle() { return &finish_handle_; }
455
456 void init_finish_handle() { /* Leaf node, no-op */ }
457
458 void register_operation(unsigned int /*flags*/) {
459 auto *runtime = Context::current().runtime();
460 finish_handle_.init(&channel_, runtime);
461 bool ok = channel_.request_push_(&finish_handle_);
462 if (ok) {
463 runtime->schedule(&finish_handle_);
464 }
465 }
466
467public:
468 bool await_ready() const noexcept { return false; }
469
470 template <typename PromiseType>
471 bool await_suspend(std::coroutine_handle<PromiseType> h) {
472 init_finish_handle();
473 finish_handle_.set_invoker(&h.promise());
474 finish_handle_.init(&channel_, Context::current().runtime());
475 bool ok = channel_.request_push_(&finish_handle_);
476 bool do_suspend = !ok;
477 return do_suspend;
478 }
479
480 auto await_resume() { return finish_handle_.extract_result(); }
481
482private:
483 Channel &channel_;
484 PushFinishHandle finish_handle_;
485};
486
492template <typename T, size_t N> struct Channel<T, N>::PopAwaiter {
493public:
494 using HandleType = PopFinishHandle;
495
496 PopAwaiter(Channel &channel) : channel_(channel) {}
497 PopAwaiter(PopAwaiter &&) = default;
498
499 PopAwaiter(const PopAwaiter &) = delete;
500 PopAwaiter &operator=(const PopAwaiter &) = delete;
501 PopAwaiter &operator=(PopAwaiter &&) = delete;
502
503public:
504 HandleType *get_handle() { return &finish_handle_; }
505
506 void init_finish_handle() { /* Leaf node, no-op */ }
507
508 void register_operation(unsigned int /*flags*/) {
509 auto *runtime = Context::current().runtime();
510 finish_handle_.init(&channel_, runtime);
511 auto item = channel_.request_pop_(&finish_handle_);
512 if (item.has_value()) {
513 finish_handle_.set_result(std::move(item.value()));
514 runtime->schedule(&finish_handle_);
515 }
516 }
517
518public:
519 bool await_ready() const noexcept { return false; }
520
521 template <typename PromiseType>
522 bool await_suspend(std::coroutine_handle<PromiseType> h) {
523 init_finish_handle();
524 finish_handle_.set_invoker(&h.promise());
525 finish_handle_.init(&channel_, Context::current().runtime());
526 auto item = channel_.request_pop_(&finish_handle_);
527 if (item.has_value()) {
528 finish_handle_.set_result(std::move(item.value()));
529 return false; // Do not suspend
530 }
531 return true; // Suspend
532 }
533
534 auto await_resume() { return finish_handle_.extract_result(); }
535
536private:
537 Channel &channel_;
538 PopFinishHandle finish_handle_;
539};
540
541} // namespace condy
Thread-safe bounded channel for communication and synchronization.
Definition channel.hpp:33
void push_close()
Close the channel.
Definition channel.hpp:156
PushAwaiter push(U &&item)
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:101
bool empty() const noexcept
Check if the channel is empty.
Definition channel.hpp:134
PopAwaiter pop()
Pop an item from the channel, awaiting if necessary.
Definition channel.hpp:114
std::optional< T > try_pop()
Try to pop an item from the channel.
Definition channel.hpp:72
size_t size() const noexcept
Get the current size of the channel.
Definition channel.hpp:125
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.hpp:143
size_t capacity() const noexcept
Get the capacity of the channel.
Definition channel.hpp:119
Channel(size_t capacity)
Construct a new Channel object.
Definition channel.hpp:40
bool try_push(U &&item)
Try to push an item into the channel.
Definition channel.hpp:62
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
Runtime type for running the io_uring event loop.
Awaiter for popping an item from the channel.
Definition channel.hpp:492
Awaiter for pushing an item into the channel.
Definition channel.hpp:441
Internal utility classes and functions used by Condy.