44class BundledProvidedBufferQueue {
48 BundledProvidedBufferQueue(uint32_t capacity,
unsigned int flags = 0)
49 : capacity_(std::bit_ceil(capacity)) {
50 auto &context = Context::current();
51 auto bgid = context.next_bgid();
53 size_t data_size = capacity_ *
sizeof(io_uring_buf);
54 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
55 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
56 if (data == MAP_FAILED) [[unlikely]] {
57 throw std::bad_alloc();
59 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
60 io_uring_buf_ring_init(br_);
62 io_uring_buf_reg reg = {};
63 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
64 reg.ring_entries = capacity_;
66 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
67 if (r != 0) [[unlikely]] {
68 munmap(data, data_size);
69 throw make_system_error(
"io_uring_register_buf_ring", -r);
75 ~BundledProvidedBufferQueue() {
76 assert(br_ !=
nullptr);
77 size_t data_size = capacity_ *
sizeof(io_uring_buf);
78 munmap(br_, data_size);
79 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
80 Context::current().ring()->ring(), bgid_);
84 BundledProvidedBufferQueue(
const BundledProvidedBufferQueue &) =
delete;
85 BundledProvidedBufferQueue &
86 operator=(
const BundledProvidedBufferQueue &) =
delete;
87 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) =
delete;
88 BundledProvidedBufferQueue &
89 operator=(BundledProvidedBufferQueue &&) =
delete;
95 size_t size()
const {
return size_; }
100 size_t capacity()
const {
return capacity_; }
112 template <
typename Buffer> uint16_t push(Buffer &&
buffer) {
113 if (size_ >= capacity_) [[unlikely]] {
114 throw std::logic_error(
"Capacity exceeded");
117 auto mask = io_uring_buf_ring_mask(capacity_);
118 uint16_t bid = br_->tail & mask;
120 io_uring_buf_ring_advance(br_, 1);
127 uint16_t bgid()
const {
return bgid_; }
129 ReturnType handle_finish(int32_t res, uint32_t flags) {
130 if (!(flags & IORING_CQE_F_BUFFER)) {
131 return ReturnType{0, 0};
136 ReturnType result = {
137 .bid =
static_cast<uint16_t
>(flags >> IORING_CQE_BUFFER_SHIFT),
141#if !IO_URING_CHECK_VERSION(2, 8)
142 if (flags & IORING_CQE_F_BUF_MORE) {
147 uint16_t curr_bid = result.bid;
150 auto &buf = br_->bufs[curr_bid];
151 assert(buf.bid == curr_bid);
152 uint32_t buf_size = buf.len;
153 bytes -=
static_cast<int32_t
>(buf_size);
154 result.num_buffers++;
156 assert(size_ >= result.num_buffers);
157 size_ -= result.num_buffers;
163 io_uring_buf_ring *br_ =
nullptr;
186 : BundledProvidedBufferQueue(capacity, flags) {}
189class BundledProvidedBufferPool;
197struct ProvidedBuffer :
public BufferBase {
199 ProvidedBuffer() =
default;
200 ProvidedBuffer(
void *
data,
size_t size, BundledProvidedBufferPool *pool)
201 : data_(
data), size_(
size), pool_(pool) {}
202 ProvidedBuffer(ProvidedBuffer &&other) noexcept
203 : data_(std::exchange(other.data_,
nullptr)),
204 size_(std::exchange(other.size_, 0)),
205 pool_(std::exchange(other.pool_,
nullptr)) {}
206 ProvidedBuffer &operator=(ProvidedBuffer &&other)
noexcept {
207 if (
this != &other) {
208 data_ = std::exchange(other.data_,
nullptr);
209 size_ = std::exchange(other.size_, 0);
210 pool_ = std::exchange(other.pool_,
nullptr);
215 ~ProvidedBuffer() {
reset(); }
217 ProvidedBuffer(
const ProvidedBuffer &) =
delete;
218 ProvidedBuffer &operator=(
const ProvidedBuffer &) =
delete;
224 void *
data()
const {
return data_; }
229 size_t size()
const {
return size_; }
242 void *data_ =
nullptr;
244 BundledProvidedBufferPool *pool_ =
nullptr;
247class BundledProvidedBufferPool {
249 using ReturnType = std::vector<ProvidedBuffer>;
251 BundledProvidedBufferPool(uint32_t num_buffers,
size_t buffer_size,
252 unsigned int flags = 0)
253 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
254 auto &context = Context::current();
255 auto bgid = context.next_bgid();
257 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size);
258 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
259 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
260 if (data == MAP_FAILED) [[unlikely]] {
261 throw std::bad_alloc();
263 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
264 io_uring_buf_ring_init(br_);
266 io_uring_buf_reg reg = {};
267 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
268 reg.ring_entries = num_buffers_;
270 int r = io_uring_register_buf_ring(context.ring()->ring(), ®, flags);
271 if (r != 0) [[unlikely]] {
272 munmap(data, data_size);
273 throw make_system_error(
"io_uring_register_buf_ring", -r);
279 static_cast<char *
>(data) +
sizeof(io_uring_buf) * num_buffers_;
280 auto mask = io_uring_buf_ring_mask(num_buffers_);
281 for (
size_t bid = 0; bid < num_buffers_; bid++) {
282 char *ptr = buffer_base + bid * buffer_size;
283 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
284 static_cast<int>(bid));
286 io_uring_buf_ring_advance(br_,
static_cast<int>(num_buffers_));
289 ~BundledProvidedBufferPool() {
290 assert(br_ !=
nullptr);
291 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size_);
292 munmap(br_, data_size);
293 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
294 Context::current().ring()->ring(), bgid_);
298 BundledProvidedBufferPool(
const BundledProvidedBufferPool &) =
delete;
299 BundledProvidedBufferPool &
300 operator=(
const BundledProvidedBufferPool &) =
delete;
301 BundledProvidedBufferPool(BundledProvidedBufferPool &&) =
delete;
302 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) =
delete;
308 size_t capacity()
const {
return num_buffers_; }
313 size_t buffer_size()
const {
return buffer_size_; }
316 uint16_t bgid()
const {
return bgid_; }
318 ReturnType handle_finish(int32_t res, [[maybe_unused]] uint32_t flags) {
319 std::vector<ProvidedBuffer> buffers;
321 if (!(flags & IORING_CQE_F_BUFFER)) {
327#if !IO_URING_CHECK_VERSION(2, 8)
328 if (flags & IORING_CQE_F_BUF_MORE) {
329 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
330 char *data = get_buffer_(bid) + partial_size_;
331 buffers.emplace_back(data, res,
nullptr);
332 partial_size_ += res;
339 auto *buf_ptr = curr_io_uring_buf_();
340 uint16_t bid = buf_ptr->bid;
341 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
342 char *data = get_buffer_(bid) + partial_size_;
343 buffers.emplace_back(data, curr_buffer_size,
this);
344 bytes -=
static_cast<int32_t
>(curr_buffer_size);
346 advance_io_uring_buf_();
352 void add_buffer_back(
void *ptr) {
353 char *base = get_buffers_base_();
355 size_t offset =
static_cast<char *
>(ptr) - base;
356 size_t bid = offset / buffer_size_;
357 assert(bid < num_buffers_);
358 char *buffer_ptr = base + bid * buffer_size_;
359 auto mask = io_uring_buf_ring_mask(num_buffers_);
360 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
361 io_uring_buf_ring_advance(br_, 1);
365 char *get_buffer_(uint16_t bid)
const {
366 return get_buffers_base_() +
static_cast<size_t>(bid) * buffer_size_;
369 char *get_buffers_base_()
const {
370 return reinterpret_cast<char *
>(br_) +
371 sizeof(io_uring_buf) * num_buffers_;
374 io_uring_buf *curr_io_uring_buf_() {
375 auto mask = io_uring_buf_ring_mask(num_buffers_);
376 return &br_->bufs[br_head_ & mask];
379 void advance_io_uring_buf_() { br_head_++; }
382 io_uring_buf_ring *br_ =
nullptr;
383 uint32_t num_buffers_;
384 uint32_t buffer_size_;
385 uint32_t partial_size_ = 0;
387 uint16_t br_head_ = 0;
391 if (pool_ !=
nullptr) {
392 pool_->add_buffer_back(data_);
420 unsigned int flags = 0)
421 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
424 ReturnType handle_finish(int32_t res, uint32_t flags) {
425 auto buffers = BundledProvidedBufferPool::handle_finish(res, flags);
426 if (buffers.empty()) {
429 assert(buffers.size() == 1);
430 return std::move(buffers[0]);
Basic buffer types and conversion utilities.
void * data() const
Get the data of the buffer.
Definition buffers.hpp:34
size_t size() const
Get the byte size of the buffer.
Definition buffers.hpp:39
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object.
Definition provided_buffers.hpp:419
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object.
Definition provided_buffers.hpp:185
The main namespace for the Condy library.
Definition condy.hpp:28
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Definition buffers.hpp:84
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
Definition provided_buffers.hpp:33
uint16_t bid
Buffer ID of the first buffer consumed.
Definition provided_buffers.hpp:37
uint16_t num_buffers
Number of buffers consumed.
Definition provided_buffers.hpp:41
Provided buffer.
Definition provided_buffers.hpp:197
void * data() const
Get the data pointer of the provided buffer.
Definition provided_buffers.hpp:224
size_t size() const
Get the size of the provided buffer.
Definition provided_buffers.hpp:229
void reset()
Reset the provided buffer, returning it to the pool if owned.
Definition provided_buffers.hpp:390
bool owns_buffer() const
Check if the provided buffer owns a buffer from a pool.
Definition provided_buffers.hpp:239
Internal utility classes and functions used by Condy.