46class [[deprecated(
"Use condy::Channel instead")]]
Channel {
56 std::lock_guard<std::mutex> lock(mutex_);
74 requires std::is_same_v<std::decay_t<U>, T>
76 std::lock_guard<std::mutex> lock(mutex_);
77 if (closed_) [[unlikely]] {
78 throw std::logic_error(
"Push to closed channel");
80 return try_push_inner_(std::forward<U>(item));
90 std::lock_guard<std::mutex> lock(mutex_);
91 return try_pop_inner_();
94 void force_push(T item)
noexcept {
95 std::lock_guard<std::mutex> lock(mutex_);
96 if (closed_) [[unlikely]] {
97 panic_on(
"Push to closed channel");
99 if (try_push_inner_(std::move(item))) [[likely]] {
106 new (std::nothrow) PushFinishHandle(std::move(item));
109 panic_on(
"Allocation failed for PushFinishHandle");
111 assert(pop_awaiters_.empty());
112 push_awaiters_.push_back(fake_handle);
115 struct [[nodiscard]] PushAwaiter;
130 struct [[nodiscard]] PopAwaiter;
144 size_t capacity() const noexcept {
return buffer_.capacity(); }
151 std::lock_guard<std::mutex> lock(mutex_);
160 std::lock_guard<std::mutex> lock(mutex_);
169 std::lock_guard<std::mutex> lock(mutex_);
182 std::lock_guard<std::mutex> lock(mutex_);
187 class PushFinishHandle;
188 class PopFinishHandle;
191 request_push_(PushFinishHandle *finish_handle)
noexcept {
192 std::lock_guard<std::mutex> lock(mutex_);
193 if (closed_) [[unlikely]] {
196 if (try_push_inner_(std::move(finish_handle->get_item()))) {
199 assert(pop_awaiters_.empty());
200 push_awaiters_.push_back(finish_handle);
201 detail::Context::current().runtime()->pend_work();
205 bool cancel_push_(PushFinishHandle *finish_handle)
noexcept {
206 std::lock_guard<std::mutex> lock(mutex_);
207 return push_awaiters_.remove(finish_handle);
210 std::optional<T> request_pop_(PopFinishHandle *finish_handle)
noexcept {
211 std::lock_guard<std::mutex> lock(mutex_);
212 auto result = try_pop_inner_();
213 if (result.has_value()) {
216 assert(push_awaiters_.empty());
217 pop_awaiters_.push_back(finish_handle);
218 detail::Context::current().runtime()->pend_work();
222 bool cancel_pop_(PopFinishHandle *finish_handle)
noexcept {
223 std::lock_guard<std::mutex> lock(mutex_);
224 return pop_awaiters_.remove(finish_handle);
228 template <
typename U>
229 requires std::is_same_v<std::decay_t<U>, T>
230 bool try_push_inner_(U &&item)
noexcept {
231 if (!pop_awaiters_.empty()) {
232 assert(empty_inner_());
233 auto *pop_handle = pop_awaiters_.pop_front();
234 pop_handle->set_result(std::forward<U>(item));
235 pop_handle->schedule();
238 if (!full_inner_()) {
239 push_inner_(std::forward<U>(item));
245 std::optional<T> try_pop_inner_() noexcept {
246 if (!push_awaiters_.empty()) {
247 assert(full_inner_());
248 auto *push_handle = push_awaiters_.pop_front();
249 T item = std::move(push_handle->get_item());
250 push_handle->schedule();
254 T result = pop_inner_();
255 push_inner_(std::move(item));
259 if (!empty_inner_()) {
260 T result = pop_inner_();
263 if (closed_) [[unlikely]] {
271 template <
typename U>
272 requires std::is_same_v<std::decay_t<U>, T>
273 void push_inner_(U &&item)
noexcept {
274 assert(!full_inner_());
275 auto mask = buffer_.capacity() - 1;
276 buffer_[tail_ & mask].construct(std::forward<U>(item));
281 T pop_inner_() noexcept {
282 assert(!empty_inner_());
283 auto mask = buffer_.capacity() - 1;
284 T item = std::move(buffer_[head_ & mask].get());
285 buffer_[head_ & mask].destroy();
291 bool no_buffer_() const noexcept {
return buffer_.capacity() == 0; }
293 bool empty_inner_() const noexcept {
300 bool full_inner_() const noexcept {
304 return size_ == buffer_.capacity();
307 void push_close_inner_() noexcept {
313 PopFinishHandle *pop_handle =
nullptr;
314 while ((pop_handle = pop_awaiters_.pop_front()) !=
nullptr) {
315 assert(empty_inner_());
316 pop_handle->schedule();
319 PushFinishHandle *push_handle =
nullptr;
320 while ((push_handle = push_awaiters_.pop_front()) !=
nullptr) {
321 push_handle->enable_throw();
322 push_handle->schedule();
326 void destruct_all_() noexcept {
327 while (!empty_inner_()) {
331 assert(head_ == tail_);
335 template <
typename Handle>
336 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
338 mutable std::mutex mutex_;
339 HandleList<PushFinishHandle> push_awaiters_;
340 HandleList<PopFinishHandle> pop_awaiters_;
344 SmallArray<RawStorage<T>, N> buffer_;
345 bool closed_ =
false;
473template <
typename T,
size_t N>
struct Channel<T, N>::PushAwaiter {
475 using HandleType = PushFinishHandle;
477 PushAwaiter(
Channel &channel, T item)
478 : channel_(channel), finish_handle_(std::move(item)) {}
481 HandleType *get_handle()
noexcept {
return &finish_handle_; }
483 void init_finish_handle()
noexcept { }
485 void register_operation(
unsigned int )
noexcept {
486 auto *runtime = detail::Context::current().runtime();
487 finish_handle_.init(&channel_, runtime);
488 auto result = channel_.request_push_(&finish_handle_);
489 if (!result.has_value()) [[unlikely]] {
490 finish_handle_.enable_throw();
491 runtime->schedule(&finish_handle_);
496 runtime->schedule(&finish_handle_);
501 bool await_ready()
const noexcept {
return false; }
503 template <
typename PromiseType>
504 bool await_suspend(std::coroutine_handle<PromiseType> h)
noexcept {
505 init_finish_handle();
506 finish_handle_.set_invoker(&h.promise());
507 finish_handle_.init(&channel_, detail::Context::current().runtime());
508 auto result = channel_.request_push_(&finish_handle_);
509 if (!result.has_value()) [[unlikely]] {
510 finish_handle_.enable_throw();
514 bool do_suspend = !ok;
518 auto await_resume() {
return finish_handle_.extract_result(); }
522 PushFinishHandle finish_handle_;
530template <
typename T,
size_t N>
struct Channel<T, N>::PopAwaiter {
532 using HandleType = PopFinishHandle;
534 PopAwaiter(
Channel &channel) : channel_(channel) {}
537 HandleType *get_handle()
noexcept {
return &finish_handle_; }
539 void init_finish_handle()
noexcept { }
541 void register_operation(
unsigned int )
noexcept {
542 auto *runtime = detail::Context::current().runtime();
543 finish_handle_.init(&channel_, runtime);
544 auto item = channel_.request_pop_(&finish_handle_);
545 if (item.has_value()) {
546 finish_handle_.set_result(std::move(item.value()));
547 runtime->schedule(&finish_handle_);
552 bool await_ready()
const noexcept {
return false; }
554 template <
typename PromiseType>
555 bool await_suspend(std::coroutine_handle<PromiseType> h)
noexcept {
556 init_finish_handle();
557 finish_handle_.set_invoker(&h.promise());
558 finish_handle_.init(&channel_, detail::Context::current().runtime());
559 auto item = channel_.request_pop_(&finish_handle_);
560 if (item.has_value()) {
561 finish_handle_.set_result(std::move(item.value()));
567 auto await_resume()
noexcept {
return finish_handle_.extract_result(); }
571 PopFinishHandle finish_handle_;