47class BundledProvidedBufferQueue {
49 BundledProvidedBufferQueue(uint32_t capacity,
unsigned int flags)
50 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0) {
51 auto &context = detail::Context::current();
53 size_t data_size = capacity_ *
sizeof(io_uring_buf);
54 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
55 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
56 if (data == MAP_FAILED) [[unlikely]] {
57 throw make_system_error(
"mmap");
59 auto d1 =
defer([&]() { munmap(data, data_size); });
61 bgid_ = context.next_bgid();
62 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
64 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
65 io_uring_buf_ring_init(br_);
67 io_uring_buf_reg reg = {};
68 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
69 reg.ring_entries = capacity_;
71 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
72 if (r != 0) [[unlikely]] {
73 throw make_system_error(
"io_uring_register_buf_ring", -r);
80 ~BundledProvidedBufferQueue() {
81 assert(br_ !=
nullptr);
82 size_t data_size = capacity_ *
sizeof(io_uring_buf);
83 munmap(br_, data_size);
84 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
85 detail::Context::current().ring()->ring(), bgid_);
88 detail::Context::current().recycle_bgid(bgid_);
92 BundledProvidedBufferQueue(
const BundledProvidedBufferQueue &) =
delete;
93 BundledProvidedBufferQueue &
94 operator=(
const BundledProvidedBufferQueue &) =
delete;
95 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) =
delete;
96 BundledProvidedBufferQueue &
97 operator=(BundledProvidedBufferQueue &&) =
delete;
103 size_t size()
const noexcept {
return size_; }
108 size_t capacity()
const noexcept {
return capacity_; }
120 template <BufferLike Buffer> uint16_t push(
const Buffer &
buffer) {
121 if (size_ >= capacity_) [[unlikely]] {
122 throw std::logic_error(
"Capacity exceeded");
125 auto mask = io_uring_buf_ring_mask(capacity_);
126 uint16_t bid = br_->tail & mask;
127 io_uring_buf_ring_add(br_,
buffer.data(),
buffer.size(), bid, mask, 0);
128 buf_lens_[bid] =
buffer.size();
129 io_uring_buf_ring_advance(br_, 1);
136 uint16_t bgid()
const noexcept {
return bgid_; }
138 BufferInfo handle_finish(io_uring_cqe *cqe)
noexcept {
139 assert(cqe !=
nullptr);
140 int32_t res = cqe->res;
141 uint32_t flags = cqe->flags;
143 if (!(flags & IORING_CQE_F_BUFFER)) {
150 .bid =
static_cast<uint16_t
>(flags >> IORING_CQE_BUFFER_SHIFT),
154#if !IO_URING_CHECK_VERSION(2, 8)
155 if (flags & IORING_CQE_F_BUF_MORE) {
156 assert(buf_lens_[result.
bid] >
static_cast<uint32_t
>(res));
157 buf_lens_[result.
bid] -= res;
162 auto mask = io_uring_buf_ring_mask(capacity_);
163 uint16_t curr_bid = result.
bid;
166 uint32_t buf_len = std::exchange(buf_lens_[curr_bid], 0);
170 curr_bid = (curr_bid + 1) & mask;
179 io_uring_buf_ring *br_ =
nullptr;
183 std::vector<uint32_t> buf_lens_;
208 : BundledProvidedBufferQueue(capacity, flags) {}
210 BufferInfo handle_finish(io_uring_cqe *cqe)
noexcept {
211 assert(cqe !=
nullptr);
212 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
213 assert(result.num_buffers <= 1);
219class BundledProvidedBufferPool;
230struct ProvidedBuffer :
public BufferBase {
232 ProvidedBuffer() =
default;
233 ProvidedBuffer(
void *
data,
size_t size,
234 detail::BundledProvidedBufferPool *pool)
235 : data_(
data), size_(
size), pool_(pool) {}
236 ProvidedBuffer(ProvidedBuffer &&other) noexcept
237 : data_(std::exchange(other.data_,
nullptr)),
238 size_(std::exchange(other.size_, 0)),
239 pool_(std::exchange(other.pool_,
nullptr)) {}
240 ProvidedBuffer &operator=(ProvidedBuffer &&other)
noexcept {
241 if (
this != &other) {
243 data_ = std::exchange(other.data_,
nullptr);
244 size_ = std::exchange(other.size_, 0);
245 pool_ = std::exchange(other.pool_,
nullptr);
250 ~ProvidedBuffer() {
reset(); }
252 ProvidedBuffer(
const ProvidedBuffer &) =
delete;
253 ProvidedBuffer &operator=(
const ProvidedBuffer &) =
delete;
259 void *
data() const noexcept {
return data_; }
264 size_t size() const noexcept {
return size_; }
269 void reset() noexcept;
277 void *data_ =
nullptr;
279 detail::BundledProvidedBufferPool *pool_ =
nullptr;
284class BundledProvidedBufferPool {
286 BundledProvidedBufferPool(uint32_t num_buffers,
size_t buffer_size,
288 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
289 auto &context = detail::Context::current();
291 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size);
292 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
293 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
294 if (data == MAP_FAILED) [[unlikely]] {
295 throw make_system_error(
"mmap");
297 auto d1 =
defer([&]() { munmap(data, data_size); });
299 bgid_ = context.next_bgid();
300 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
302 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
303 io_uring_buf_ring_init(br_);
305 io_uring_buf_reg reg = {};
306 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
307 reg.ring_entries = num_buffers_;
309 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
310 if (r != 0) [[unlikely]] {
311 throw make_system_error(
"io_uring_register_buf_ring", -r);
315 static_cast<char *
>(data) +
sizeof(io_uring_buf) * num_buffers_;
316 auto mask = io_uring_buf_ring_mask(num_buffers_);
317 for (
size_t bid = 0; bid < num_buffers_; bid++) {
318 char *ptr = buffer_base + bid * buffer_size;
319 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
320 static_cast<int>(bid));
322 io_uring_buf_ring_advance(br_,
static_cast<int>(num_buffers_));
328 ~BundledProvidedBufferPool() {
329 assert(br_ !=
nullptr);
330 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size_);
331 munmap(br_, data_size);
332 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
333 detail::Context::current().ring()->ring(), bgid_);
336 detail::Context::current().recycle_bgid(bgid_);
340 BundledProvidedBufferPool(
const BundledProvidedBufferPool &) =
delete;
341 BundledProvidedBufferPool &
342 operator=(
const BundledProvidedBufferPool &) =
delete;
343 BundledProvidedBufferPool(BundledProvidedBufferPool &&) =
delete;
344 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) =
delete;
350 size_t capacity() const noexcept {
return num_buffers_; }
355 size_t buffer_size() const noexcept {
return buffer_size_; }
358 uint16_t bgid() const noexcept {
return bgid_; }
360 std::vector<ProvidedBuffer> handle_finish(io_uring_cqe *cqe)
noexcept {
361 assert(cqe !=
nullptr);
362 int32_t res = cqe->res;
363 uint32_t flags = cqe->flags;
364 std::vector<ProvidedBuffer> buffers;
366 if (!(flags & IORING_CQE_F_BUFFER)) {
372 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
374#if !IO_URING_CHECK_VERSION(2, 8)
375 if (flags & IORING_CQE_F_BUF_MORE) {
376 char *data = get_buffer_(bid) + partial_size_;
377 buffers.emplace_back(data, res,
nullptr);
378 partial_size_ += res;
379 assert(partial_size_ < buffer_size_);
383 assert(bid == curr_io_uring_buf_()->bid);
387 auto *buf_ptr = curr_io_uring_buf_();
389 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
390 char *data = get_buffer_(bid) + partial_size_;
391 buffers.emplace_back(data, curr_buffer_size,
this);
392 bytes -=
static_cast<int32_t
>(curr_buffer_size);
394 advance_io_uring_buf_();
400 void add_buffer_back(
void *ptr)
noexcept {
401 char *base = get_buffers_base_();
403 size_t offset =
static_cast<char *
>(ptr) - base;
404 size_t bid = offset / buffer_size_;
405 assert(bid < num_buffers_);
406 char *buffer_ptr = base + bid * buffer_size_;
407 auto mask = io_uring_buf_ring_mask(num_buffers_);
408 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
409 io_uring_buf_ring_advance(br_, 1);
413 char *get_buffer_(uint16_t bid)
const noexcept {
414 return get_buffers_base_() +
static_cast<size_t>(bid) * buffer_size_;
417 char *get_buffers_base_() const noexcept {
418 return reinterpret_cast<char *
>(br_) +
419 sizeof(io_uring_buf) * num_buffers_;
422 io_uring_buf *curr_io_uring_buf_() noexcept {
423 auto mask = io_uring_buf_ring_mask(num_buffers_);
424 return &br_->bufs[br_head_ & mask];
427 void advance_io_uring_buf_() noexcept { br_head_++; }
430 io_uring_buf_ring *br_ =
nullptr;
431 uint32_t num_buffers_;
432 uint32_t buffer_size_;
433 uint32_t partial_size_ = 0;
435 uint16_t br_head_ = 0;
441 if (pool_ !=
nullptr) {
442 pool_->add_buffer_back(data_);
471 unsigned int flags = 0)
472 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
476 assert(cqe !=
nullptr);
477 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
478 if (buffers.empty()) {
481 assert(buffers.size() == 1);
482 return std::move(buffers[0]);
496 return static_cast<detail::BundledProvidedBufferPool &
>(
buffer);
506 return static_cast<detail::BundledProvidedBufferQueue &
>(
buffer);
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.