Condy v1.1.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
provided_buffers.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/buffers.hpp"
11#include "condy/condy_uring.hpp"
12#include "condy/context.hpp"
13#include "condy/ring.hpp"
14#include "condy/utils.hpp"
15#include <bit>
16#include <cstddef>
17#include <cstdint>
18#include <stdexcept>
19#include <sys/mman.h>
20#include <sys/types.h>
21
22namespace condy {
23
33struct BufferInfo {
37 uint16_t bid;
41 uint16_t num_buffers;
42};
43
44class BundledProvidedBufferQueue {
45public:
46 using ReturnType = BufferInfo;
47
48 BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
49 : capacity_(std::bit_ceil(capacity)) {
50 auto &context = Context::current();
51 auto bgid = context.next_bgid();
52
53 size_t data_size = capacity_ * sizeof(io_uring_buf);
54 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
55 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
56 if (data == MAP_FAILED) [[unlikely]] {
57 throw std::bad_alloc();
58 }
59 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
60 io_uring_buf_ring_init(br_);
61
62 io_uring_buf_reg reg = {};
63 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
64 reg.ring_entries = capacity_;
65 reg.bgid = bgid;
66 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
67 if (r != 0) [[unlikely]] {
68 munmap(data, data_size);
69 throw make_system_error("io_uring_register_buf_ring", -r);
70 }
71
72 bgid_ = bgid;
73 }
74
75 ~BundledProvidedBufferQueue() {
76 assert(br_ != nullptr);
77 size_t data_size = capacity_ * sizeof(io_uring_buf);
78 munmap(br_, data_size);
79 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
80 Context::current().ring()->ring(), bgid_);
81 assert(r == 0);
82 }
83
84 BundledProvidedBufferQueue(const BundledProvidedBufferQueue &) = delete;
85 BundledProvidedBufferQueue &
86 operator=(const BundledProvidedBufferQueue &) = delete;
87 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) = delete;
88 BundledProvidedBufferQueue &
89 operator=(BundledProvidedBufferQueue &&) = delete;
90
91public:
95 size_t size() const { return size_; }
96
100 size_t capacity() const { return capacity_; }
101
112 template <typename Buffer> uint16_t push(Buffer &&buffer) {
113 if (size_ >= capacity_) [[unlikely]] {
114 throw std::logic_error("Capacity exceeded");
115 }
116
117 auto mask = io_uring_buf_ring_mask(capacity_);
118 uint16_t bid = br_->tail & mask;
119 io_uring_buf_ring_add(br_, buffer.data(), buffer.size(), bid, mask, 0);
120 io_uring_buf_ring_advance(br_, 1);
121 size_++;
122
123 return bid;
124 }
125
126public:
127 uint16_t bgid() const { return bgid_; }
128
129 ReturnType handle_finish(int32_t res, uint32_t flags) {
130 if (!(flags & IORING_CQE_F_BUFFER)) {
131 return ReturnType{0, 0};
132 }
133
134 assert(res >= 0);
135
136 ReturnType result = {
137 .bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
138 .num_buffers = 0,
139 };
140
141#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
142 if (flags & IORING_CQE_F_BUF_MORE) {
143 return result;
144 }
145#endif
146
147 uint16_t curr_bid = result.bid;
148 auto bytes = res;
149 while (bytes > 0) {
150 auto &buf = br_->bufs[curr_bid];
151 assert(buf.bid == curr_bid);
152 uint32_t buf_size = buf.len;
153 bytes -= static_cast<int32_t>(buf_size);
154 result.num_buffers++;
155 }
156 assert(size_ >= result.num_buffers);
157 size_ -= result.num_buffers;
158
159 return result;
160 }
161
162private:
163 io_uring_buf_ring *br_ = nullptr;
164 uint32_t size_ = 0;
165 uint32_t capacity_;
166 uint16_t bgid_;
167};
168
177class ProvidedBufferQueue : public BundledProvidedBufferQueue {
178public:
185 ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
186 : BundledProvidedBufferQueue(capacity, flags) {}
187};
188
189class BundledProvidedBufferPool;
190
197struct ProvidedBuffer : public BufferBase {
198public:
199 ProvidedBuffer() = default;
200 ProvidedBuffer(void *data, size_t size, BundledProvidedBufferPool *pool)
201 : data_(data), size_(size), pool_(pool) {}
202 ProvidedBuffer(ProvidedBuffer &&other) noexcept
203 : data_(std::exchange(other.data_, nullptr)),
204 size_(std::exchange(other.size_, 0)),
205 pool_(std::exchange(other.pool_, nullptr)) {}
206 ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
207 if (this != &other) {
208 data_ = std::exchange(other.data_, nullptr);
209 size_ = std::exchange(other.size_, 0);
210 pool_ = std::exchange(other.pool_, nullptr);
211 }
212 return *this;
213 }
214
215 ~ProvidedBuffer() { reset(); }
216
217 ProvidedBuffer(const ProvidedBuffer &) = delete;
218 ProvidedBuffer &operator=(const ProvidedBuffer &) = delete;
219
220public:
224 void *data() const { return data_; }
225
229 size_t size() const { return size_; }
230
234 void reset();
235
239 bool owns_buffer() const { return pool_ != nullptr; }
240
241private:
242 void *data_ = nullptr;
243 size_t size_ = 0;
244 BundledProvidedBufferPool *pool_ = nullptr;
245};
246
247class BundledProvidedBufferPool {
248public:
249 using ReturnType = std::vector<ProvidedBuffer>;
250
251 BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
252 unsigned int flags = 0)
253 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
254 auto &context = Context::current();
255 auto bgid = context.next_bgid();
256
257 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size);
258 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
259 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
260 if (data == MAP_FAILED) [[unlikely]] {
261 throw std::bad_alloc();
262 }
263 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
264 io_uring_buf_ring_init(br_);
265
266 io_uring_buf_reg reg = {};
267 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
268 reg.ring_entries = num_buffers_;
269 reg.bgid = bgid;
270 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
271 if (r != 0) [[unlikely]] {
272 munmap(data, data_size);
273 throw make_system_error("io_uring_register_buf_ring", -r);
274 }
275
276 bgid_ = bgid;
277
278 char *buffer_base =
279 static_cast<char *>(data) + sizeof(io_uring_buf) * num_buffers_;
280 auto mask = io_uring_buf_ring_mask(num_buffers_);
281 for (size_t bid = 0; bid < num_buffers_; bid++) {
282 char *ptr = buffer_base + bid * buffer_size;
283 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
284 static_cast<int>(bid));
285 }
286 io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));
287 }
288
289 ~BundledProvidedBufferPool() {
290 assert(br_ != nullptr);
291 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_);
292 munmap(br_, data_size);
293 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
294 Context::current().ring()->ring(), bgid_);
295 assert(r == 0);
296 }
297
298 BundledProvidedBufferPool(const BundledProvidedBufferPool &) = delete;
299 BundledProvidedBufferPool &
300 operator=(const BundledProvidedBufferPool &) = delete;
301 BundledProvidedBufferPool(BundledProvidedBufferPool &&) = delete;
302 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) = delete;
303
304public:
308 size_t capacity() const { return num_buffers_; }
309
313 size_t buffer_size() const { return buffer_size_; }
314
315public:
316 uint16_t bgid() const { return bgid_; }
317
318 ReturnType handle_finish(int32_t res, [[maybe_unused]] uint32_t flags) {
319 std::vector<ProvidedBuffer> buffers;
320
321 if (!(flags & IORING_CQE_F_BUFFER)) {
322 return buffers;
323 }
324
325 assert(res >= 0);
326
327#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
328 if (flags & IORING_CQE_F_BUF_MORE) {
329 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
330 char *data = get_buffer_(bid) + partial_size_;
331 buffers.emplace_back(data, res, nullptr);
332 partial_size_ += res;
333 return buffers;
334 }
335#endif
336
337 int32_t bytes = res;
338 while (bytes > 0) {
339 auto *buf_ptr = curr_io_uring_buf_();
340 uint16_t bid = buf_ptr->bid;
341 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
342 char *data = get_buffer_(bid) + partial_size_;
343 buffers.emplace_back(data, curr_buffer_size, this);
344 bytes -= static_cast<int32_t>(curr_buffer_size);
345 partial_size_ = 0;
346 advance_io_uring_buf_();
347 }
348
349 return buffers;
350 }
351
352 void add_buffer_back(void *ptr) {
353 char *base = get_buffers_base_();
354 assert(ptr >= base);
355 size_t offset = static_cast<char *>(ptr) - base;
356 size_t bid = offset / buffer_size_;
357 assert(bid < num_buffers_);
358 char *buffer_ptr = base + bid * buffer_size_;
359 auto mask = io_uring_buf_ring_mask(num_buffers_);
360 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
361 io_uring_buf_ring_advance(br_, 1);
362 }
363
364private:
365 char *get_buffer_(uint16_t bid) const {
366 return get_buffers_base_() + static_cast<size_t>(bid) * buffer_size_;
367 }
368
369 char *get_buffers_base_() const {
370 return reinterpret_cast<char *>(br_) +
371 sizeof(io_uring_buf) * num_buffers_;
372 }
373
374 io_uring_buf *curr_io_uring_buf_() {
375 auto mask = io_uring_buf_ring_mask(num_buffers_);
376 return &br_->bufs[br_head_ & mask];
377 }
378
379 void advance_io_uring_buf_() { br_head_++; }
380
381private:
382 io_uring_buf_ring *br_ = nullptr;
383 uint32_t num_buffers_;
384 uint32_t buffer_size_;
385 uint32_t partial_size_ = 0;
386 uint16_t bgid_;
387 uint16_t br_head_ = 0;
388};
389
391 if (pool_ != nullptr) {
392 pool_->add_buffer_back(data_);
393 }
394 data_ = nullptr;
395 size_ = 0;
396 pool_ = nullptr;
397}
398
408class ProvidedBufferPool : public BundledProvidedBufferPool {
409public:
410 using ReturnType = ProvidedBuffer;
411
419 ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
420 unsigned int flags = 0)
421 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
422
423public:
424 ReturnType handle_finish(int32_t res, uint32_t flags) {
425 auto buffers = BundledProvidedBufferPool::handle_finish(res, flags);
426 if (buffers.empty()) {
427 return ReturnType();
428 }
429 assert(buffers.size() == 1);
430 return std::move(buffers[0]);
431 }
432};
433
434} // namespace condy
Basic buffer types and conversion utilities.
void * data() const
Get the data of the buffer.
Definition buffers.hpp:34
size_t size() const
Get the byte size of the buffer.
Definition buffers.hpp:39
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object.
Definition provided_buffers.hpp:419
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object.
Definition provided_buffers.hpp:185
The main namespace for the Condy library.
Definition condy.hpp:28
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Definition buffers.hpp:84
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
Definition provided_buffers.hpp:33
uint16_t bid
Buffer ID of the first buffer consumed.
Definition provided_buffers.hpp:37
uint16_t num_buffers
Number of buffers consumed.
Definition provided_buffers.hpp:41
Provided buffer.
Definition provided_buffers.hpp:197
void * data() const
Get the data pointer of the provided buffer.
Definition provided_buffers.hpp:224
size_t size() const
Get the size of the provided buffer.
Definition provided_buffers.hpp:229
void reset()
Reset the provided buffer, returning it to the pool if owned.
Definition provided_buffers.hpp:390
bool owns_buffer() const
Check if the provided buffer owns a buffer from a pool.
Definition provided_buffers.hpp:239
Internal utility classes and functions used by Condy.