Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
runtime.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
9#include "condy/context.hpp"
10#include "condy/intrusive.hpp"
11#include "condy/invoker.hpp"
12#include "condy/ring.hpp"
14#include "condy/singleton.hpp"
15#include "condy/utils.hpp"
16#include "condy/work_type.hpp"
17#include <atomic>
18#include <cerrno>
19#include <cstddef>
20#include <cstdint>
21#include <cstring>
22#include <mutex>
23
24namespace condy {
25
26namespace detail {
27
28class ThreadLocalRing : public ThreadLocalSingleton<ThreadLocalRing> {
29public:
30 Ring *ring() { return &ring_; }
31
32 ThreadLocalRing() {
33 io_uring_params params = {};
34 params.flags |= IORING_SETUP_CLAMP;
35 params.flags |= IORING_SETUP_SINGLE_ISSUER;
36 params.flags |= IORING_SETUP_SUBMIT_ALL;
37 [[maybe_unused]] int r = ring_.init(8, &params);
38 // If we can construct Runtime, we should be able to construct this
39 // thread-local ring.
40 assert(r == 0);
41 }
42
43private:
44 Ring ring_;
45};
46
47inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept {
48#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
49 return io_uring_register_sync_msg(sqe_data);
50#else
51 auto *ring = ThreadLocalRing::current().ring();
52 auto *sqe = ring->get_sqe();
53 *sqe = *sqe_data;
54 int r = 0;
55 auto n =
56 ring->reap_completions_wait([&](io_uring_cqe *cqe) { r = cqe->res; });
57 if (n < 0) {
58 return static_cast<int>(n);
59 }
60 assert(n == 1);
61 return r;
62#endif
63}
64
65class CancelRequest {
66public:
67 CancelRequest(uintptr_t data) : data_(data) {}
68
69 void wait() noexcept {
70 while (!finished_.load(std::memory_order_acquire)) {
71 finished_.wait(false, std::memory_order_relaxed);
72 }
73 }
74
75 void notify() noexcept {
76 finished_.store(true, std::memory_order_release);
77 finished_.notify_one();
78 }
79
80 uintptr_t data() const noexcept { return data_; }
81
82private:
83 uintptr_t data_;
84 std::atomic_bool finished_ = false;
85};
86
87} // namespace detail
88
89class OpFinishHandleBase {
90public:
91 using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept;
92
93 bool handle(io_uring_cqe *cqe) noexcept {
94 assert(handle_func_ != nullptr);
95 return handle_func_(this, cqe);
96 }
97
98protected:
99 OpFinishHandleBase() = default;
100
101protected:
102 HandleFunc handle_func_ = nullptr;
103};
104
111class Runtime {
112public:
117 Runtime(const RuntimeOptions &options = {}) {
118 io_uring_params params;
119 std::memset(&params, 0, sizeof(params));
120
121 params.flags |= IORING_SETUP_CLAMP;
122 params.flags |= IORING_SETUP_SINGLE_ISSUER;
123 params.flags |= IORING_SETUP_SUBMIT_ALL;
124 params.flags |= IORING_SETUP_R_DISABLED;
125
126 size_t ring_entries = options.sq_size_;
127 if (options.cq_size_ != 0) { // 0 means default
128 params.flags |= IORING_SETUP_CQSIZE;
129 params.cq_entries = options.cq_size_;
130 }
131
132 if (options.enable_iopoll_) {
133 params.flags |= IORING_SETUP_IOPOLL;
134#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
135 if (options.enable_hybrid_iopoll_) {
136 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
137 }
138#endif
139 }
140
141 if (options.enable_sqpoll_) {
142 params.flags |= IORING_SETUP_SQPOLL;
143 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
144 if (options.sqpoll_thread_cpu_.has_value()) {
145 params.flags |= IORING_SETUP_SQ_AFF;
146 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
147 }
148 }
149
150 if (options.attach_wq_target_ != nullptr) {
151 params.flags |= IORING_SETUP_ATTACH_WQ;
152 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
153 }
154
155 if (options.enable_defer_taskrun_) {
156 params.flags |= IORING_SETUP_DEFER_TASKRUN;
157 params.flags |= IORING_SETUP_TASKRUN_FLAG;
158 }
159
160 if (options.enable_coop_taskrun_) {
161 params.flags |= IORING_SETUP_COOP_TASKRUN;
162 params.flags |= IORING_SETUP_TASKRUN_FLAG;
163 }
164
165 if (options.enable_sqe128_) {
166 params.flags |= IORING_SETUP_SQE128;
167 }
168
169 if (options.enable_cqe32_) {
170 params.flags |= IORING_SETUP_CQE32;
171 }
172
173#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
174 if (options.enable_sqe_mixed_) {
175 params.flags |= IORING_SETUP_SQE_MIXED;
176 }
177#endif
178
179#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
180 if (options.enable_cqe_mixed_) {
181 params.flags |= IORING_SETUP_CQE_MIXED;
182 }
183#endif
184
185 void *buf = nullptr;
186 size_t buf_size = 0;
187#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
188 if (options.enable_no_mmap_) {
189 params.flags |= IORING_SETUP_NO_MMAP;
190 buf = options.no_mmap_buf_;
191 buf_size = options.no_mmap_buf_size_;
192 }
193#endif
194
195#if !IO_URING_CHECK_VERSION(2, 14) // >= 2.14
196 if (options.enable_sq_rewind_) {
197 params.flags |= IORING_SETUP_SQ_REWIND;
198 }
199#endif
200
201 int r = ring_.init(ring_entries, &params, buf, buf_size);
202 if (r < 0) {
203 throw make_system_error("io_uring_queue_init_params", -r);
204 }
205
206 event_interval_ = options.event_interval_;
207 disable_register_ring_fd_ = options.disable_register_ring_fd_;
208 }
209
210 ~Runtime() { ring_.destroy(); }
211
212 Runtime(const Runtime &) = delete;
213 Runtime &operator=(const Runtime &) = delete;
214 Runtime(Runtime &&) = delete;
215 Runtime &operator=(Runtime &&) = delete;
216
217public:
225 void allow_exit() noexcept {
226 pending_works_--;
227 wakeup_();
228 }
229
230 void schedule(WorkInvoker *work) noexcept {
231 auto *curr_runtime = detail::Context::current().runtime();
232 if (curr_runtime == this) {
233 local_queue_.push_back(work);
234 return;
235 }
236
237 auto state = state_.load();
238 if (state == State::Enabled) {
239 // Fast path: if the ring is enabled, we can directly schedule the
240 // work
241 tsan_release(work);
242 schedule_msg_ring_(curr_runtime,
243 encode_work(work, WorkType::Schedule));
244 } else {
245 // Slow path: if the ring is not enabled, we need to acquire the
246 // mutex to ensure the work is scheduled before the ring is enabled
247 std::unique_lock<std::mutex> lock(mutex_);
248 state = state_.load();
249 if (state == State::Enabled) {
250 lock.unlock();
251 tsan_release(work);
252 schedule_msg_ring_(curr_runtime,
253 encode_work(work, WorkType::Schedule));
254 } else {
255 global_queue_.push_back(work);
256 }
257 }
258 }
259
260 // Internal use only. Schedule a cancel request for the given data.
261 void cancel(uintptr_t data) noexcept {
262 auto *curr_runtime = detail::Context::current().runtime();
263 if (curr_runtime == this) {
264 io_uring_sqe *sqe = ring_.get_sqe();
265 prep_cancel_(sqe, data);
266 return;
267 }
268
269 auto state = state_.load();
270 if (state != State::Enabled) {
271 return;
272 }
273
274 detail::CancelRequest request(data);
275 tsan_release(&request);
276 schedule_msg_ring_(curr_runtime,
277 encode_work(&request, WorkType::Cancel));
278 if (curr_runtime != nullptr) {
279 // Ensure the cancel msg is submitted.
280 curr_runtime->ring_.submit();
281 }
282 // Block until the runtime thread has submitted the cancel SQE. This is
283 // important to prevent address reuse of the same data pointer, which
284 // can lead to incorrect cancellation or other bugs.
285 request.wait();
286 }
287
288 void pend_work() noexcept { pending_works_++; }
289
290 void resume_work() noexcept { pending_works_--; }
291
301 void run() {
302 State expected = State::Idle;
303 bool success = state_.compare_exchange_strong(expected, State::Running);
304 if (!success) {
305 throw std::runtime_error(
306 "Runtime is already running or has been stopped");
307 }
308 auto d1 = defer([this]() { state_.store(State::Stopped); });
309
310 [[maybe_unused]] int r;
311 r = io_uring_enable_rings(ring_.ring());
312 assert(r == 0);
313
314 {
315 std::lock_guard<std::mutex> lock(mutex_);
316 flush_global_queue_();
317 // Now that the ring is enabled and all pending works are scheduled,
318 // we can set the state to Enabled.
319 state_.store(State::Enabled);
320 }
321
322 if (!disable_register_ring_fd_) {
323 r = io_uring_register_ring_fd(ring_.ring());
324 assert(r == 1); // 1 indicates success for this call
325 }
326
327 detail::Context::current().init(&ring_, this);
328 auto d2 = defer([]() { detail::Context::current().reset(); });
329
330 while (true) {
331 tick_count_++;
332
333 if (tick_count_ % event_interval_ == 0) {
334 flush_ring_();
335 }
336
337 if (auto *work = local_queue_.pop_front()) {
338 (*work)();
339 continue;
340 }
341
342 if (pending_works_ == 0) {
343 break;
344 }
345 flush_ring_wait_();
346 }
347 }
348
353 auto &fd_table() noexcept { return ring_.fd_table(); }
354
359 auto &buffer_table() noexcept { return ring_.buffer_table(); }
360
365 auto &settings() noexcept { return ring_.settings(); }
366
367private:
368 void schedule_msg_ring_(Runtime *curr_runtime, uintptr_t data) noexcept {
369 int ring_fd = this->ring_.ring()->ring_fd;
370 if (curr_runtime != nullptr) {
371 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
372 prep_msg_ring_(ring_fd, sqe, data);
373 curr_runtime->pend_work();
374 } else {
375 io_uring_sqe sqe = {};
376 prep_msg_ring_(ring_fd, &sqe, data);
377 int r = detail::sync_msg_ring(&sqe);
378 if (r < 0) {
379 panic_on(std::format("sync_msg_ring: {}", std::strerror(-r)));
380 }
381 }
382 }
383
384 // Wakeup the runtime if it's blocked in Ring::reap_completions_wait()
385 void wakeup_() noexcept {
386 auto *curr_runtime = detail::Context::current().runtime();
387 if (curr_runtime == this) {
388 return;
389 }
390
391 auto state = state_.load();
392 if (state != State::Enabled) {
393 return;
394 }
395
396 schedule_msg_ring_(curr_runtime,
397 encode_work(nullptr, WorkType::Ignore));
398 }
399
400 void flush_global_queue_() noexcept {
401 local_queue_.push_back(std::move(global_queue_));
402 }
403
404 static void prep_msg_ring_(int ring_fd, io_uring_sqe *sqe,
405 uintptr_t data) noexcept {
406 io_uring_prep_msg_ring(sqe, ring_fd, 0, data, 0);
407 io_uring_sqe_set_data64(sqe, encode_work(nullptr, WorkType::Schedule));
408 }
409
410 static void prep_cancel_(io_uring_sqe *sqe, uintptr_t data) noexcept {
411 io_uring_prep_cancel64(sqe, data, 0);
412 io_uring_sqe_set_data64(sqe, encode_work(nullptr, WorkType::Ignore));
413 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
414 }
415
416 void flush_ring_() noexcept {
417 auto r = ring_.reap_completions(
418 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
419 if (r < 0) {
420 panic_on(std::format("io_uring_peek_cqe: {}",
421 std::strerror(static_cast<int>(-r))));
422 }
423 }
424
425 void flush_ring_wait_() noexcept {
426 auto r = ring_.reap_completions_wait(
427 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
428 if (r < 0) {
429 panic_on(std::format("io_uring_submit_and_wait: {}",
430 std::strerror(static_cast<int>(-r))));
431 }
432 }
433
434 void process_cqe_(io_uring_cqe *cqe) noexcept {
435 auto [data, type] = decode_work(io_uring_cqe_get_data64(cqe));
436
437 if (type == WorkType::Ignore) {
438 // No-op
439 assert(cqe->res != -EINVAL); // If EINVAL, something is wrong
440 } else if (type == WorkType::Schedule) {
441 if (data == nullptr) {
442 if (cqe->res < 0) {
443 panic_on(std::format("io_uring_prep_msg_ring: {}",
444 std::strerror(-cqe->res)));
445 }
446 pending_works_--;
447 } else {
448 auto *work = static_cast<WorkInvoker *>(data);
449 tsan_acquire(work);
450 (*work)();
451 }
452 } else if (type == WorkType::Cancel) {
453 detail::CancelRequest *request =
454 static_cast<detail::CancelRequest *>(data);
455 tsan_acquire(request);
456 io_uring_sqe *sqe = ring_.get_sqe();
457 prep_cancel_(sqe, request->data());
458 request->notify();
459 } else if (type == WorkType::Common) {
460 auto *handle = static_cast<OpFinishHandleBase *>(data);
461 auto op_finish = handle->handle(cqe);
462 if (op_finish) {
463 pending_works_--;
464 }
465 } else {
466 unreachable();
467 }
468 }
469
470private:
471 enum class State : uint8_t {
472 Idle, // Not running
473 Running, // Started running
474 Enabled, // Running and ring enabled
475 Stopped, // Stopped
476 };
477 static_assert(std::atomic<State>::is_always_lock_free);
478
479 using WorkListQueue =
480 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
481
482 // Global state
483 std::mutex mutex_;
484 WorkListQueue global_queue_;
485 std::atomic_size_t pending_works_ = 1;
486 std::atomic<State> state_ = State::Idle;
487
488 // Local state
489 WorkListQueue local_queue_;
490 Ring ring_;
491 size_t tick_count_ = 0;
492
493 // Configurable parameters
494 size_t event_interval_ = 61;
495 bool disable_register_ring_fd_ = false;
496};
497
504inline auto &current_runtime() noexcept {
505 return *detail::Context::current().runtime();
506}
507
508} // namespace condy
The event loop runtime for executing asynchronous.
Definition runtime.hpp:111
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:301
auto & buffer_table() noexcept
Get the buffer table of the runtime.
Definition runtime.hpp:359
auto & fd_table() noexcept
Get the file descriptor table of the runtime.
Definition runtime.hpp:353
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:225
Runtime(const RuntimeOptions &options={})
Construct a new Runtime object.
Definition runtime.hpp:117
auto & settings() noexcept
Get the ring settings of the runtime.
Definition runtime.hpp:365
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:92
auto & current_runtime() noexcept
Get the current runtime.
Definition runtime.hpp:504
Wrapper classes for liburing interfaces.
Internal utility classes and functions used by Condy.