75 io_uring_params params;
76 std::memset(¶ms, 0,
sizeof(params));
78 params.flags |= IORING_SETUP_CLAMP;
79 params.flags |= IORING_SETUP_SINGLE_ISSUER;
80 params.flags |= IORING_SETUP_SUBMIT_ALL;
81 params.flags |= IORING_SETUP_R_DISABLED;
83 size_t ring_entries = options.sq_size_;
84 if (options.cq_size_ != 0) {
85 params.flags |= IORING_SETUP_CQSIZE;
86 params.cq_entries = options.cq_size_;
89 if (options.enable_iopoll_) {
90 params.flags |= IORING_SETUP_IOPOLL;
91#if !IO_URING_CHECK_VERSION(2, 9)
92 if (options.enable_hybrid_iopoll_) {
93 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
98 if (options.enable_sqpoll_) {
99 params.flags |= IORING_SETUP_SQPOLL;
100 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
101 if (options.sqpoll_thread_cpu_.has_value()) {
102 params.flags |= IORING_SETUP_SQ_AFF;
103 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
107 if (options.attach_wq_target_ !=
nullptr) {
108 params.flags |= IORING_SETUP_ATTACH_WQ;
109 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
112 if (options.enable_defer_taskrun_) {
113 params.flags |= IORING_SETUP_DEFER_TASKRUN;
114 params.flags |= IORING_SETUP_TASKRUN_FLAG;
117 if (options.enable_coop_taskrun_) {
118 params.flags |= IORING_SETUP_COOP_TASKRUN;
119 params.flags |= IORING_SETUP_TASKRUN_FLAG;
122 if (options.enable_sqe128_) {
123 params.flags |= IORING_SETUP_SQE128;
126 if (options.enable_cqe32_) {
127 params.flags |= IORING_SETUP_CQE32;
130#if !IO_URING_CHECK_VERSION(2, 13)
131 if (options.enable_sqe_mixed_) {
132 params.flags |= IORING_SETUP_SQE_MIXED;
136#if !IO_URING_CHECK_VERSION(2, 13)
137 if (options.enable_cqe_mixed_) {
138 params.flags |= IORING_SETUP_CQE_MIXED;
144#if !IO_URING_CHECK_VERSION(2, 5)
145 if (options.enable_no_mmap_) {
146 params.flags |= IORING_SETUP_NO_MMAP;
147 buf = options.no_mmap_buf_;
148 buf_size = options.no_mmap_buf_size_;
152 int r = ring_.init(ring_entries, ¶ms, buf, buf_size);
154 throw make_system_error(
"io_uring_queue_init_params", -r);
157 event_interval_ = options.event_interval_;
158 disable_register_ring_fd_ = options.disable_register_ring_fd_;
181 void schedule(WorkInvoker *work)
noexcept {
182 auto *curr_runtime = detail::Context::current().runtime();
183 if (curr_runtime ==
this) {
184 local_queue_.push_back(work);
188 auto state = state_.load();
189 if (state == State::Enabled) {
193 schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
197 std::unique_lock<std::mutex> lock(mutex_);
198 state = state_.load();
199 if (state == State::Enabled) {
202 schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
204 global_queue_.push_back(work);
209 void pend_work() noexcept { pending_works_++; }
211 void resume_work() noexcept { pending_works_--; }
221 State expected = State::Idle;
222 [[maybe_unused]]
bool success =
223 state_.compare_exchange_strong(expected, State::Running);
224 assert(success &&
"Runtime is already running or stopped");
225 auto d1 =
defer([
this]() { state_.store(State::Stopped); });
227 [[maybe_unused]]
int r;
228 r = io_uring_enable_rings(ring_.ring());
232 std::lock_guard<std::mutex> lock(mutex_);
233 flush_global_queue_();
236 state_.store(State::Enabled);
239 if (!disable_register_ring_fd_) {
240 r = io_uring_register_ring_fd(ring_.ring());
244 detail::Context::current().init(&ring_,
this);
245 auto d2 =
defer([]() { detail::Context::current().reset(); });
250 if (tick_count_ % event_interval_ == 0) {
254 if (
auto *work = local_queue_.pop_front()) {
259 if (pending_works_ == 0) {
270 auto &
fd_table() noexcept {
return ring_.fd_table(); }
282 auto &
settings() noexcept {
return ring_.settings(); }
285 void schedule_msg_ring_(
Runtime *curr_runtime, WorkInvoker *work,
286 WorkType type)
noexcept {
287 if (curr_runtime !=
nullptr) {
288 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
289 prep_msg_ring_(sqe, work, type);
290 curr_runtime->pend_work();
292 io_uring_sqe sqe = {};
293 prep_msg_ring_(&sqe, work, type);
294 [[maybe_unused]]
int r = detail::sync_msg_ring(&sqe);
300 void wakeup_() noexcept {
301 auto *curr_runtime = detail::Context::current().runtime();
302 if (curr_runtime ==
this) {
306 auto state = state_.load();
307 if (state != State::Enabled) {
311 schedule_msg_ring_(curr_runtime,
nullptr, WorkType::Ignore);
314 void flush_global_queue_() noexcept {
315 local_queue_.push_back(std::move(global_queue_));
318 void prep_msg_ring_(io_uring_sqe *sqe, WorkInvoker *work,
319 WorkType type)
noexcept {
320 auto data = encode_work(work, type);
321 io_uring_prep_msg_ring(sqe, this->ring_.ring()->ring_fd, 0,
322 reinterpret_cast<uint64_t
>(data), 0);
323 io_uring_sqe_set_data(sqe, encode_work(
nullptr, WorkType::Schedule));
326 size_t flush_ring_() noexcept {
327 return ring_.reap_completions(
328 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
331 size_t flush_ring_wait_() noexcept {
332 return ring_.reap_completions_wait(
333 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
336 void process_cqe_(io_uring_cqe *cqe)
noexcept {
337 auto *data_raw = io_uring_cqe_get_data(cqe);
338 auto [data, type] = decode_work(data_raw);
340 if (type == WorkType::Ignore) {
342 assert(cqe->res != -EINVAL);
343 }
else if (type == WorkType::SendFd) {
345 if (
fd_table.fd_accepter_ ==
nullptr) [[unlikely]] {
346 std::cerr <<
"[Deprecated Warning] Received a file "
347 "descriptor but no accepter is set.\n";
349 uint64_t payload =
reinterpret_cast<uint64_t
>(data) >> 3;
353 int target_fd =
static_cast<int>(payload - 1);
357 }
else if (type == WorkType::Schedule) {
358 if (data ==
nullptr) {
359 assert(cqe->res == 0);
362 auto *work =
static_cast<WorkInvoker *
>(data);
364 local_queue_.push_back(work);
366 }
else if (type == WorkType::Common) {
367 auto *handle =
static_cast<OpFinishHandleBase *
>(data);
368 auto action = handle->handle_cqe(cqe);
369 if (action.op_finish) {
372 if (action.queue_work) {
373 local_queue_.push_back(handle);
376 assert(
false &&
"Invalid work type");
381 enum class State : uint8_t {
387 static_assert(std::atomic<State>::is_always_lock_free);
389 using WorkListQueue =
390 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
394 WorkListQueue global_queue_;
395 std::atomic_size_t pending_works_ = 1;
396 std::atomic<State> state_ = State::Idle;
399 WorkListQueue local_queue_;
401 size_t tick_count_ = 0;
404 size_t event_interval_ = 61;
405 bool disable_register_ring_fd_ =
false;