40template <
typename T,
size_t N = 2>
class Channel {
50 std::lock_guard<std::mutex> lock(mutex_);
68 requires std::is_same_v<std::decay_t<U>, T>
70 std::lock_guard<std::mutex> lock(mutex_);
74 if (try_push_inner_(std::forward<U>(item))) {
87 std::lock_guard<std::mutex> lock(mutex_);
88 auto item = try_pop_inner_();
89 if (item.has_value()) {
90 return {0, std::move(item.value())};
94 return {-EAGAIN, T()};
98 void force_push(T item)
noexcept {
99 std::lock_guard<std::mutex> lock(mutex_);
100 if (closed_) [[unlikely]] {
101 panic_on(
"Push to closed channel");
103 if (try_push_inner_(std::move(item))) [[likely]] {
110 new (std::nothrow) PushFinishHandle(std::move(item));
113 panic_on(
"Allocation failed for PushFinishHandle");
115 assert(pop_awaiters_.empty());
116 push_awaiters_.push_back(fake_handle);
135 struct [[nodiscard]] PopAwaiter;
150 size_t capacity() const noexcept {
return buffer_.capacity(); }
157 std::lock_guard<std::mutex> lock(mutex_);
166 std::lock_guard<std::mutex> lock(mutex_);
175 std::lock_guard<std::mutex> lock(mutex_);
188 std::lock_guard<std::mutex> lock(mutex_);
193 class PushFinishHandle;
194 class PopFinishHandle;
196 int request_push_(PushFinishHandle *finish_handle)
noexcept {
197 std::lock_guard<std::mutex> lock(mutex_);
201 if (try_push_inner_(std::move(finish_handle->get_item()))) {
204 assert(pop_awaiters_.empty());
205 push_awaiters_.push_back(finish_handle);
206 detail::Context::current().runtime()->pend_work();
210 bool cancel_push_(PushFinishHandle *finish_handle)
noexcept {
211 std::lock_guard<std::mutex> lock(mutex_);
212 return push_awaiters_.remove(finish_handle);
215 std::pair<int, T> request_pop_(PopFinishHandle *finish_handle)
noexcept {
216 std::lock_guard<std::mutex> lock(mutex_);
217 auto result = try_pop_inner_();
218 if (result.has_value()) {
219 return {0, std::move(result.value())};
221 assert(push_awaiters_.empty());
223 return {-EPIPE, T()};
225 pop_awaiters_.push_back(finish_handle);
226 detail::Context::current().runtime()->pend_work();
227 return {-EAGAIN, T()};
230 bool cancel_pop_(PopFinishHandle *finish_handle)
noexcept {
231 std::lock_guard<std::mutex> lock(mutex_);
232 return pop_awaiters_.remove(finish_handle);
236 template <
typename U>
237 requires std::is_same_v<std::decay_t<U>, T>
238 bool try_push_inner_(U &&item)
noexcept {
239 if (!pop_awaiters_.empty()) {
240 assert(empty_inner_());
241 auto *pop_handle = pop_awaiters_.pop_front();
242 pop_handle->set_result({0, std::forward<U>(item)});
243 pop_handle->schedule();
246 if (!full_inner_()) {
247 push_inner_(std::forward<U>(item));
253 std::optional<T> try_pop_inner_() noexcept {
254 if (!push_awaiters_.empty()) {
255 assert(full_inner_());
256 auto *push_handle = push_awaiters_.pop_front();
257 T item = std::move(push_handle->get_item());
258 push_handle->set_result(0);
259 push_handle->schedule();
260 return pop_and_push_(std::move(item));
262 if (!empty_inner_()) {
263 T result = pop_inner_();
269 T pop_and_push_(T item)
noexcept {
273 T result = pop_inner_();
274 push_inner_(std::move(item));
279 template <
typename U>
280 requires std::is_same_v<std::decay_t<U>, T>
281 void push_inner_(U &&item)
noexcept {
282 assert(!full_inner_());
283 auto mask = buffer_.capacity() - 1;
284 buffer_[tail_ & mask].construct(std::forward<U>(item));
289 T pop_inner_() noexcept {
290 assert(!empty_inner_());
291 auto mask = buffer_.capacity() - 1;
292 T item = std::move(buffer_[head_ & mask].get());
293 buffer_[head_ & mask].destroy();
299 bool no_buffer_() const noexcept {
return buffer_.capacity() == 0; }
301 bool empty_inner_() const noexcept {
308 bool full_inner_() const noexcept {
312 return size_ == buffer_.capacity();
315 void push_close_inner_() noexcept {
321 PopFinishHandle *pop_handle =
nullptr;
322 while ((pop_handle = pop_awaiters_.pop_front()) !=
nullptr) {
323 assert(empty_inner_());
324 pop_handle->set_result({-EPIPE, T()});
325 pop_handle->schedule();
328 PushFinishHandle *push_handle =
nullptr;
329 while ((push_handle = push_awaiters_.pop_front()) !=
nullptr) {
330 assert(full_inner_());
331 push_handle->set_result(-EPIPE);
332 push_handle->schedule();
336 void destruct_all_() noexcept {
337 while (!empty_inner_()) {
341 assert(head_ == tail_);
345 template <
typename Handle>
346 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
348 mutable std::mutex mutex_;
349 HandleList<PushFinishHandle> push_awaiters_;
350 HandleList<PopFinishHandle> pop_awaiters_;
354 SmallArray<RawStorage<T>, N> buffer_;
355 bool closed_ =
false;
479template <
typename T,
size_t N>
struct Channel<T, N>::PushAwaiter {
481 using HandleType = PushFinishHandle;
483 PushAwaiter(
Channel &channel, T item)
484 : channel_(channel), finish_handle_(std::move(item)) {}
487 HandleType *get_handle()
noexcept {
return &finish_handle_; }
489 void init_finish_handle()
noexcept { }
491 void register_operation(
unsigned int )
noexcept {
492 auto *runtime = detail::Context::current().runtime();
493 finish_handle_.init(&channel_, runtime);
494 int r = channel_.request_push_(&finish_handle_);
497 finish_handle_.set_result(r);
498 runtime->schedule(&finish_handle_);
503 bool await_ready()
const noexcept {
return false; }
505 template <
typename PromiseType>
506 bool await_suspend(std::coroutine_handle<PromiseType> h)
noexcept {
507 init_finish_handle();
508 finish_handle_.set_invoker(&h.promise());
509 finish_handle_.init(&channel_, detail::Context::current().runtime());
510 int r = channel_.request_push_(&finish_handle_);
513 finish_handle_.set_result(r);
519 auto await_resume()
noexcept {
return finish_handle_.extract_result(); }
523 PushFinishHandle finish_handle_;
532template <
typename T,
size_t N>
struct Channel<T, N>::PopAwaiter {
534 using HandleType = PopFinishHandle;
536 PopAwaiter(
Channel &channel) : channel_(channel) {}
539 HandleType *get_handle()
noexcept {
return &finish_handle_; }
541 void init_finish_handle()
noexcept { }
543 void register_operation(
unsigned int )
noexcept {
544 auto *runtime = detail::Context::current().runtime();
545 finish_handle_.init(&channel_, runtime);
546 auto item = channel_.request_pop_(&finish_handle_);
549 finish_handle_.set_result(std::move(item));
550 runtime->schedule(&finish_handle_);
555 bool await_ready()
const noexcept {
return false; }
557 template <
typename PromiseType>
558 bool await_suspend(std::coroutine_handle<PromiseType> h)
noexcept {
559 init_finish_handle();
560 finish_handle_.set_invoker(&h.promise());
561 finish_handle_.init(&channel_, detail::Context::current().runtime());
562 auto item = channel_.request_pop_(&finish_handle_);
565 finish_handle_.set_result(std::move(item));
571 auto await_resume()
noexcept {
return finish_handle_.extract_result(); }
575 PopFinishHandle finish_handle_;