Condy v1.3.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
ring.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/condy_uring.hpp"
11#include "condy/utils.hpp"
12#include <cassert>
13#include <cerrno>
14#include <cstddef>
15#include <cstring>
16
17namespace condy {
18
24class FdTable {
25public:
26 FdTable(io_uring &ring) : ring_(ring) {}
27
28 FdTable(const FdTable &) = delete;
29 FdTable &operator=(const FdTable &) = delete;
30 FdTable(FdTable &&) = delete;
31 FdTable &operator=(FdTable &&) = delete;
32
33public:
39 int init(size_t capacity) {
40 return io_uring_register_files_sparse(&ring_, capacity);
41 }
42
47 int destroy() { return io_uring_unregister_files(&ring_); }
48
56 int update(unsigned index_base, const int *fds, unsigned nr_fds) {
57 return io_uring_register_files_update(&ring_, index_base, fds, nr_fds);
58 }
59
69 template <typename Func> void set_fd_accepter(Func &&accepter) {
70 fd_accepter_ = std::forward<Func>(accepter);
71 }
72
79 int set_file_alloc_range(unsigned offset, unsigned size) {
80 return io_uring_register_file_alloc_range(&ring_, offset, size);
81 }
82
83private:
84 std::function<void(int32_t)> fd_accepter_ = nullptr;
85 io_uring &ring_;
86
87 friend class Runtime;
88 friend auto async_fixed_fd_send(FdTable &dst, int source_fd, int target_fd,
89 unsigned int flags);
90};
91
97class BufferTable {
98public:
99 BufferTable(io_uring &ring) : ring_(ring) {}
100
101 BufferTable(const BufferTable &) = delete;
102 BufferTable &operator=(const BufferTable &) = delete;
103 BufferTable(BufferTable &&) = delete;
104 BufferTable &operator=(BufferTable &&) = delete;
105
106public:
112 int init(size_t capacity) {
113 int r = io_uring_register_buffers_sparse(&ring_, capacity);
114 if (r < 0) {
115 return r;
116 }
117 initialized_ = true;
118 return r;
119 }
120
125 int destroy() {
126 initialized_ = false;
127 return io_uring_unregister_buffers(&ring_);
128 }
129
137 int update(unsigned index_base, const iovec *vecs, unsigned nr_vecs) {
138 return io_uring_register_buffers_update_tag(&ring_, index_base, vecs,
139 nullptr, nr_vecs);
140 }
141
142#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
143
152 int clone_buffers(BufferTable &src, unsigned int dst_off = 0,
153 unsigned int src_off = 0, unsigned int nr = 0) {
154 auto *src_ring = &src.ring_;
155 auto *dst_ring = &ring_;
156 unsigned int flags = 0;
157 if (initialized_) {
158 flags |= IORING_REGISTER_DST_REPLACE;
159 }
160 int r = __io_uring_clone_buffers_offset(dst_ring, src_ring, dst_off,
161 src_off, nr, flags);
162 if (r < 0) {
163 return r;
164 }
165 initialized_ = true;
166 return r;
167 }
168#endif
169
170private:
171 io_uring &ring_;
172 bool initialized_ = false;
173};
174
180class RingSettings {
181public:
182 RingSettings(io_uring &ring) : ring_(ring) {}
183
184 ~RingSettings() {
185 if (probe_) {
186 io_uring_free_probe(probe_);
187 probe_ = nullptr;
188 }
189 }
190
191 RingSettings(const RingSettings &) = delete;
192 RingSettings &operator=(const RingSettings &) = delete;
193 RingSettings(RingSettings &&) = delete;
194 RingSettings &operator=(RingSettings &&) = delete;
195
196public:
203 int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask) {
204 return io_uring_register_iowq_aff(&ring_, cpusz, mask);
205 }
206
210 int remove_iowq_aff() { return io_uring_unregister_iowq_aff(&ring_); }
211
218 int set_iowq_max_workers(unsigned int *values) {
219 return io_uring_register_iowq_max_workers(&ring_, values);
220 }
221
227 io_uring_probe *get_probe() {
228 if (probe_) {
229 return probe_;
230 }
231 probe_ = io_uring_get_probe_ring(&ring_);
232 return probe_;
233 }
234
239 uint32_t get_features() const { return features_; }
240
241#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6
247 int apply_napi(io_uring_napi *napi) {
248 return io_uring_register_napi(&ring_, napi);
249 }
250
254 int remove_napi(io_uring_napi *napi = nullptr) {
255 return io_uring_unregister_napi(&ring_, napi);
256 }
257#endif
258
259#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
265 int set_clock(io_uring_clock_register *clock_reg) {
266 return io_uring_register_clock(&ring_, clock_reg);
267 }
268#endif
269
270#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
276 int set_rings_size(io_uring_params *params) {
277 return io_uring_resize_rings(&ring_, params);
278 }
279#endif
280
281#if !IO_URING_CHECK_VERSION(2, 10) // >= 2.10
287 int set_iowait(bool enable_iowait) {
288 return io_uring_set_iowait(&ring_, enable_iowait);
289 }
290#endif
291
292private:
293 io_uring &ring_;
294 io_uring_probe *probe_ = nullptr;
295 uint32_t features_ = 0;
296
297 friend class Ring;
298};
299
300class Ring {
301public:
302 Ring() = default;
303 ~Ring() { destroy(); }
304
305 Ring(const Ring &) = delete;
306 Ring &operator=(const Ring &) = delete;
307 Ring(Ring &&) = delete;
308 Ring &operator=(Ring &&) = delete;
309
310public:
311 int init(unsigned int entries, io_uring_params *params,
312 [[maybe_unused]] void *buf = nullptr,
313 [[maybe_unused]] size_t buf_size = 0) {
314 int r;
315 assert(!initialized_);
316#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
317 if (params->flags & IORING_SETUP_NO_MMAP) {
318 r = io_uring_queue_init_mem(entries, &ring_, params, buf, buf_size);
319 } else
320#endif
321 r = io_uring_queue_init_params(entries, &ring_, params);
322 if (r < 0) {
323 return r;
324 }
325 settings_.features_ = params->features;
326 sqpoll_mode_ = (params->flags & IORING_SETUP_SQPOLL) != 0;
327 initialized_ = true;
328 return r;
329 }
330
331 void destroy() {
332 if (initialized_) {
333 io_uring_queue_exit(&ring_);
334 initialized_ = false;
335 }
336 }
337
338 void submit() { io_uring_submit(&ring_); }
339
340 template <typename Func> size_t reap_completions_wait(Func &&process_func) {
341 unsigned head;
342 io_uring_cqe *cqe;
343 size_t reaped = 0;
344 do {
345 int r = io_uring_submit_and_wait(&ring_, 1);
346 if (r >= 0) [[likely]] {
347 break;
348 } else if (r == -EINTR) {
349 continue;
350 } else {
351 throw make_system_error("io_uring_submit_and_wait", -r);
352 }
353 } while (true);
354
355 io_uring_for_each_cqe(&ring_, head, cqe) {
356 process_func(cqe);
357#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
358 reaped += io_uring_cqe_nr(cqe);
359#else
360 reaped++;
361#endif
362 }
363 io_uring_cq_advance(&ring_, reaped);
364 return reaped;
365 }
366
367 template <typename Func> size_t reap_completions(Func &&process_func) {
368 unsigned head;
369 io_uring_cqe *cqe;
370 size_t reaped = 0;
371
372 if (io_uring_peek_cqe(&ring_, &cqe) == 0) {
373 io_uring_for_each_cqe(&ring_, head, cqe) {
374 process_func(cqe);
375#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
376 reaped += io_uring_cqe_nr(cqe);
377#else
378 reaped++;
379#endif
380 }
381 io_uring_cq_advance(&ring_, reaped);
382 }
383
384 return reaped;
385 }
386
387 void reserve_space(size_t n) {
388 size_t space_left;
389 do {
390 space_left = io_uring_sq_space_left(&ring_);
391 if (space_left >= n) {
392 return;
393 }
394 submit();
395 } while (true);
396 }
397
398 io_uring *ring() { return &ring_; }
399
400 FdTable &fd_table() { return fd_table_; }
401
402 BufferTable &buffer_table() { return buffer_table_; }
403
404 RingSettings &settings() { return settings_; }
405
406 io_uring_sqe *get_sqe() { return get_sqe_<io_uring_get_sqe>(); }
407
408#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
409 io_uring_sqe *get_sqe128() {
410 if (ring_.flags & (IORING_SETUP_SQE128 | IORING_SETUP_SQE_MIXED))
411 [[likely]] {
412 return get_sqe_<io_uring_get_sqe128>();
413 } else {
414 panic_on("SQE128 is not enabled for this io_uring ring");
415 }
416 }
417#endif
418
419private:
420 template <io_uring_sqe *(*get_sqe)(struct io_uring *)>
421 io_uring_sqe *get_sqe_() {
422 [[maybe_unused]] int r;
423 io_uring_sqe *sqe;
424 do {
425 sqe = get_sqe(&ring_);
426 if (sqe) {
427 break;
428 }
429 r = io_uring_submit(&ring_);
430 assert(r >= 0);
431 if (sqpoll_mode_) {
432 r = io_uring_sqring_wait(&ring_);
433 assert(r >= 0);
434 }
435 } while (true);
436 return sqe;
437 }
438
439private:
440 bool initialized_ = false;
441 io_uring ring_;
442 bool sqpoll_mode_ = false;
443
444 FdTable fd_table_{ring_};
445 BufferTable buffer_table_{ring_};
446 RingSettings settings_{ring_};
447};
448
449} // namespace condy
int destroy()
Destroy the buffer table.
Definition ring.hpp:125
int update(unsigned index_base, const iovec *vecs, unsigned nr_vecs)
Update the buffer table starting from the given index.
Definition ring.hpp:137
int init(size_t capacity)
Initialize the buffer table with the given capacity.
Definition ring.hpp:112
int clone_buffers(BufferTable &src, unsigned int dst_off=0, unsigned int src_off=0, unsigned int nr=0)
Clone buffers from another BufferTable into this one.
Definition ring.hpp:152
File descriptor table for io_uring.
Definition ring.hpp:24
void set_fd_accepter(Func &&accepter)
Set the accepter function for incoming file descriptors.
Definition ring.hpp:69
int update(unsigned index_base, const int *fds, unsigned nr_fds)
Update the file descriptor table starting from the given index.
Definition ring.hpp:56
int init(size_t capacity)
Initialize the file descriptor table with the given capacity.
Definition ring.hpp:39
int destroy()
Destroy the file descriptor table.
Definition ring.hpp:47
int set_file_alloc_range(unsigned offset, unsigned size)
Set the file allocation range for the fd table.
Definition ring.hpp:79
friend auto async_fixed_fd_send(FdTable &dst, int source_fd, int target_fd, unsigned int flags)
See io_uring_prep_msg_ring_fd.
int set_clock(io_uring_clock_register *clock_reg)
Set the clock registration for the io_uring instance.
Definition ring.hpp:265
uint32_t get_features() const
Get the supported features of the ring.
Definition ring.hpp:239
int remove_iowq_aff()
Remove I/O worker queue affinity settings.
Definition ring.hpp:210
int set_iowait(bool enable_iowait)
Enable or disable iowait for the io_uring instance.
Definition ring.hpp:287
int apply_iowq_aff(size_t cpusz, const cpu_set_t *mask)
Apply I/O worker queue affinity settings.
Definition ring.hpp:203
io_uring_probe * get_probe()
Get the io_uring probe for the ring.
Definition ring.hpp:227
int set_iowq_max_workers(unsigned int *values)
Set the maximum number of I/O workers.
Definition ring.hpp:218
int remove_napi(io_uring_napi *napi=nullptr)
Remove NAPI settings from the io_uring instance.
Definition ring.hpp:254
int apply_napi(io_uring_napi *napi)
Apply NAPI settings to the io_uring instance.
Definition ring.hpp:247
int set_rings_size(io_uring_params *params)
Resize the rings of the io_uring instance.
Definition ring.hpp:276
The event loop runtime for executing asynchronous.
Definition runtime.hpp:76
The main namespace for the Condy library.
Definition condy.hpp:28
Internal utility classes and functions used by Condy.