Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
ring.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/condy_uring.hpp"
11#include <cassert>
12#include <cerrno>
13#include <cstddef>
14#include <cstring>
15
16namespace condy {
17
23class FdTable {
24public:
25 FdTable(io_uring &ring) : ring_(ring) {}
26
27 FdTable(const FdTable &) = delete;
28 FdTable &operator=(const FdTable &) = delete;
29 FdTable(FdTable &&) = delete;
30 FdTable &operator=(FdTable &&) = delete;
31
32public:
38 int init(size_t capacity) noexcept {
39 return io_uring_register_files_sparse(&ring_, capacity);
40 }
41
49 int init(const int *fds, unsigned nr_fds) {
50 return io_uring_register_files(&ring_, fds, nr_fds);
51 }
52
57 int destroy() noexcept { return io_uring_unregister_files(&ring_); }
58
67 int update(unsigned index_base, const int *fds, unsigned nr_fds) noexcept {
68 return io_uring_register_files_update(&ring_, index_base, fds, nr_fds);
69 }
70
77 int set_file_alloc_range(unsigned offset, unsigned size) noexcept {
78 return io_uring_register_file_alloc_range(&ring_, offset, size);
79 }
80
81private:
82 io_uring &ring_;
83
84 friend class Runtime;
85};
86
92class BufferTable {
93public:
94 BufferTable(io_uring &ring) : ring_(ring) {}
95
96 BufferTable(const BufferTable &) = delete;
97 BufferTable &operator=(const BufferTable &) = delete;
98 BufferTable(BufferTable &&) = delete;
99 BufferTable &operator=(BufferTable &&) = delete;
100
101public:
107 int init(size_t capacity) noexcept {
108 int r = io_uring_register_buffers_sparse(&ring_, capacity);
109 if (r < 0) {
110 return r;
111 }
112 initialized_ = true;
113 return r;
114 }
115
123 int init(const iovec *vecs, unsigned nr_vecs) {
124 int r = io_uring_register_buffers(&ring_, vecs, nr_vecs);
125 if (r < 0) {
126 return r;
127 }
128 initialized_ = true;
129 return r;
130 }
131
136 int destroy() noexcept {
137 initialized_ = false;
138 return io_uring_unregister_buffers(&ring_);
139 }
140
149 int update(unsigned index_base, const iovec *vecs,
150 unsigned nr_vecs) noexcept {
151 return io_uring_register_buffers_update_tag(&ring_, index_base, vecs,
152 nullptr, nr_vecs);
153 }
154
155#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
156
165 int clone_buffers(BufferTable &src, unsigned int dst_off = 0,
166 unsigned int src_off = 0, unsigned int nr = 0) noexcept {
167 auto *src_ring = &src.ring_;
168 auto *dst_ring = &ring_;
169 unsigned int flags = 0;
170 if (initialized_) {
171 flags |= IORING_REGISTER_DST_REPLACE;
172 }
173 int r = __io_uring_clone_buffers_offset(dst_ring, src_ring, dst_off,
174 src_off, nr, flags);
175 if (r < 0) {
176 return r;
177 }
178 initialized_ = true;
179 return r;
180 }
181#endif
182
183private:
184 io_uring &ring_;
185 bool initialized_ = false;
186};
187
193class RingSettings {
194public:
195 RingSettings(io_uring &ring) : ring_(ring) {}
196
197 ~RingSettings() {
198 if (probe_) {
199 io_uring_free_probe(probe_);
200 probe_ = nullptr;
201 }
202 }
203
204 RingSettings(const RingSettings &) = delete;
205 RingSettings &operator=(const RingSettings &) = delete;
206 RingSettings(RingSettings &&) = delete;
207 RingSettings &operator=(RingSettings &&) = delete;
208
209public:
216 int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask) noexcept {
217 return io_uring_register_iowq_aff(&ring_, cpusz, mask);
218 }
219
223 int remove_iowq_aff() noexcept {
224 return io_uring_unregister_iowq_aff(&ring_);
225 }
226
233 int set_iowq_max_workers(unsigned int *values) noexcept {
234 return io_uring_register_iowq_max_workers(&ring_, values);
235 }
236
242 io_uring_probe *get_probe() noexcept {
243 if (probe_) {
244 return probe_;
245 }
246 probe_ = io_uring_get_probe_ring(&ring_);
247 return probe_;
248 }
249
254 uint32_t get_features() const noexcept { return features_; }
255
256#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
262 int apply_napi(io_uring_napi *napi) noexcept {
263 return io_uring_register_napi(&ring_, napi);
264 }
265
269 int remove_napi(io_uring_napi *napi = nullptr) noexcept {
270 return io_uring_unregister_napi(&ring_, napi);
271 }
272#endif
273
274#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
280 int set_clock(io_uring_clock_register *clock_reg) noexcept {
281 return io_uring_register_clock(&ring_, clock_reg);
282 }
283#endif
284
285#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
291 int set_rings_size(io_uring_params *params) noexcept {
292 return io_uring_resize_rings(&ring_, params);
293 }
294#endif
295
296#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
302 int set_iowait(bool enable_iowait) noexcept {
303 return io_uring_set_iowait(&ring_, enable_iowait);
304 }
305#endif
306
307private:
308 io_uring &ring_;
309 io_uring_probe *probe_ = nullptr;
310 uint32_t features_ = 0;
311
312 friend class Ring;
313};
314
315class Ring {
316public:
317 Ring() = default;
318 ~Ring() { destroy(); }
319
320 Ring(const Ring &) = delete;
321 Ring &operator=(const Ring &) = delete;
322 Ring(Ring &&) = delete;
323 Ring &operator=(Ring &&) = delete;
324
325public:
326 int init(unsigned int entries, io_uring_params *params,
327 [[maybe_unused]] void *buf = nullptr,
328 [[maybe_unused]] size_t buf_size = 0) noexcept {
329 int r;
330 if (initialized_) {
331 return -EBUSY;
332 }
333#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
334 if (params->flags & IORING_SETUP_NO_MMAP) {
335 r = io_uring_queue_init_mem(entries, &ring_, params, buf, buf_size);
336 } else
337#endif
338 r = io_uring_queue_init_params(entries, &ring_, params);
339 if (r < 0) {
340 return r;
341 }
342 settings_.features_ = params->features;
343 sqpoll_mode_ = (params->flags & IORING_SETUP_SQPOLL) != 0;
344 initialized_ = true;
345 return r;
346 }
347
348 void destroy() noexcept {
349 if (initialized_) {
350 io_uring_queue_exit(&ring_);
351 initialized_ = false;
352 }
353 }
354
355 void submit() noexcept { io_uring_submit(&ring_); }
356
357 template <typename Func>
358 ssize_t reap_completions_wait(Func &&process_func) noexcept {
359 unsigned head;
360 io_uring_cqe *cqe;
361 ssize_t reaped = 0;
362 do {
363 int r = io_uring_submit_and_wait(&ring_, 1);
364 if (r >= 0) [[likely]] {
365 break;
366 } else if (r == -EINTR) {
367 continue;
368 } else {
369 return r;
370 }
371 } while (true);
372
373 io_uring_for_each_cqe(&ring_, head, cqe) {
374 process_func(cqe);
375#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
376 reaped += io_uring_cqe_nr(cqe);
377#else
378 reaped++;
379#endif
380 }
381 io_uring_cq_advance(&ring_, reaped);
382 return reaped;
383 }
384
385 template <typename Func>
386 ssize_t reap_completions(Func &&process_func) noexcept {
387 io_uring_cqe *cqe;
388 int r = io_uring_peek_cqe(&ring_, &cqe);
389 if (r == -EAGAIN) {
390 return 0;
391 } else if (r < 0) {
392 return r;
393 }
394
395 unsigned head;
396 ssize_t reaped = 0;
397 io_uring_for_each_cqe(&ring_, head, cqe) {
398 process_func(cqe);
399#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
400 reaped += io_uring_cqe_nr(cqe);
401#else
402 reaped++;
403#endif
404 }
405 io_uring_cq_advance(&ring_, reaped);
406 return reaped;
407 }
408
409 void reserve_space(size_t n) noexcept {
410 size_t space_left;
411 do {
412 space_left = io_uring_sq_space_left(&ring_);
413 if (space_left >= n) {
414 return;
415 }
416 submit();
417 } while (true);
418 }
419
420 io_uring *ring() noexcept { return &ring_; }
421
422 FdTable &fd_table() noexcept { return fd_table_; }
423
424 BufferTable &buffer_table() noexcept { return buffer_table_; }
425
426 RingSettings &settings() noexcept { return settings_; }
427
428 io_uring_sqe *get_sqe() noexcept { return get_sqe_<io_uring_get_sqe>(); }
429
430#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
431 io_uring_sqe *get_sqe128() noexcept {
432 if (ring_.flags & (IORING_SETUP_SQE128 | IORING_SETUP_SQE_MIXED))
433 [[likely]] {
434 return get_sqe_<io_uring_get_sqe128>();
435 }
436 return nullptr;
437 }
438#endif
439
440private:
441 template <io_uring_sqe *(*get_sqe)(struct io_uring *)>
442 io_uring_sqe *get_sqe_() noexcept {
443 [[maybe_unused]] int r;
444 io_uring_sqe *sqe;
445 do {
446 sqe = get_sqe(&ring_);
447 if (sqe) {
448 break;
449 }
450 r = io_uring_submit(&ring_);
451 assert(r >= 0);
452 if (sqpoll_mode_) {
453 r = io_uring_sqring_wait(&ring_);
454 assert(r >= 0);
455 }
456 } while (true);
457 return sqe;
458 }
459
460private:
461 bool initialized_ = false;
462 io_uring ring_;
463 bool sqpoll_mode_ = false;
464
465 FdTable fd_table_{ring_};
466 BufferTable buffer_table_{ring_};
467 RingSettings settings_{ring_};
468};
469
470} // namespace condy
int clone_buffers(BufferTable &src, unsigned int dst_off=0, unsigned int src_off=0, unsigned int nr=0) noexcept
Clone buffers from another BufferTable into this one.
Definition ring.hpp:165
int update(unsigned index_base, const iovec *vecs, unsigned nr_vecs) noexcept
Update the buffer table starting from the given index.
Definition ring.hpp:149
int init(size_t capacity) noexcept
Initialize the buffer table with the given capacity.
Definition ring.hpp:107
int destroy() noexcept
Destroy the buffer table.
Definition ring.hpp:136
int init(const iovec *vecs, unsigned nr_vecs)
Initialize the buffer table with the given array of iovec structures.
Definition ring.hpp:123
int init(size_t capacity) noexcept
Initialize the file descriptor table with the given capacity.
Definition ring.hpp:38
int destroy() noexcept
Destroy the file descriptor table.
Definition ring.hpp:57
int init(const int *fds, unsigned nr_fds)
Initialize the file descriptor table with the given array of file descriptors.
Definition ring.hpp:49
int set_file_alloc_range(unsigned offset, unsigned size) noexcept
Set the file allocation range for the fd table.
Definition ring.hpp:77
int update(unsigned index_base, const int *fds, unsigned nr_fds) noexcept
Update the file descriptor table starting from the given index.
Definition ring.hpp:67
int set_rings_size(io_uring_params *params) noexcept
Resize the rings of the io_uring instance.
Definition ring.hpp:291
int apply_napi(io_uring_napi *napi) noexcept
Apply NAPI settings to the io_uring instance.
Definition ring.hpp:262
io_uring_probe * get_probe() noexcept
Get the io_uring probe for the ring.
Definition ring.hpp:242
uint32_t get_features() const noexcept
Get the supported features of the ring.
Definition ring.hpp:254
int set_iowq_max_workers(unsigned int *values) noexcept
Set the maximum number of I/O workers.
Definition ring.hpp:233
int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask) noexcept
Apply I/O worker queue affinity settings.
Definition ring.hpp:216
int remove_iowq_aff() noexcept
Remove I/O worker queue affinity settings.
Definition ring.hpp:223
int set_clock(io_uring_clock_register *clock_reg) noexcept
Set the clock registration for the io_uring instance.
Definition ring.hpp:280
int set_iowait(bool enable_iowait) noexcept
Enable or disable iowait for the io_uring instance.
Definition ring.hpp:302
int remove_napi(io_uring_napi *napi=nullptr) noexcept
Remove NAPI settings from the io_uring instance.
Definition ring.hpp:269
The event loop runtime for executing asynchronous.
Definition runtime.hpp:91
The main namespace for the Condy library.
Definition condy.hpp:30