19 FutexMutex() : state_(false) {}
21 FutexMutex(
const FutexMutex &) =
delete;
22 FutexMutex &operator=(
const FutexMutex &) =
delete;
23 FutexMutex(FutexMutex &&) =
delete;
24 FutexMutex &operator=(FutexMutex &&) =
delete;
27 condy::Coro<void> lock() {
28 bool expected =
false;
29 while (!state_.compare_exchange_weak(expected,
true,
30 std::memory_order_acquire,
31 std::memory_order_relaxed)) {
33 co_await futex_.wait(
true);
38 state_.store(
false, std::memory_order_release);
43 std::atomic<bool> state_;
44 condy::Futex<bool> futex_{state_};
47class FutexConditionVariable {
49 FutexConditionVariable() : generation_(0) {}
51 FutexConditionVariable(
const FutexConditionVariable &) =
delete;
52 FutexConditionVariable &operator=(
const FutexConditionVariable &) =
delete;
53 FutexConditionVariable(FutexConditionVariable &&) =
delete;
54 FutexConditionVariable &operator=(FutexConditionVariable &&) =
delete;
57 condy::Coro<void> wait(FutexMutex &mutex) {
58 uint32_t gen = generation_.load(std::memory_order_acquire);
60 while (generation_.load(std::memory_order_acquire) == gen) {
61 co_await futex_.wait(gen);
63 co_await mutex.lock();
67 generation_.fetch_add(1, std::memory_order_release);
72 generation_.fetch_add(1, std::memory_order_release);
77 std::atomic<uint32_t> generation_;
78 condy::Futex<uint32_t> futex_{generation_};
81template <
typename T>
class Queue {
83 Queue(
size_t queue_size) : capacity_(queue_size) {}
85 Queue(
const Queue &) =
delete;
86 Queue &operator=(
const Queue &) =
delete;
87 Queue(Queue &&) =
delete;
88 Queue &operator=(Queue &&) =
delete;
91 condy::Coro<void> enqueue(
const T &item) {
92 co_await queue_mutex_.lock();
93 while (queue_.size() >= capacity_) {
94 co_await not_full_.wait(queue_mutex_);
97 queue_mutex_.unlock();
98 not_empty_.notify_one();
101 condy::Coro<T> dequeue() {
102 co_await queue_mutex_.lock();
103 while (queue_.empty()) {
104 co_await not_empty_.wait(queue_mutex_);
106 T item = queue_.front();
108 queue_mutex_.unlock();
109 not_full_.notify_one();
114 std::queue<T> queue_;
116 FutexMutex queue_mutex_;
117 FutexConditionVariable not_empty_, not_full_;
121 for (
size_t i = 0; i < produce_count; ++i) {
122 co_await q.enqueue(
static_cast<int>(i));
130 for (
size_t i = 0; i < consume_count; ++i) {
131 item =
co_await q.dequeue();
132 std::cout << std::format(
"Consumer {} consumed item {}\n",
id, item);
138void usage(
const char *prog_name) {
139 std::cerr << std::format(
140 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
141 "[-n items_per_producer]\n",
145static size_t queue_size = 32;
146static size_t num_producers = 8;
147static size_t num_consumers = 8;
148static size_t items_per_producer = 32;
150int main(
int argc,
char *argv[])
noexcept(
false) {
152 while ((opt = getopt(argc, argv,
"hq:p:c:n:")) != -1) {
155 queue_size = std::stoul(optarg);
158 num_producers = std::stoul(optarg);
161 num_consumers = std::stoul(optarg);
164 items_per_producer = std::stoul(optarg);
175 size_t total_items = num_producers * items_per_producer;
176 if (total_items % num_consumers != 0) {
177 std::cerr << std::format(
178 "Total items ({}) must be divisible by number of consumers ({})\n",
179 total_items, num_consumers);
182 size_t items_per_consumer = total_items / num_consumers;
185 Queue<int> queue(queue_size);
187 std::thread producer_thread([&]() {
188 for (
size_t i = 0; i < num_producers; ++i) {
195 std::thread consumer_thread([&]() {
196 for (
size_t i = 0; i < num_consumers; ++i) {
198 rt2, consumer(queue,
static_cast<int>(i), items_per_consumer))
205 producer_thread.join();
206 consumer_thread.join();
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
void run()
Run the runtime event loop in the current thread.
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Main include file for the Condy library.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
auto & current_runtime() noexcept
Get the current runtime.
detail::SwitchAwaiter co_switch(Runtime &runtime) noexcept
Switch current coroutine task to the given runtime.