98 io_uring_params params;
99 std::memset(¶ms, 0,
sizeof(params));
101 params.flags |= IORING_SETUP_CLAMP;
102 params.flags |= IORING_SETUP_SINGLE_ISSUER;
103 params.flags |= IORING_SETUP_SUBMIT_ALL;
104 params.flags |= IORING_SETUP_R_DISABLED;
106 size_t ring_entries = options.sq_size_;
107 if (options.cq_size_ != 0) {
108 params.flags |= IORING_SETUP_CQSIZE;
109 params.cq_entries = options.cq_size_;
112 if (options.enable_iopoll_) {
113 params.flags |= IORING_SETUP_IOPOLL;
114#if !IO_URING_CHECK_VERSION(2, 9)
115 if (options.enable_hybrid_iopoll_) {
116 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
121 if (options.enable_sqpoll_) {
122 params.flags |= IORING_SETUP_SQPOLL;
123 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
124 if (options.sqpoll_thread_cpu_.has_value()) {
125 params.flags |= IORING_SETUP_SQ_AFF;
126 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
130 if (options.attach_wq_target_ !=
nullptr) {
131 params.flags |= IORING_SETUP_ATTACH_WQ;
132 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
135 if (options.enable_defer_taskrun_) {
136 params.flags |= IORING_SETUP_DEFER_TASKRUN;
137 params.flags |= IORING_SETUP_TASKRUN_FLAG;
140 if (options.enable_coop_taskrun_) {
141 params.flags |= IORING_SETUP_COOP_TASKRUN;
142 params.flags |= IORING_SETUP_TASKRUN_FLAG;
145 if (options.enable_sqe128_) {
146 params.flags |= IORING_SETUP_SQE128;
149 if (options.enable_cqe32_) {
150 params.flags |= IORING_SETUP_CQE32;
153#if !IO_URING_CHECK_VERSION(2, 13)
154 if (options.enable_sqe_mixed_) {
155 params.flags |= IORING_SETUP_SQE_MIXED;
159#if !IO_URING_CHECK_VERSION(2, 13)
160 if (options.enable_cqe_mixed_) {
161 params.flags |= IORING_SETUP_CQE_MIXED;
167#if !IO_URING_CHECK_VERSION(2, 5)
168 if (options.enable_no_mmap_) {
169 params.flags |= IORING_SETUP_NO_MMAP;
170 buf = options.no_mmap_buf_;
171 buf_size = options.no_mmap_buf_size_;
175 int r = ring_.init(ring_entries, ¶ms, buf, buf_size);
177 throw make_system_error(
"io_uring_queue_init_params", -r);
180 event_interval_ = options.event_interval_;
181 disable_register_ring_fd_ = options.disable_register_ring_fd_;
204 void schedule(WorkInvoker *work)
noexcept {
205 auto *curr_runtime = detail::Context::current().runtime();
206 if (curr_runtime ==
this) {
207 local_queue_.push_back(work);
211 auto state = state_.load();
212 if (state == State::Enabled) {
216 schedule_msg_ring_(curr_runtime,
217 encode_work(work, WorkType::Schedule));
221 std::unique_lock<std::mutex> lock(mutex_);
222 state = state_.load();
223 if (state == State::Enabled) {
226 schedule_msg_ring_(curr_runtime,
227 encode_work(work, WorkType::Schedule));
229 global_queue_.push_back(work);
235 void cancel(
void *data)
noexcept {
237 assert(
reinterpret_cast<intptr_t
>(data) % 8 == 0);
238 auto *curr_runtime = detail::Context::current().runtime();
239 if (curr_runtime ==
this) {
240 io_uring_sqe *sqe = ring_.get_sqe();
241 prep_cancel_(sqe, data);
245 auto state = state_.load();
246 if (state != State::Enabled) {
250 schedule_msg_ring_(curr_runtime, encode_work(data, WorkType::Cancel));
253 void pend_work() noexcept { pending_works_++; }
255 void resume_work() noexcept { pending_works_--; }
267 State expected = State::Idle;
268 bool success = state_.compare_exchange_strong(expected, State::Running);
270 throw std::runtime_error(
271 "Runtime is already running or has been stopped");
273 auto d1 =
defer([
this]() { state_.store(State::Stopped); });
275 [[maybe_unused]]
int r;
276 r = io_uring_enable_rings(ring_.ring());
280 std::lock_guard<std::mutex> lock(mutex_);
281 flush_global_queue_();
284 state_.store(State::Enabled);
287 if (!disable_register_ring_fd_) {
288 r = io_uring_register_ring_fd(ring_.ring());
292 detail::Context::current().init(&ring_,
this);
293 auto d2 =
defer([]() { detail::Context::current().reset(); });
298 if (tick_count_ % event_interval_ == 0) {
302 if (
auto *work = local_queue_.pop_front()) {
307 if (pending_works_ == 0) {
318 auto &
fd_table() noexcept {
return ring_.fd_table(); }
330 auto &
settings() noexcept {
return ring_.settings(); }
333 void schedule_msg_ring_(
Runtime *curr_runtime,
void *data)
noexcept {
334 int ring_fd = this->ring_.ring()->ring_fd;
335 if (curr_runtime !=
nullptr) {
336 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
337 prep_msg_ring_(ring_fd, sqe, data);
338 curr_runtime->pend_work();
340 io_uring_sqe sqe = {};
341 prep_msg_ring_(ring_fd, &sqe, data);
342 int r = detail::sync_msg_ring(&sqe);
344 panic_on(std::format(
"sync_msg_ring: {}", std::strerror(-r)));
350 void wakeup_() noexcept {
351 auto *curr_runtime = detail::Context::current().runtime();
352 if (curr_runtime ==
this) {
356 auto state = state_.load();
357 if (state != State::Enabled) {
361 schedule_msg_ring_(curr_runtime,
362 encode_work(
nullptr, WorkType::Ignore));
365 void flush_global_queue_() noexcept {
366 local_queue_.push_back(std::move(global_queue_));
369 static void prep_msg_ring_(
int ring_fd, io_uring_sqe *sqe,
370 void *data)
noexcept {
371 io_uring_prep_msg_ring(sqe, ring_fd, 0,
372 reinterpret_cast<uint64_t
>(data), 0);
373 io_uring_sqe_set_data(sqe, encode_work(
nullptr, WorkType::Schedule));
376 static void prep_cancel_(io_uring_sqe *sqe,
void *data)
noexcept {
377 io_uring_prep_cancel(sqe, data, 0);
378 io_uring_sqe_set_data(sqe, encode_work(
nullptr, WorkType::Ignore));
379 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
382 void flush_ring_() noexcept {
383 auto r = ring_.reap_completions(
384 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
386 panic_on(std::format(
"io_uring_peek_cqe: {}",
387 std::strerror(
static_cast<int>(-r))));
391 void flush_ring_wait_() noexcept {
392 auto r = ring_.reap_completions_wait(
393 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
395 panic_on(std::format(
"io_uring_submit_and_wait: {}",
396 std::strerror(
static_cast<int>(-r))));
400 void process_cqe_(io_uring_cqe *cqe)
noexcept {
401 auto *data_raw = io_uring_cqe_get_data(cqe);
402 auto [data, type] = decode_work(data_raw);
404 if (type == WorkType::Ignore) {
406 assert(cqe->res != -EINVAL);
407 }
else if (type == WorkType::Schedule) {
408 if (data ==
nullptr) {
410 panic_on(std::format(
"io_uring_prep_msg_ring: {}",
411 std::strerror(-cqe->res)));
415 auto *work =
static_cast<WorkInvoker *
>(data);
419 }
else if (type == WorkType::Cancel) {
420 io_uring_sqe *sqe = ring_.get_sqe();
421 prep_cancel_(sqe, data);
422 }
else if (type == WorkType::Common) {
423 auto *handle =
static_cast<OpFinishHandleBase *
>(data);
424 auto op_finish = handle->handle(cqe);
434 enum class State : uint8_t {
440 static_assert(std::atomic<State>::is_always_lock_free);
442 using WorkListQueue =
443 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
447 WorkListQueue global_queue_;
448 std::atomic_size_t pending_works_ = 1;
449 std::atomic<State> state_ = State::Idle;
452 WorkListQueue local_queue_;
454 size_t tick_count_ = 0;
457 size_t event_interval_ = 61;
458 bool disable_register_ring_fd_ =
false;