Condy v1.3.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
link-cp.cpp
Go to the documentation of this file.
1
5
7#include "condy/runtime.hpp"
8#include <condy.hpp>
9#include <cstdio>
10#include <cstring>
11#include <sys/types.h>
12#include <unistd.h>
13
14static size_t task_num = 64;
15static size_t chunk_size = 1024 * 1024l; // 1 MB
16static bool use_direct = false;
17
18condy::Coro<void> copy_file_task(size_t task_id, loff_t &offset,
19 loff_t file_size, void *buffer) {
20 using condy::operators::operator>>;
21
22 int buffer_index = static_cast<int>(task_id);
23
24 while (offset < file_size) {
25 loff_t current_offset = offset;
26 offset += static_cast<loff_t>(chunk_size);
27
28 auto to_copy = std::min(static_cast<loff_t>(chunk_size),
29 file_size - current_offset);
30 auto buf = condy::buffer(buffer, to_copy);
31
32 auto aw1 = condy::async_read(
33 condy::fixed(0), condy::fixed(buffer_index, buf), current_offset);
34 auto aw2 = condy::async_write(
35 condy::fixed(1), condy::fixed(buffer_index, buf), current_offset);
36 auto [r1, r2] = co_await (std::move(aw1) >> std::move(aw2));
37
38 if (r1 < 0 || r2 < 0) {
39 std::fprintf(stderr, "Failed to copy at offset %lld: %d %d\n",
40 (long long)current_offset, r1, r2);
41 exit(1);
42 }
43 }
44}
45
46condy::Coro<void> do_file_copy(int infd, int outfd, loff_t size) {
47 size_t buffers_size = task_num * chunk_size;
48 void *raw_buffer;
49 if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) {
50 std::fprintf(stderr, "Failed to allocate aligned buffers: %d\n", errno);
51 exit(1);
52 }
53
54 auto &fd_table = condy::current_runtime().fd_table();
55 fd_table.init(2);
56 int fds[2] = {infd, outfd};
57 fd_table.update(0, fds, 2);
58
59 auto &buffer_table = condy::current_runtime().buffer_table();
60 buffer_table.init(task_num);
61 std::vector<iovec> iovs(task_num);
62 for (size_t i = 0; i < task_num; i++) {
63 iovs[i] = {
64 .iov_base = static_cast<char *>(raw_buffer) + i * chunk_size,
65 .iov_len = chunk_size,
66 };
67 }
68 buffer_table.update(0, iovs.data(), task_num);
69
70 int r_faddvise = co_await condy::async_fadvise(
71 infd, 0, static_cast<off_t>(size), POSIX_FADV_SEQUENTIAL);
72 if (r_faddvise < 0) {
73 std::fprintf(stderr, "Failed to fadvise input file: %d\n", r_faddvise);
74 exit(1);
75 }
76
77 std::vector<condy::Task<void>> tasks;
78 tasks.reserve(task_num);
79 loff_t offset = 0;
80 for (size_t i = 0; i < task_num; i++) {
81 tasks.emplace_back(condy::co_spawn(
82 copy_file_task(i, offset, size,
83 static_cast<char *>(raw_buffer) + i * chunk_size)));
84 }
85 for (auto &task : tasks) {
86 co_await std::move(task);
87 }
88 free(raw_buffer);
89}
90
91condy::Coro<void> co_main(const char *infile, const char *outfile) {
92 using condy::operators::operator&&;
93
94 int flags = 0;
95 if (use_direct) {
96 flags |= O_DIRECT;
97 }
98
99 auto [infd, outfd] =
100 co_await (condy::async_open(infile, O_RDONLY | flags, 0) &&
101 condy::async_open(outfile, O_WRONLY | O_CREAT | flags, 0644));
102 if (infd < 0 || outfd < 0) {
103 std::fprintf(stderr, "Failed to open file: %d %d\n", infd, outfd);
104 exit(1);
105 }
106
107 struct statx statx_buf;
108 int r_stat = co_await condy::async_statx(
109 AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf);
110 if (r_stat < 0) {
111 std::fprintf(stderr, "Failed to statx file: %d\n", r_stat);
112 exit(1);
113 }
114
115 if (use_direct && (statx_buf.stx_size % 4096 != 0)) {
116 std::fprintf(
117 stderr,
118 "File size %lld is not multiple of 4096 bytes for O_DIRECT\n",
119 (long long)statx_buf.stx_size);
120 exit(1);
121 }
122
123 std::printf("Copy %lld bytes from %s to %s\n",
124 (long long)statx_buf.stx_size, infile, outfile);
125
126 auto start = std::chrono::high_resolution_clock::now();
127
128 co_await do_file_copy(infd, outfd, static_cast<loff_t>(statx_buf.stx_size));
129
130 auto end = std::chrono::high_resolution_clock::now();
131 std::chrono::duration<double> elapsed = end - start;
132 double mbps =
133 (static_cast<double>(statx_buf.stx_size) / (1024.0 * 1024.0)) /
134 elapsed.count();
135 std::printf("Copied %lld bytes in %.2f seconds (%.2f MB/s)\n",
136 (long long)statx_buf.stx_size, elapsed.count(), mbps);
137
138 co_await (condy::async_close(infd) && condy::async_close(outfd));
139}
140
141void usage(const char *progname) {
142 std::fprintf(
143 stderr,
144 "Usage: %s [-hd] [-t <task_num>] [-c <chunk_size>] <infile> <outfile>\n"
145 " -h Show this help message\n"
146 " -d Use O_DIRECT for file operations\n"
147 " -t <task_num> Number of concurrent copy tasks\n"
148 " -c <chunk_size> Size of each copy chunk\n",
149 progname);
150}
151
152size_t get_chunk_size(const char *arg) {
153 size_t len = std::strlen(arg);
154 int suffix = std::tolower(arg[len - 1]);
155 size_t multiplier = 1;
156 if (suffix == 'k') {
157 multiplier = 1024;
158 len -= 1;
159 } else if (suffix == 'm') {
160 multiplier = 1024l * 1024;
161 len -= 1;
162 } else if (suffix == 'g') {
163 multiplier = 1024l * 1024 * 1024;
164 len -= 1;
165 }
166 return std::stoul(std::string(arg, len)) * multiplier;
167}
168
169int main(int argc, char **argv) noexcept(false) {
170 int opt;
171 while ((opt = getopt(argc, argv, "ht:c:d")) != -1) {
172 switch (opt) {
173 case 't':
174 task_num = std::stoul(optarg);
175 break;
176 case 'c':
177 chunk_size = get_chunk_size(optarg);
178 break;
179 case 'd':
180 use_direct = true;
181 break;
182 case 'h':
183 usage(argv[0]);
184 return 0;
185 default:
186 usage(argv[0]);
187 return 1;
188 }
189 }
190
191 if (argc - optind < 2) {
192 usage(argv[0]);
193 return 1;
194 }
195
196 auto options = condy::RuntimeOptions().sq_size(task_num * 2);
197 condy::Runtime runtime(options);
198 condy::sync_wait(runtime, co_main(argv[optind], argv[optind + 1]));
199 return 0;
200}
Definitions of asynchronous operations.
Coroutine type used to define a coroutine function.
Definition coro.hpp:26
The event loop runtime for executing asynchronous.
Definition runtime.hpp:76
Main include file for the Condy library.
auto async_fadvise(Fd fd, __u64 offset, off_t len, int advice)
See io_uring_prep_fadvise.
auto async_write(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_write.
T sync_wait(Runtime &runtime, Coro< T, Allocator > coro)
Synchronously wait for a coroutine to complete in the given runtime.
Definition sync_wait.hpp:24
auto async_statx(int dfd, const char *path, int flags, unsigned mask, struct statx *statxbuf)
See io_uring_prep_statx.
auto fixed(int fd)
Mark a file descriptor as fixed for io_uring operations.
Definition helpers.hpp:98
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
Definition buffers.hpp:84
auto async_read(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_read.
auto async_close(int fd)
See io_uring_prep_close.
auto async_open(const char *path, int flags, mode_t mode)
See io_uring_prep_openat.
auto & current_runtime()
Get the current runtime.
Definition runtime.hpp:413
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Definition task.hpp:240
Runtime type for running the io_uring event loop.
Self & sq_size(size_t v)
Set SQ size.