Condy v1.6.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
link-cp.cpp
Go to the documentation of this file.
1
5
6#include <condy.hpp>
7#include <cstdio>
8#include <cstring>
9#include <format>
10#include <iostream>
11#include <sys/types.h>
12#include <unistd.h>
13
14static size_t task_num = 64;
15static size_t chunk_size = 1024 * 1024l; // 1 MB
16static bool use_direct = false;
17
18condy::Coro<void> copy_file_task(size_t task_id, loff_t &offset,
19 loff_t file_size, void *buffer) {
20 using condy::operators::operator>>;
21
22 int buffer_index = static_cast<int>(task_id);
23
24 while (offset < file_size) {
25 loff_t current_offset = offset;
26 offset += static_cast<loff_t>(chunk_size);
27
28 auto to_copy = std::min(static_cast<loff_t>(chunk_size),
29 file_size - current_offset);
30 auto buf = condy::buffer(buffer, to_copy);
31
32 auto aw1 = condy::async_read(
33 condy::fixed(0), condy::fixed(buffer_index, buf), current_offset);
34 auto aw2 = condy::async_write(
35 condy::fixed(1), condy::fixed(buffer_index, buf), current_offset);
36 auto [r1, r2] = co_await (aw1 >> aw2);
37
38 if (r1 < 0 || r2 < 0) {
39 std::cerr << std::format("Failed to copy at offset {}: read result "
40 "{}, write result {}\n",
41 current_offset, r1, r2);
42 exit(1);
43 }
44 }
45}
46
47condy::Coro<void> do_file_copy(int infd, int outfd, loff_t size) {
48 size_t buffers_size = task_num * chunk_size;
49 void *raw_buffer;
50 if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) {
51 std::perror("Failed to allocate aligned buffers");
52 exit(1);
53 }
54
55 auto &fd_table = condy::current_runtime().fd_table();
56 fd_table.init(2);
57 int fds[2] = {infd, outfd};
58 fd_table.update(0, fds, 2);
59
60 auto &buffer_table = condy::current_runtime().buffer_table();
61 buffer_table.init(task_num);
62 std::vector<iovec> iovs(task_num);
63 for (size_t i = 0; i < task_num; i++) {
64 iovs[i] = {
65 .iov_base = static_cast<char *>(raw_buffer) + i * chunk_size,
66 .iov_len = chunk_size,
67 };
68 }
69 buffer_table.update(0, iovs.data(), task_num);
70
71 int r_faddvise = co_await condy::async_fadvise(
72 infd, 0, static_cast<off_t>(size), POSIX_FADV_SEQUENTIAL);
73 if (r_faddvise < 0) {
74 std::cerr << std::format("Failed to fadvise input file: {}\n",
75 r_faddvise);
76 exit(1);
77 }
78
79 std::vector<condy::Task<void>> tasks;
80 tasks.reserve(task_num);
81 loff_t offset = 0;
82 for (size_t i = 0; i < task_num; i++) {
83 tasks.emplace_back(condy::co_spawn(
84 copy_file_task(i, offset, size,
85 static_cast<char *>(raw_buffer) + i * chunk_size)));
86 }
87 for (auto &task : tasks) {
88 co_await std::move(task);
89 }
90 free(raw_buffer);
91}
92
93condy::Coro<void> co_main(const char *infile, const char *outfile) {
94 using condy::operators::operator&&;
95
96 int flags = 0;
97 if (use_direct) {
98 flags |= O_DIRECT;
99 }
100
101 auto [infd, outfd] =
102 co_await (condy::async_open(infile, O_RDONLY | flags, 0) &&
103 condy::async_open(outfile, O_WRONLY | O_CREAT | flags, 0644));
104 if (infd < 0 || outfd < 0) {
105 std::cerr << std::format("Failed to open file: {} {}\n", infd, outfd);
106 exit(1);
107 }
108
109 struct statx statx_buf;
110 int r_stat = co_await condy::async_statx(
111 AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf);
112 if (r_stat < 0) {
113 std::cerr << std::format("Failed to statx file: {}\n", r_stat);
114 exit(1);
115 }
116
117 if (use_direct && (statx_buf.stx_size % 4096 != 0)) {
118 std::cerr << std::format(
119 "File size {} is not multiple of 4096 bytes for O_DIRECT\n",
120 statx_buf.stx_size);
121 exit(1);
122 }
123
124 std::cout << std::format("Copy {} bytes from {} to {}\n",
125 statx_buf.stx_size, infile, outfile);
126
127 auto start = std::chrono::high_resolution_clock::now();
128
129 co_await do_file_copy(infd, outfd, static_cast<loff_t>(statx_buf.stx_size));
130
131 auto end = std::chrono::high_resolution_clock::now();
132 std::chrono::duration<double> elapsed = end - start;
133 double mbps =
134 (static_cast<double>(statx_buf.stx_size) / (1024.0 * 1024.0)) /
135 elapsed.count();
136 std::cout << std::format(
137 "Copied {} bytes in {:.2f} seconds ({:.2f} MB/s)\n", statx_buf.stx_size,
138 elapsed.count(), mbps);
139
140 co_await (condy::async_close(infd) && condy::async_close(outfd));
141}
142
143void usage(const char *progname) {
144 std::cerr << std::format(
145 "Usage: {} [-hd] [-t <task_num>] [-c <chunk_size>] <infile> <outfile>\n"
146 " -h Show this help message\n"
147 " -d Use O_DIRECT for file operations\n"
148 " -t <task_num> Number of concurrent copy tasks\n"
149 " -c <chunk_size> Size of each copy chunk\n",
150 progname);
151}
152
153size_t get_chunk_size(const char *arg) {
154 size_t len = std::strlen(arg);
155 int suffix = std::tolower(arg[len - 1]);
156 size_t multiplier = 1;
157 if (suffix == 'k') {
158 multiplier = 1024;
159 len -= 1;
160 } else if (suffix == 'm') {
161 multiplier = 1024l * 1024;
162 len -= 1;
163 } else if (suffix == 'g') {
164 multiplier = 1024l * 1024 * 1024;
165 len -= 1;
166 }
167 return std::stoul(std::string(arg, len)) * multiplier;
168}
169
170int main(int argc, char **argv) noexcept(false) {
171 int opt;
172 while ((opt = getopt(argc, argv, "ht:c:d")) != -1) {
173 switch (opt) {
174 case 't':
175 task_num = std::stoul(optarg);
176 break;
177 case 'c':
178 chunk_size = get_chunk_size(optarg);
179 break;
180 case 'd':
181 use_direct = true;
182 break;
183 case 'h':
184 usage(argv[0]);
185 return 0;
186 default:
187 usage(argv[0]);
188 return 1;
189 }
190 }
191
192 if (argc - optind < 2) {
193 usage(argv[0]);
194 return 1;
195 }
196
197 auto options = condy::RuntimeOptions().sq_size(task_num * 2);
198 condy::Runtime runtime(options);
199 condy::sync_wait(runtime, co_main(argv[optind], argv[optind + 1]));
200 return 0;
201}
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:91
Main include file for the Condy library.
auto async_fadvise(Fd fd, __u64 offset, off_t len, int advice)
See io_uring_prep_fadvise.
auto async_write(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_write.
T sync_wait(Runtime &runtime, Coro< T, Allocator > coro)
Synchronously wait for a coroutine to complete in the given runtime.
Definition sync_wait.hpp:24
auto async_statx(int dfd, const char *path, int flags, unsigned mask, struct statx *statxbuf)
See io_uring_prep_statx.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro) noexcept
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:266
auto fixed(int fd)
Mark a file descriptor as fixed for io_uring operations.
Definition helpers.hpp:101
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
Definition buffers.hpp:85
auto async_read(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_read.
auto async_close(int fd)
See io_uring_prep_close.
auto async_open(const char *path, int flags, mode_t mode)
See io_uring_prep_openat.
auto & current_runtime() noexcept
Get the current runtime.
Definition runtime.hpp:467
Self & sq_size(size_t v)
Set SQ size.