Condy v1.5.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
link-cp.cpp
Go to the documentation of this file.
1
5
7#include "condy/runtime.hpp"
8#include <condy.hpp>
9#include <cstdio>
10#include <cstring>
11#include <format>
12#include <iostream>
13#include <sys/types.h>
14#include <unistd.h>
15
16static size_t task_num = 64;
17static size_t chunk_size = 1024 * 1024l; // 1 MB
18static bool use_direct = false;
19
20condy::Coro<void> copy_file_task(size_t task_id, loff_t &offset,
21 loff_t file_size, void *buffer) {
22 using condy::operators::operator>>;
23
24 int buffer_index = static_cast<int>(task_id);
25
26 while (offset < file_size) {
27 loff_t current_offset = offset;
28 offset += static_cast<loff_t>(chunk_size);
29
30 auto to_copy = std::min(static_cast<loff_t>(chunk_size),
31 file_size - current_offset);
32 auto buf = condy::buffer(buffer, to_copy);
33
34 auto aw1 = condy::async_read(
35 condy::fixed(0), condy::fixed(buffer_index, buf), current_offset);
36 auto aw2 = condy::async_write(
37 condy::fixed(1), condy::fixed(buffer_index, buf), current_offset);
38 auto [r1, r2] = co_await (aw1 >> aw2);
39
40 if (r1 < 0 || r2 < 0) {
41 std::cerr << std::format("Failed to copy at offset {}: read result "
42 "{}, write result {}\n",
43 current_offset, r1, r2);
44 exit(1);
45 }
46 }
47}
48
49condy::Coro<void> do_file_copy(int infd, int outfd, loff_t size) {
50 size_t buffers_size = task_num * chunk_size;
51 void *raw_buffer;
52 if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) {
53 std::perror("Failed to allocate aligned buffers");
54 exit(1);
55 }
56
57 auto &fd_table = condy::current_runtime().fd_table();
58 fd_table.init(2);
59 int fds[2] = {infd, outfd};
60 fd_table.update(0, fds, 2);
61
62 auto &buffer_table = condy::current_runtime().buffer_table();
63 buffer_table.init(task_num);
64 std::vector<iovec> iovs(task_num);
65 for (size_t i = 0; i < task_num; i++) {
66 iovs[i] = {
67 .iov_base = static_cast<char *>(raw_buffer) + i * chunk_size,
68 .iov_len = chunk_size,
69 };
70 }
71 buffer_table.update(0, iovs.data(), task_num);
72
73 int r_faddvise = co_await condy::async_fadvise(
74 infd, 0, static_cast<off_t>(size), POSIX_FADV_SEQUENTIAL);
75 if (r_faddvise < 0) {
76 std::cerr << std::format("Failed to fadvise input file: {}\n",
77 r_faddvise);
78 exit(1);
79 }
80
81 std::vector<condy::Task<void>> tasks;
82 tasks.reserve(task_num);
83 loff_t offset = 0;
84 for (size_t i = 0; i < task_num; i++) {
85 tasks.emplace_back(condy::co_spawn(
86 copy_file_task(i, offset, size,
87 static_cast<char *>(raw_buffer) + i * chunk_size)));
88 }
89 for (auto &task : tasks) {
90 co_await std::move(task);
91 }
92 free(raw_buffer);
93}
94
95condy::Coro<void> co_main(const char *infile, const char *outfile) {
96 using condy::operators::operator&&;
97
98 int flags = 0;
99 if (use_direct) {
100 flags |= O_DIRECT;
101 }
102
103 auto [infd, outfd] =
104 co_await (condy::async_open(infile, O_RDONLY | flags, 0) &&
105 condy::async_open(outfile, O_WRONLY | O_CREAT | flags, 0644));
106 if (infd < 0 || outfd < 0) {
107 std::cerr << std::format("Failed to open file: {} {}\n", infd, outfd);
108 exit(1);
109 }
110
111 struct statx statx_buf;
112 int r_stat = co_await condy::async_statx(
113 AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf);
114 if (r_stat < 0) {
115 std::cerr << std::format("Failed to statx file: {}\n", r_stat);
116 exit(1);
117 }
118
119 if (use_direct && (statx_buf.stx_size % 4096 != 0)) {
120 std::cerr << std::format(
121 "File size {} is not multiple of 4096 bytes for O_DIRECT\n",
122 statx_buf.stx_size);
123 exit(1);
124 }
125
126 std::cout << std::format("Copy {} bytes from {} to {}\n",
127 statx_buf.stx_size, infile, outfile);
128
129 auto start = std::chrono::high_resolution_clock::now();
130
131 co_await do_file_copy(infd, outfd, static_cast<loff_t>(statx_buf.stx_size));
132
133 auto end = std::chrono::high_resolution_clock::now();
134 std::chrono::duration<double> elapsed = end - start;
135 double mbps =
136 (static_cast<double>(statx_buf.stx_size) / (1024.0 * 1024.0)) /
137 elapsed.count();
138 std::cout << std::format(
139 "Copied {} bytes in {:.2f} seconds ({:.2f} MB/s)\n", statx_buf.stx_size,
140 elapsed.count(), mbps);
141
142 co_await (condy::async_close(infd) && condy::async_close(outfd));
143}
144
145void usage(const char *progname) {
146 std::cerr << std::format(
147 "Usage: {} [-hd] [-t <task_num>] [-c <chunk_size>] <infile> <outfile>\n"
148 " -h Show this help message\n"
149 " -d Use O_DIRECT for file operations\n"
150 " -t <task_num> Number of concurrent copy tasks\n"
151 " -c <chunk_size> Size of each copy chunk\n",
152 progname);
153}
154
155size_t get_chunk_size(const char *arg) {
156 size_t len = std::strlen(arg);
157 int suffix = std::tolower(arg[len - 1]);
158 size_t multiplier = 1;
159 if (suffix == 'k') {
160 multiplier = 1024;
161 len -= 1;
162 } else if (suffix == 'm') {
163 multiplier = 1024l * 1024;
164 len -= 1;
165 } else if (suffix == 'g') {
166 multiplier = 1024l * 1024 * 1024;
167 len -= 1;
168 }
169 return std::stoul(std::string(arg, len)) * multiplier;
170}
171
172int main(int argc, char **argv) noexcept(false) {
173 int opt;
174 while ((opt = getopt(argc, argv, "ht:c:d")) != -1) {
175 switch (opt) {
176 case 't':
177 task_num = std::stoul(optarg);
178 break;
179 case 'c':
180 chunk_size = get_chunk_size(optarg);
181 break;
182 case 'd':
183 use_direct = true;
184 break;
185 case 'h':
186 usage(argv[0]);
187 return 0;
188 default:
189 usage(argv[0]);
190 return 1;
191 }
192 }
193
194 if (argc - optind < 2) {
195 usage(argv[0]);
196 return 1;
197 }
198
199 auto options = condy::RuntimeOptions().sq_size(task_num * 2);
200 condy::Runtime runtime(options);
201 condy::sync_wait(runtime, co_main(argv[optind], argv[optind + 1]));
202 return 0;
203}
Definitions of asynchronous operations.
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:68
Main include file for the Condy library.
auto async_fadvise(Fd fd, __u64 offset, off_t len, int advice)
See io_uring_prep_fadvise.
auto async_write(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_write.
T sync_wait(Runtime &runtime, Coro< T, Allocator > coro)
Synchronously wait for a coroutine to complete in the given runtime.
Definition sync_wait.hpp:24
auto async_statx(int dfd, const char *path, int flags, unsigned mask, struct statx *statxbuf)
See io_uring_prep_statx.
auto fixed(int fd)
Mark a file descriptor as fixed for io_uring operations.
Definition helpers.hpp:98
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Definition buffers.hpp:85
auto async_read(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_read.
auto async_close(int fd)
See io_uring_prep_close.
auto async_open(const char *path, int flags, mode_t mode)
See io_uring_prep_openat.
auto & current_runtime()
Get the current runtime.
Definition runtime.hpp:414
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266
Runtime type for running the io_uring event loop.
Self & sq_size(size_t v)
Set SQ size.