118 io_uring_params params;
119 std::memset(¶ms, 0,
sizeof(params));
121 params.flags |= IORING_SETUP_CLAMP;
122 params.flags |= IORING_SETUP_SINGLE_ISSUER;
123 params.flags |= IORING_SETUP_SUBMIT_ALL;
124 params.flags |= IORING_SETUP_R_DISABLED;
126 size_t ring_entries = options.sq_size_;
127 if (options.cq_size_ != 0) {
128 params.flags |= IORING_SETUP_CQSIZE;
129 params.cq_entries = options.cq_size_;
132 if (options.enable_iopoll_) {
133 params.flags |= IORING_SETUP_IOPOLL;
134#if !IO_URING_CHECK_VERSION(2, 9)
135 if (options.enable_hybrid_iopoll_) {
136 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
141 if (options.enable_sqpoll_) {
142 params.flags |= IORING_SETUP_SQPOLL;
143 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
144 if (options.sqpoll_thread_cpu_.has_value()) {
145 params.flags |= IORING_SETUP_SQ_AFF;
146 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
150 if (options.attach_wq_target_ !=
nullptr) {
151 params.flags |= IORING_SETUP_ATTACH_WQ;
152 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
155 if (options.enable_defer_taskrun_) {
156 params.flags |= IORING_SETUP_DEFER_TASKRUN;
157 params.flags |= IORING_SETUP_TASKRUN_FLAG;
160 if (options.enable_coop_taskrun_) {
161 params.flags |= IORING_SETUP_COOP_TASKRUN;
162 params.flags |= IORING_SETUP_TASKRUN_FLAG;
165 if (options.enable_sqe128_) {
166 params.flags |= IORING_SETUP_SQE128;
169 if (options.enable_cqe32_) {
170 params.flags |= IORING_SETUP_CQE32;
173#if !IO_URING_CHECK_VERSION(2, 13)
174 if (options.enable_sqe_mixed_) {
175 params.flags |= IORING_SETUP_SQE_MIXED;
179#if !IO_URING_CHECK_VERSION(2, 13)
180 if (options.enable_cqe_mixed_) {
181 params.flags |= IORING_SETUP_CQE_MIXED;
187#if !IO_URING_CHECK_VERSION(2, 5)
188 if (options.enable_no_mmap_) {
189 params.flags |= IORING_SETUP_NO_MMAP;
190 buf = options.no_mmap_buf_;
191 buf_size = options.no_mmap_buf_size_;
195#if !IO_URING_CHECK_VERSION(2, 14)
196 if (options.enable_sq_rewind_) {
197 params.flags |= IORING_SETUP_SQ_REWIND;
201 int r = ring_.init(ring_entries, ¶ms, buf, buf_size);
203 throw make_system_error(
"io_uring_queue_init_params", -r);
206 event_interval_ = options.event_interval_;
207 disable_register_ring_fd_ = options.disable_register_ring_fd_;
230 void schedule(WorkInvoker *work)
noexcept {
231 auto *curr_runtime = detail::Context::current().runtime();
232 if (curr_runtime ==
this) {
233 local_queue_.push_back(work);
237 auto state = state_.load();
238 if (state == State::Enabled) {
242 schedule_msg_ring_(curr_runtime,
243 encode_work(work, WorkType::Schedule));
247 std::unique_lock<std::mutex> lock(mutex_);
248 state = state_.load();
249 if (state == State::Enabled) {
252 schedule_msg_ring_(curr_runtime,
253 encode_work(work, WorkType::Schedule));
255 global_queue_.push_back(work);
261 void cancel(uintptr_t data)
noexcept {
262 auto *curr_runtime = detail::Context::current().runtime();
263 if (curr_runtime ==
this) {
264 io_uring_sqe *sqe = ring_.get_sqe();
265 prep_cancel_(sqe, data);
269 auto state = state_.load();
270 if (state != State::Enabled) {
274 detail::CancelRequest request(data);
275 tsan_release(&request);
276 schedule_msg_ring_(curr_runtime,
277 encode_work(&request, WorkType::Cancel));
278 if (curr_runtime !=
nullptr) {
280 curr_runtime->ring_.submit();
288 void pend_work() noexcept { pending_works_++; }
290 void resume_work() noexcept { pending_works_--; }
302 State expected = State::Idle;
303 bool success = state_.compare_exchange_strong(expected, State::Running);
305 throw std::runtime_error(
306 "Runtime is already running or has been stopped");
308 auto d1 =
defer([
this]() { state_.store(State::Stopped); });
310 [[maybe_unused]]
int r;
311 r = io_uring_enable_rings(ring_.ring());
315 std::lock_guard<std::mutex> lock(mutex_);
316 flush_global_queue_();
319 state_.store(State::Enabled);
322 if (!disable_register_ring_fd_) {
323 r = io_uring_register_ring_fd(ring_.ring());
327 detail::Context::current().init(&ring_,
this);
328 auto d2 =
defer([]() { detail::Context::current().reset(); });
333 if (tick_count_ % event_interval_ == 0) {
337 if (
auto *work = local_queue_.pop_front()) {
342 if (pending_works_ == 0) {
353 auto &
fd_table() noexcept {
return ring_.fd_table(); }
365 auto &
settings() noexcept {
return ring_.settings(); }
368 void schedule_msg_ring_(
Runtime *curr_runtime, uintptr_t data)
noexcept {
369 int ring_fd = this->ring_.ring()->ring_fd;
370 if (curr_runtime !=
nullptr) {
371 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
372 prep_msg_ring_(ring_fd, sqe, data);
373 curr_runtime->pend_work();
375 io_uring_sqe sqe = {};
376 prep_msg_ring_(ring_fd, &sqe, data);
377 int r = detail::sync_msg_ring(&sqe);
379 panic_on(std::format(
"sync_msg_ring: {}", std::strerror(-r)));
385 void wakeup_() noexcept {
386 auto *curr_runtime = detail::Context::current().runtime();
387 if (curr_runtime ==
this) {
391 auto state = state_.load();
392 if (state != State::Enabled) {
396 schedule_msg_ring_(curr_runtime,
397 encode_work(
nullptr, WorkType::Ignore));
400 void flush_global_queue_() noexcept {
401 local_queue_.push_back(std::move(global_queue_));
404 static void prep_msg_ring_(
int ring_fd, io_uring_sqe *sqe,
405 uintptr_t data)
noexcept {
406 io_uring_prep_msg_ring(sqe, ring_fd, 0, data, 0);
407 io_uring_sqe_set_data64(sqe, encode_work(
nullptr, WorkType::Schedule));
410 static void prep_cancel_(io_uring_sqe *sqe, uintptr_t data)
noexcept {
411 io_uring_prep_cancel64(sqe, data, 0);
412 io_uring_sqe_set_data64(sqe, encode_work(
nullptr, WorkType::Ignore));
413 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
416 void flush_ring_() noexcept {
417 auto r = ring_.reap_completions(
418 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
420 panic_on(std::format(
"io_uring_peek_cqe: {}",
421 std::strerror(
static_cast<int>(-r))));
425 void flush_ring_wait_() noexcept {
426 auto r = ring_.reap_completions_wait(
427 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
429 panic_on(std::format(
"io_uring_submit_and_wait: {}",
430 std::strerror(
static_cast<int>(-r))));
434 void process_cqe_(io_uring_cqe *cqe)
noexcept {
435 auto [data, type] = decode_work(io_uring_cqe_get_data64(cqe));
437 if (type == WorkType::Ignore) {
439 assert(cqe->res != -EINVAL);
440 }
else if (type == WorkType::Schedule) {
441 if (data ==
nullptr) {
443 panic_on(std::format(
"io_uring_prep_msg_ring: {}",
444 std::strerror(-cqe->res)));
448 auto *work =
static_cast<WorkInvoker *
>(data);
452 }
else if (type == WorkType::Cancel) {
453 detail::CancelRequest *request =
454 static_cast<detail::CancelRequest *
>(data);
455 tsan_acquire(request);
456 io_uring_sqe *sqe = ring_.get_sqe();
457 prep_cancel_(sqe, request->data());
459 }
else if (type == WorkType::Common) {
460 auto *handle =
static_cast<OpFinishHandleBase *
>(data);
461 auto op_finish = handle->handle(cqe);
471 enum class State : uint8_t {
477 static_assert(std::atomic<State>::is_always_lock_free);
479 using WorkListQueue =
480 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
484 WorkListQueue global_queue_;
485 std::atomic_size_t pending_works_ = 1;
486 std::atomic<State> state_ = State::Idle;
489 WorkListQueue local_queue_;
491 size_t tick_count_ = 0;
494 size_t event_interval_ = 61;
495 bool disable_register_ring_fd_ =
false;