Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
channel_legacy.hpp
Go to the documentation of this file.
1
8
9#pragma once
10
11#include "condy/context.hpp"
12#include "condy/intrusive.hpp"
13#include "condy/invoker.hpp"
14#include "condy/runtime.hpp"
15#include "condy/utils.hpp"
16#include <bit>
17#include <coroutine>
18#include <cstddef>
19#include <new>
20#include <optional>
21#include <type_traits>
22
23namespace condy {
24
25namespace legacy {
26
27#pragma GCC diagnostic push
28#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
29
45template <typename T, size_t N = 2>
46class [[deprecated("Use condy::Channel instead")]] Channel {
47public:
54 : buffer_(capacity ? std::bit_ceil(capacity) : 0) {}
55 ~Channel() {
56 std::lock_guard<std::mutex> lock(mutex_);
57 push_close_inner_();
58 destruct_all_();
59 }
60
61 Channel(const Channel &) = delete;
62 Channel &operator=(const Channel &) = delete;
63 Channel(Channel &&) = delete;
64 Channel &operator=(Channel &&) = delete;
65
66public:
73 template <typename U>
74 requires std::is_same_v<std::decay_t<U>, T>
75 bool try_push(U &&item) {
76 std::lock_guard<std::mutex> lock(mutex_);
77 if (closed_) [[unlikely]] {
78 throw std::logic_error("Push to closed channel");
79 }
80 return try_push_inner_(std::forward<U>(item));
81 }
82
89 std::optional<T> try_pop() noexcept {
90 std::lock_guard<std::mutex> lock(mutex_);
91 return try_pop_inner_();
92 }
93
94 void force_push(T item) noexcept {
95 std::lock_guard<std::mutex> lock(mutex_);
96 if (closed_) [[unlikely]] {
97 panic_on("Push to closed channel");
98 }
99 if (try_push_inner_(std::move(item))) [[likely]] {
100 return;
101 }
102 // This is safe because if try_push_inner_ returns false, the item has
103 // not been moved into the channel.
104 // NOLINTBEGIN(bugprone-use-after-move)
105 auto *fake_handle =
106 new (std::nothrow) PushFinishHandle(std::move(item));
107 // NOLINTEND(bugprone-use-after-move)
108 if (!fake_handle) {
109 panic_on("Allocation failed for PushFinishHandle");
110 }
111 assert(pop_awaiters_.empty());
112 push_awaiters_.push_back(fake_handle);
113 }
114
115 struct [[nodiscard]] PushAwaiter;
128 PushAwaiter push(T item) noexcept { return {*this, std::move(item)}; }
129
130 struct [[nodiscard]] PopAwaiter;
139 PopAwaiter pop() noexcept { return {*this}; }
140
144 size_t capacity() const noexcept { return buffer_.capacity(); }
145
150 size_t size() const noexcept {
151 std::lock_guard<std::mutex> lock(mutex_);
152 return size_;
153 }
154
159 bool empty() const noexcept {
160 std::lock_guard<std::mutex> lock(mutex_);
161 return size_ == 0;
162 }
163
168 bool is_closed() const noexcept {
169 std::lock_guard<std::mutex> lock(mutex_);
170 return closed_;
171 }
172
181 void push_close() noexcept {
182 std::lock_guard<std::mutex> lock(mutex_);
183 push_close_inner_();
184 }
185
186private:
187 class PushFinishHandle;
188 class PopFinishHandle;
189
190 std::optional<bool>
191 request_push_(PushFinishHandle *finish_handle) noexcept {
192 std::lock_guard<std::mutex> lock(mutex_);
193 if (closed_) [[unlikely]] {
194 return std::nullopt;
195 }
196 if (try_push_inner_(std::move(finish_handle->get_item()))) {
197 return true;
198 }
199 assert(pop_awaiters_.empty());
200 push_awaiters_.push_back(finish_handle);
201 detail::Context::current().runtime()->pend_work();
202 return false;
203 }
204
205 bool cancel_push_(PushFinishHandle *finish_handle) noexcept {
206 std::lock_guard<std::mutex> lock(mutex_);
207 return push_awaiters_.remove(finish_handle);
208 }
209
210 std::optional<T> request_pop_(PopFinishHandle *finish_handle) noexcept {
211 std::lock_guard<std::mutex> lock(mutex_);
212 auto result = try_pop_inner_();
213 if (result.has_value()) {
214 return result;
215 }
216 assert(push_awaiters_.empty());
217 pop_awaiters_.push_back(finish_handle);
218 detail::Context::current().runtime()->pend_work();
219 return std::nullopt;
220 }
221
222 bool cancel_pop_(PopFinishHandle *finish_handle) noexcept {
223 std::lock_guard<std::mutex> lock(mutex_);
224 return pop_awaiters_.remove(finish_handle);
225 }
226
227private:
228 template <typename U>
229 requires std::is_same_v<std::decay_t<U>, T>
230 bool try_push_inner_(U &&item) noexcept {
231 if (!pop_awaiters_.empty()) {
232 assert(empty_inner_());
233 auto *pop_handle = pop_awaiters_.pop_front();
234 pop_handle->set_result(std::forward<U>(item));
235 pop_handle->schedule();
236 return true;
237 }
238 if (!full_inner_()) {
239 push_inner_(std::forward<U>(item));
240 return true;
241 }
242 return false;
243 }
244
245 std::optional<T> try_pop_inner_() noexcept {
246 if (!push_awaiters_.empty()) {
247 assert(full_inner_());
248 auto *push_handle = push_awaiters_.pop_front();
249 T item = std::move(push_handle->get_item());
250 push_handle->schedule();
251 if (no_buffer_()) {
252 return item;
253 } else {
254 T result = pop_inner_();
255 push_inner_(std::move(item));
256 return result;
257 }
258 }
259 if (!empty_inner_()) {
260 T result = pop_inner_();
261 return result;
262 }
263 if (closed_) [[unlikely]] {
264 // Default indicates closed channel
265 T return_value = {};
266 return return_value;
267 }
268 return std::nullopt;
269 }
270
271 template <typename U>
272 requires std::is_same_v<std::decay_t<U>, T>
273 void push_inner_(U &&item) noexcept {
274 assert(!full_inner_());
275 auto mask = buffer_.capacity() - 1;
276 buffer_[tail_ & mask].construct(std::forward<U>(item));
277 tail_++;
278 size_++;
279 }
280
281 T pop_inner_() noexcept {
282 assert(!empty_inner_());
283 auto mask = buffer_.capacity() - 1;
284 T item = std::move(buffer_[head_ & mask].get());
285 buffer_[head_ & mask].destroy();
286 head_++;
287 size_--;
288 return item;
289 }
290
291 bool no_buffer_() const noexcept { return buffer_.capacity() == 0; }
292
293 bool empty_inner_() const noexcept {
294 if (no_buffer_()) {
295 return true;
296 }
297 return size_ == 0;
298 }
299
300 bool full_inner_() const noexcept {
301 if (no_buffer_()) {
302 return true;
303 }
304 return size_ == buffer_.capacity();
305 }
306
307 void push_close_inner_() noexcept {
308 if (closed_) {
309 return;
310 }
311 closed_ = true;
312 // Cancel all pending pop awaiters
313 PopFinishHandle *pop_handle = nullptr;
314 while ((pop_handle = pop_awaiters_.pop_front()) != nullptr) {
315 assert(empty_inner_());
316 pop_handle->schedule();
317 }
318 // Throw exception to all pending push awaiters
319 PushFinishHandle *push_handle = nullptr;
320 while ((push_handle = push_awaiters_.pop_front()) != nullptr) {
321 push_handle->enable_throw();
322 push_handle->schedule();
323 }
324 }
325
326 void destruct_all_() noexcept {
327 while (!empty_inner_()) {
328 pop_inner_();
329 }
330 assert(size_ == 0);
331 assert(head_ == tail_);
332 }
333
334private:
335 template <typename Handle>
336 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
337
338 mutable std::mutex mutex_;
339 HandleList<PushFinishHandle> push_awaiters_;
340 HandleList<PopFinishHandle> pop_awaiters_;
341 size_t head_ = 0;
342 size_t tail_ = 0;
343 size_t size_ = 0;
344 SmallArray<RawStorage<T>, N> buffer_;
345 bool closed_ = false;
346};
347
348template <typename T, size_t N>
349class Channel<T, N>::PushFinishHandle
350 : public InvokerAdapter<PushFinishHandle, WorkInvoker> {
351public:
352 using ReturnType = bool;
353
354 PushFinishHandle(T item) : item_(std::move(item)) {}
355
356 void cancel([[maybe_unused]] Runtime *runtime) noexcept {
357 assert(runtime == runtime_);
358 if (channel_->cancel_push_(this)) {
359 // Successfully canceled
360 canceled_ = true;
361 runtime_->resume_work();
362 runtime_->schedule(this);
363 }
364 }
365
366 ReturnType extract_result() {
367 if (should_throw_) [[unlikely]] {
368 throw std::logic_error("Push to closed channel");
369 }
370 bool success = !canceled_;
371 return success;
372 }
373
374 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
375
376 void invoke() noexcept {
377 if (need_resume_) {
378 runtime_->resume_work();
379 }
380 (*invoker_)();
381 }
382
383public:
384 void init(Channel *channel, Runtime *runtime) noexcept {
385 channel_ = channel;
386 runtime_ = runtime;
387 }
388
389 T &get_item() noexcept { return item_; }
390
391 void schedule() noexcept {
392 if (runtime_ == nullptr) [[unlikely]] {
393 // Fake handle, no need to schedule
394 delete this;
395 } else {
396 need_resume_ = true;
397 runtime_->schedule(this);
398 }
399 }
400
401 void enable_throw() noexcept { should_throw_ = true; }
402
403public:
404 DoubleLinkEntry link_entry_;
405
406private:
407 Invoker *invoker_ = nullptr;
408 Channel *channel_ = nullptr;
409 Runtime *runtime_ = nullptr;
410 T item_;
411 bool need_resume_ = false;
412 bool should_throw_ = false;
413 bool canceled_ = false;
414};
415
416template <typename T, size_t N>
417class Channel<T, N>::PopFinishHandle
418 : public InvokerAdapter<PopFinishHandle, WorkInvoker> {
419public:
420 using ReturnType = T;
421
422 void cancel([[maybe_unused]] Runtime *runtime) noexcept {
423 assert(runtime == runtime_);
424 if (channel_->cancel_pop_(this)) {
425 // Successfully canceled
426 runtime_->resume_work();
427 runtime_->schedule(this);
428 }
429 }
430
431 ReturnType extract_result() noexcept { return std::move(result_); }
432
433 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
434
435 void invoke() noexcept {
436 if (need_resume_) {
437 runtime_->resume_work();
438 }
439 (*invoker_)();
440 }
441
442public:
443 void init(Channel *channel, Runtime *runtime) noexcept {
444 channel_ = channel;
445 runtime_ = runtime;
446 }
447
448 void set_result(T result) noexcept { result_ = std::move(result); }
449
450 void schedule() noexcept {
451 assert(runtime_ != nullptr);
452 need_resume_ = true;
453 runtime_->schedule(this);
454 }
455
456public:
457 DoubleLinkEntry link_entry_;
458
459private:
460 Invoker *invoker_ = nullptr;
461 Channel *channel_ = nullptr;
462 Runtime *runtime_ = nullptr;
463 T result_ = {};
464 bool need_resume_ = false;
465};
466
473template <typename T, size_t N> struct Channel<T, N>::PushAwaiter {
474public:
475 using HandleType = PushFinishHandle;
476
477 PushAwaiter(Channel &channel, T item)
478 : channel_(channel), finish_handle_(std::move(item)) {}
479
480public:
481 HandleType *get_handle() noexcept { return &finish_handle_; }
482
483 void init_finish_handle() noexcept { /* Leaf node, no-op */ }
484
485 void register_operation(unsigned int /*flags*/) noexcept {
486 auto *runtime = detail::Context::current().runtime();
487 finish_handle_.init(&channel_, runtime);
488 auto result = channel_.request_push_(&finish_handle_);
489 if (!result.has_value()) [[unlikely]] {
490 finish_handle_.enable_throw();
491 runtime->schedule(&finish_handle_);
492 return;
493 }
494 bool ok = *result;
495 if (ok) {
496 runtime->schedule(&finish_handle_);
497 }
498 }
499
500public:
501 bool await_ready() const noexcept { return false; }
502
503 template <typename PromiseType>
504 bool await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
505 init_finish_handle();
506 finish_handle_.set_invoker(&h.promise());
507 finish_handle_.init(&channel_, detail::Context::current().runtime());
508 auto result = channel_.request_push_(&finish_handle_);
509 if (!result.has_value()) [[unlikely]] {
510 finish_handle_.enable_throw();
511 return false; // Do not suspend
512 }
513 bool ok = *result;
514 bool do_suspend = !ok;
515 return do_suspend;
516 }
517
518 auto await_resume() { return finish_handle_.extract_result(); }
519
520private:
521 Channel &channel_;
522 PushFinishHandle finish_handle_;
523};
524
530template <typename T, size_t N> struct Channel<T, N>::PopAwaiter {
531public:
532 using HandleType = PopFinishHandle;
533
534 PopAwaiter(Channel &channel) : channel_(channel) {}
535
536public:
537 HandleType *get_handle() noexcept { return &finish_handle_; }
538
539 void init_finish_handle() noexcept { /* Leaf node, no-op */ }
540
541 void register_operation(unsigned int /*flags*/) noexcept {
542 auto *runtime = detail::Context::current().runtime();
543 finish_handle_.init(&channel_, runtime);
544 auto item = channel_.request_pop_(&finish_handle_);
545 if (item.has_value()) {
546 finish_handle_.set_result(std::move(item.value()));
547 runtime->schedule(&finish_handle_);
548 }
549 }
550
551public:
552 bool await_ready() const noexcept { return false; }
553
554 template <typename PromiseType>
555 bool await_suspend(std::coroutine_handle<PromiseType> h) noexcept {
556 init_finish_handle();
557 finish_handle_.set_invoker(&h.promise());
558 finish_handle_.init(&channel_, detail::Context::current().runtime());
559 auto item = channel_.request_pop_(&finish_handle_);
560 if (item.has_value()) {
561 finish_handle_.set_result(std::move(item.value()));
562 return false; // Do not suspend
563 }
564 return true; // Suspend
565 }
566
567 auto await_resume() noexcept { return finish_handle_.extract_result(); }
568
569private:
570 Channel &channel_;
571 PopFinishHandle finish_handle_;
572};
573
574#pragma GCC diagnostic pop
575
576} // namespace legacy
577
578} // namespace condy
Thread-safe bounded channel for communication and synchronization.
Definition channel.hpp:40
[Deprecated] Thread-safe bounded channel for communication and synchronization.
bool empty() const noexcept
Check if the channel is empty.
std::optional< T > try_pop() noexcept
Try to pop an item from the channel.
bool is_closed() const noexcept
Check if the channel is closed.
size_t capacity() const noexcept
Get the capacity of the channel.
PushAwaiter push(T item) noexcept
Push an item into the channel, awaiting if necessary.
size_t size() const noexcept
Get the current size of the channel.
Channel(size_t capacity)
Construct a new Channel object.
bool try_push(U &&item)
Try to push an item into the channel.
PopAwaiter pop() noexcept
Pop an item from the channel, awaiting if necessary.
void push_close() noexcept
Close the channel.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
Runtime type for running the io_uring event loop.
Awaiter for popping an item from the channel.
Awaiter for pushing an item into the channel.
Internal utility classes and functions used by Condy.