Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
futex-semaphore.cpp
Go to the documentation of this file.
1
5
6#include <atomic>
7#include <cassert>
8#include <condy.hpp>
9#include <cstdio>
10#include <format>
11#include <iostream>
12#include <linux/futex.h>
13#include <queue>
14#include <thread>
15#include <unistd.h>
16
17#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
18
19long futex_wait(void *uaddr, unsigned long val, unsigned long mask,
20 unsigned int flags, const struct timespec *timeout,
21 int clockid) {
22 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
23}
24
25long futex_wake(void *uaddr, unsigned long mask, int nr, unsigned int flags) {
26 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
27}
28
29class FutexSemaphore {
30public:
31 FutexSemaphore(uint32_t initial_count = 0) : count(initial_count) {}
32
33 FutexSemaphore(const FutexSemaphore &) = delete;
34 FutexSemaphore &operator=(const FutexSemaphore &) = delete;
35 FutexSemaphore(FutexSemaphore &&) = delete;
36 FutexSemaphore &operator=(FutexSemaphore &&) = delete;
37
38public:
39 condy::Coro<void> async_acquire() {
40 uint32_t c;
41 while (true) {
42 size_t retries = 0;
43 while (retries++ < MAX_RETRIES) {
44 c = count.load(std::memory_order_relaxed);
45 if (c > 0 && count.compare_exchange_weak(
46 c, c - 1, std::memory_order_acquire,
47 std::memory_order_relaxed)) {
48 co_return;
49 }
50 }
51 [[maybe_unused]] int r = co_await condy::async_futex_wait(
52 raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
53 0);
54 assert(r == 0 || r == -EAGAIN);
55 }
56 }
57
58 void acquire() {
59 uint32_t c;
60 while (true) {
61 size_t retries = 0;
62 while (retries++ < MAX_RETRIES) {
63 c = count.load(std::memory_order_relaxed);
64 if (c > 0 && count.compare_exchange_weak(
65 c, c - 1, std::memory_order_acquire,
66 std::memory_order_relaxed)) {
67 return;
68 }
69 }
70 futex_wait(raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY,
71 FUTEX2_SIZE_U32, nullptr, 0);
72 }
73 }
74
75 condy::Coro<void> async_release(uint32_t n = 1) {
76 count.fetch_add(n, std::memory_order_release);
77 [[maybe_unused]] int r = co_await condy::async_futex_wake(
78 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
79 assert(r >= 0);
80 }
81
82 void release(uint32_t n = 1) {
83 count.fetch_add(n, std::memory_order_release);
84 [[maybe_unused]] long r =
85 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
86 static_cast<int>(n), FUTEX2_SIZE_U32);
87 assert(r >= 0);
88 }
89
90private:
91 uint32_t *raw_count_ptr_() { return reinterpret_cast<uint32_t *>(&count); }
92
93private:
94 static constexpr size_t MAX_RETRIES = 32;
95
96 std::atomic<uint32_t> count;
97};
98
99class FutexMutex {
100public:
101 FutexMutex() = default;
102
103 FutexMutex(const FutexMutex &) = delete;
104 FutexMutex &operator=(const FutexMutex &) = delete;
105 FutexMutex(FutexMutex &&) = delete;
106 FutexMutex &operator=(FutexMutex &&) = delete;
107
108public:
109 auto async_lock() { return sem.async_acquire(); }
110
111 auto async_unlock() { return sem.async_release(); }
112
113 void lock() { sem.acquire(); }
114
115 void unlock() { sem.release(); }
116
117private:
118 FutexSemaphore sem{1};
119};
120
121struct State {
122 State(size_t queue_size) : empty(queue_size), full(0) {}
123
124 std::queue<int> queue;
125 FutexMutex queue_mutex;
126 FutexSemaphore empty, full;
127};
128
129void producer(State &share, [[maybe_unused]] int id, size_t produce_count) {
130 for (size_t i = 0; i < produce_count; ++i) {
131 share.empty.acquire();
132 {
133 share.queue_mutex.lock();
134 share.queue.push(static_cast<int>(i));
135 share.queue_mutex.unlock();
136 }
137 share.full.release();
138 }
139}
140
141condy::Coro<void> async_consumer(State &share, int id, size_t consume_count) {
142 int item;
143 for (size_t i = 0; i < consume_count; ++i) {
144 co_await share.full.async_acquire();
145 {
146 co_await share.queue_mutex.async_lock();
147 item = share.queue.front();
148 share.queue.pop();
149 co_await share.queue_mutex.async_unlock();
150 }
151 co_await share.empty.async_release();
152
153 std::cout << std::format("Consumer {} consumed item {}\n", id, item);
154 }
155}
156
157void usage(const char *prog_name) {
158 std::cerr << std::format(
159 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
160 "[-n items_per_producer]\n",
161 prog_name);
162}
163
164static size_t queue_size = 32;
165static size_t num_producers = 8;
166static size_t num_consumers = 8;
167static size_t items_per_producer = 32;
168
169int main(int argc, char *argv[]) noexcept(false) {
170 int opt;
171 while ((opt = getopt(argc, argv, "hq:p:c:n:")) != -1) {
172 switch (opt) {
173 case 'q':
174 queue_size = std::stoul(optarg);
175 break;
176 case 'p':
177 num_producers = std::stoul(optarg);
178 break;
179 case 'c':
180 num_consumers = std::stoul(optarg);
181 break;
182 case 'n':
183 items_per_producer = std::stoul(optarg);
184 break;
185 case 'h':
186 usage(argv[0]);
187 return 0;
188 default:
189 usage(argv[0]);
190 return 1;
191 }
192 }
193
194 size_t total_items = num_producers * items_per_producer;
195 if (total_items % num_consumers != 0) {
196 std::cerr << std::format(
197 "Total items ({}) must be divisible by number of consumers ({})\n",
198 total_items, num_consumers);
199 return 1;
200 }
201 size_t items_per_consumer = total_items / num_consumers;
202
203 condy::Runtime rt1, rt2;
204 State share(queue_size);
205
206 std::vector<std::thread> producers;
207 producers.reserve(num_producers);
208 for (size_t i = 0; i < num_producers; ++i) {
209 producers.emplace_back(producer, std::ref(share), static_cast<int>(i),
210 items_per_producer);
211 }
212
213 std::thread consumer_thread([&]() {
214 for (size_t i = 0; i < num_consumers; ++i) {
215 condy::co_spawn(rt2, async_consumer(share, static_cast<int>(i),
216 items_per_consumer))
217 .detach();
218 }
219 rt2.allow_exit();
220 rt2.run();
221 });
222
223 for (auto &producer_thread : producers) {
224 producer_thread.join();
225 }
226 consumer_thread.join();
227}
228
229#else
230
231int main() {
232 std::cerr << std::format("Futex-based semaphore and mutex require io_uring "
233 "version 2.6 or higher.\n");
234 return 1;
235}
236
237#endif
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:68
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:176
void run() noexcept
Run the runtime event loop in the current thread.
Definition runtime.hpp:220
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266