33template <
typename T,
size_t N = 2>
class Channel {
43 std::lock_guard<std::mutex> lock(mutex_);
62 template <
typename U>
bool try_push(U &&item) {
63 std::lock_guard<std::mutex> lock(mutex_);
64 return try_push_inner_(std::forward<U>(item));
73 std::lock_guard<std::mutex> lock(mutex_);
74 return try_pop_inner_();
77 template <
typename U>
void force_push(U &&item) {
78 std::lock_guard<std::mutex> lock(mutex_);
79 if (try_push_inner_(std::forward<U>(item))) [[likely]] {
82 auto *fake_handle =
new PushFinishHandle(std::forward<U>(item));
83 assert(pop_awaiters_.empty());
84 push_awaiters_.push_back(fake_handle);
102 return {*
this, std::forward<U>(item)};
105 struct [[nodiscard]] PopAwaiter;
119 size_t capacity() const noexcept {
return buffer_.capacity(); }
126 std::lock_guard<std::mutex> lock(mutex_);
135 std::lock_guard<std::mutex> lock(mutex_);
144 std::lock_guard<std::mutex> lock(mutex_);
157 std::lock_guard<std::mutex> lock(mutex_);
162 class PushFinishHandle;
163 class PopFinishHandle;
165 bool request_push_(PushFinishHandle *finish_handle) {
166 std::lock_guard<std::mutex> lock(mutex_);
167 if (try_push_inner_(std::move(finish_handle->get_item()))) {
170 assert(pop_awaiters_.empty());
171 push_awaiters_.push_back(finish_handle);
172 Context::current().runtime()->pend_work();
176 bool cancel_push_(PushFinishHandle *finish_handle) {
177 std::lock_guard<std::mutex> lock(mutex_);
178 return push_awaiters_.remove(finish_handle);
181 std::optional<T> request_pop_(PopFinishHandle *finish_handle) {
182 std::lock_guard<std::mutex> lock(mutex_);
183 auto result = try_pop_inner_();
184 if (result.has_value()) {
187 assert(push_awaiters_.empty());
188 pop_awaiters_.push_back(finish_handle);
189 Context::current().runtime()->pend_work();
193 bool cancel_pop_(PopFinishHandle *finish_handle) {
194 std::lock_guard<std::mutex> lock(mutex_);
195 return pop_awaiters_.remove(finish_handle);
199 template <
typename U>
bool try_push_inner_(U &&item) {
200 if (closed_) [[unlikely]] {
201 throw std::logic_error(
"Push to closed channel");
203 if (!pop_awaiters_.empty()) {
204 assert(empty_inner_());
205 auto *pop_handle = pop_awaiters_.pop_front();
206 pop_handle->set_result(std::forward<U>(item));
207 pop_handle->schedule();
210 if (!full_inner_()) {
211 push_inner_(std::forward<U>(item));
217 std::optional<T> try_pop_inner_() {
218 if (!push_awaiters_.empty()) {
219 assert(full_inner_());
220 auto *push_handle = push_awaiters_.pop_front();
221 T item = std::move(push_handle->get_item());
222 push_handle->schedule();
226 T result = pop_inner_();
227 push_inner_(std::move(item));
231 if (!empty_inner_()) {
232 T result = pop_inner_();
235 if (closed_) [[unlikely]] {
243 template <
typename U>
void push_inner_(U &&item) {
244 assert(!full_inner_());
245 auto mask = buffer_.capacity() - 1;
246 buffer_[tail_ & mask].construct(std::forward<U>(item));
252 assert(!empty_inner_());
253 auto mask = buffer_.capacity() - 1;
254 T item = std::move(buffer_[head_ & mask].get());
255 buffer_[head_ & mask].destroy();
261 bool no_buffer_() const noexcept {
return buffer_.capacity() == 0; }
263 bool empty_inner_() const noexcept {
270 bool full_inner_() const noexcept {
274 return size_ == buffer_.capacity();
277 void push_close_inner_() {
283 PopFinishHandle *pop_handle =
nullptr;
284 while ((pop_handle = pop_awaiters_.pop_front()) !=
nullptr) {
285 assert(empty_inner_());
286 pop_handle->schedule();
289 PushFinishHandle *push_handle =
nullptr;
290 while ((push_handle = push_awaiters_.pop_front()) !=
nullptr) {
291 push_handle->enable_throw();
292 push_handle->schedule();
296 void destruct_all_() {
297 while (!empty_inner_()) {
301 assert(head_ == tail_);
305 template <
typename Handle>
306 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
308 mutable std::mutex mutex_;
309 HandleList<PushFinishHandle> push_awaiters_;
310 HandleList<PopFinishHandle> pop_awaiters_;
314 SmallArray<RawStorage<T>, N> buffer_;
315 bool closed_ =
false;
441template <
typename T,
size_t N>
struct Channel<T, N>::PushAwaiter {
443 using HandleType = PushFinishHandle;
445 PushAwaiter(
Channel &channel, T item)
446 : channel_(channel), finish_handle_(std::move(item)) {}
447 PushAwaiter(PushAwaiter &&) =
default;
449 PushAwaiter(
const PushAwaiter &) =
delete;
450 PushAwaiter &operator=(
const PushAwaiter &) =
delete;
451 PushAwaiter &operator=(PushAwaiter &&) =
delete;
454 HandleType *get_handle() {
return &finish_handle_; }
456 void init_finish_handle() { }
458 void register_operation(
unsigned int ) {
459 auto *runtime = Context::current().runtime();
460 finish_handle_.init(&channel_, runtime);
461 bool ok = channel_.request_push_(&finish_handle_);
463 runtime->schedule(&finish_handle_);
468 bool await_ready()
const noexcept {
return false; }
470 template <
typename PromiseType>
471 bool await_suspend(std::coroutine_handle<PromiseType> h) {
472 init_finish_handle();
473 finish_handle_.set_invoker(&h.promise());
474 finish_handle_.init(&channel_, Context::current().runtime());
475 bool ok = channel_.request_push_(&finish_handle_);
476 bool do_suspend = !ok;
480 auto await_resume() {
return finish_handle_.extract_result(); }
484 PushFinishHandle finish_handle_;
492template <
typename T,
size_t N>
struct Channel<T, N>::PopAwaiter {
494 using HandleType = PopFinishHandle;
496 PopAwaiter(
Channel &channel) : channel_(channel) {}
497 PopAwaiter(PopAwaiter &&) =
default;
499 PopAwaiter(
const PopAwaiter &) =
delete;
500 PopAwaiter &operator=(
const PopAwaiter &) =
delete;
501 PopAwaiter &operator=(PopAwaiter &&) =
delete;
504 HandleType *get_handle() {
return &finish_handle_; }
506 void init_finish_handle() { }
508 void register_operation(
unsigned int ) {
509 auto *runtime = Context::current().runtime();
510 finish_handle_.init(&channel_, runtime);
511 auto item = channel_.request_pop_(&finish_handle_);
512 if (item.has_value()) {
513 finish_handle_.set_result(std::move(item.value()));
514 runtime->schedule(&finish_handle_);
519 bool await_ready()
const noexcept {
return false; }
521 template <
typename PromiseType>
522 bool await_suspend(std::coroutine_handle<PromiseType> h) {
523 init_finish_handle();
524 finish_handle_.set_invoker(&h.promise());
525 finish_handle_.init(&channel_, Context::current().runtime());
526 auto item = channel_.request_pop_(&finish_handle_);
527 if (item.has_value()) {
528 finish_handle_.set_result(std::move(item.value()));
534 auto await_resume() {
return finish_handle_.extract_result(); }
538 PopFinishHandle finish_handle_;