Condy v1.3.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
futex-semaphore.cpp
Go to the documentation of this file.
1
5
6#include <atomic>
7#include <cassert>
8#include <condy.hpp>
9#include <cstdio>
10#include <linux/futex.h>
11#include <queue>
12#include <thread>
13#include <unistd.h>
14
15#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
16
17long futex_wait(void *uaddr, unsigned long val, unsigned long mask,
18 unsigned int flags, const struct timespec *timeout,
19 int clockid) {
20 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
21}
22
23long futex_wake(void *uaddr, unsigned long mask, int nr, unsigned int flags) {
24 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
25}
26
27class FutexSemaphore {
28public:
29 FutexSemaphore(uint32_t initial_count = 0) : count(initial_count) {}
30
31 FutexSemaphore(const FutexSemaphore &) = delete;
32 FutexSemaphore &operator=(const FutexSemaphore &) = delete;
33 FutexSemaphore(FutexSemaphore &&) = delete;
34 FutexSemaphore &operator=(FutexSemaphore &&) = delete;
35
36public:
37 condy::Coro<void> async_acquire() {
38 uint32_t c;
39 while (true) {
40 size_t retries = 0;
41 while (retries++ < MAX_RETRIES) {
42 c = count.load(std::memory_order_relaxed);
43 if (c > 0 && count.compare_exchange_weak(
44 c, c - 1, std::memory_order_acquire,
45 std::memory_order_relaxed)) {
46 co_return;
47 }
48 }
49 [[maybe_unused]] int r = co_await condy::async_futex_wait(
50 raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
51 0);
52 assert(r == 0 || r == -EAGAIN);
53 }
54 }
55
56 void acquire() {
57 uint32_t c;
58 while (true) {
59 size_t retries = 0;
60 while (retries++ < MAX_RETRIES) {
61 c = count.load(std::memory_order_relaxed);
62 if (c > 0 && count.compare_exchange_weak(
63 c, c - 1, std::memory_order_acquire,
64 std::memory_order_relaxed)) {
65 return;
66 }
67 }
68 futex_wait(raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY,
69 FUTEX2_SIZE_U32, nullptr, 0);
70 }
71 }
72
73 condy::Coro<void> async_release(uint32_t n = 1) {
74 count.fetch_add(n, std::memory_order_release);
75 [[maybe_unused]] int r = co_await condy::async_futex_wake(
76 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
77 assert(r >= 0);
78 }
79
80 void release(uint32_t n = 1) {
81 count.fetch_add(n, std::memory_order_release);
82 [[maybe_unused]] long r =
83 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
84 static_cast<int>(n), FUTEX2_SIZE_U32);
85 assert(r >= 0);
86 }
87
88private:
89 uint32_t *raw_count_ptr_() { return reinterpret_cast<uint32_t *>(&count); }
90
91private:
92 static constexpr size_t MAX_RETRIES = 32;
93
94 std::atomic<uint32_t> count;
95};
96
97class FutexMutex {
98public:
99 FutexMutex() = default;
100
101 FutexMutex(const FutexMutex &) = delete;
102 FutexMutex &operator=(const FutexMutex &) = delete;
103 FutexMutex(FutexMutex &&) = delete;
104 FutexMutex &operator=(FutexMutex &&) = delete;
105
106public:
107 auto async_lock() { return sem.async_acquire(); }
108
109 auto async_unlock() { return sem.async_release(); }
110
111 void lock() { sem.acquire(); }
112
113 void unlock() { sem.release(); }
114
115private:
116 FutexSemaphore sem{1};
117};
118
119struct State {
120 State(size_t queue_size) : empty(queue_size), full(0) {}
121
122 std::queue<int> queue;
123 FutexMutex queue_mutex;
124 FutexSemaphore empty, full;
125};
126
127void producer(State &share, [[maybe_unused]] int id, size_t produce_count) {
128 for (size_t i = 0; i < produce_count; ++i) {
129 share.empty.acquire();
130 {
131 share.queue_mutex.lock();
132 share.queue.push(static_cast<int>(i));
133 share.queue_mutex.unlock();
134 }
135 share.full.release();
136 }
137}
138
139condy::Coro<void> async_consumer(State &share, int id, size_t consume_count) {
140 int item;
141 for (size_t i = 0; i < consume_count; ++i) {
142 co_await share.full.async_acquire();
143 {
144 co_await share.queue_mutex.async_lock();
145 item = share.queue.front();
146 share.queue.pop();
147 co_await share.queue_mutex.async_unlock();
148 }
149 co_await share.empty.async_release();
150
151 std::printf("Consumer %d consumed item %d\n", id, item);
152 }
153}
154
155void usage(const char *prog_name) {
156 std::printf("Usage: %s [-h] [-q queue_size] [-p num_producers] [-c "
157 "num_consumers] [-n items_per_producer]\n",
158 prog_name);
159}
160
161static size_t queue_size = 32;
162static size_t num_producers = 8;
163static size_t num_consumers = 8;
164static size_t items_per_producer = 32;
165
166int main(int argc, char *argv[]) noexcept(false) {
167 int opt;
168 while ((opt = getopt(argc, argv, "hq:p:c:n:")) != -1) {
169 switch (opt) {
170 case 'q':
171 queue_size = std::stoul(optarg);
172 break;
173 case 'p':
174 num_producers = std::stoul(optarg);
175 break;
176 case 'c':
177 num_consumers = std::stoul(optarg);
178 break;
179 case 'n':
180 items_per_producer = std::stoul(optarg);
181 break;
182 case 'h':
183 usage(argv[0]);
184 return 0;
185 default:
186 usage(argv[0]);
187 return 1;
188 }
189 }
190
191 size_t total_items = num_producers * items_per_producer;
192 if (total_items % num_consumers != 0) {
193 std::fprintf(stderr,
194 "Total items (%zu) must be divisible by number of "
195 "consumers (%zu)\n",
196 total_items, num_consumers);
197 return 1;
198 }
199 size_t items_per_consumer = total_items / num_consumers;
200
201 condy::Runtime rt1, rt2;
202 State share(queue_size);
203
204 std::vector<std::thread> producers;
205 producers.reserve(num_producers);
206 for (size_t i = 0; i < num_producers; ++i) {
207 producers.emplace_back(producer, std::ref(share), static_cast<int>(i),
208 items_per_producer);
209 }
210
211 std::thread consumer_thread([&]() {
212 for (size_t i = 0; i < num_consumers; ++i) {
213 condy::co_spawn(rt2, async_consumer(share, static_cast<int>(i),
214 items_per_consumer))
215 .detach();
216 }
217 rt2.allow_exit();
218 rt2.run();
219 });
220
221 for (auto &producer_thread : producers) {
222 producer_thread.join();
223 }
224 consumer_thread.join();
225}
226
227#else
228
229int main() {
230 std::fprintf(stderr,
231 "Futex-based semaphore and mutex require io_uring version "
232 "2.6 or higher.\n");
233 return 1;
234}
235
236#endif
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:76
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:240
void allow_exit()
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:184
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:240