83 io_uring_params params;
84 std::memset(¶ms, 0,
sizeof(params));
86 params.flags |= IORING_SETUP_CLAMP;
87 params.flags |= IORING_SETUP_SINGLE_ISSUER;
88 params.flags |= IORING_SETUP_SUBMIT_ALL;
89 params.flags |= IORING_SETUP_R_DISABLED;
91 size_t ring_entries = options.sq_size_;
92 if (options.cq_size_ != 0) {
93 params.flags |= IORING_SETUP_CQSIZE;
94 params.cq_entries = options.cq_size_;
97 if (options.enable_iopoll_) {
98 params.flags |= IORING_SETUP_IOPOLL;
99#if !IO_URING_CHECK_VERSION(2, 9)
100 if (options.enable_hybrid_iopoll_) {
101 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
106 if (options.enable_sqpoll_) {
107 params.flags |= IORING_SETUP_SQPOLL;
108 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
109 if (options.sqpoll_thread_cpu_.has_value()) {
110 params.flags |= IORING_SETUP_SQ_AFF;
111 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
115 if (options.attach_wq_target_ !=
nullptr) {
116 params.flags |= IORING_SETUP_ATTACH_WQ;
117 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
120 if (options.enable_defer_taskrun_) {
121 params.flags |= IORING_SETUP_DEFER_TASKRUN;
124 if (options.enable_coop_taskrun_) {
125 params.flags |= IORING_SETUP_COOP_TASKRUN;
126 if (options.enable_coop_taskrun_flag_) {
127 params.flags |= IORING_SETUP_TASKRUN_FLAG;
131 if (options.enable_sqe128_) {
132 params.flags |= IORING_SETUP_SQE128;
135 if (options.enable_cqe32_) {
136 params.flags |= IORING_SETUP_CQE32;
141#if !IO_URING_CHECK_VERSION(2, 5)
142 if (options.enable_no_mmap_) {
143 params.flags |= IORING_SETUP_NO_MMAP;
144 buf = options.no_mmap_buf_;
145 buf_size = options.no_mmap_buf_size_;
149 int r = ring_.init(ring_entries, ¶ms, buf, buf_size);
151 throw make_system_error(
"io_uring_queue_init_params", -r);
154 event_interval_ = options.event_interval_;
155 disable_register_ring_fd_ = options.disable_register_ring_fd_;
178 void notify() { async_waiter_.notify(ring_); }
180 void schedule(WorkInvoker *work) {
181 auto *runtime = Context::current().runtime();
182 if (runtime ==
this) {
183 local_queue_.push_back(work);
187 auto state = state_.load();
188 if (runtime !=
nullptr && state == State::Enabled) {
190 io_uring_sqe *sqe = runtime->ring_.get_sqe();
191 prep_msg_ring_(sqe, work);
192 runtime->pend_work();
196#if !IO_URING_CHECK_VERSION(2, 12)
197 if (runtime ==
nullptr && state == State::Enabled) {
199 io_uring_sqe sqe = {};
200 prep_msg_ring_(&sqe, work);
201 [[maybe_unused]]
int r = io_uring_register_sync_msg(&sqe);
208 std::lock_guard<std::mutex> lock(mutex_);
209 bool need_notify = global_queue_.empty();
210 global_queue_.push_back(work);
217 void pend_work() { pending_works_++; }
219 void resume_work() { pending_works_--; }
230 State expected = State::Idle;
231 if (!state_.compare_exchange_strong(expected, State::Running)) {
232 throw std::logic_error(
"Runtime is already running or stopped");
234 auto d1 =
defer([
this]() { state_.store(State::Stopped); });
236 [[maybe_unused]]
int r;
237 r = io_uring_enable_rings(ring_.ring());
240 state_.store(State::Enabled);
242 if (!disable_register_ring_fd_) {
243 r = io_uring_register_ring_fd(ring_.ring());
245 throw make_system_error(
"io_uring_register_ring_fd", -r);
249 Context::current().init(&ring_,
this);
250 auto d2 =
defer([]() { Context::current().reset(); });
253 std::lock_guard<std::mutex> lock(mutex_);
254 flush_global_queue_();
260 if (tick_count_ % event_interval_ == 0) {
264 if (
auto *work = local_queue_.pop_front()) {
269 if (pending_works_ == 0) {
295 void flush_global_queue_() {
296 local_queue_.push_back(std::move(global_queue_));
297 async_waiter_.async_wait(ring_);
300 void prep_msg_ring_(io_uring_sqe *sqe, WorkInvoker *work) {
301 auto data = encode_work(work, WorkType::Schedule);
302 io_uring_prep_msg_ring(sqe, this->ring_.ring()->ring_fd, 0,
303 reinterpret_cast<uint64_t
>(data), 0);
304 io_uring_sqe_set_data(sqe, encode_work(
nullptr, WorkType::Schedule));
307 size_t flush_ring_() {
308 return ring_.reap_completions(
309 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
312 size_t flush_ring_wait_() {
313 return ring_.reap_completions_wait(
314 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
317 void process_cqe_(io_uring_cqe *cqe) {
318 auto *data_raw = io_uring_cqe_get_data(cqe);
319 auto [data, type] = decode_work(data_raw);
321 if (type == WorkType::Ignore) {
323 assert(cqe->res != -EINVAL);
324 }
else if (type == WorkType::Notify) {
325 std::lock_guard<std::mutex> lock(mutex_);
326 flush_global_queue_();
327 }
else if (type == WorkType::SendFd) {
329 if (
fd_table.fd_accepter_ ==
nullptr) [[unlikely]] {
330 throw std::logic_error(
"No way to accept sent fd");
332 uint64_t payload =
reinterpret_cast<uint64_t
>(data) >> 3;
336 int target_fd =
static_cast<int>(payload - 1);
339 }
else if (type == WorkType::Schedule) {
340 if (data ==
nullptr) {
341 assert(cqe->res == 0);
344 auto *work =
static_cast<WorkInvoker *
>(data);
346 local_queue_.push_back(work);
348 }
else if (type == WorkType::MultiShot) {
349 auto *handle =
static_cast<ExtendOpFinishHandle *
>(data);
350 handle->set_result(cqe->res, cqe->flags);
351 if (cqe->flags & IORING_CQE_F_MORE) {
352 handle->invoke_extend(0);
355 local_queue_.push_back(handle);
357 }
else if (type == WorkType::ZeroCopy) {
358 auto *handle =
static_cast<ExtendOpFinishHandle *
>(data);
359 if (cqe->flags & IORING_CQE_F_MORE) {
360 handle->set_result(cqe->res, cqe->flags);
361 local_queue_.push_back(handle);
364 if (cqe->flags & IORING_CQE_F_NOTIF) {
365 handle->invoke_extend(cqe->res);
367 handle->set_result(cqe->res, cqe->flags);
368 local_queue_.push_back(handle);
369 handle->invoke_extend(0);
372 }
else if (type == WorkType::Common) {
373 auto *handle =
static_cast<OpFinishHandle *
>(data);
374 handle->set_result(cqe->res, cqe->flags);
376 local_queue_.push_back(handle);
378 assert(
false &&
"Invalid work type");
383 enum class State : uint8_t {
389 static_assert(std::atomic<State>::is_always_lock_free);
391 using WorkListQueue =
392 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
396 detail::AsyncWaiter async_waiter_;
397 WorkListQueue global_queue_;
398 std::atomic_size_t pending_works_ = 1;
399 std::atomic<State> state_ = State::Idle;
402 WorkListQueue local_queue_;
404 size_t tick_count_ = 0;
407 size_t event_interval_ = 61;
408 bool disable_register_ring_fd_ =
false;