46class BundledProvidedBufferQueue {
50 BundledProvidedBufferQueue(uint32_t capacity,
unsigned int flags)
51 : capacity_(std::bit_ceil(capacity)) {
52 auto &context = detail::Context::current();
53 auto bgid = context.next_bgid();
55 size_t data_size = capacity_ *
sizeof(io_uring_buf);
56 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
57 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
58 if (data == MAP_FAILED) [[unlikely]] {
59 throw std::bad_alloc();
61 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
62 io_uring_buf_ring_init(br_);
64 io_uring_buf_reg reg = {};
65 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
66 reg.ring_entries = capacity_;
68 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
69 if (r != 0) [[unlikely]] {
70 munmap(data, data_size);
71 throw make_system_error(
"io_uring_register_buf_ring", -r);
77 ~BundledProvidedBufferQueue() {
78 assert(br_ !=
nullptr);
79 size_t data_size = capacity_ *
sizeof(io_uring_buf);
80 munmap(br_, data_size);
81 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
82 detail::Context::current().ring()->ring(), bgid_);
85 detail::Context::current().recycle_bgid(bgid_);
89 BundledProvidedBufferQueue(
const BundledProvidedBufferQueue &) =
delete;
90 BundledProvidedBufferQueue &
91 operator=(
const BundledProvidedBufferQueue &) =
delete;
92 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) =
delete;
93 BundledProvidedBufferQueue &
94 operator=(BundledProvidedBufferQueue &&) =
delete;
100 size_t size()
const {
return size_; }
105 size_t capacity()
const {
return capacity_; }
117 template <
typename Buffer> uint16_t push(Buffer &&
buffer) {
118 if (size_ >= capacity_) [[unlikely]] {
119 throw std::logic_error(
"Capacity exceeded");
122 auto mask = io_uring_buf_ring_mask(capacity_);
123 uint16_t bid = br_->tail & mask;
124 io_uring_buf_ring_add(br_,
buffer.data(),
buffer.size(), bid, mask, 0);
125 io_uring_buf_ring_advance(br_, 1);
132 uint16_t bgid()
const {
return bgid_; }
134 ReturnType handle_finish(int32_t res, uint32_t flags) {
135 if (!(flags & IORING_CQE_F_BUFFER)) {
136 return ReturnType{0, 0};
141 ReturnType result = {
142 .bid =
static_cast<uint16_t
>(flags >> IORING_CQE_BUFFER_SHIFT),
146#if !IO_URING_CHECK_VERSION(2, 8)
147 if (flags & IORING_CQE_F_BUF_MORE) {
152 uint16_t curr_bid = result.
bid;
155 auto &buf = br_->bufs[curr_bid];
156 assert(buf.bid == curr_bid);
157 uint32_t buf_size = buf.len;
158 bytes -=
static_cast<int32_t
>(buf_size);
159 result.num_buffers++;
161 assert(size_ >= result.num_buffers);
162 size_ -= result.num_buffers;
168 io_uring_buf_ring *br_ =
nullptr;
196 : BundledProvidedBufferQueue(capacity, flags) {}
200class BundledProvidedBufferPool;
211struct ProvidedBuffer :
public BufferBase {
213 ProvidedBuffer() =
default;
214 ProvidedBuffer(
void *
data,
size_t size,
215 detail::BundledProvidedBufferPool *pool)
216 : data_(
data), size_(
size), pool_(pool) {}
217 ProvidedBuffer(ProvidedBuffer &&other) noexcept
218 : data_(std::exchange(other.data_,
nullptr)),
219 size_(std::exchange(other.size_, 0)),
220 pool_(std::exchange(other.pool_,
nullptr)) {}
221 ProvidedBuffer &operator=(ProvidedBuffer &&other)
noexcept {
222 if (
this != &other) {
223 data_ = std::exchange(other.data_,
nullptr);
224 size_ = std::exchange(other.size_, 0);
225 pool_ = std::exchange(other.pool_,
nullptr);
230 ~ProvidedBuffer() {
reset(); }
232 ProvidedBuffer(
const ProvidedBuffer &) =
delete;
233 ProvidedBuffer &operator=(
const ProvidedBuffer &) =
delete;
239 void *
data()
const {
return data_; }
244 size_t size()
const {
return size_; }
257 void *data_ =
nullptr;
259 detail::BundledProvidedBufferPool *pool_ =
nullptr;
264class BundledProvidedBufferPool {
266 using ReturnType = std::vector<ProvidedBuffer>;
268 BundledProvidedBufferPool(uint32_t num_buffers,
size_t buffer_size,
270 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
271 auto &context = detail::Context::current();
272 auto bgid = context.next_bgid();
274 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size);
275 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
276 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
277 if (data == MAP_FAILED) [[unlikely]] {
278 throw std::bad_alloc();
280 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
281 io_uring_buf_ring_init(br_);
283 io_uring_buf_reg reg = {};
284 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
285 reg.ring_entries = num_buffers_;
287 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
288 if (r != 0) [[unlikely]] {
289 munmap(data, data_size);
290 throw make_system_error(
"io_uring_register_buf_ring", -r);
296 static_cast<char *
>(data) +
sizeof(io_uring_buf) * num_buffers_;
297 auto mask = io_uring_buf_ring_mask(num_buffers_);
298 for (
size_t bid = 0; bid < num_buffers_; bid++) {
299 char *ptr = buffer_base + bid * buffer_size;
300 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
301 static_cast<int>(bid));
303 io_uring_buf_ring_advance(br_,
static_cast<int>(num_buffers_));
306 ~BundledProvidedBufferPool() {
307 assert(br_ !=
nullptr);
308 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size_);
309 munmap(br_, data_size);
310 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
311 detail::Context::current().ring()->ring(), bgid_);
314 detail::Context::current().recycle_bgid(bgid_);
318 BundledProvidedBufferPool(
const BundledProvidedBufferPool &) =
delete;
319 BundledProvidedBufferPool &
320 operator=(
const BundledProvidedBufferPool &) =
delete;
321 BundledProvidedBufferPool(BundledProvidedBufferPool &&) =
delete;
322 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) =
delete;
328 size_t capacity()
const {
return num_buffers_; }
333 size_t buffer_size()
const {
return buffer_size_; }
336 uint16_t bgid()
const {
return bgid_; }
338 ReturnType handle_finish(int32_t res, [[maybe_unused]] uint32_t flags) {
339 std::vector<ProvidedBuffer> buffers;
341 if (!(flags & IORING_CQE_F_BUFFER)) {
347#if !IO_URING_CHECK_VERSION(2, 8)
348 if (flags & IORING_CQE_F_BUF_MORE) {
349 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
350 char *data = get_buffer_(bid) + partial_size_;
351 buffers.emplace_back(data, res,
nullptr);
352 partial_size_ += res;
359 auto *buf_ptr = curr_io_uring_buf_();
360 uint16_t bid = buf_ptr->bid;
361 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
362 char *data = get_buffer_(bid) + partial_size_;
363 buffers.emplace_back(data, curr_buffer_size,
this);
364 bytes -=
static_cast<int32_t
>(curr_buffer_size);
366 advance_io_uring_buf_();
372 void add_buffer_back(
void *ptr) {
373 char *base = get_buffers_base_();
375 size_t offset =
static_cast<char *
>(ptr) - base;
376 size_t bid = offset / buffer_size_;
377 assert(bid < num_buffers_);
378 char *buffer_ptr = base + bid * buffer_size_;
379 auto mask = io_uring_buf_ring_mask(num_buffers_);
380 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
381 io_uring_buf_ring_advance(br_, 1);
385 char *get_buffer_(uint16_t bid)
const {
386 return get_buffers_base_() +
static_cast<size_t>(bid) * buffer_size_;
389 char *get_buffers_base_()
const {
390 return reinterpret_cast<char *
>(br_) +
391 sizeof(io_uring_buf) * num_buffers_;
394 io_uring_buf *curr_io_uring_buf_() {
395 auto mask = io_uring_buf_ring_mask(num_buffers_);
396 return &br_->bufs[br_head_ & mask];
399 void advance_io_uring_buf_() { br_head_++; }
402 io_uring_buf_ring *br_ =
nullptr;
403 uint32_t num_buffers_;
404 uint32_t buffer_size_;
405 uint32_t partial_size_ = 0;
407 uint16_t br_head_ = 0;
413 if (pool_ !=
nullptr) {
414 pool_->add_buffer_back(data_);
445 unsigned int flags = 0)
446 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
449 ReturnType handle_finish(int32_t res, uint32_t flags) {
450 auto buffers = BundledProvidedBufferPool::handle_finish(res, flags);
451 if (buffers.empty()) {
454 assert(buffers.size() == 1);
455 return std::move(buffers[0]);
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
void * data() const
Get the data pointer of the provided buffer.
size_t size() const
Get the size of the provided buffer.
void reset()
Reset the provided buffer, returning it to the pool if owned.
bool owns_buffer() const
Check if the provided buffer owns a buffer from a pool.
Internal utility classes and functions used by Condy.