12#include <linux/futex.h>
17#if !IO_URING_CHECK_VERSION(2, 6)
19long futex_wait(
void *uaddr,
unsigned long val,
unsigned long mask,
20 unsigned int flags,
const struct timespec *timeout,
22 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
25long futex_wake(
void *uaddr,
unsigned long mask,
int nr,
unsigned int flags) {
26 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
31 FutexSemaphore(uint32_t initial_count = 0) : count(initial_count) {}
33 FutexSemaphore(
const FutexSemaphore &) =
delete;
34 FutexSemaphore &operator=(
const FutexSemaphore &) =
delete;
35 FutexSemaphore(FutexSemaphore &&) =
delete;
36 FutexSemaphore &operator=(FutexSemaphore &&) =
delete;
39 condy::Coro<void> async_acquire() {
43 while (retries++ < MAX_RETRIES) {
44 c = count.load(std::memory_order_relaxed);
45 if (c > 0 && count.compare_exchange_weak(
46 c, c - 1, std::memory_order_acquire,
47 std::memory_order_relaxed)) {
52 raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
54 assert(r == 0 || r == -EAGAIN);
62 while (retries++ < MAX_RETRIES) {
63 c = count.load(std::memory_order_relaxed);
64 if (c > 0 && count.compare_exchange_weak(
65 c, c - 1, std::memory_order_acquire,
66 std::memory_order_relaxed)) {
70 futex_wait(raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY,
71 FUTEX2_SIZE_U32,
nullptr, 0);
75 condy::Coro<void> async_release(uint32_t n = 1) {
76 count.fetch_add(n, std::memory_order_release);
78 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
82 void release(uint32_t n = 1) {
83 count.fetch_add(n, std::memory_order_release);
84 [[maybe_unused]]
long r =
85 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
86 static_cast<int>(n), FUTEX2_SIZE_U32);
91 uint32_t *raw_count_ptr_() {
return reinterpret_cast<uint32_t *
>(&count); }
94 static constexpr size_t MAX_RETRIES = 32;
96 std::atomic<uint32_t> count;
101 FutexMutex() =
default;
103 FutexMutex(
const FutexMutex &) =
delete;
104 FutexMutex &operator=(
const FutexMutex &) =
delete;
105 FutexMutex(FutexMutex &&) =
delete;
106 FutexMutex &operator=(FutexMutex &&) =
delete;
109 auto async_lock() {
return sem.async_acquire(); }
111 auto async_unlock() {
return sem.async_release(); }
113 void lock() { sem.acquire(); }
115 void unlock() { sem.release(); }
118 FutexSemaphore sem{1};
122 State(
size_t queue_size) : empty(queue_size), full(0) {}
124 std::queue<int> queue;
125 FutexMutex queue_mutex;
126 FutexSemaphore empty, full;
129void producer(State &share, [[maybe_unused]]
int id,
size_t produce_count) {
130 for (
size_t i = 0; i < produce_count; ++i) {
131 share.empty.acquire();
133 share.queue_mutex.lock();
134 share.queue.push(
static_cast<int>(i));
135 share.queue_mutex.unlock();
137 share.full.release();
143 for (
size_t i = 0; i < consume_count; ++i) {
144 co_await share.full.async_acquire();
146 co_await share.queue_mutex.async_lock();
147 item = share.queue.front();
149 co_await share.queue_mutex.async_unlock();
151 co_await share.empty.async_release();
153 std::cout << std::format(
"Consumer {} consumed item {}\n",
id, item);
157void usage(
const char *prog_name) {
158 std::cerr << std::format(
159 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
160 "[-n items_per_producer]\n",
164static size_t queue_size = 32;
165static size_t num_producers = 8;
166static size_t num_consumers = 8;
167static size_t items_per_producer = 32;
169int main(
int argc,
char *argv[])
noexcept(
false) {
171 while ((opt = getopt(argc, argv,
"hq:p:c:n:")) != -1) {
174 queue_size = std::stoul(optarg);
177 num_producers = std::stoul(optarg);
180 num_consumers = std::stoul(optarg);
183 items_per_producer = std::stoul(optarg);
194 size_t total_items = num_producers * items_per_producer;
195 if (total_items % num_consumers != 0) {
196 std::cerr << std::format(
197 "Total items ({}) must be divisible by number of consumers ({})\n",
198 total_items, num_consumers);
201 size_t items_per_consumer = total_items / num_consumers;
204 State share(queue_size);
206 std::vector<std::thread> producers;
207 producers.reserve(num_producers);
208 for (
size_t i = 0; i < num_producers; ++i) {
209 producers.emplace_back(producer, std::ref(share),
static_cast<int>(i),
213 std::thread consumer_thread([&]() {
214 for (
size_t i = 0; i < num_consumers; ++i) {
223 for (
auto &producer_thread : producers) {
224 producer_thread.join();
226 consumer_thread.join();
232 std::cerr << std::format(
"Futex-based semaphore and mutex require io_uring "
233 "version 2.6 or higher.\n");
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
void run() noexcept
Run the runtime event loop in the current thread.
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.