Condy v1.1.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
runtime.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
9#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/ring.hpp"
15#include "condy/utils.hpp"
16#include "condy/work_type.hpp"
17#include <cstddef>
18#include <cstdint>
19#include <cstring>
20#include <mutex>
21#include <sys/eventfd.h>
22
23namespace condy {
24
25namespace detail {
26
27#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
28class AsyncWaiter {
29public:
30 void async_wait(Ring &) {}
31
32 void notify(Ring &ring) {
33 io_uring_sqe sqe = {};
34 io_uring_prep_msg_ring(
35 &sqe, ring.ring()->ring_fd, 0,
36 reinterpret_cast<uint64_t>(encode_work(nullptr, WorkType::Notify)),
37 0);
38 io_uring_register_sync_msg(&sqe);
39 }
40};
41#else
42class AsyncWaiter {
43public:
44 AsyncWaiter() {
45 notify_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
46 if (notify_fd_ < 0) {
47 throw make_system_error("eventfd", errno);
48 }
49 }
50
51 ~AsyncWaiter() { close(notify_fd_); }
52
53 void async_wait(Ring &ring) {
54 eventfd_read(notify_fd_, &dummy_);
55 io_uring_sqe *sqe = ring.get_sqe();
56 io_uring_prep_read(sqe, notify_fd_, &dummy_, sizeof(dummy_), 0);
57 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Notify));
58 }
59
60 void notify(Ring &) { eventfd_write(notify_fd_, 1); }
61
62private:
63 int notify_fd_;
64 eventfd_t dummy_;
65};
66#endif
67
68} // namespace detail
69
76class Runtime {
77public:
82 Runtime(const RuntimeOptions &options = {}) {
83 io_uring_params params;
84 std::memset(&params, 0, sizeof(params));
85
86 params.flags |= IORING_SETUP_CLAMP;
87 params.flags |= IORING_SETUP_SINGLE_ISSUER;
88 params.flags |= IORING_SETUP_SUBMIT_ALL;
89 params.flags |= IORING_SETUP_R_DISABLED;
90
91 size_t ring_entries = options.sq_size_;
92 if (options.cq_size_ != 0) { // 0 means default
93 params.flags |= IORING_SETUP_CQSIZE;
94 params.cq_entries = options.cq_size_;
95 }
96
97 if (options.enable_iopoll_) {
98 params.flags |= IORING_SETUP_IOPOLL;
99#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
100 if (options.enable_hybrid_iopoll_) {
101 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
102 }
103#endif
104 }
105
106 if (options.enable_sqpoll_) {
107 params.flags |= IORING_SETUP_SQPOLL;
108 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
109 if (options.sqpoll_thread_cpu_.has_value()) {
110 params.flags |= IORING_SETUP_SQ_AFF;
111 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
112 }
113 }
114
115 if (options.attach_wq_target_ != nullptr) {
116 params.flags |= IORING_SETUP_ATTACH_WQ;
117 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
118 }
119
120 if (options.enable_defer_taskrun_) {
121 params.flags |= IORING_SETUP_DEFER_TASKRUN;
122 }
123
124 if (options.enable_coop_taskrun_) {
125 params.flags |= IORING_SETUP_COOP_TASKRUN;
126 if (options.enable_coop_taskrun_flag_) {
127 params.flags |= IORING_SETUP_TASKRUN_FLAG;
128 }
129 }
130
131 if (options.enable_sqe128_) {
132 params.flags |= IORING_SETUP_SQE128;
133 }
134
135 if (options.enable_cqe32_) {
136 params.flags |= IORING_SETUP_CQE32;
137 }
138
139 void *buf = nullptr;
140 size_t buf_size = 0;
141#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
142 if (options.enable_no_mmap_) {
143 params.flags |= IORING_SETUP_NO_MMAP;
144 buf = options.no_mmap_buf_;
145 buf_size = options.no_mmap_buf_size_;
146 }
147#endif
148
149 int r = ring_.init(ring_entries, &params, buf, buf_size);
150 if (r < 0) {
151 throw make_system_error("io_uring_queue_init_params", -r);
152 }
153
154 event_interval_ = options.event_interval_;
155 disable_register_ring_fd_ = options.disable_register_ring_fd_;
156 }
157
158 ~Runtime() { ring_.destroy(); }
159
160 Runtime(const Runtime &) = delete;
161 Runtime &operator=(const Runtime &) = delete;
162 Runtime(Runtime &&) = delete;
163 Runtime &operator=(Runtime &&) = delete;
164
165public:
173 void allow_exit() {
174 pending_works_--;
175 notify();
176 }
177
178 void notify() { async_waiter_.notify(ring_); }
179
180 void schedule(WorkInvoker *work) {
181 auto *runtime = Context::current().runtime();
182 if (runtime == this) {
183 local_queue_.push_back(work);
184 return;
185 }
186
187 auto state = state_.load();
188 if (runtime != nullptr && state == State::Enabled) {
189 tsan_release(work);
190 io_uring_sqe *sqe = runtime->ring_.get_sqe();
191 prep_msg_ring_(sqe, work);
192 runtime->pend_work();
193 return;
194 }
195
196#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
197 if (runtime == nullptr && state == State::Enabled) {
198 tsan_release(work);
199 io_uring_sqe sqe = {};
200 prep_msg_ring_(&sqe, work);
201 [[maybe_unused]] int r = io_uring_register_sync_msg(&sqe);
202 assert(r == 0);
203 return;
204 }
205#endif
206
207 {
208 std::lock_guard<std::mutex> lock(mutex_);
209 bool need_notify = global_queue_.empty();
210 global_queue_.push_back(work);
211 if (need_notify) {
212 notify();
213 }
214 }
215 }
216
217 void pend_work() { pending_works_++; }
218
219 void resume_work() { pending_works_--; }
220
229 void run() {
230 State expected = State::Idle;
231 if (!state_.compare_exchange_strong(expected, State::Running)) {
232 throw std::logic_error("Runtime is already running or stopped");
233 }
234 auto d1 = defer([this]() { state_.store(State::Stopped); });
235
236 [[maybe_unused]] int r;
237 r = io_uring_enable_rings(ring_.ring());
238 assert(r == 0);
239
240 state_.store(State::Enabled);
241
242 if (!disable_register_ring_fd_) {
243 r = io_uring_register_ring_fd(ring_.ring());
244 if (r != 1) { // 1 indicates success for this call
245 throw make_system_error("io_uring_register_ring_fd", -r);
246 }
247 }
248
249 Context::current().init(&ring_, this);
250 auto d2 = defer([]() { Context::current().reset(); });
251
252 {
253 std::lock_guard<std::mutex> lock(mutex_);
254 flush_global_queue_();
255 }
256
257 while (true) {
258 tick_count_++;
259
260 if (tick_count_ % event_interval_ == 0) {
261 flush_ring_();
262 }
263
264 if (auto *work = local_queue_.pop_front()) {
265 (*work)();
266 continue;
267 }
268
269 if (pending_works_ == 0) {
270 break;
271 }
272 flush_ring_wait_();
273 }
274 }
275
280 auto &fd_table() { return ring_.fd_table(); }
281
286 auto &buffer_table() { return ring_.buffer_table(); }
287
292 auto &settings() { return ring_.settings(); }
293
294private:
295 void flush_global_queue_() {
296 local_queue_.push_back(std::move(global_queue_));
297 async_waiter_.async_wait(ring_);
298 }
299
300 void prep_msg_ring_(io_uring_sqe *sqe, WorkInvoker *work) {
301 auto data = encode_work(work, WorkType::Schedule);
302 io_uring_prep_msg_ring(sqe, this->ring_.ring()->ring_fd, 0,
303 reinterpret_cast<uint64_t>(data), 0);
304 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Schedule));
305 }
306
307 size_t flush_ring_() {
308 return ring_.reap_completions(
309 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
310 }
311
312 size_t flush_ring_wait_() {
313 return ring_.reap_completions_wait(
314 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
315 }
316
317 void process_cqe_(io_uring_cqe *cqe) {
318 auto *data_raw = io_uring_cqe_get_data(cqe);
319 auto [data, type] = decode_work(data_raw);
320
321 if (type == WorkType::Ignore) {
322 // No-op
323 assert(cqe->res != -EINVAL); // If EINVAL, something is wrong
324 } else if (type == WorkType::Notify) {
325 std::lock_guard<std::mutex> lock(mutex_);
326 flush_global_queue_();
327 } else if (type == WorkType::SendFd) {
328 auto &fd_table = ring_.fd_table();
329 if (fd_table.fd_accepter_ == nullptr) [[unlikely]] {
330 throw std::logic_error("No way to accept sent fd");
331 }
332 uint64_t payload = reinterpret_cast<uint64_t>(data) >> 3;
333 if (payload == 0) { // Auto-allocate
334 fd_table.fd_accepter_(cqe->res);
335 } else {
336 int target_fd = static_cast<int>(payload - 1);
337 fd_table.fd_accepter_(target_fd);
338 }
339 } else if (type == WorkType::Schedule) {
340 if (data == nullptr) {
341 assert(cqe->res == 0);
342 pending_works_--;
343 } else {
344 auto *work = static_cast<WorkInvoker *>(data);
345 tsan_acquire(data);
346 local_queue_.push_back(work);
347 }
348 } else if (type == WorkType::MultiShot) {
349 auto *handle = static_cast<ExtendOpFinishHandle *>(data);
350 handle->set_result(cqe->res, cqe->flags);
351 if (cqe->flags & IORING_CQE_F_MORE) {
352 handle->invoke_extend(0); // res not used here
353 } else {
354 pending_works_--;
355 local_queue_.push_back(handle);
356 }
357 } else if (type == WorkType::ZeroCopy) {
358 auto *handle = static_cast<ExtendOpFinishHandle *>(data);
359 if (cqe->flags & IORING_CQE_F_MORE) {
360 handle->set_result(cqe->res, cqe->flags);
361 local_queue_.push_back(handle);
362 } else {
363 pending_works_--;
364 if (cqe->flags & IORING_CQE_F_NOTIF) {
365 handle->invoke_extend(cqe->res);
366 } else {
367 handle->set_result(cqe->res, cqe->flags);
368 local_queue_.push_back(handle);
369 handle->invoke_extend(0);
370 }
371 }
372 } else if (type == WorkType::Common) {
373 auto *handle = static_cast<OpFinishHandle *>(data);
374 handle->set_result(cqe->res, cqe->flags);
375 pending_works_--;
376 local_queue_.push_back(handle);
377 } else {
378 assert(false && "Invalid work type");
379 }
380 }
381
382private:
383 enum class State : uint8_t {
384 Idle, // Not running
385 Running, // Started running
386 Enabled, // Running and ring enabled
387 Stopped, // Stopped
388 };
389 static_assert(std::atomic<State>::is_always_lock_free);
390
391 using WorkListQueue =
392 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
393
394 // Global state
395 std::mutex mutex_;
396 detail::AsyncWaiter async_waiter_;
397 WorkListQueue global_queue_;
398 std::atomic_size_t pending_works_ = 1;
399 std::atomic<State> state_ = State::Idle;
400
401 // Local state
402 WorkListQueue local_queue_;
403 Ring ring_;
404 size_t tick_count_ = 0;
405
406 // Configurable parameters
407 size_t event_interval_ = 61;
408 bool disable_register_ring_fd_ = false;
409};
410
417inline auto &current_runtime() { return *Context::current().runtime(); }
418
424inline void set_current_cred_id(uint16_t id) {
425 Context::current().set_cred_id(id);
426}
427
428} // namespace condy
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:229
void allow_exit()
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:173
auto & settings()
Get the ring settings of the runtime.
Definition runtime.hpp:292
auto & buffer_table()
Get the buffer table of the runtime.
Definition runtime.hpp:286
Runtime(const RuntimeOptions &options={})
Construct a new Runtime object.
Definition runtime.hpp:82
auto & fd_table()
Get the file descriptor table of the runtime.
Definition runtime.hpp:280
Definitions of finish handle types for asynchronous operations.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
void set_current_cred_id(uint16_t id)
Set the current cred id object.
Definition runtime.hpp:424
auto & current_runtime()
Get the current runtime.
Definition runtime.hpp:417
Defer defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:56
Wrapper classes for liburing interfaces.
Runtime options.
Definition runtime_options.hpp:22
Internal utility classes and functions used by Condy.