Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
provided_buffers.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/buffers.hpp"
11#include "condy/condy_uring.hpp"
12#include "condy/context.hpp"
13#include "condy/ring.hpp"
14#include "condy/utils.hpp"
15#include <bit>
16#include <cstddef>
17#include <cstdint>
18#include <stdexcept>
19#include <sys/mman.h>
20#include <sys/types.h>
21
22namespace condy {
23
33struct BufferInfo {
37 uint16_t bid;
41 uint16_t num_buffers;
42};
43
44namespace detail {
45
46class BundledProvidedBufferQueue {
47public:
48 using ReturnType = BufferInfo;
49
50 BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags)
51 : capacity_(std::bit_ceil(capacity)) {
52 auto &context = detail::Context::current();
53 auto bgid = context.next_bgid();
54
55 size_t data_size = capacity_ * sizeof(io_uring_buf);
56 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
57 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
58 if (data == MAP_FAILED) [[unlikely]] {
59 throw std::bad_alloc();
60 }
61 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
62 io_uring_buf_ring_init(br_);
63
64 io_uring_buf_reg reg = {};
65 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
66 reg.ring_entries = capacity_;
67 reg.bgid = bgid;
68 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
69 if (r != 0) [[unlikely]] {
70 munmap(data, data_size);
71 throw make_system_error("io_uring_register_buf_ring", -r);
72 }
73
74 bgid_ = bgid;
75 }
76
77 ~BundledProvidedBufferQueue() {
78 assert(br_ != nullptr);
79 size_t data_size = capacity_ * sizeof(io_uring_buf);
80 munmap(br_, data_size);
81 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
82 detail::Context::current().ring()->ring(), bgid_);
83 assert(r == 0);
84 if (r == 0) {
85 detail::Context::current().recycle_bgid(bgid_);
86 }
87 }
88
89 BundledProvidedBufferQueue(const BundledProvidedBufferQueue &) = delete;
90 BundledProvidedBufferQueue &
91 operator=(const BundledProvidedBufferQueue &) = delete;
92 BundledProvidedBufferQueue(BundledProvidedBufferQueue &&) = delete;
93 BundledProvidedBufferQueue &
94 operator=(BundledProvidedBufferQueue &&) = delete;
95
96public:
100 size_t size() const { return size_; }
101
105 size_t capacity() const { return capacity_; }
106
117 template <typename Buffer> uint16_t push(Buffer &&buffer) {
118 if (size_ >= capacity_) [[unlikely]] {
119 throw std::logic_error("Capacity exceeded");
120 }
121
122 auto mask = io_uring_buf_ring_mask(capacity_);
123 uint16_t bid = br_->tail & mask;
124 io_uring_buf_ring_add(br_, buffer.data(), buffer.size(), bid, mask, 0);
125 io_uring_buf_ring_advance(br_, 1);
126 size_++;
127
128 return bid;
129 }
130
131public:
132 uint16_t bgid() const { return bgid_; }
133
134 ReturnType handle_finish(int32_t res, uint32_t flags) {
135 if (!(flags & IORING_CQE_F_BUFFER)) {
136 return ReturnType{0, 0};
137 }
138
139 assert(res >= 0);
140
141 ReturnType result = {
142 .bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
143 .num_buffers = 0,
144 };
145
146#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
147 if (flags & IORING_CQE_F_BUF_MORE) {
148 return result;
149 }
150#endif
151
152 uint16_t curr_bid = result.bid;
153 auto bytes = res;
154 while (bytes > 0) {
155 auto &buf = br_->bufs[curr_bid];
156 assert(buf.bid == curr_bid);
157 uint32_t buf_size = buf.len;
158 bytes -= static_cast<int32_t>(buf_size);
159 result.num_buffers++;
160 }
161 assert(size_ >= result.num_buffers);
162 size_ -= result.num_buffers;
163
164 return result;
165 }
166
167private:
168 io_uring_buf_ring *br_ = nullptr;
169 uint32_t size_ = 0;
170 uint32_t capacity_;
171 uint16_t bgid_;
172};
173
174} // namespace detail
175
187class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue {
188public:
195 ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
196 : BundledProvidedBufferQueue(capacity, flags) {}
197};
198
199namespace detail {
200class BundledProvidedBufferPool;
201}
202
211struct ProvidedBuffer : public BufferBase {
212public:
213 ProvidedBuffer() = default;
214 ProvidedBuffer(void *data, size_t size,
215 detail::BundledProvidedBufferPool *pool)
216 : data_(data), size_(size), pool_(pool) {}
217 ProvidedBuffer(ProvidedBuffer &&other) noexcept
218 : data_(std::exchange(other.data_, nullptr)),
219 size_(std::exchange(other.size_, 0)),
220 pool_(std::exchange(other.pool_, nullptr)) {}
221 ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
222 if (this != &other) {
223 data_ = std::exchange(other.data_, nullptr);
224 size_ = std::exchange(other.size_, 0);
225 pool_ = std::exchange(other.pool_, nullptr);
226 }
227 return *this;
228 }
229
230 ~ProvidedBuffer() { reset(); }
231
232 ProvidedBuffer(const ProvidedBuffer &) = delete;
233 ProvidedBuffer &operator=(const ProvidedBuffer &) = delete;
234
235public:
239 void *data() const { return data_; }
240
244 size_t size() const { return size_; }
245
249 void reset();
250
254 bool owns_buffer() const { return pool_ != nullptr; }
255
256private:
257 void *data_ = nullptr;
258 size_t size_ = 0;
259 detail::BundledProvidedBufferPool *pool_ = nullptr;
260};
261
262namespace detail {
263
264class BundledProvidedBufferPool {
265public:
266 using ReturnType = std::vector<ProvidedBuffer>;
267
268 BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
269 unsigned int flags)
270 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
271 auto &context = detail::Context::current();
272 auto bgid = context.next_bgid();
273
274 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size);
275 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
276 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
277 if (data == MAP_FAILED) [[unlikely]] {
278 throw std::bad_alloc();
279 }
280 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
281 io_uring_buf_ring_init(br_);
282
283 io_uring_buf_reg reg = {};
284 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
285 reg.ring_entries = num_buffers_;
286 reg.bgid = bgid;
287 int r = io_uring_register_buf_ring(context.ring()->ring(), &reg, flags);
288 if (r != 0) [[unlikely]] {
289 munmap(data, data_size);
290 throw make_system_error("io_uring_register_buf_ring", -r);
291 }
292
293 bgid_ = bgid;
294
295 char *buffer_base =
296 static_cast<char *>(data) + sizeof(io_uring_buf) * num_buffers_;
297 auto mask = io_uring_buf_ring_mask(num_buffers_);
298 for (size_t bid = 0; bid < num_buffers_; bid++) {
299 char *ptr = buffer_base + bid * buffer_size;
300 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
301 static_cast<int>(bid));
302 }
303 io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));
304 }
305
306 ~BundledProvidedBufferPool() {
307 assert(br_ != nullptr);
308 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_);
309 munmap(br_, data_size);
310 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
311 detail::Context::current().ring()->ring(), bgid_);
312 assert(r == 0);
313 if (r == 0) {
314 detail::Context::current().recycle_bgid(bgid_);
315 }
316 }
317
318 BundledProvidedBufferPool(const BundledProvidedBufferPool &) = delete;
319 BundledProvidedBufferPool &
320 operator=(const BundledProvidedBufferPool &) = delete;
321 BundledProvidedBufferPool(BundledProvidedBufferPool &&) = delete;
322 BundledProvidedBufferPool &operator=(BundledProvidedBufferPool &&) = delete;
323
324public:
328 size_t capacity() const { return num_buffers_; }
329
333 size_t buffer_size() const { return buffer_size_; }
334
335public:
336 uint16_t bgid() const { return bgid_; }
337
338 ReturnType handle_finish(int32_t res, [[maybe_unused]] uint32_t flags) {
339 std::vector<ProvidedBuffer> buffers;
340
341 if (!(flags & IORING_CQE_F_BUFFER)) {
342 return buffers;
343 }
344
345 assert(res >= 0);
346
347#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
348 if (flags & IORING_CQE_F_BUF_MORE) {
349 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
350 char *data = get_buffer_(bid) + partial_size_;
351 buffers.emplace_back(data, res, nullptr);
352 partial_size_ += res;
353 return buffers;
354 }
355#endif
356
357 int32_t bytes = res;
358 while (bytes > 0) {
359 auto *buf_ptr = curr_io_uring_buf_();
360 uint16_t bid = buf_ptr->bid;
361 uint32_t curr_buffer_size = buffer_size_ - partial_size_;
362 char *data = get_buffer_(bid) + partial_size_;
363 buffers.emplace_back(data, curr_buffer_size, this);
364 bytes -= static_cast<int32_t>(curr_buffer_size);
365 partial_size_ = 0;
366 advance_io_uring_buf_();
367 }
368
369 return buffers;
370 }
371
372 void add_buffer_back(void *ptr) {
373 char *base = get_buffers_base_();
374 assert(ptr >= base);
375 size_t offset = static_cast<char *>(ptr) - base;
376 size_t bid = offset / buffer_size_;
377 assert(bid < num_buffers_);
378 char *buffer_ptr = base + bid * buffer_size_;
379 auto mask = io_uring_buf_ring_mask(num_buffers_);
380 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
381 io_uring_buf_ring_advance(br_, 1);
382 }
383
384private:
385 char *get_buffer_(uint16_t bid) const {
386 return get_buffers_base_() + static_cast<size_t>(bid) * buffer_size_;
387 }
388
389 char *get_buffers_base_() const {
390 return reinterpret_cast<char *>(br_) +
391 sizeof(io_uring_buf) * num_buffers_;
392 }
393
394 io_uring_buf *curr_io_uring_buf_() {
395 auto mask = io_uring_buf_ring_mask(num_buffers_);
396 return &br_->bufs[br_head_ & mask];
397 }
398
399 void advance_io_uring_buf_() { br_head_++; }
400
401private:
402 io_uring_buf_ring *br_ = nullptr;
403 uint32_t num_buffers_;
404 uint32_t buffer_size_;
405 uint32_t partial_size_ = 0;
406 uint16_t bgid_;
407 uint16_t br_head_ = 0;
408};
409
410} // namespace detail
411
413 if (pool_ != nullptr) {
414 pool_->add_buffer_back(data_);
415 }
416 data_ = nullptr;
417 size_ = 0;
418 pool_ = nullptr;
419}
420
433class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
434public:
435 using ReturnType = ProvidedBuffer;
436
444 ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
445 unsigned int flags = 0)
446 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
447
448public:
449 ReturnType handle_finish(int32_t res, uint32_t flags) {
450 auto buffers = BundledProvidedBufferPool::handle_finish(res, flags);
451 if (buffers.empty()) {
452 return ReturnType();
453 }
454 assert(buffers.size() == 1);
455 return std::move(buffers[0]);
456 }
457};
458
459} // namespace condy
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
Definition condy.hpp:28
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Definition buffers.hpp:85
Wrapper classes for liburing interfaces.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
void * data() const
Get the data pointer of the provided buffer.
size_t size() const
Get the size of the provided buffer.
void reset()
Reset the provided buffer, returning it to the pool if owned.
bool owns_buffer() const
Check if the provided buffer owns a buffer from a pool.
Internal utility classes and functions used by Condy.