13#include <linux/futex.h>
18#if !IO_URING_CHECK_VERSION(2, 6)
20long futex_wait(
void *uaddr,
unsigned long val,
unsigned long mask,
21 unsigned int flags,
const struct timespec *timeout,
23 return syscall(SYS_futex_wait, uaddr, val, mask, flags, timeout, clockid);
26long futex_wake(
void *uaddr,
unsigned long mask,
int nr,
unsigned int flags) {
27 return syscall(SYS_futex_wake, uaddr, mask, nr, flags);
32 FutexSemaphore(uint32_t initial_count = 0) : count_(initial_count) {}
34 FutexSemaphore(
const FutexSemaphore &) =
delete;
35 FutexSemaphore &operator=(
const FutexSemaphore &) =
delete;
36 FutexSemaphore(FutexSemaphore &&) =
delete;
37 FutexSemaphore &operator=(FutexSemaphore &&) =
delete;
40 condy::Coro<void> async_acquire() {
42 uint32_t c = count_.load(std::memory_order_relaxed);
44 if (count_.compare_exchange_strong(c, c - 1,
45 std::memory_order_acquire,
46 std::memory_order_relaxed)) {
52 raw_count_ptr_(), 0, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32,
54 assert(r == 0 || r == -EAGAIN);
60 uint32_t c = count_.load(std::memory_order_relaxed);
62 if (count_.compare_exchange_strong(c, c - 1,
63 std::memory_order_acquire,
64 std::memory_order_relaxed)) {
69 futex_wait(raw_count_ptr_(), 0, FUTEX_BITSET_MATCH_ANY,
70 FUTEX2_SIZE_U32,
nullptr, 0);
74 condy::Coro<void> async_release(uint32_t n = 1) {
75 count_.fetch_add(n, std::memory_order_release);
77 raw_count_ptr_(), n, FUTEX_BITSET_MATCH_ANY, FUTEX2_SIZE_U32, 0);
81 void release(uint32_t n = 1) {
82 count_.fetch_add(n, std::memory_order_release);
83 [[maybe_unused]]
long r =
84 futex_wake(raw_count_ptr_(), FUTEX_BITSET_MATCH_ANY,
85 static_cast<int>(n), FUTEX2_SIZE_U32);
90 uint32_t *raw_count_ptr_() {
return reinterpret_cast<uint32_t *
>(&count_); }
93 std::atomic<uint32_t> count_;
98 FutexMutex() =
default;
100 FutexMutex(
const FutexMutex &) =
delete;
101 FutexMutex &operator=(
const FutexMutex &) =
delete;
102 FutexMutex(FutexMutex &&) =
delete;
103 FutexMutex &operator=(FutexMutex &&) =
delete;
106 auto async_lock() {
return sem_.async_acquire(); }
108 auto async_unlock() {
return sem_.async_release(); }
110 void lock() { sem_.acquire(); }
112 void unlock() { sem_.release(); }
115 FutexSemaphore sem_{1};
119 State(
size_t queue_size) : empty(queue_size), full(0) {}
121 std::queue<int> queue;
122 FutexMutex queue_mutex;
123 FutexSemaphore empty, full;
126void producer(State &share, [[maybe_unused]]
int id,
size_t produce_count) {
127 for (
size_t i = 0; i < produce_count; ++i) {
128 share.empty.acquire();
130 share.queue_mutex.lock();
131 share.queue.push(
static_cast<int>(i));
132 share.queue_mutex.unlock();
134 share.full.release();
140 for (
size_t i = 0; i < consume_count; ++i) {
141 co_await share.full.async_acquire();
143 co_await share.queue_mutex.async_lock();
144 item = share.queue.front();
146 co_await share.queue_mutex.async_unlock();
148 co_await share.empty.async_release();
150 std::cout << std::format(
"Consumer {} consumed item {}\n",
id, item);
154void usage(
const char *prog_name) {
155 std::cerr << std::format(
156 "Usage: {} [-h] [-q queue_size] [-p num_producers] [-c num_consumers] "
157 "[-n items_per_producer]\n",
161static size_t queue_size = 32;
162static size_t num_producers = 8;
163static size_t num_consumers = 8;
164static size_t items_per_producer = 32;
166int main(
int argc,
char *argv[])
noexcept(
false) {
168 while ((opt = getopt(argc, argv,
"hq:p:c:n:")) != -1) {
171 queue_size = std::stoul(optarg);
174 num_producers = std::stoul(optarg);
177 num_consumers = std::stoul(optarg);
180 items_per_producer = std::stoul(optarg);
191 size_t total_items = num_producers * items_per_producer;
192 if (total_items % num_consumers != 0) {
193 std::cerr << std::format(
194 "Total items ({}) must be divisible by number of consumers ({})\n",
195 total_items, num_consumers);
198 size_t items_per_consumer = total_items / num_consumers;
201 State share(queue_size);
203 std::vector<std::thread> producers;
204 producers.reserve(num_producers);
205 for (
size_t i = 0; i < num_producers; ++i) {
206 producers.emplace_back(producer, std::ref(share),
static_cast<int>(i),
210 std::thread consumer_thread([&]() {
211 for (
size_t i = 0; i < num_consumers; ++i) {
220 for (
auto &producer_thread : producers) {
221 producer_thread.join();
223 consumer_thread.join();
229 std::cerr << std::format(
"Futex-based semaphore and mutex require io_uring "
230 "version 2.6 or higher.\n");
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
void run()
Run the runtime event loop in the current thread.
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Main include file for the Condy library.
auto async_futex_wake(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wake.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
auto async_futex_wait(uint32_t *futex, uint64_t val, uint64_t mask, uint32_t futex_flags, unsigned int flags)
See io_uring_prep_futex_wait.