Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
runtime.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
9#include "condy/context.hpp"
10#include "condy/intrusive.hpp"
11#include "condy/invoker.hpp"
12#include "condy/ring.hpp"
14#include "condy/singleton.hpp"
15#include "condy/utils.hpp"
16#include "condy/work_type.hpp"
17#include <cerrno>
18#include <cstddef>
19#include <cstdint>
20#include <cstring>
21#include <mutex>
22
23namespace condy {
24
25namespace detail {
26
27class ThreadLocalRing : public ThreadLocalSingleton<ThreadLocalRing> {
28public:
29 Ring *ring() { return &ring_; }
30
31 ThreadLocalRing() {
32 io_uring_params params = {};
33 params.flags |= IORING_SETUP_CLAMP;
34 params.flags |= IORING_SETUP_SINGLE_ISSUER;
35 params.flags |= IORING_SETUP_SUBMIT_ALL;
36 [[maybe_unused]] int r = ring_.init(8, &params);
37 // If we can construct Runtime, we should be able to construct this
38 // thread-local ring.
39 assert(r == 0);
40 }
41
42private:
43 Ring ring_;
44};
45
46inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept {
47#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
48 return io_uring_register_sync_msg(sqe_data);
49#else
50 auto *ring = ThreadLocalRing::current().ring();
51 auto *sqe = ring->get_sqe();
52 *sqe = *sqe_data;
53 int r = 0;
54 auto n =
55 ring->reap_completions_wait([&](io_uring_cqe *cqe) { r = cqe->res; });
56 if (n < 0) {
57 return static_cast<int>(n);
58 }
59 assert(n == 1);
60 return r;
61#endif
62}
63
64} // namespace detail
65
66class OpFinishHandleBase {
67public:
68 using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept;
69
70 bool handle(io_uring_cqe *cqe) noexcept {
71 assert(handle_func_ != nullptr);
72 return handle_func_(this, cqe);
73 }
74
75 void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }
76
77protected:
78 OpFinishHandleBase() = default;
79
80protected:
81 HandleFunc handle_func_ = nullptr;
82 Invoker *invoker_ = nullptr;
83};
84
91class Runtime {
92public:
97 Runtime(const RuntimeOptions &options = {}) {
98 io_uring_params params;
99 std::memset(&params, 0, sizeof(params));
100
101 params.flags |= IORING_SETUP_CLAMP;
102 params.flags |= IORING_SETUP_SINGLE_ISSUER;
103 params.flags |= IORING_SETUP_SUBMIT_ALL;
104 params.flags |= IORING_SETUP_R_DISABLED;
105
106 size_t ring_entries = options.sq_size_;
107 if (options.cq_size_ != 0) { // 0 means default
108 params.flags |= IORING_SETUP_CQSIZE;
109 params.cq_entries = options.cq_size_;
110 }
111
112 if (options.enable_iopoll_) {
113 params.flags |= IORING_SETUP_IOPOLL;
114#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
115 if (options.enable_hybrid_iopoll_) {
116 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
117 }
118#endif
119 }
120
121 if (options.enable_sqpoll_) {
122 params.flags |= IORING_SETUP_SQPOLL;
123 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
124 if (options.sqpoll_thread_cpu_.has_value()) {
125 params.flags |= IORING_SETUP_SQ_AFF;
126 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
127 }
128 }
129
130 if (options.attach_wq_target_ != nullptr) {
131 params.flags |= IORING_SETUP_ATTACH_WQ;
132 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
133 }
134
135 if (options.enable_defer_taskrun_) {
136 params.flags |= IORING_SETUP_DEFER_TASKRUN;
137 params.flags |= IORING_SETUP_TASKRUN_FLAG;
138 }
139
140 if (options.enable_coop_taskrun_) {
141 params.flags |= IORING_SETUP_COOP_TASKRUN;
142 params.flags |= IORING_SETUP_TASKRUN_FLAG;
143 }
144
145 if (options.enable_sqe128_) {
146 params.flags |= IORING_SETUP_SQE128;
147 }
148
149 if (options.enable_cqe32_) {
150 params.flags |= IORING_SETUP_CQE32;
151 }
152
153#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
154 if (options.enable_sqe_mixed_) {
155 params.flags |= IORING_SETUP_SQE_MIXED;
156 }
157#endif
158
159#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
160 if (options.enable_cqe_mixed_) {
161 params.flags |= IORING_SETUP_CQE_MIXED;
162 }
163#endif
164
165 void *buf = nullptr;
166 size_t buf_size = 0;
167#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
168 if (options.enable_no_mmap_) {
169 params.flags |= IORING_SETUP_NO_MMAP;
170 buf = options.no_mmap_buf_;
171 buf_size = options.no_mmap_buf_size_;
172 }
173#endif
174
175 int r = ring_.init(ring_entries, &params, buf, buf_size);
176 if (r < 0) {
177 throw make_system_error("io_uring_queue_init_params", -r);
178 }
179
180 event_interval_ = options.event_interval_;
181 disable_register_ring_fd_ = options.disable_register_ring_fd_;
182 }
183
184 ~Runtime() { ring_.destroy(); }
185
186 Runtime(const Runtime &) = delete;
187 Runtime &operator=(const Runtime &) = delete;
188 Runtime(Runtime &&) = delete;
189 Runtime &operator=(Runtime &&) = delete;
190
191public:
199 void allow_exit() noexcept {
200 pending_works_--;
201 wakeup_();
202 }
203
204 void schedule(WorkInvoker *work) noexcept {
205 auto *curr_runtime = detail::Context::current().runtime();
206 if (curr_runtime == this) {
207 local_queue_.push_back(work);
208 return;
209 }
210
211 auto state = state_.load();
212 if (state == State::Enabled) {
213 // Fast path: if the ring is enabled, we can directly schedule the
214 // work
215 tsan_release(work);
216 schedule_msg_ring_(curr_runtime,
217 encode_work(work, WorkType::Schedule));
218 } else {
219 // Slow path: if the ring is not enabled, we need to acquire the
220 // mutex to ensure the work is scheduled before the ring is enabled
221 std::unique_lock<std::mutex> lock(mutex_);
222 state = state_.load();
223 if (state == State::Enabled) {
224 lock.unlock();
225 tsan_release(work);
226 schedule_msg_ring_(curr_runtime,
227 encode_work(work, WorkType::Schedule));
228 } else {
229 global_queue_.push_back(work);
230 }
231 }
232 }
233
234 // Internal use only. Schedule a cancel request for the given data.
235 void cancel(void *data) noexcept {
236 // Ensure align of 8 for encoding
237 assert(reinterpret_cast<intptr_t>(data) % 8 == 0);
238 auto *curr_runtime = detail::Context::current().runtime();
239 if (curr_runtime == this) {
240 io_uring_sqe *sqe = ring_.get_sqe();
241 prep_cancel_(sqe, data);
242 return;
243 }
244
245 auto state = state_.load();
246 if (state != State::Enabled) {
247 return;
248 }
249
250 schedule_msg_ring_(curr_runtime, encode_work(data, WorkType::Cancel));
251 }
252
253 void pend_work() noexcept { pending_works_++; }
254
255 void resume_work() noexcept { pending_works_--; }
256
266 void run() {
267 State expected = State::Idle;
268 bool success = state_.compare_exchange_strong(expected, State::Running);
269 if (!success) {
270 throw std::runtime_error(
271 "Runtime is already running or has been stopped");
272 }
273 auto d1 = defer([this]() { state_.store(State::Stopped); });
274
275 [[maybe_unused]] int r;
276 r = io_uring_enable_rings(ring_.ring());
277 assert(r == 0);
278
279 {
280 std::lock_guard<std::mutex> lock(mutex_);
281 flush_global_queue_();
282 // Now that the ring is enabled and all pending works are scheduled,
283 // we can set the state to Enabled.
284 state_.store(State::Enabled);
285 }
286
287 if (!disable_register_ring_fd_) {
288 r = io_uring_register_ring_fd(ring_.ring());
289 assert(r == 1); // 1 indicates success for this call
290 }
291
292 detail::Context::current().init(&ring_, this);
293 auto d2 = defer([]() { detail::Context::current().reset(); });
294
295 while (true) {
296 tick_count_++;
297
298 if (tick_count_ % event_interval_ == 0) {
299 flush_ring_();
300 }
301
302 if (auto *work = local_queue_.pop_front()) {
303 (*work)();
304 continue;
305 }
306
307 if (pending_works_ == 0) {
308 break;
309 }
310 flush_ring_wait_();
311 }
312 }
313
318 auto &fd_table() noexcept { return ring_.fd_table(); }
319
324 auto &buffer_table() noexcept { return ring_.buffer_table(); }
325
330 auto &settings() noexcept { return ring_.settings(); }
331
332private:
333 void schedule_msg_ring_(Runtime *curr_runtime, void *data) noexcept {
334 int ring_fd = this->ring_.ring()->ring_fd;
335 if (curr_runtime != nullptr) {
336 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
337 prep_msg_ring_(ring_fd, sqe, data);
338 curr_runtime->pend_work();
339 } else {
340 io_uring_sqe sqe = {};
341 prep_msg_ring_(ring_fd, &sqe, data);
342 int r = detail::sync_msg_ring(&sqe);
343 if (r < 0) {
344 panic_on(std::format("sync_msg_ring: {}", std::strerror(-r)));
345 }
346 }
347 }
348
349 // Wakeup the runtime if it's blocked in Ring::reap_completions_wait()
350 void wakeup_() noexcept {
351 auto *curr_runtime = detail::Context::current().runtime();
352 if (curr_runtime == this) {
353 return;
354 }
355
356 auto state = state_.load();
357 if (state != State::Enabled) {
358 return;
359 }
360
361 schedule_msg_ring_(curr_runtime,
362 encode_work(nullptr, WorkType::Ignore));
363 }
364
365 void flush_global_queue_() noexcept {
366 local_queue_.push_back(std::move(global_queue_));
367 }
368
369 static void prep_msg_ring_(int ring_fd, io_uring_sqe *sqe,
370 void *data) noexcept {
371 io_uring_prep_msg_ring(sqe, ring_fd, 0,
372 reinterpret_cast<uint64_t>(data), 0);
373 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Schedule));
374 }
375
376 static void prep_cancel_(io_uring_sqe *sqe, void *data) noexcept {
377 io_uring_prep_cancel(sqe, data, 0);
378 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore));
379 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
380 }
381
382 void flush_ring_() noexcept {
383 auto r = ring_.reap_completions(
384 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
385 if (r < 0) {
386 panic_on(std::format("io_uring_peek_cqe: {}",
387 std::strerror(static_cast<int>(-r))));
388 }
389 }
390
391 void flush_ring_wait_() noexcept {
392 auto r = ring_.reap_completions_wait(
393 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
394 if (r < 0) {
395 panic_on(std::format("io_uring_submit_and_wait: {}",
396 std::strerror(static_cast<int>(-r))));
397 }
398 }
399
400 void process_cqe_(io_uring_cqe *cqe) noexcept {
401 auto *data_raw = io_uring_cqe_get_data(cqe);
402 auto [data, type] = decode_work(data_raw);
403
404 if (type == WorkType::Ignore) {
405 // No-op
406 assert(cqe->res != -EINVAL); // If EINVAL, something is wrong
407 } else if (type == WorkType::Schedule) {
408 if (data == nullptr) {
409 if (cqe->res < 0) {
410 panic_on(std::format("io_uring_prep_msg_ring: {}",
411 std::strerror(-cqe->res)));
412 }
413 pending_works_--;
414 } else {
415 auto *work = static_cast<WorkInvoker *>(data);
416 tsan_acquire(data);
417 (*work)();
418 }
419 } else if (type == WorkType::Cancel) {
420 io_uring_sqe *sqe = ring_.get_sqe();
421 prep_cancel_(sqe, data);
422 } else if (type == WorkType::Common) {
423 auto *handle = static_cast<OpFinishHandleBase *>(data);
424 auto op_finish = handle->handle(cqe);
425 if (op_finish) {
426 pending_works_--;
427 }
428 } else {
429 unreachable();
430 }
431 }
432
433private:
434 enum class State : uint8_t {
435 Idle, // Not running
436 Running, // Started running
437 Enabled, // Running and ring enabled
438 Stopped, // Stopped
439 };
440 static_assert(std::atomic<State>::is_always_lock_free);
441
442 using WorkListQueue =
443 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
444
445 // Global state
446 std::mutex mutex_;
447 WorkListQueue global_queue_;
448 std::atomic_size_t pending_works_ = 1;
449 std::atomic<State> state_ = State::Idle;
450
451 // Local state
452 WorkListQueue local_queue_;
453 Ring ring_;
454 size_t tick_count_ = 0;
455
456 // Configurable parameters
457 size_t event_interval_ = 61;
458 bool disable_register_ring_fd_ = false;
459};
460
467inline auto &current_runtime() noexcept {
468 return *detail::Context::current().runtime();
469}
470
471} // namespace condy
The event loop runtime for executing asynchronous.
Definition runtime.hpp:91
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:266
auto & buffer_table() noexcept
Get the buffer table of the runtime.
Definition runtime.hpp:324
auto & fd_table() noexcept
Get the file descriptor table of the runtime.
Definition runtime.hpp:318
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:199
Runtime(const RuntimeOptions &options={})
Construct a new Runtime object.
Definition runtime.hpp:97
auto & settings() noexcept
Get the ring settings of the runtime.
Definition runtime.hpp:330
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:30
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:69
auto & current_runtime() noexcept
Get the current runtime.
Definition runtime.hpp:467
Wrapper classes for liburing interfaces.
Internal utility classes and functions used by Condy.