38template <
typename T,
size_t N = 2>
class Channel {
48 std::lock_guard<std::mutex> lock(mutex_);
53 Channel(
const Channel &) =
delete;
54 Channel &operator=(
const Channel &) =
delete;
56 Channel &operator=(Channel &&) =
delete;
66 requires std::is_same_v<std::decay_t<U>, T>
68 std::lock_guard<std::mutex> lock(mutex_);
72 if (try_push_inner_(std::forward<U>(item))) {
84 std::pair<int32_t, T>
try_pop() noexcept {
85 std::lock_guard<std::mutex> lock(mutex_);
86 auto item = try_pop_inner_();
87 if (item.has_value()) {
88 return {0, std::move(item.value())};
92 return {-EAGAIN, T()};
96 void force_push(T item)
noexcept {
97 std::lock_guard<std::mutex> lock(mutex_);
98 if (closed_) [[unlikely]] {
99 panic_on(
"Push to closed channel");
101 if (try_push_inner_(std::move(item))) [[likely]] {
108 new (std::nothrow) FakePushFinishHandle(std::move(item));
111 panic_on(
"Allocation failed for PushFinishHandle");
113 assert(pop_awaiters_.empty());
114 push_awaiters_.push_back(fake_handle);
117 class [[nodiscard]] MovePushSender;
130 MovePushSender
push(T &&item)
noexcept {
return {*
this, std::move(item)}; }
132 class [[nodiscard]] CopyPushSender;
140 CopyPushSender
push(
const T &item)
noexcept
141 requires std::copy_constructible<T>
143 return {*
this, item};
146 class [[nodiscard]] PopSender;
153 PopSender
pop() noexcept {
return {*
this}; }
158 size_t capacity() const noexcept {
return buffer_.capacity(); }
165 std::lock_guard<std::mutex> lock(mutex_);
166 return size_inner_();
174 std::lock_guard<std::mutex> lock(mutex_);
175 return empty_inner_();
183 std::lock_guard<std::mutex> lock(mutex_);
196 std::lock_guard<std::mutex> lock(mutex_);
201 class PushFinishHandleBase;
202 template <
typename Receiver>
class PushFinishHandle;
203 class FakePushFinishHandle;
205 class PopFinishHandleBase;
206 template <
typename Receiver>
class PopFinishHandle;
208 int32_t request_push_(PushFinishHandleBase *finish_handle)
noexcept {
209 std::lock_guard<std::mutex> lock(mutex_);
213 if (try_push_inner_(std::move(finish_handle->get_item()))) {
216 assert(pop_awaiters_.empty());
217 push_awaiters_.push_back(finish_handle);
218 detail::Context::current().runtime()->pend_work();
222 bool cancel_push_(PushFinishHandleBase *finish_handle)
noexcept {
223 std::lock_guard<std::mutex> lock(mutex_);
224 return push_awaiters_.remove(finish_handle);
227 std::pair<int32_t, T>
228 request_pop_(PopFinishHandleBase *finish_handle)
noexcept {
229 std::lock_guard<std::mutex> lock(mutex_);
230 auto result = try_pop_inner_();
231 if (result.has_value()) {
232 return {0, std::move(result.value())};
234 assert(push_awaiters_.empty());
236 return {-EPIPE, T()};
238 pop_awaiters_.push_back(finish_handle);
239 detail::Context::current().runtime()->pend_work();
240 return {-EAGAIN, T()};
243 bool cancel_pop_(PopFinishHandleBase *finish_handle)
noexcept {
244 std::lock_guard<std::mutex> lock(mutex_);
245 return pop_awaiters_.remove(finish_handle);
249 template <
typename U>
250 requires std::is_same_v<std::decay_t<U>, T>
251 bool try_push_inner_(U &&item)
noexcept {
252 if (!pop_awaiters_.empty()) {
253 assert(empty_inner_());
254 auto *pop_handle = pop_awaiters_.pop_front();
255 pop_handle->set_result({0, std::forward<U>(item)});
256 pop_handle->schedule();
259 if (!full_inner_()) {
260 push_inner_(std::forward<U>(item));
266 std::optional<T> try_pop_inner_() noexcept {
267 if (!push_awaiters_.empty()) {
268 assert(full_inner_());
269 auto *push_handle = push_awaiters_.pop_front();
270 T item = std::move(push_handle->get_item());
271 push_handle->set_result(0);
272 push_handle->schedule();
273 return pop_and_push_(std::move(item));
275 if (!empty_inner_()) {
276 T result = pop_inner_();
282 T pop_and_push_(T item)
noexcept {
286 T result = pop_inner_();
287 push_inner_(std::move(item));
292 template <
typename U>
293 requires std::is_same_v<std::decay_t<U>, T>
294 void push_inner_(U &&item)
noexcept {
295 assert(!full_inner_());
296 auto mask = buffer_.capacity() - 1;
297 buffer_[tail_ & mask].construct(std::forward<U>(item));
301 T pop_inner_() noexcept {
302 assert(!empty_inner_());
303 auto mask = buffer_.capacity() - 1;
304 T item = std::move(buffer_[head_ & mask].get());
305 buffer_[head_ & mask].destroy();
310 bool no_buffer_() const noexcept {
return buffer_.capacity() == 0; }
312 bool empty_inner_() const noexcept {
return size_inner_() == 0; }
314 bool full_inner_() const noexcept {
315 return size_inner_() == buffer_.capacity();
318 void push_close_inner_() noexcept {
324 PopFinishHandleBase *pop_handle =
nullptr;
325 while ((pop_handle = pop_awaiters_.pop_front()) !=
nullptr) {
326 assert(empty_inner_());
327 pop_handle->set_result({-EPIPE, T()});
328 pop_handle->schedule();
331 PushFinishHandleBase *push_handle =
nullptr;
332 while ((push_handle = push_awaiters_.pop_front()) !=
nullptr) {
333 assert(full_inner_());
334 push_handle->set_result(-EPIPE);
335 push_handle->schedule();
339 void destruct_all_() noexcept {
340 while (!empty_inner_()) {
343 assert(head_ == tail_);
346 size_t size_inner_() const noexcept {
return tail_ - head_; }
349 template <
typename Handle>
350 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
352 mutable std::mutex mutex_;
353 HandleList<PushFinishHandleBase> push_awaiters_;
354 HandleList<PopFinishHandleBase> pop_awaiters_;
357 SmallArray<RawStorage<T>, N> buffer_;
358 bool closed_ =
false;
361template <
typename T,
size_t N>
362class Channel<T, N>::PushFinishHandleBase :
public WorkInvoker {
364 PushFinishHandleBase(T &item) : item_(item) {}
366 void schedule() noexcept {
367 if (runtime_ ==
nullptr) [[unlikely]] {
369 auto *this_fake =
static_cast<FakePushFinishHandle *
>(
this);
372 runtime_->schedule(
this);
376 T &get_item() noexcept {
return item_; }
378 void set_result(int32_t result)
noexcept { result_ = result; }
381 DoubleLinkEntry link_entry_;
384 Runtime *runtime_ =
nullptr;
386 int32_t result_ = -ENOTRECOVERABLE;
389template <
typename T,
size_t N>
390template <
typename Receiver>
391class Channel<T, N>::PushFinishHandle
392 :
public InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase> {
395 InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase>;
397 PushFinishHandle(
Channel &channel, T &item, Receiver receiver)
398 : Base(item), channel_(channel), receiver_(std::move(receiver)) {}
400 void start(Runtime *runtime)
noexcept {
401 this->runtime_ = runtime;
402 int32_t r = channel_.request_push_(
this);
404 std::move(receiver_)(r);
408 auto stop_token = receiver_.get_stop_token();
409 if (stop_token.stop_possible()) {
410 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
414 void invoke() noexcept {
415 stop_callback_.reset();
416 assert(this->runtime_ !=
nullptr);
417 this->runtime_->resume_work();
418 std::move(receiver_)(this->result_);
422 void cancel_() noexcept {
423 if (channel_.cancel_push_(
this)) {
425 assert(this->result_ == -ENOTRECOVERABLE);
426 this->result_ = -ECANCELED;
427 assert(this->runtime_ !=
nullptr);
428 this->runtime_->schedule(
this);
432 struct Cancellation {
433 PushFinishHandle *self;
434 void operator()() noexcept { self->cancel_(); }
437 using StopCallbackType =
438 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
443 std::optional<StopCallbackType> stop_callback_;
446template <
typename T,
size_t N>
447class Channel<T, N>::FakePushFinishHandle :
public PushFinishHandleBase {
449 FakePushFinishHandle(T &&item)
450 : PushFinishHandleBase(item_copy_), item_copy_(std::move(item)) {}
456template <
typename T,
size_t N>
457class Channel<T, N>::PopFinishHandleBase :
public WorkInvoker {
459 void schedule() noexcept {
460 assert(runtime_ !=
nullptr);
461 runtime_->schedule(
this);
464 void set_result(std::pair<int32_t, T> result)
noexcept {
465 result_ = std::move(result);
469 DoubleLinkEntry link_entry_;
472 Runtime *runtime_ =
nullptr;
474 std::pair<int32_t, T> result_ = {-ENOTRECOVERABLE, T()};
477template <
typename T,
size_t N>
478template <
typename Receiver>
479class Channel<T, N>::PopFinishHandle
480 :
public InvokerAdapter<PopFinishHandle<Receiver>, PopFinishHandleBase> {
482 PopFinishHandle(
Channel &channel, Receiver receiver)
483 : channel_(channel), receiver_(std::move(receiver)) {}
485 void start(Runtime *runtime)
noexcept {
486 this->runtime_ = runtime;
487 auto item = channel_.request_pop_(
this);
490 std::move(receiver_)(std::move(item));
494 auto stop_token = receiver_.get_stop_token();
495 if (stop_token.stop_possible()) {
496 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
500 void invoke() noexcept {
501 stop_callback_.reset();
502 assert(this->runtime_ !=
nullptr);
503 this->runtime_->resume_work();
504 std::move(receiver_)(std::move(this->result_));
508 void cancel_() noexcept {
509 if (channel_.cancel_pop_(
this)) {
511 assert(this->result_.first == -ENOTRECOVERABLE);
512 this->result_.first = -ECANCELED;
513 assert(this->runtime_ !=
nullptr);
514 this->runtime_->schedule(
this);
518 struct Cancellation {
519 PopFinishHandle *self;
520 void operator()() noexcept { self->cancel_(); }
523 using StopCallbackType =
524 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
529 std::optional<StopCallbackType> stop_callback_;
532template <
typename T,
size_t N>
class Channel<T, N>::MovePushSender {
534 using ReturnType = int32_t;
536 MovePushSender(
Channel &channel, T &&item)
537 : channel_(channel), item_(std::move(item)) {}
539 template <
typename Receiver>
auto connect(Receiver receiver)
noexcept {
540 return OperationState<Receiver>(channel_, std::move(item_),
541 std::move(receiver));
545 template <
typename Receiver>
547 :
public Channel<T, N>::template PushFinishHandle<Receiver> {
550 typename Channel<T, N>::template PushFinishHandle<Receiver>;
551 OperationState(Channel &channel, T &&item, Receiver receiver)
552 : Base(channel, item, std::move(receiver)) {}
554 void start(
unsigned int )
noexcept {
555 auto *runtime = detail::Context::current().runtime();
556 Base::start(runtime);
564template <
typename T,
size_t N>
class Channel<T, N>::CopyPushSender {
566 using ReturnType = int32_t;
568 CopyPushSender(
Channel &channel,
const T &item)
569 : channel_(channel), item_(item) {}
571 template <
typename Receiver>
auto connect(Receiver receiver)
noexcept {
572 return OperationState<Receiver>(channel_, item_, std::move(receiver));
576 template <
typename Receiver>
578 :
public Channel<T, N>::template PushFinishHandle<Receiver> {
581 typename Channel<T, N>::template PushFinishHandle<Receiver>;
582 OperationState(Channel &channel,
const T &item, Receiver receiver)
583 : Base(channel, item_copy_, std::move(receiver)), item_copy_(item) {
586 void start(
unsigned int )
noexcept {
587 auto *runtime = detail::Context::current().runtime();
588 Base::start(runtime);
599template <
typename T,
size_t N>
class Channel<T, N>::PopSender {
601 using ReturnType = std::pair<int32_t, T>;
603 PopSender(
Channel &channel) : channel_(channel) {}
605 template <
typename Receiver>
auto connect(Receiver receiver)
noexcept {
606 return OperationState<Receiver>(channel_, std::move(receiver));
610 template <
typename Receiver>
612 :
public Channel<T, N>::template PopFinishHandle<Receiver> {
614 using Base =
typename Channel<T, N>::template PopFinishHandle<Receiver>;
617 void start(
unsigned int )
noexcept {
618 auto *runtime = detail::Context::current().runtime();
619 Base::start(runtime);
Thread-safe bounded channel for communication and synchronization.
void push_close() noexcept
Close the channel.
MovePushSender push(T &&item) noexcept
Push an item into the channel, awaiting if necessary.
std::pair< int32_t, T > try_pop() noexcept
Try to pop an item from the channel.
bool empty() const noexcept
Check if the channel is empty.
CopyPushSender push(const T &item) noexcept
Push an item into the channel, awaiting if necessary.
PopSender pop() noexcept
Pop an item from the channel, awaiting if necessary.
size_t size() const noexcept
Get the current size of the channel.
bool is_closed() const noexcept
Check if the channel is closed.
int32_t try_push(U &&item) noexcept
Try to push an item into the channel.
size_t capacity() const noexcept
Get the capacity of the channel.
Channel(size_t capacity)
Construct a new Channel object.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.