Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
runtime.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
9#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/ring.hpp"
15#include "condy/singleton.hpp"
16#include "condy/utils.hpp"
17#include "condy/work_type.hpp"
18#include <cerrno>
19#include <cstddef>
20#include <cstdint>
21#include <cstring>
22#include <mutex>
23
24namespace condy {
25
26namespace detail {
27
28class ThreadLocalRing : public ThreadLocalSingleton<ThreadLocalRing> {
29public:
30 Ring *ring() { return &ring_; }
31
32 ThreadLocalRing() {
33 io_uring_params params = {};
34 params.flags |= IORING_SETUP_CLAMP;
35 params.flags |= IORING_SETUP_SINGLE_ISSUER;
36 params.flags |= IORING_SETUP_SUBMIT_ALL;
37 [[maybe_unused]] int r = ring_.init(8, &params);
38 assert(r == 0);
39 }
40
41private:
42 Ring ring_;
43};
44
45inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept {
46#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
47 return io_uring_register_sync_msg(sqe_data);
48#else
49 auto *ring = ThreadLocalRing::current().ring();
50 auto *sqe = ring->get_sqe();
51 *sqe = *sqe_data;
52 int r;
53 [[maybe_unused]] auto n =
54 ring->reap_completions_wait([&](io_uring_cqe *cqe) { r = cqe->res; });
55 assert(n == 1);
56 return r;
57#endif
58}
59
60} // namespace detail
61
68class Runtime {
69public:
74 Runtime(const RuntimeOptions &options = {}) {
75 io_uring_params params;
76 std::memset(&params, 0, sizeof(params));
77
78 params.flags |= IORING_SETUP_CLAMP;
79 params.flags |= IORING_SETUP_SINGLE_ISSUER;
80 params.flags |= IORING_SETUP_SUBMIT_ALL;
81 params.flags |= IORING_SETUP_R_DISABLED;
82
83 size_t ring_entries = options.sq_size_;
84 if (options.cq_size_ != 0) { // 0 means default
85 params.flags |= IORING_SETUP_CQSIZE;
86 params.cq_entries = options.cq_size_;
87 }
88
89 if (options.enable_iopoll_) {
90 params.flags |= IORING_SETUP_IOPOLL;
91#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
92 if (options.enable_hybrid_iopoll_) {
93 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
94 }
95#endif
96 }
97
98 if (options.enable_sqpoll_) {
99 params.flags |= IORING_SETUP_SQPOLL;
100 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
101 if (options.sqpoll_thread_cpu_.has_value()) {
102 params.flags |= IORING_SETUP_SQ_AFF;
103 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
104 }
105 }
106
107 if (options.attach_wq_target_ != nullptr) {
108 params.flags |= IORING_SETUP_ATTACH_WQ;
109 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
110 }
111
112 if (options.enable_defer_taskrun_) {
113 params.flags |= IORING_SETUP_DEFER_TASKRUN;
114 params.flags |= IORING_SETUP_TASKRUN_FLAG;
115 }
116
117 if (options.enable_coop_taskrun_) {
118 params.flags |= IORING_SETUP_COOP_TASKRUN;
119 params.flags |= IORING_SETUP_TASKRUN_FLAG;
120 }
121
122 if (options.enable_sqe128_) {
123 params.flags |= IORING_SETUP_SQE128;
124 }
125
126 if (options.enable_cqe32_) {
127 params.flags |= IORING_SETUP_CQE32;
128 }
129
130#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
131 if (options.enable_sqe_mixed_) {
132 params.flags |= IORING_SETUP_SQE_MIXED;
133 }
134#endif
135
136#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
137 if (options.enable_cqe_mixed_) {
138 params.flags |= IORING_SETUP_CQE_MIXED;
139 }
140#endif
141
142 void *buf = nullptr;
143 size_t buf_size = 0;
144#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
145 if (options.enable_no_mmap_) {
146 params.flags |= IORING_SETUP_NO_MMAP;
147 buf = options.no_mmap_buf_;
148 buf_size = options.no_mmap_buf_size_;
149 }
150#endif
151
152 int r = ring_.init(ring_entries, &params, buf, buf_size);
153 if (r < 0) {
154 throw make_system_error("io_uring_queue_init_params", -r);
155 }
156
157 event_interval_ = options.event_interval_;
158 disable_register_ring_fd_ = options.disable_register_ring_fd_;
159 }
160
161 ~Runtime() { ring_.destroy(); }
162
163 Runtime(const Runtime &) = delete;
164 Runtime &operator=(const Runtime &) = delete;
165 Runtime(Runtime &&) = delete;
166 Runtime &operator=(Runtime &&) = delete;
167
168public:
176 void allow_exit() noexcept {
177 pending_works_--;
178 wakeup_();
179 }
180
181 void schedule(WorkInvoker *work) noexcept {
182 auto *curr_runtime = detail::Context::current().runtime();
183 if (curr_runtime == this) {
184 local_queue_.push_back(work);
185 return;
186 }
187
188 auto state = state_.load();
189 if (state == State::Enabled) {
190 // Fast path: if the ring is enabled, we can directly schedule the
191 // work
192 tsan_release(work);
193 schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
194 } else {
195 // Slow path: if the ring is not enabled, we need to acquire the
196 // mutex to ensure the work is scheduled before the ring is enabled
197 std::unique_lock<std::mutex> lock(mutex_);
198 state = state_.load();
199 if (state == State::Enabled) {
200 lock.unlock();
201 tsan_release(work);
202 schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
203 } else {
204 global_queue_.push_back(work);
205 }
206 }
207 }
208
209 void pend_work() noexcept { pending_works_++; }
210
211 void resume_work() noexcept { pending_works_--; }
212
220 void run() noexcept {
221 State expected = State::Idle;
222 [[maybe_unused]] bool success =
223 state_.compare_exchange_strong(expected, State::Running);
224 assert(success && "Runtime is already running or stopped");
225 auto d1 = defer([this]() { state_.store(State::Stopped); });
226
227 [[maybe_unused]] int r;
228 r = io_uring_enable_rings(ring_.ring());
229 assert(r == 0);
230
231 {
232 std::lock_guard<std::mutex> lock(mutex_);
233 flush_global_queue_();
234 // Now that the ring is enabled and all pending works are scheduled,
235 // we can set the state to Enabled.
236 state_.store(State::Enabled);
237 }
238
239 if (!disable_register_ring_fd_) {
240 r = io_uring_register_ring_fd(ring_.ring());
241 assert(r == 1); // 1 indicates success for this call
242 }
243
244 detail::Context::current().init(&ring_, this);
245 auto d2 = defer([]() { detail::Context::current().reset(); });
246
247 while (true) {
248 tick_count_++;
249
250 if (tick_count_ % event_interval_ == 0) {
251 flush_ring_();
252 }
253
254 if (auto *work = local_queue_.pop_front()) {
255 (*work)();
256 continue;
257 }
258
259 if (pending_works_ == 0) {
260 break;
261 }
262 flush_ring_wait_();
263 }
264 }
265
270 auto &fd_table() noexcept { return ring_.fd_table(); }
271
276 auto &buffer_table() noexcept { return ring_.buffer_table(); }
277
282 auto &settings() noexcept { return ring_.settings(); }
283
284private:
285 void schedule_msg_ring_(Runtime *curr_runtime, WorkInvoker *work,
286 WorkType type) noexcept {
287 if (curr_runtime != nullptr) {
288 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
289 prep_msg_ring_(sqe, work, type);
290 curr_runtime->pend_work();
291 } else {
292 io_uring_sqe sqe = {};
293 prep_msg_ring_(&sqe, work, type);
294 [[maybe_unused]] int r = detail::sync_msg_ring(&sqe);
295 assert(r == 0);
296 }
297 }
298
299 // Wakeup the runtime if it's blocked in Ring::reap_completions_wait()
300 void wakeup_() noexcept {
301 auto *curr_runtime = detail::Context::current().runtime();
302 if (curr_runtime == this) {
303 return;
304 }
305
306 auto state = state_.load();
307 if (state != State::Enabled) {
308 return;
309 }
310
311 schedule_msg_ring_(curr_runtime, nullptr, WorkType::Ignore);
312 }
313
314 void flush_global_queue_() noexcept {
315 local_queue_.push_back(std::move(global_queue_));
316 }
317
318 void prep_msg_ring_(io_uring_sqe *sqe, WorkInvoker *work,
319 WorkType type) noexcept {
320 auto data = encode_work(work, type);
321 io_uring_prep_msg_ring(sqe, this->ring_.ring()->ring_fd, 0,
322 reinterpret_cast<uint64_t>(data), 0);
323 io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Schedule));
324 }
325
326 size_t flush_ring_() noexcept {
327 return ring_.reap_completions(
328 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
329 }
330
331 size_t flush_ring_wait_() noexcept {
332 return ring_.reap_completions_wait(
333 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
334 }
335
336 void process_cqe_(io_uring_cqe *cqe) noexcept {
337 auto *data_raw = io_uring_cqe_get_data(cqe);
338 auto [data, type] = decode_work(data_raw);
339
340 if (type == WorkType::Ignore) {
341 // No-op
342 assert(cqe->res != -EINVAL); // If EINVAL, something is wrong
343 } else if (type == WorkType::SendFd) {
344 auto &fd_table = ring_.fd_table();
345 if (fd_table.fd_accepter_ == nullptr) [[unlikely]] {
346 std::cerr << "[Deprecated Warning] Received a file "
347 "descriptor but no accepter is set.\n";
348 } else {
349 uint64_t payload = reinterpret_cast<uint64_t>(data) >> 3;
350 if (payload == 0) { // Auto-allocate
351 fd_table.fd_accepter_(cqe->res);
352 } else {
353 int target_fd = static_cast<int>(payload - 1);
354 fd_table.fd_accepter_(target_fd);
355 }
356 }
357 } else if (type == WorkType::Schedule) {
358 if (data == nullptr) {
359 assert(cqe->res == 0);
360 pending_works_--;
361 } else {
362 auto *work = static_cast<WorkInvoker *>(data);
363 tsan_acquire(data);
364 local_queue_.push_back(work);
365 }
366 } else if (type == WorkType::Common) {
367 auto *handle = static_cast<OpFinishHandleBase *>(data);
368 auto action = handle->handle_cqe(cqe);
369 if (action.op_finish) {
370 pending_works_--;
371 }
372 if (action.queue_work) {
373 local_queue_.push_back(handle);
374 }
375 } else {
376 assert(false && "Invalid work type");
377 }
378 }
379
380private:
381 enum class State : uint8_t {
382 Idle, // Not running
383 Running, // Started running
384 Enabled, // Running and ring enabled
385 Stopped, // Stopped
386 };
387 static_assert(std::atomic<State>::is_always_lock_free);
388
389 using WorkListQueue =
390 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
391
392 // Global state
393 std::mutex mutex_;
394 WorkListQueue global_queue_;
395 std::atomic_size_t pending_works_ = 1;
396 std::atomic<State> state_ = State::Idle;
397
398 // Local state
399 WorkListQueue local_queue_;
400 Ring ring_;
401 size_t tick_count_ = 0;
402
403 // Configurable parameters
404 size_t event_interval_ = 61;
405 bool disable_register_ring_fd_ = false;
406};
407
414inline auto &current_runtime() { return *detail::Context::current().runtime(); }
415
416} // namespace condy
The event loop runtime for executing asynchronous.
Definition runtime.hpp:68
auto & buffer_table() noexcept
Get the buffer table of the runtime.
Definition runtime.hpp:276
auto & fd_table() noexcept
Get the file descriptor table of the runtime.
Definition runtime.hpp:270
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:176
Runtime(const RuntimeOptions &options={})
Construct a new Runtime object.
Definition runtime.hpp:74
void run() noexcept
Run the runtime event loop in the current thread.
Definition runtime.hpp:220
auto & settings() noexcept
Get the ring settings of the runtime.
Definition runtime.hpp:282
Definitions of finish handle types for asynchronous operations.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:28
auto & current_runtime()
Get the current runtime.
Definition runtime.hpp:414
Defer defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:62
Wrapper classes for liburing interfaces.
Internal utility classes and functions used by Condy.