12#include <linux/futex.h>
17#if !IO_URING_CHECK_VERSION(2, 6)
19long futex_wait(
void *uaddr,
unsigned long val,
unsigned long mask,
20 unsigned int flags,
const struct timespec *timeout,
22 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
25long futex_wake(
void *uaddr,
unsigned long mask,
int nr,
unsigned int flags) {
26 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
31 FutexSemaphore(uint32_t initial_count = 0) : count(initial_count) {}
33 FutexSemaphore(
const FutexSemaphore &) =
delete;
34 FutexSemaphore &operator=(
const FutexSemaphore &) =
delete;
35 FutexSemaphore(FutexSemaphore &&) =
delete;
36 FutexSemaphore &operator=(FutexSemaphore &&) =
delete;
39 condy::Coro<void> async_acquire() {
43 while (retries++ < MAX_RETRIES) {
44 c = count.load(std::memory_order_relaxed);
45 if (c > 0 && count.compare_exchange_weak(
46 c, c - 1, std::memory_order_acquire,
47 std::memory_order_relaxed)) {
52 raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
54 assert(r == 0 || r == -EAGAIN);
62 while (retries++ < MAX_RETRIES) {
63 c = count.load(std::memory_order_relaxed);
64 if (c > 0 && count.compare_exchange_weak(
65 c, c - 1, std::memory_order_acquire,
66 std::memory_order_relaxed)) {
70 futex_wait(raw_count_ptr_(), c, FUTEX_BITSET_MATCH_ANY,
71 FUTEX2_SIZE_U32,
nullptr, 0);
75 condy::Coro<void> async_release(uint32_t n = 1) {
76 count.fetch_add(n, std::memory_order_release);
78 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
82 void release(uint32_t n = 1) {
83 count.fetch_add(n, std::memory_order_release);
84 [[maybe_unused]]
long r =
85 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
86 static_cast<int>(n), FUTEX2_SIZE_U32);
91 uint32_t *raw_count_ptr_() {
return reinterpret_cast<uint32_t *
>(&count); }
94 static constexpr size_t MAX_RETRIES = 32;
96 std::atomic<uint32_t> count;
101 FutexMutex() =
default;
103 FutexMutex(
const FutexMutex &) =
delete;
104 FutexMutex &operator=(
const FutexMutex &) =
delete;
105 FutexMutex(FutexMutex &&) =
delete;
106 FutexMutex &operator=(FutexMutex &&) =
delete;
109 auto async_lock() {
return sem.async_acquire(); }
111 auto async_unlock() {
return sem.async_release(); }
113 void lock() { sem.acquire(); }
115 void unlock() { sem.release(); }
118 FutexSemaphore sem{1};
122 State(
size_t queue_size) : empty(queue_size), full(0) {}
124 std::queue<int> queue;
125 FutexMutex queue_mutex;
126 FutexSemaphore empty, full;
129void producer(State &share, [[maybe_unused]]
int id,
size_t produce_count) {
130 for (
size_t i = 0; i < produce_count; ++i) {
131 share.empty.acquire();
133 share.queue_mutex.lock();
134 share.queue.push(
static_cast<int>(i));
135 share.queue_mutex.unlock();
137 share.full.release();
143 for (
size_t i = 0; i < consume_count; ++i) {
144 co_await share.full.async_acquire();
146 co_await share.queue_mutex.async_lock();
147 item = share.queue.front();
149 co_await share.queue_mutex.async_unlock();
151 co_await share.empty.async_release();
153 std::cout << std::format(
"Consumer {} consumed item {}\n",
id, item);
157void usage(
const char *prog_name) {
158 std::cerr << std::format(
159 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
160 "[-n items_per_producer]\n",
164static size_t queue_size = 32;
165static size_t num_producers = 8;
166static size_t num_consumers = 8;
167static size_t items_per_producer = 32;
169int main(
int argc,
char *argv[])
noexcept(
false) {
171 while ((opt = getopt(argc, argv,
"hq:p:c:n:")) != -1) {
174 queue_size = std::stoul(optarg);
177 num_producers = std::stoul(optarg);
180 num_consumers = std::stoul(optarg);
183 items_per_producer = std::stoul(optarg);
194 size_t total_items = num_producers * items_per_producer;
195 if (total_items % num_consumers != 0) {
196 std::cerr << std::format(
197 "Total items ({}) must be divisible by number of consumers ({})\n",
198 total_items, num_consumers);
201 size_t items_per_consumer = total_items / num_consumers;
204 State share(queue_size);
206 std::vector<std::thread> producers;
207 producers.reserve(num_producers);
208 for (
size_t i = 0; i < num_producers; ++i) {
209 producers.emplace_back(producer, std::ref(share),
static_cast<int>(i),
213 std::thread consumer_thread([&]() {
214 for (
size_t i = 0; i < num_consumers; ++i) {
223 for (
auto &producer_thread : producers) {
224 producer_thread.join();
226 consumer_thread.join();
232 std::cerr << std::format(
"Futex-based semaphore and mutex require io_uring "
233 "version 2.6 or higher.\n");
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
void run()
Run the runtime event loop in the current thread.
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.