Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
queue-condy-futex.cpp
Go to the documentation of this file.
1
6
7#include <atomic>
8#include <cassert>
9#include <condy.hpp>
10#include <cstdio>
11#include <format>
12#include <iostream>
13#include <queue>
14#include <thread>
15#include <unistd.h>
16
17class FutexMutex {
18public:
19 FutexMutex() : state_(false) {}
20
21 FutexMutex(const FutexMutex &) = delete;
22 FutexMutex &operator=(const FutexMutex &) = delete;
23 FutexMutex(FutexMutex &&) = delete;
24 FutexMutex &operator=(FutexMutex &&) = delete;
25
26public:
27 condy::Coro<void> lock() {
28 bool expected = false;
29 while (!state_.compare_exchange_weak(expected, true,
30 std::memory_order_acquire,
31 std::memory_order_relaxed)) {
32 expected = false;
33 co_await futex_.wait(true);
34 }
35 }
36
37 void unlock() {
38 state_.store(false, std::memory_order_release);
39 futex_.notify_one();
40 }
41
42private:
43 std::atomic<bool> state_;
44 condy::Futex<bool> futex_{state_};
45};
46
47class FutexConditionVariable {
48public:
49 FutexConditionVariable() : generation_(0) {}
50
51 FutexConditionVariable(const FutexConditionVariable &) = delete;
52 FutexConditionVariable &operator=(const FutexConditionVariable &) = delete;
53 FutexConditionVariable(FutexConditionVariable &&) = delete;
54 FutexConditionVariable &operator=(FutexConditionVariable &&) = delete;
55
56public:
57 condy::Coro<void> wait(FutexMutex &mutex) {
58 uint32_t gen = generation_.load(std::memory_order_acquire);
59 mutex.unlock();
60 while (generation_.load(std::memory_order_acquire) == gen) {
61 co_await futex_.wait(gen);
62 }
63 co_await mutex.lock();
64 }
65
66 void notify_one() {
67 generation_.fetch_add(1, std::memory_order_release);
68 futex_.notify_one();
69 }
70
71 void notify_all() {
72 generation_.fetch_add(1, std::memory_order_release);
73 futex_.notify_all();
74 }
75
76private:
77 std::atomic<uint32_t> generation_;
78 condy::Futex<uint32_t> futex_{generation_};
79};
80
81template <typename T> class Queue {
82public:
83 Queue(size_t queue_size) : capacity_(queue_size) {}
84
85 Queue(const Queue &) = delete;
86 Queue &operator=(const Queue &) = delete;
87 Queue(Queue &&) = delete;
88 Queue &operator=(Queue &&) = delete;
89
90public:
91 condy::Coro<void> enqueue(const T &item) {
92 co_await queue_mutex_.lock();
93 while (queue_.size() >= capacity_) {
94 co_await not_full_.wait(queue_mutex_);
95 }
96 queue_.push(item);
97 queue_mutex_.unlock();
98 not_empty_.notify_one();
99 }
100
101 condy::Coro<T> dequeue() {
102 co_await queue_mutex_.lock();
103 while (queue_.empty()) {
104 co_await not_empty_.wait(queue_mutex_);
105 }
106 T item = queue_.front();
107 queue_.pop();
108 queue_mutex_.unlock();
109 not_full_.notify_one();
110 co_return item;
111 }
112
113private:
114 std::queue<T> queue_;
115 size_t capacity_;
116 FutexMutex queue_mutex_;
117 FutexConditionVariable not_empty_, not_full_;
118};
119
120condy::Coro<void> producer(Queue<int> &q, size_t produce_count) {
121 for (size_t i = 0; i < produce_count; ++i) {
122 co_await q.enqueue(static_cast<int>(i));
123 // Yield to improve fairness
125 }
126}
127
128condy::Coro<void> consumer(Queue<int> &q, int id, size_t consume_count) {
129 int item;
130 for (size_t i = 0; i < consume_count; ++i) {
131 item = co_await q.dequeue();
132 std::cout << std::format("Consumer {} consumed item {}\n", id, item);
133 // Yield to improve fairness
135 }
136}
137
138void usage(const char *prog_name) {
139 std::cerr << std::format(
140 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
141 "[-n items_per_producer]\n",
142 prog_name);
143}
144
145static size_t queue_size = 32;
146static size_t num_producers = 8;
147static size_t num_consumers = 8;
148static size_t items_per_producer = 32;
149
150int main(int argc, char *argv[]) noexcept(false) {
151 int opt;
152 while ((opt = getopt(argc, argv, "hq:p:c:n:")) != -1) {
153 switch (opt) {
154 case 'q':
155 queue_size = std::stoul(optarg);
156 break;
157 case 'p':
158 num_producers = std::stoul(optarg);
159 break;
160 case 'c':
161 num_consumers = std::stoul(optarg);
162 break;
163 case 'n':
164 items_per_producer = std::stoul(optarg);
165 break;
166 case 'h':
167 usage(argv[0]);
168 return 0;
169 default:
170 usage(argv[0]);
171 return 1;
172 }
173 }
174
175 size_t total_items = num_producers * items_per_producer;
176 if (total_items % num_consumers != 0) {
177 std::cerr << std::format(
178 "Total items ({}) must be divisible by number of consumers ({})\n",
179 total_items, num_consumers);
180 return 1;
181 }
182 size_t items_per_consumer = total_items / num_consumers;
183
184 condy::Runtime rt1, rt2;
185 Queue<int> queue(queue_size);
186
187 std::thread producer_thread([&]() {
188 for (size_t i = 0; i < num_producers; ++i) {
189 condy::co_spawn(rt1, producer(queue, items_per_producer)).detach();
190 }
191 rt1.allow_exit();
192 rt1.run();
193 });
194
195 std::thread consumer_thread([&]() {
196 for (size_t i = 0; i < num_consumers; ++i) {
198 rt2, consumer(queue, static_cast<int>(i), items_per_consumer))
199 .detach();
200 }
201 rt2.allow_exit();
202 rt2.run();
203 });
204
205 producer_thread.join();
206 consumer_thread.join();
207}
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:111
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:301
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:225
Main include file for the Condy library.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266
auto & current_runtime() noexcept
Get the current runtime.
Definition runtime.hpp:504
detail::SwitchAwaiter co_switch(Runtime &runtime) noexcept
Switch current coroutine task to the given runtime.
Definition task.hpp:318