47class BundledProvidedBufferQueue {
51 BundledProvidedBufferQueue(uint32_t capacity,
unsigned int flags)
52 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0) {
53 auto &context = detail::Context::current();
55 size_t data_size = capacity_ *
sizeof(io_uring_buf);
56 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
57 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
58 if (data == MAP_FAILED) [[unlikely]] {
59 throw make_system_error(
"mmap");
61 auto d1 =
defer([&]() { munmap(data, data_size); });
63 bgid_ = context.next_bgid();
64 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
66 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
67 io_uring_buf_ring_init(br_);
69 io_uring_buf_reg reg = {};
70 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
71 reg.ring_entries = capacity_;
73 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
74 if (r != 0) [[unlikely]] {
75 throw make_system_error(
"io_uring_register_buf_ring", -r);
82 ~BundledProvidedBufferQueue() {
83 assert(br_ !=
nullptr);
84 size_t data_size = capacity_ *
sizeof(io_uring_buf);
85 munmap(br_, data_size);
86 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
87 detail::Context::current().ring()->ring(), bgid_);
90 detail::Context::current().recycle_bgid(bgid_);
94 BundledProvidedBufferQueue(
const BundledProvidedBufferQueue &) =
delete;
95 BundledProvidedBufferQueue &
96 operator=(
const BundledProvidedBufferQueue &) =
delete;
97 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) =
delete;
98 BundledProvidedBufferQueue &
99 operator=(BundledProvidedBufferQueue &&) =
delete;
105 size_t size()
const noexcept {
return size_; }
110 size_t capacity()
const noexcept {
return capacity_; }
122 template <BufferLike Buffer> uint16_t push(
const Buffer &
buffer) {
123 if (size_ >= capacity_) [[unlikely]] {
124 throw std::logic_error(
"Capacity exceeded");
127 auto mask = io_uring_buf_ring_mask(capacity_);
128 uint16_t bid = br_->tail & mask;
129 io_uring_buf_ring_add(br_,
buffer.data(),
buffer.size(), bid, mask, 0);
130 buf_lens_[bid] =
buffer.size();
131 io_uring_buf_ring_advance(br_, 1);
138 uint16_t bgid()
const noexcept {
return bgid_; }
140 ReturnType handle_finish(io_uring_cqe *cqe)
noexcept {
141 assert(cqe !=
nullptr);
142 int32_t res = cqe->res;
143 uint32_t flags = cqe->flags;
145 if (!(flags & IORING_CQE_F_BUFFER)) {
146 return ReturnType{0, 0};
151 ReturnType result = {
152 .bid =
static_cast<uint16_t
>(flags >> IORING_CQE_BUFFER_SHIFT),
156#if !IO_URING_CHECK_VERSION(2, 8)
157 if (flags & IORING_CQE_F_BUF_MORE) {
158 assert(buf_lens_[result.bid] >
static_cast<uint32_t
>(res));
159 buf_lens_[result.bid] -= res;
164 auto mask = io_uring_buf_ring_mask(capacity_);
165 uint16_t curr_bid = result.bid;
168 uint32_t buf_len = std::exchange(buf_lens_[curr_bid], 0);
171 result.num_buffers++;
172 curr_bid = (curr_bid + 1) & mask;
174 assert(size_ >= result.num_buffers);
175 size_ -= result.num_buffers;
181 io_uring_buf_ring *br_ =
nullptr;
185 std::vector<uint32_t> buf_lens_;
210 : BundledProvidedBufferQueue(capacity, flags) {}
212 ReturnType handle_finish(io_uring_cqe *cqe)
noexcept {
213 assert(cqe !=
nullptr);
214 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
215 assert(result.num_buffers <= 1);
221class BundledProvidedBufferPool;
232struct ProvidedBuffer :
public BufferBase {
234 ProvidedBuffer() =
default;
235 ProvidedBuffer(
void *
data,
size_t size,
236 detail::BundledProvidedBufferPool *pool)
237 : data_(
data), size_(
size), pool_(pool) {}
238 ProvidedBuffer(ProvidedBuffer &&other) noexcept
239 : data_(std::exchange(other.data_,
nullptr)),
240 size_(std::exchange(other.size_, 0)),
241 pool_(std::exchange(other.pool_,
nullptr)) {}
242 ProvidedBuffer &operator=(ProvidedBuffer &&other)
noexcept {
243 if (
this != &other) {
245 data_ = std::exchange(other.data_,
nullptr);
246 size_ = std::exchange(other.size_, 0);
247 pool_ = std::exchange(other.pool_,
nullptr);
252 ~ProvidedBuffer() {
reset(); }
254 ProvidedBuffer(
const ProvidedBuffer &) =
delete;
255 ProvidedBuffer &operator=(
const ProvidedBuffer &) =
delete;
261 void *
data() const noexcept {
return data_; }
266 size_t size() const noexcept {
return size_; }
271 void reset() noexcept;
279 void *data_ =
nullptr;
281 detail::BundledProvidedBufferPool *pool_ =
nullptr;
286class BundledProvidedBufferPool {
288 using ReturnType = std::vector<ProvidedBuffer>;
290 BundledProvidedBufferPool(uint32_t num_buffers,
size_t buffer_size,
292 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
293 auto &context = detail::Context::current();
295 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size);
296 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
297 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
298 if (data == MAP_FAILED) [[unlikely]] {
299 throw make_system_error(
"mmap");
301 auto d1 =
defer([&]() { munmap(data, data_size); });
303 bgid_ = context.next_bgid();
304 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
306 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
307 io_uring_buf_ring_init(br_);
309 io_uring_buf_reg reg = {};
310 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
311 reg.ring_entries = num_buffers_;
313 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
314 if (r != 0) [[unlikely]] {
315 throw make_system_error(
"io_uring_register_buf_ring", -r);
319 static_cast<char *
>(data) +
sizeof(io_uring_buf) * num_buffers_;
320 auto mask = io_uring_buf_ring_mask(num_buffers_);
321 for (
size_t bid = 0; bid < num_buffers_; bid++) {
322 char *ptr = buffer_base + bid * buffer_size;
323 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
324 static_cast<int>(bid));
326 io_uring_buf_ring_advance(br_,
static_cast<int>(num_buffers_));
332 ~BundledProvidedBufferPool() {
333 assert(br_ !=
nullptr);
334 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size_);
335 munmap(br_, data_size);
336 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
337 detail::Context::current().ring()->ring(), bgid_);
340 detail::Context::current().recycle_bgid(bgid_);
344 BundledProvidedBufferPool(
const BundledProvidedBufferPool &) =
delete;
345 BundledProvidedBufferPool &
346 operator=(
const BundledProvidedBufferPool &) =
delete;
347 BundledProvidedBufferPool(BundledProvidedBufferPool &&) =
delete;
348 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) =
delete;
354 size_t capacity() const noexcept {
return num_buffers_; }
359 size_t buffer_size() const noexcept {
return buffer_size_; }
362 uint16_t bgid() const noexcept {
return bgid_; }
364 ReturnType handle_finish(io_uring_cqe *cqe)
noexcept {
365 assert(cqe !=
nullptr);
366 int32_t res = cqe->res;
367 uint32_t flags = cqe->flags;
368 std::vector<ProvidedBuffer> buffers;
370 if (!(flags & IORING_CQE_F_BUFFER)) {
376 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
378#if !IO_URING_CHECK_VERSION(2, 8)
379 if (flags & IORING_CQE_F_BUF_MORE) {
380 char *data = get_buffer_(bid) + partial_size_;
381 buffers.emplace_back(data, res,
nullptr);
382 partial_size_ += res;
383 assert(partial_size_ < buffer_size_);
387 assert(bid == curr_io_uring_buf_()->bid);
391 auto *buf_ptr = curr_io_uring_buf_();
393 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
394 char *data = get_buffer_(bid) + partial_size_;
395 buffers.emplace_back(data, curr_buffer_size,
this);
396 bytes -=
static_cast<int32_t
>(curr_buffer_size);
398 advance_io_uring_buf_();
404 void add_buffer_back(
void *ptr)
noexcept {
405 char *base = get_buffers_base_();
407 size_t offset =
static_cast<char *
>(ptr) - base;
408 size_t bid = offset / buffer_size_;
409 assert(bid < num_buffers_);
410 char *buffer_ptr = base + bid * buffer_size_;
411 auto mask = io_uring_buf_ring_mask(num_buffers_);
412 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
413 io_uring_buf_ring_advance(br_, 1);
417 char *get_buffer_(uint16_t bid)
const noexcept {
418 return get_buffers_base_() +
static_cast<size_t>(bid) * buffer_size_;
421 char *get_buffers_base_() const noexcept {
422 return reinterpret_cast<char *
>(br_) +
423 sizeof(io_uring_buf) * num_buffers_;
426 io_uring_buf *curr_io_uring_buf_() noexcept {
427 auto mask = io_uring_buf_ring_mask(num_buffers_);
428 return &br_->bufs[br_head_ & mask];
431 void advance_io_uring_buf_() noexcept { br_head_++; }
434 io_uring_buf_ring *br_ =
nullptr;
435 uint32_t num_buffers_;
436 uint32_t buffer_size_;
437 uint32_t partial_size_ = 0;
439 uint16_t br_head_ = 0;
445 if (pool_ !=
nullptr) {
446 pool_->add_buffer_back(data_);
477 unsigned int flags = 0)
478 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
481 ReturnType handle_finish(io_uring_cqe *cqe)
noexcept {
482 assert(cqe !=
nullptr);
483 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
484 if (buffers.empty()) {
487 assert(buffers.size() == 1);
488 return std::move(buffers[0]);
502 return static_cast<detail::BundledProvidedBufferPool &
>(
buffer);
512 return static_cast<detail::BundledProvidedBufferQueue &
>(
buffer);
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.