Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
queue-kernel-futex.cpp
Go to the documentation of this file.
1
6
7#include <atomic>
8#include <cassert>
9#include <condy.hpp>
10#include <cstdio>
11#include <format>
12#include <iostream>
13#include <linux/futex.h>
14#include <queue>
15#include <thread>
16#include <unistd.h>
17
18#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
19
20long futex_wait(void *uaddr, unsigned long val, unsigned long mask,
21 unsigned int flags, const struct timespec *timeout,
22 int clockid) {
23 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
24}
25
26long futex_wake(void *uaddr, unsigned long mask, int nr, unsigned int flags) {
27 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
28}
29
30class FutexSemaphore {
31public:
32 FutexSemaphore(uint32_t initial_count = 0) : count_(initial_count) {}
33
34 FutexSemaphore(const FutexSemaphore &) = delete;
35 FutexSemaphore &operator=(const FutexSemaphore &) = delete;
36 FutexSemaphore(FutexSemaphore &&) = delete;
37 FutexSemaphore &operator=(FutexSemaphore &&) = delete;
38
39public:
40 condy::Coro<void> async_acquire() {
41 while (true) {
42 uint32_t c = count_.load(std::memory_order_relaxed);
43 if (c > 0) {
44 if (count_.compare_exchange_strong(c, c - 1,
45 std::memory_order_acquire,
46 std::memory_order_relaxed)) {
47 co_return;
48 }
49 continue;
50 }
51 [[maybe_unused]] int r = co_await condy::async_futex_wait(
52 raw_count_ptr_(), 0, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
53 0);
54 assert(r == 0 || r == -EAGAIN);
55 }
56 }
57
58 void acquire() {
59 while (true) {
60 uint32_t c = count_.load(std::memory_order_relaxed);
61 if (c > 0) {
62 if (count_.compare_exchange_strong(c, c - 1,
63 std::memory_order_acquire,
64 std::memory_order_relaxed)) {
65 return;
66 }
67 continue;
68 }
69 futex_wait(raw_count_ptr_(), 0, FUTEX_BITSET_MATCH_ANY,
70 FUTEX2_SIZE_U32, nullptr, 0);
71 }
72 }
73
74 condy::Coro<void> async_release(uint32_t n = 1) {
75 count_.fetch_add(n, std::memory_order_release);
76 [[maybe_unused]] int r = co_await condy::async_futex_wake(
77 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
78 assert(r >= 0);
79 }
80
81 void release(uint32_t n = 1) {
82 count_.fetch_add(n, std::memory_order_release);
83 [[maybe_unused]] long r =
84 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
85 static_cast<int>(n), FUTEX2_SIZE_U32);
86 assert(r >= 0);
87 }
88
89private:
90 uint32_t *raw_count_ptr_() { return reinterpret_cast<uint32_t *>(&count_); }
91
92private:
93 std::atomic<uint32_t> count_;
94};
95
96class FutexMutex {
97public:
98 FutexMutex() = default;
99
100 FutexMutex(const FutexMutex &) = delete;
101 FutexMutex &operator=(const FutexMutex &) = delete;
102 FutexMutex(FutexMutex &&) = delete;
103 FutexMutex &operator=(FutexMutex &&) = delete;
104
105public:
106 auto async_lock() { return sem_.async_acquire(); }
107
108 auto async_unlock() { return sem_.async_release(); }
109
110 void lock() { sem_.acquire(); }
111
112 void unlock() { sem_.release(); }
113
114private:
115 FutexSemaphore sem_{1};
116};
117
118struct State {
119 State(size_t queue_size) : empty(queue_size), full(0) {}
120
121 std::queue<int> queue;
122 FutexMutex queue_mutex;
123 FutexSemaphore empty, full;
124};
125
126void producer(State &share, [[maybe_unused]] int id, size_t produce_count) {
127 for (size_t i = 0; i < produce_count; ++i) {
128 share.empty.acquire();
129 {
130 share.queue_mutex.lock();
131 share.queue.push(static_cast<int>(i));
132 share.queue_mutex.unlock();
133 }
134 share.full.release();
135 }
136}
137
138condy::Coro<void> async_consumer(State &share, int id, size_t consume_count) {
139 int item;
140 for (size_t i = 0; i < consume_count; ++i) {
141 co_await share.full.async_acquire();
142 {
143 co_await share.queue_mutex.async_lock();
144 item = share.queue.front();
145 share.queue.pop();
146 co_await share.queue_mutex.async_unlock();
147 }
148 co_await share.empty.async_release();
149
150 std::cout << std::format("Consumer {} consumed item {}\n", id, item);
151 }
152}
153
154void usage(const char *prog_name) {
155 std::cerr << std::format(
156 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
157 "[-n items_per_producer]\n",
158 prog_name);
159}
160
161static size_t queue_size = 32;
162static size_t num_producers = 8;
163static size_t num_consumers = 8;
164static size_t items_per_producer = 32;
165
166int main(int argc, char *argv[]) noexcept(false) {
167 int opt;
168 while ((opt = getopt(argc, argv, "hq:p:c:n:")) != -1) {
169 switch (opt) {
170 case 'q':
171 queue_size = std::stoul(optarg);
172 break;
173 case 'p':
174 num_producers = std::stoul(optarg);
175 break;
176 case 'c':
177 num_consumers = std::stoul(optarg);
178 break;
179 case 'n':
180 items_per_producer = std::stoul(optarg);
181 break;
182 case 'h':
183 usage(argv[0]);
184 return 0;
185 default:
186 usage(argv[0]);
187 return 1;
188 }
189 }
190
191 size_t total_items = num_producers * items_per_producer;
192 if (total_items % num_consumers != 0) {
193 std::cerr << std::format(
194 "Total items ({}) must be divisible by number of consumers ({})\n",
195 total_items, num_consumers);
196 return 1;
197 }
198 size_t items_per_consumer = total_items / num_consumers;
199
200 condy::Runtime rt1, rt2;
201 State share(queue_size);
202
203 std::vector<std::thread> producers;
204 producers.reserve(num_producers);
205 for (size_t i = 0; i < num_producers; ++i) {
206 producers.emplace_back(producer, std::ref(share), static_cast<int>(i),
207 items_per_producer);
208 }
209
210 std::thread consumer_thread([&]() {
211 for (size_t i = 0; i < num_consumers; ++i) {
212 condy::co_spawn(rt2, async_consumer(share, static_cast<int>(i),
213 items_per_consumer))
214 .detach();
215 }
216 rt2.allow_exit();
217 rt2.run();
218 });
219
220 for (auto &producer_thread : producers) {
221 producer_thread.join();
222 }
223 consumer_thread.join();
224}
225
226#else
227
228int main() {
229 std::cerr << std::format("Futex-based semaphore and mutex require io_uring "
230 "version 2.6 or higher.\n");
231 return 1;
232}
233
234#endif
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:111
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:301
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:225
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.