10#include <linux/futex.h>
15#if !IO_URING_CHECK_VERSION(2, 6)
17long futex_wait(
void *uaddr,
unsigned long val,
unsigned long mask,
18 unsigned int flags,
const struct timespec *timeout,
20 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
23long futex_wake(
void *uaddr,
unsigned long mask,
int nr,
unsigned int flags) {
24 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
29 FutexSemaphore(uint32_t initial_count = 0) : count(initial_count) {}
31 FutexSemaphore(
const FutexSemaphore &) =
delete;
32 FutexSemaphore &operator=(
const FutexSemaphore &) =
delete;
33 FutexSemaphore(FutexSemaphore &&) =
delete;
34 FutexSemaphore &operator=(FutexSemaphore &&) =
delete;
37 condy::Coro<void> async_acquire() {
41 while (retries++ < MAX_RETRIES) {
42 c = count.load(std::memory_order_relaxed);
43 if (c > 0 && count.compare_exchange_weak(
44 c, c - 1, std::memory_order_acquire,
45 std::memory_order_relaxed)) {
50 raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
52 assert(r == 0 || r == -EAGAIN);
60 while (retries++ < MAX_RETRIES) {
61 c = count.load(std::memory_order_relaxed);
62 if (c > 0 && count.compare_exchange_weak(
63 c, c - 1, std::memory_order_acquire,
64 std::memory_order_relaxed)) {
68 futex_wait(raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY,
69 FUTEX2_SIZE_U32,
nullptr, 0);
73 condy::Coro<void> async_release(uint32_t n = 1) {
74 count.fetch_add(n, std::memory_order_release);
76 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
80 void release(uint32_t n = 1) {
81 count.fetch_add(n, std::memory_order_release);
82 [[maybe_unused]]
long r =
83 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
84 static_cast<int>(n), FUTEX2_SIZE_U32);
89 uint32_t *raw_count_ptr_() {
return reinterpret_cast<uint32_t *
>(&count); }
92 static constexpr size_t MAX_RETRIES = 32;
94 std::atomic<uint32_t> count;
99 FutexMutex() =
default;
101 FutexMutex(
const FutexMutex &) =
delete;
102 FutexMutex &operator=(
const FutexMutex &) =
delete;
103 FutexMutex(FutexMutex &&) =
delete;
104 FutexMutex &operator=(FutexMutex &&) =
delete;
107 auto async_lock() {
return sem.async_acquire(); }
109 auto async_unlock() {
return sem.async_release(); }
111 void lock() { sem.acquire(); }
113 void unlock() { sem.release(); }
116 FutexSemaphore sem{1};
120 State(
size_t queue_size) : empty(queue_size), full(0) {}
122 std::queue<int> queue;
123 FutexMutex queue_mutex;
124 FutexSemaphore empty, full;
127void producer(State &share, [[maybe_unused]]
int id,
size_t produce_count) {
128 for (
size_t i = 0; i < produce_count; ++i) {
129 share.empty.acquire();
131 share.queue_mutex.lock();
132 share.queue.push(
static_cast<int>(i));
133 share.queue_mutex.unlock();
135 share.full.release();
141 for (
size_t i = 0; i < consume_count; ++i) {
142 co_await share.full.async_acquire();
144 co_await share.queue_mutex.async_lock();
145 item = share.queue.front();
147 co_await share.queue_mutex.async_unlock();
149 co_await share.empty.async_release();
151 std::printf(
"Consumer %d consumed item %d\n",
id, item);
155void usage(
const char *prog_name) {
156 std::printf(
"Usage: %s [-h] [-q queue_size] [-p num_producers] [-c "
157 "num_consumers] [-n items_per_producer]\n",
161static size_t queue_size = 32;
162static size_t num_producers = 8;
163static size_t num_consumers = 8;
164static size_t items_per_producer = 32;
166int main(
int argc,
char *argv[])
noexcept(
false) {
168 while ((opt = getopt(argc, argv,
"hq:p:c:n:")) != -1) {
171 queue_size = std::stoul(optarg);
174 num_producers = std::stoul(optarg);
177 num_consumers = std::stoul(optarg);
180 items_per_producer = std::stoul(optarg);
191 size_t total_items = num_producers * items_per_producer;
192 if (total_items % num_consumers != 0) {
194 "Total items (%zu) must be divisible by number of "
196 total_items, num_consumers);
199 size_t items_per_consumer = total_items / num_consumers;
202 State share(queue_size);
204 std::vector<std::thread> producers;
205 producers.reserve(num_producers);
206 for (
size_t i = 0; i < num_producers; ++i) {
207 producers.emplace_back(producer, std::ref(share),
static_cast<int>(i),
211 std::thread consumer_thread([&]() {
212 for (
size_t i = 0; i < num_consumers; ++i) {
221 for (
auto &producer_thread : producers) {
222 producer_thread.join();
224 consumer_thread.join();
231 "Futex-based semaphore and mutex require io_uring version "
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
void run()
Run the runtime event loop in the current thread.
void allow_exit()
Allow the runtime to exit when there are no pending works.
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.