Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
provided_buffers.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/buffers.hpp"
11#include "condy/concepts.hpp"
12#include "condy/condy_uring.hpp"
13#include "condy/context.hpp"
14#include "condy/ring.hpp"
15#include "condy/utils.hpp"
16#include <bit>
17#include <cstddef>
18#include <cstdint>
19#include <stdexcept>
20#include <sys/mman.h>
21#include <sys/types.h>
22
23namespace condy {
24
34struct BufferInfo {
38 uint16_t bid;
42 uint16_t num_buffers;
43};
44
45namespace detail {
46
47class BundledProvidedBufferQueue {
48public:
49 using ReturnType = BufferInfo;
50
51 BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags)
52 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0) {
53 auto &context = detail::Context::current();
54
55 size_t data_size = capacity_ * sizeof(io_uring_buf);
56 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
57 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
58 if (data == MAP_FAILED) [[unlikely]] {
59 throw make_system_error("mmap");
60 }
61 auto d1 = defer([&]() { munmap(data, data_size); });
62
63 bgid_ = context.next_bgid();
64 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
65
66 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
67 io_uring_buf_ring_init(br_);
68
69 io_uring_buf_reg reg = {};
70 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
71 reg.ring_entries = capacity_;
72 reg.bgid = bgid_;
73 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
74 if (r != 0) [[unlikely]] {
75 throw make_system_error("io_uring_register_buf_ring", -r);
76 }
77
78 d1.dismiss();
79 d2.dismiss();
80 }
81
82 ~BundledProvidedBufferQueue() {
83 assert(br_ != nullptr);
84 size_t data_size = capacity_ * sizeof(io_uring_buf);
85 munmap(br_, data_size);
86 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
87 detail::Context::current().ring()->ring(), bgid_);
88 assert(r == 0);
89 if (r == 0) {
90 detail::Context::current().recycle_bgid(bgid_);
91 }
92 }
93
94 BundledProvidedBufferQueue(const BundledProvidedBufferQueue &) = delete;
95 BundledProvidedBufferQueue &
96 operator=(const BundledProvidedBufferQueue &) = delete;
97 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) = delete;
98 BundledProvidedBufferQueue &
99 operator=(BundledProvidedBufferQueue &&) = delete;
100
101public:
105 size_t size() const noexcept { return size_; }
106
110 size_t capacity() const noexcept { return capacity_; }
111
122 template <BufferLike Buffer> uint16_t push(const Buffer &buffer) {
123 if (size_ >= capacity_) [[unlikely]] {
124 throw std::logic_error("Capacity exceeded");
125 }
126
127 auto mask = io_uring_buf_ring_mask(capacity_);
128 uint16_t bid = br_->tail & mask;
129 io_uring_buf_ring_add(br_, buffer.data(), buffer.size(), bid, mask, 0);
130 buf_lens_[bid] = buffer.size();
131 io_uring_buf_ring_advance(br_, 1);
132 size_++;
133
134 return bid;
135 }
136
137public:
138 uint16_t bgid() const noexcept { return bgid_; }
139
140 ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
141 assert(cqe != nullptr);
142 int32_t res = cqe->res;
143 uint32_t flags = cqe->flags;
144
145 if (!(flags & IORING_CQE_F_BUFFER)) {
146 return ReturnType{0, 0};
147 }
148
149 assert(res > 0);
150
151 ReturnType result = {
152 .bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
153 .num_buffers = 0,
154 };
155
156#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
157 if (flags & IORING_CQE_F_BUF_MORE) {
158 assert(buf_lens_[result.bid] > static_cast<uint32_t>(res));
159 buf_lens_[result.bid] -= res;
160 return result;
161 }
162#endif
163
164 auto mask = io_uring_buf_ring_mask(capacity_);
165 uint16_t curr_bid = result.bid;
166 int64_t bytes = res;
167 while (bytes > 0) {
168 uint32_t buf_len = std::exchange(buf_lens_[curr_bid], 0);
169 assert(buf_len > 0);
170 bytes -= buf_len;
171 result.num_buffers++;
172 curr_bid = (curr_bid + 1) & mask;
173 }
174 assert(size_ >= result.num_buffers);
175 size_ -= result.num_buffers;
176
177 return result;
178 }
179
180private:
181 io_uring_buf_ring *br_ = nullptr;
182 uint32_t size_ = 0;
183 uint32_t capacity_;
184 uint16_t bgid_;
185 std::vector<uint32_t> buf_lens_;
186};
187
188} // namespace detail
189
201class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue {
202public:
209 ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
210 : BundledProvidedBufferQueue(capacity, flags) {}
211
212 ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
213 assert(cqe != nullptr);
214 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
215 assert(result.num_buffers <= 1);
216 return result;
217 }
218};
219
220namespace detail {
221class BundledProvidedBufferPool;
222}
223
232struct ProvidedBuffer : public BufferBase {
233public:
234 ProvidedBuffer() = default;
235 ProvidedBuffer(void *data, size_t size,
236 detail::BundledProvidedBufferPool *pool)
237 : data_(data), size_(size), pool_(pool) {}
238 ProvidedBuffer(ProvidedBuffer &&other) noexcept
239 : data_(std::exchange(other.data_, nullptr)),
240 size_(std::exchange(other.size_, 0)),
241 pool_(std::exchange(other.pool_, nullptr)) {}
242 ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
243 if (this != &other) {
244 reset();
245 data_ = std::exchange(other.data_, nullptr);
246 size_ = std::exchange(other.size_, 0);
247 pool_ = std::exchange(other.pool_, nullptr);
248 }
249 return *this;
250 }
251
252 ~ProvidedBuffer() { reset(); }
253
254 ProvidedBuffer(const ProvidedBuffer &) = delete;
255 ProvidedBuffer &operator=(const ProvidedBuffer &) = delete;
256
257public:
261 void *data() const noexcept { return data_; }
262
266 size_t size() const noexcept { return size_; }
267
271 void reset() noexcept;
272
276 bool owns_buffer() const noexcept { return pool_ != nullptr; }
277
278private:
279 void *data_ = nullptr;
280 size_t size_ = 0;
281 detail::BundledProvidedBufferPool *pool_ = nullptr;
282};
283
284namespace detail {
285
286class BundledProvidedBufferPool {
287public:
288 using ReturnType = std::vector<ProvidedBuffer>;
289
290 BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
291 unsigned int flags)
292 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
293 auto &context = detail::Context::current();
294
295 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size);
296 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
297 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
298 if (data == MAP_FAILED) [[unlikely]] {
299 throw make_system_error("mmap");
300 }
301 auto d1 = defer([&]() { munmap(data, data_size); });
302
303 bgid_ = context.next_bgid();
304 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
305
306 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
307 io_uring_buf_ring_init(br_);
308
309 io_uring_buf_reg reg = {};
310 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
311 reg.ring_entries = num_buffers_;
312 reg.bgid = bgid_;
313 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
314 if (r != 0) [[unlikely]] {
315 throw make_system_error("io_uring_register_buf_ring", -r);
316 }
317
318 char *buffer_base =
319 static_cast<char *>(data) + sizeof(io_uring_buf) * num_buffers_;
320 auto mask = io_uring_buf_ring_mask(num_buffers_);
321 for (size_t bid = 0; bid < num_buffers_; bid++) {
322 char *ptr = buffer_base + bid * buffer_size;
323 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
324 static_cast<int>(bid));
325 }
326 io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));
327
328 d1.dismiss();
329 d2.dismiss();
330 }
331
332 ~BundledProvidedBufferPool() {
333 assert(br_ != nullptr);
334 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_);
335 munmap(br_, data_size);
336 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
337 detail::Context::current().ring()->ring(), bgid_);
338 assert(r == 0);
339 if (r == 0) {
340 detail::Context::current().recycle_bgid(bgid_);
341 }
342 }
343
344 BundledProvidedBufferPool(const BundledProvidedBufferPool &) = delete;
345 BundledProvidedBufferPool &
346 operator=(const BundledProvidedBufferPool &) = delete;
347 BundledProvidedBufferPool(BundledProvidedBufferPool &&) = delete;
348 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) = delete;
349
350public:
354 size_t capacity() const noexcept { return num_buffers_; }
355
359 size_t buffer_size() const noexcept { return buffer_size_; }
360
361public:
362 uint16_t bgid() const noexcept { return bgid_; }
363
364 ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
365 assert(cqe != nullptr);
366 int32_t res = cqe->res;
367 uint32_t flags = cqe->flags;
368 std::vector<ProvidedBuffer> buffers;
369
370 if (!(flags & IORING_CQE_F_BUFFER)) {
371 return buffers;
372 }
373
374 assert(res > 0);
375
376 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
377
378#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
379 if (flags & IORING_CQE_F_BUF_MORE) {
380 char *data = get_buffer_(bid) + partial_size_;
381 buffers.emplace_back(data, res, nullptr);
382 partial_size_ += res;
383 assert(partial_size_ < buffer_size_);
384 return buffers;
385 }
386#endif
387 assert(bid == curr_io_uring_buf_()->bid);
388
389 int32_t bytes = res;
390 while (bytes > 0) {
391 auto *buf_ptr = curr_io_uring_buf_();
392 bid = buf_ptr->bid;
393 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
394 char *data = get_buffer_(bid) + partial_size_;
395 buffers.emplace_back(data, curr_buffer_size, this);
396 bytes -= static_cast<int32_t>(curr_buffer_size);
397 partial_size_ = 0;
398 advance_io_uring_buf_();
399 }
400
401 return buffers;
402 }
403
404 void add_buffer_back(void *ptr) noexcept {
405 char *base = get_buffers_base_();
406 assert(ptr >= base);
407 size_t offset = static_cast<char *>(ptr) - base;
408 size_t bid = offset / buffer_size_;
409 assert(bid < num_buffers_);
410 char *buffer_ptr = base + bid * buffer_size_;
411 auto mask = io_uring_buf_ring_mask(num_buffers_);
412 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
413 io_uring_buf_ring_advance(br_, 1);
414 }
415
416private:
417 char *get_buffer_(uint16_t bid) const noexcept {
418 return get_buffers_base_() + static_cast<size_t>(bid) * buffer_size_;
419 }
420
421 char *get_buffers_base_() const noexcept {
422 return reinterpret_cast<char *>(br_) +
423 sizeof(io_uring_buf) * num_buffers_;
424 }
425
426 io_uring_buf *curr_io_uring_buf_() noexcept {
427 auto mask = io_uring_buf_ring_mask(num_buffers_);
428 return &br_->bufs[br_head_ & mask];
429 }
430
431 void advance_io_uring_buf_() noexcept { br_head_++; }
432
433private:
434 io_uring_buf_ring *br_ = nullptr;
435 uint32_t num_buffers_;
436 uint32_t buffer_size_;
437 uint32_t partial_size_ = 0;
438 uint16_t bgid_;
439 uint16_t br_head_ = 0;
440};
441
442} // namespace detail
443
444inline void ProvidedBuffer::reset() noexcept {
445 if (pool_ != nullptr) {
446 pool_->add_buffer_back(data_);
447 }
448 data_ = nullptr;
449 size_ = 0;
450 pool_ = nullptr;
451}
452
465class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
466public:
467 using ReturnType = ProvidedBuffer;
468
476 ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
477 unsigned int flags = 0)
478 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
479
480public:
481 ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
482 assert(cqe != nullptr);
483 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
484 if (buffers.empty()) {
485 return ReturnType();
486 }
487 assert(buffers.size() == 1);
488 return std::move(buffers[0]);
489 }
490};
491
502 return static_cast<detail::BundledProvidedBufferPool &>(buffer);
503}
504
512 return static_cast<detail::BundledProvidedBufferQueue &>(buffer);
513}
514
515} // namespace condy
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
Definition condy.hpp:30
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
Definition buffers.hpp:85
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:69
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.