Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
ring.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/condy_uring.hpp"
11#include "condy/utils.hpp"
12#include <cassert>
13#include <cerrno>
14#include <cstddef>
15#include <cstring>
16
17namespace condy {
18
24class FdTable {
25public:
26 FdTable(io_uring &ring) : ring_(ring) {}
27
28 FdTable(const FdTable &) = delete;
29 FdTable &operator=(const FdTable &) = delete;
30 FdTable(FdTable &&) = delete;
31 FdTable &operator=(FdTable &&) = delete;
32
33public:
39 int init(size_t capacity) noexcept {
40 return io_uring_register_files_sparse(&ring_, capacity);
41 }
42
50 int init(const int *fds, unsigned nr_fds) {
51 return io_uring_register_files(&ring_, fds, nr_fds);
52 }
53
58 int destroy() noexcept { return io_uring_unregister_files(&ring_); }
59
68 int update(unsigned index_base, const int *fds, unsigned nr_fds) noexcept {
69 return io_uring_register_files_update(&ring_, index_base, fds, nr_fds);
70 }
71
83 template <typename Func>
84 [[deprecated("set_fd_accepter is deprecated and will be removed in a "
85 "future version")]]
86 void set_fd_accepter(Func &&accepter) {
87 fd_accepter_ = std::forward<Func>(accepter);
88 }
89
96 int set_file_alloc_range(unsigned offset, unsigned size) noexcept {
97 return io_uring_register_file_alloc_range(&ring_, offset, size);
98 }
99
100private:
101 std::function<void(int32_t)> fd_accepter_ = nullptr;
102 io_uring &ring_;
103
104 friend class Runtime;
105 friend auto async_fixed_fd_send(FdTable &dst, int source_fd, int target_fd,
106 unsigned int flags);
107};
108
114class BufferTable {
115public:
116 BufferTable(io_uring &ring) : ring_(ring) {}
117
118 BufferTable(const BufferTable &) = delete;
119 BufferTable &operator=(const BufferTable &) = delete;
120 BufferTable(BufferTable &&) = delete;
121 BufferTable &operator=(BufferTable &&) = delete;
122
123public:
129 int init(size_t capacity) noexcept {
130 int r = io_uring_register_buffers_sparse(&ring_, capacity);
131 if (r < 0) {
132 return r;
133 }
134 initialized_ = true;
135 return r;
136 }
137
145 int init(const iovec *vecs, unsigned nr_vecs) {
146 int r = io_uring_register_buffers(&ring_, vecs, nr_vecs);
147 if (r < 0) {
148 return r;
149 }
150 initialized_ = true;
151 return r;
152 }
153
158 int destroy() noexcept {
159 initialized_ = false;
160 return io_uring_unregister_buffers(&ring_);
161 }
162
171 int update(unsigned index_base, const iovec *vecs,
172 unsigned nr_vecs) noexcept {
173 return io_uring_register_buffers_update_tag(&ring_, index_base, vecs,
174 nullptr, nr_vecs);
175 }
176
177#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
178
187 int clone_buffers(BufferTable &src, unsigned int dst_off = 0,
188 unsigned int src_off = 0, unsigned int nr = 0) noexcept {
189 auto *src_ring = &src.ring_;
190 auto *dst_ring = &ring_;
191 unsigned int flags = 0;
192 if (initialized_) {
193 flags |= IORING_REGISTER_DST_REPLACE;
194 }
195 int r = __io_uring_clone_buffers_offset(dst_ring, src_ring, dst_off,
196 src_off, nr, flags);
197 if (r < 0) {
198 return r;
199 }
200 initialized_ = true;
201 return r;
202 }
203#endif
204
205private:
206 io_uring &ring_;
207 bool initialized_ = false;
208};
209
215class RingSettings {
216public:
217 RingSettings(io_uring &ring) : ring_(ring) {}
218
219 ~RingSettings() {
220 if (probe_) {
221 io_uring_free_probe(probe_);
222 probe_ = nullptr;
223 }
224 }
225
226 RingSettings(const RingSettings &) = delete;
227 RingSettings &operator=(const RingSettings &) = delete;
228 RingSettings(RingSettings &&) = delete;
229 RingSettings &operator=(RingSettings &&) = delete;
230
231public:
238 int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask) noexcept {
239 return io_uring_register_iowq_aff(&ring_, cpusz, mask);
240 }
241
245 int remove_iowq_aff() noexcept {
246 return io_uring_unregister_iowq_aff(&ring_);
247 }
248
255 int set_iowq_max_workers(unsigned int *values) noexcept {
256 return io_uring_register_iowq_max_workers(&ring_, values);
257 }
258
264 io_uring_probe *get_probe() noexcept {
265 if (probe_) {
266 return probe_;
267 }
268 probe_ = io_uring_get_probe_ring(&ring_);
269 return probe_;
270 }
271
276 uint32_t get_features() const noexcept { return features_; }
277
278#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
284 int apply_napi(io_uring_napi *napi) noexcept {
285 return io_uring_register_napi(&ring_, napi);
286 }
287
291 int remove_napi(io_uring_napi *napi = nullptr) noexcept {
292 return io_uring_unregister_napi(&ring_, napi);
293 }
294#endif
295
296#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
302 int set_clock(io_uring_clock_register *clock_reg) noexcept {
303 return io_uring_register_clock(&ring_, clock_reg);
304 }
305#endif
306
307#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
313 int set_rings_size(io_uring_params *params) noexcept {
314 return io_uring_resize_rings(&ring_, params);
315 }
316#endif
317
318#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
324 int set_iowait(bool enable_iowait) noexcept {
325 return io_uring_set_iowait(&ring_, enable_iowait);
326 }
327#endif
328
329private:
330 io_uring &ring_;
331 io_uring_probe *probe_ = nullptr;
332 uint32_t features_ = 0;
333
334 friend class Ring;
335};
336
337class Ring {
338public:
339 Ring() = default;
340 ~Ring() { destroy(); }
341
342 Ring(const Ring &) = delete;
343 Ring &operator=(const Ring &) = delete;
344 Ring(Ring &&) = delete;
345 Ring &operator=(Ring &&) = delete;
346
347public:
348 int init(unsigned int entries, io_uring_params *params,
349 [[maybe_unused]] void *buf = nullptr,
350 [[maybe_unused]] size_t buf_size = 0) noexcept {
351 int r;
352 assert(!initialized_);
353#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
354 if (params->flags & IORING_SETUP_NO_MMAP) {
355 r = io_uring_queue_init_mem(entries, &ring_, params, buf, buf_size);
356 } else
357#endif
358 r = io_uring_queue_init_params(entries, &ring_, params);
359 if (r < 0) {
360 return r;
361 }
362 settings_.features_ = params->features;
363 sqpoll_mode_ = (params->flags & IORING_SETUP_SQPOLL) != 0;
364 initialized_ = true;
365 return r;
366 }
367
368 void destroy() noexcept {
369 if (initialized_) {
370 io_uring_queue_exit(&ring_);
371 initialized_ = false;
372 }
373 }
374
375 void submit() noexcept { io_uring_submit(&ring_); }
376
377 template <typename Func>
378 size_t reap_completions_wait(Func &&process_func) noexcept {
379 unsigned head;
380 io_uring_cqe *cqe;
381 size_t reaped = 0;
382 do {
383 int r = io_uring_submit_and_wait(&ring_, 1);
384 if (r >= 0) [[likely]] {
385 break;
386 } else if (r == -EINTR) {
387 continue;
388 } else {
389 assert(false && "io_uring_submit_and_wait failed");
390 }
391 } while (true);
392
393 io_uring_for_each_cqe(&ring_, head, cqe) {
394 process_func(cqe);
395#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
396 reaped += io_uring_cqe_nr(cqe);
397#else
398 reaped++;
399#endif
400 }
401 io_uring_cq_advance(&ring_, reaped);
402 return reaped;
403 }
404
405 template <typename Func>
406 size_t reap_completions(Func &&process_func) noexcept {
407 unsigned head;
408 io_uring_cqe *cqe;
409 size_t reaped = 0;
410
411 if (io_uring_peek_cqe(&ring_, &cqe) == 0) {
412 io_uring_for_each_cqe(&ring_, head, cqe) {
413 process_func(cqe);
414#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
415 reaped += io_uring_cqe_nr(cqe);
416#else
417 reaped++;
418#endif
419 }
420 io_uring_cq_advance(&ring_, reaped);
421 }
422
423 return reaped;
424 }
425
426 void reserve_space(size_t n) noexcept {
427 size_t space_left;
428 do {
429 space_left = io_uring_sq_space_left(&ring_);
430 if (space_left >= n) {
431 return;
432 }
433 submit();
434 } while (true);
435 }
436
437 io_uring *ring() noexcept { return &ring_; }
438
439 FdTable &fd_table() noexcept { return fd_table_; }
440
441 BufferTable &buffer_table() noexcept { return buffer_table_; }
442
443 RingSettings &settings() noexcept { return settings_; }
444
445 io_uring_sqe *get_sqe() noexcept { return get_sqe_<io_uring_get_sqe>(); }
446
447#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
448 io_uring_sqe *get_sqe128() noexcept {
449 if (ring_.flags & (IORING_SETUP_SQE128 | IORING_SETUP_SQE_MIXED))
450 [[likely]] {
451 return get_sqe_<io_uring_get_sqe128>();
452 } else {
453 panic_on("SQE128 is not enabled for this io_uring ring");
454 }
455 }
456#endif
457
458private:
459 template <io_uring_sqe *(*get_sqe)(struct io_uring *)>
460 io_uring_sqe *get_sqe_() noexcept {
461 [[maybe_unused]] int r;
462 io_uring_sqe *sqe;
463 do {
464 sqe = get_sqe(&ring_);
465 if (sqe) {
466 break;
467 }
468 r = io_uring_submit(&ring_);
469 assert(r >= 0);
470 if (sqpoll_mode_) {
471 r = io_uring_sqring_wait(&ring_);
472 assert(r >= 0);
473 }
474 } while (true);
475 return sqe;
476 }
477
478private:
479 bool initialized_ = false;
480 io_uring ring_;
481 bool sqpoll_mode_ = false;
482
483 FdTable fd_table_{ring_};
484 BufferTable buffer_table_{ring_};
485 RingSettings settings_{ring_};
486};
487
488} // namespace condy
int clone_buffers(BufferTable &src, unsigned int dst_off=0, unsigned int src_off=0, unsigned int nr=0) noexcept
Clone buffers from another BufferTable into this one.
Definition ring.hpp:187
int update(unsigned index_base, const iovec *vecs, unsigned nr_vecs) noexcept
Update the buffer table starting from the given index.
Definition ring.hpp:171
int init(size_t capacity) noexcept
Initialize the buffer table with the given capacity.
Definition ring.hpp:129
int destroy() noexcept
Destroy the buffer table.
Definition ring.hpp:158
int init(const iovec *vecs, unsigned nr_vecs)
Initialize the buffer table with the given array of iovec structures.
Definition ring.hpp:145
File descriptor table for io_uring.
Definition ring.hpp:24
void set_fd_accepter(Func &&accepter)
Set the accepter function for incoming file descriptors.
Definition ring.hpp:86
int init(size_t capacity) noexcept
Initialize the file descriptor table with the given capacity.
Definition ring.hpp:39
int destroy() noexcept
Destroy the file descriptor table.
Definition ring.hpp:58
int init(const int *fds, unsigned nr_fds)
Initialize the file descriptor table with the given array of file descriptors.
Definition ring.hpp:50
friend auto async_fixed_fd_send(FdTable &dst, int source_fd, int target_fd, unsigned int flags)
See io_uring_prep_msg_ring_fd.
int set_file_alloc_range(unsigned offset, unsigned size) noexcept
Set the file allocation range for the fd table.
Definition ring.hpp:96
int update(unsigned index_base, const int *fds, unsigned nr_fds) noexcept
Update the file descriptor table starting from the given index.
Definition ring.hpp:68
int set_rings_size(io_uring_params *params) noexcept
Resize the rings of the io_uring instance.
Definition ring.hpp:313
int apply_napi(io_uring_napi *napi) noexcept
Apply NAPI settings to the io_uring instance.
Definition ring.hpp:284
io_uring_probe * get_probe() noexcept
Get the io_uring probe for the ring.
Definition ring.hpp:264
uint32_t get_features() const noexcept
Get the supported features of the ring.
Definition ring.hpp:276
int set_iowq_max_workers(unsigned int *values) noexcept
Set the maximum number of I/O workers.
Definition ring.hpp:255
int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask) noexcept
Apply I/O worker queue affinity settings.
Definition ring.hpp:238
int remove_iowq_aff() noexcept
Remove I/O worker queue affinity settings.
Definition ring.hpp:245
int set_clock(io_uring_clock_register *clock_reg) noexcept
Set the clock registration for the io_uring instance.
Definition ring.hpp:302
int set_iowait(bool enable_iowait) noexcept
Enable or disable iowait for the io_uring instance.
Definition ring.hpp:324
int remove_napi(io_uring_napi *napi=nullptr) noexcept
Remove NAPI settings from the io_uring instance.
Definition ring.hpp:291
The event loop runtime for executing asynchronous.
Definition runtime.hpp:68
The main namespace for the Condy library.
Definition condy.hpp:28
Internal utility classes and functions used by Condy.