Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
provided_buffers.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/buffers.hpp"
11#include "condy/concepts.hpp"
12#include "condy/condy_uring.hpp"
13#include "condy/context.hpp"
14#include "condy/ring.hpp"
15#include "condy/utils.hpp"
16#include <bit>
17#include <cstddef>
18#include <cstdint>
19#include <stdexcept>
20#include <sys/mman.h>
21#include <sys/types.h>
22
23namespace condy {
24
34struct BufferInfo {
38 uint16_t bid;
42 uint16_t num_buffers;
43};
44
45namespace detail {
46
47class BundledProvidedBufferQueue {
48public:
49 BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags)
50 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0) {
51 auto &context = detail::Context::current();
52
53 size_t data_size = capacity_ * sizeof(io_uring_buf);
54 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
55 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
56 if (data == MAP_FAILED) [[unlikely]] {
57 throw make_system_error("mmap");
58 }
59 auto d1 = defer([&]() { munmap(data, data_size); });
60
61 bgid_ = context.next_bgid();
62 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
63
64 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
65 io_uring_buf_ring_init(br_);
66
67 io_uring_buf_reg reg = {};
68 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
69 reg.ring_entries = capacity_;
70 reg.bgid = bgid_;
71 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
72 if (r != 0) [[unlikely]] {
73 throw make_system_error("io_uring_register_buf_ring", -r);
74 }
75
76 d1.dismiss();
77 d2.dismiss();
78 }
79
80 ~BundledProvidedBufferQueue() {
81 assert(br_ != nullptr);
82 size_t data_size = capacity_ * sizeof(io_uring_buf);
83 munmap(br_, data_size);
84 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
85 detail::Context::current().ring()->ring(), bgid_);
86 assert(r == 0);
87 if (r == 0) {
88 detail::Context::current().recycle_bgid(bgid_);
89 }
90 }
91
92 BundledProvidedBufferQueue(const BundledProvidedBufferQueue &) = delete;
93 BundledProvidedBufferQueue &
94 operator=(const BundledProvidedBufferQueue &) = delete;
95 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) = delete;
96 BundledProvidedBufferQueue &
97 operator=(BundledProvidedBufferQueue &&) = delete;
98
99public:
103 size_t size() const noexcept { return size_; }
104
108 size_t capacity() const noexcept { return capacity_; }
109
120 template <BufferLike Buffer> uint16_t push(const Buffer &buffer) {
121 if (size_ >= capacity_) [[unlikely]] {
122 throw std::logic_error("Capacity exceeded");
123 }
124
125 auto mask = io_uring_buf_ring_mask(capacity_);
126 uint16_t bid = br_->tail & mask;
127 io_uring_buf_ring_add(br_, buffer.data(), buffer.size(), bid, mask, 0);
128 buf_lens_[bid] = buffer.size();
129 io_uring_buf_ring_advance(br_, 1);
130 size_++;
131
132 return bid;
133 }
134
135public:
136 uint16_t bgid() const noexcept { return bgid_; }
137
138 BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
139 assert(cqe != nullptr);
140 int32_t res = cqe->res;
141 uint32_t flags = cqe->flags;
142
143 if (!(flags & IORING_CQE_F_BUFFER)) {
144 return BufferInfo{0, 0};
145 }
146
147 assert(res > 0);
148
149 BufferInfo result = {
150 .bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
151 .num_buffers = 0,
152 };
153
154#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
155 if (flags & IORING_CQE_F_BUF_MORE) {
156 assert(buf_lens_[result.bid] > static_cast<uint32_t>(res));
157 buf_lens_[result.bid] -= res;
158 return result;
159 }
160#endif
161
162 auto mask = io_uring_buf_ring_mask(capacity_);
163 uint16_t curr_bid = result.bid;
164 int64_t bytes = res;
165 while (bytes > 0) {
166 uint32_t buf_len = std::exchange(buf_lens_[curr_bid], 0);
167 assert(buf_len > 0);
168 bytes -= buf_len;
169 result.num_buffers++;
170 curr_bid = (curr_bid + 1) & mask;
171 }
172 assert(size_ >= result.num_buffers);
173 size_ -= result.num_buffers;
174
175 return result;
176 }
177
178private:
179 io_uring_buf_ring *br_ = nullptr;
180 uint32_t size_ = 0;
181 uint32_t capacity_;
182 uint16_t bgid_;
183 std::vector<uint32_t> buf_lens_;
184};
185
186} // namespace detail
187
199class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue {
200public:
207 ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
208 : BundledProvidedBufferQueue(capacity, flags) {}
209
210 BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
211 assert(cqe != nullptr);
212 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
213 assert(result.num_buffers <= 1);
214 return result;
215 }
216};
217
218namespace detail {
219class BundledProvidedBufferPool;
220}
221
230struct ProvidedBuffer : public BufferBase {
231public:
232 ProvidedBuffer() = default;
233 ProvidedBuffer(void *data, size_t size,
234 detail::BundledProvidedBufferPool *pool)
235 : data_(data), size_(size), pool_(pool) {}
236 ProvidedBuffer(ProvidedBuffer &&other) noexcept
237 : data_(std::exchange(other.data_, nullptr)),
238 size_(std::exchange(other.size_, 0)),
239 pool_(std::exchange(other.pool_, nullptr)) {}
240 ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
241 if (this != &other) {
242 reset();
243 data_ = std::exchange(other.data_, nullptr);
244 size_ = std::exchange(other.size_, 0);
245 pool_ = std::exchange(other.pool_, nullptr);
246 }
247 return *this;
248 }
249
250 ~ProvidedBuffer() { reset(); }
251
252 ProvidedBuffer(const ProvidedBuffer &) = delete;
253 ProvidedBuffer &operator=(const ProvidedBuffer &) = delete;
254
255public:
259 void *data() const noexcept { return data_; }
260
264 size_t size() const noexcept { return size_; }
265
269 void reset() noexcept;
270
274 bool owns_buffer() const noexcept { return pool_ != nullptr; }
275
276private:
277 void *data_ = nullptr;
278 size_t size_ = 0;
279 detail::BundledProvidedBufferPool *pool_ = nullptr;
280};
281
282namespace detail {
283
284class BundledProvidedBufferPool {
285public:
286 BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
287 unsigned int flags)
288 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
289 auto &context = detail::Context::current();
290
291 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size);
292 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
293 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
294 if (data == MAP_FAILED) [[unlikely]] {
295 throw make_system_error("mmap");
296 }
297 auto d1 = defer([&]() { munmap(data, data_size); });
298
299 bgid_ = context.next_bgid();
300 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
301
302 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
303 io_uring_buf_ring_init(br_);
304
305 io_uring_buf_reg reg = {};
306 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
307 reg.ring_entries = num_buffers_;
308 reg.bgid = bgid_;
309 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
310 if (r != 0) [[unlikely]] {
311 throw make_system_error("io_uring_register_buf_ring", -r);
312 }
313
314 char *buffer_base =
315 static_cast<char *>(data) + sizeof(io_uring_buf) * num_buffers_;
316 auto mask = io_uring_buf_ring_mask(num_buffers_);
317 for (size_t bid = 0; bid < num_buffers_; bid++) {
318 char *ptr = buffer_base + bid * buffer_size;
319 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
320 static_cast<int>(bid));
321 }
322 io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));
323
324 d1.dismiss();
325 d2.dismiss();
326 }
327
328 ~BundledProvidedBufferPool() {
329 assert(br_ != nullptr);
330 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_);
331 munmap(br_, data_size);
332 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
333 detail::Context::current().ring()->ring(), bgid_);
334 assert(r == 0);
335 if (r == 0) {
336 detail::Context::current().recycle_bgid(bgid_);
337 }
338 }
339
340 BundledProvidedBufferPool(const BundledProvidedBufferPool &) = delete;
341 BundledProvidedBufferPool &
342 operator=(const BundledProvidedBufferPool &) = delete;
343 BundledProvidedBufferPool(BundledProvidedBufferPool &&) = delete;
344 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) = delete;
345
346public:
350 size_t capacity() const noexcept { return num_buffers_; }
351
355 size_t buffer_size() const noexcept { return buffer_size_; }
356
357public:
358 uint16_t bgid() const noexcept { return bgid_; }
359
360 std::vector<ProvidedBuffer> handle_finish(io_uring_cqe *cqe) noexcept {
361 assert(cqe != nullptr);
362 int32_t res = cqe->res;
363 uint32_t flags = cqe->flags;
364 std::vector<ProvidedBuffer> buffers;
365
366 if (!(flags & IORING_CQE_F_BUFFER)) {
367 return buffers;
368 }
369
370 assert(res > 0);
371
372 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
373
374#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
375 if (flags & IORING_CQE_F_BUF_MORE) {
376 char *data = get_buffer_(bid) + partial_size_;
377 buffers.emplace_back(data, res, nullptr);
378 partial_size_ += res;
379 assert(partial_size_ < buffer_size_);
380 return buffers;
381 }
382#endif
383 assert(bid == curr_io_uring_buf_()->bid);
384
385 int32_t bytes = res;
386 while (bytes > 0) {
387 auto *buf_ptr = curr_io_uring_buf_();
388 bid = buf_ptr->bid;
389 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
390 char *data = get_buffer_(bid) + partial_size_;
391 buffers.emplace_back(data, curr_buffer_size, this);
392 bytes -= static_cast<int32_t>(curr_buffer_size);
393 partial_size_ = 0;
394 advance_io_uring_buf_();
395 }
396
397 return buffers;
398 }
399
400 void add_buffer_back(void *ptr) noexcept {
401 char *base = get_buffers_base_();
402 assert(ptr >= base);
403 size_t offset = static_cast<char *>(ptr) - base;
404 size_t bid = offset / buffer_size_;
405 assert(bid < num_buffers_);
406 char *buffer_ptr = base + bid * buffer_size_;
407 auto mask = io_uring_buf_ring_mask(num_buffers_);
408 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
409 io_uring_buf_ring_advance(br_, 1);
410 }
411
412private:
413 char *get_buffer_(uint16_t bid) const noexcept {
414 return get_buffers_base_() + static_cast<size_t>(bid) * buffer_size_;
415 }
416
417 char *get_buffers_base_() const noexcept {
418 return reinterpret_cast<char *>(br_) +
419 sizeof(io_uring_buf) * num_buffers_;
420 }
421
422 io_uring_buf *curr_io_uring_buf_() noexcept {
423 auto mask = io_uring_buf_ring_mask(num_buffers_);
424 return &br_->bufs[br_head_ & mask];
425 }
426
427 void advance_io_uring_buf_() noexcept { br_head_++; }
428
429private:
430 io_uring_buf_ring *br_ = nullptr;
431 uint32_t num_buffers_;
432 uint32_t buffer_size_;
433 uint32_t partial_size_ = 0;
434 uint16_t bgid_;
435 uint16_t br_head_ = 0;
436};
437
438} // namespace detail
439
440inline void ProvidedBuffer::reset() noexcept {
441 if (pool_ != nullptr) {
442 pool_->add_buffer_back(data_);
443 }
444 data_ = nullptr;
445 size_ = 0;
446 pool_ = nullptr;
447}
448
461class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
462public:
470 ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
471 unsigned int flags = 0)
472 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
473
474public:
475 ProvidedBuffer handle_finish(io_uring_cqe *cqe) noexcept {
476 assert(cqe != nullptr);
477 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
478 if (buffers.empty()) {
479 return ProvidedBuffer();
480 }
481 assert(buffers.size() == 1);
482 return std::move(buffers[0]);
483 }
484};
485
496 return static_cast<detail::BundledProvidedBufferPool &>(buffer);
497}
498
506 return static_cast<detail::BundledProvidedBufferQueue &>(buffer);
507}
508
509} // namespace condy
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
Definition condy.hpp:30
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
Definition buffers.hpp:85
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:92
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.