16static size_t task_num = 64;
17static size_t chunk_size = 1024 * 1024l;
18static bool use_direct =
false;
21 loff_t file_size,
void *buffer) {
22 using condy::operators::operator>>;
24 int buffer_index =
static_cast<int>(task_id);
26 while (offset < file_size) {
27 loff_t current_offset = offset;
28 offset +=
static_cast<loff_t
>(chunk_size);
30 auto to_copy = std::min(
static_cast<loff_t
>(chunk_size),
31 file_size - current_offset);
38 auto [r1, r2] =
co_await (aw1 >> aw2);
40 if (r1 < 0 || r2 < 0) {
41 std::cerr << std::format(
"Failed to copy at offset {}: read result "
42 "{}, write result {}\n",
43 current_offset, r1, r2);
50 size_t buffers_size = task_num * chunk_size;
52 if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) {
53 std::perror(
"Failed to allocate aligned buffers");
59 int fds[2] = {infd, outfd};
60 fd_table.update(0, fds, 2);
63 buffer_table.init(task_num);
64 std::vector<iovec> iovs(task_num);
65 for (
size_t i = 0; i < task_num; i++) {
67 .iov_base =
static_cast<char *
>(raw_buffer) + i * chunk_size,
68 .iov_len = chunk_size,
71 buffer_table.update(0, iovs.data(), task_num);
74 infd, 0,
static_cast<off_t
>(size), POSIX_FADV_SEQUENTIAL);
76 std::cerr << std::format(
"Failed to fadvise input file: {}\n",
81 std::vector<condy::Task<void>> tasks;
82 tasks.reserve(task_num);
84 for (
size_t i = 0; i < task_num; i++) {
86 copy_file_task(i, offset, size,
87 static_cast<char *
>(raw_buffer) + i * chunk_size)));
89 for (
auto &task : tasks) {
90 co_await std::move(task);
96 using condy::operators::operator&&;
106 if (infd < 0 || outfd < 0) {
107 std::cerr << std::format(
"Failed to open file: {} {}\n", infd, outfd);
111 struct statx statx_buf;
113 AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf);
115 std::cerr << std::format(
"Failed to statx file: {}\n", r_stat);
119 if (use_direct && (statx_buf.stx_size % 4096 != 0)) {
120 std::cerr << std::format(
121 "File size {} is not multiple of 4096 bytes for O_DIRECT\n",
126 std::cout << std::format(
"Copy {} bytes from {} to {}\n",
127 statx_buf.stx_size, infile, outfile);
129 auto start = std::chrono::high_resolution_clock::now();
131 co_await do_file_copy(infd, outfd,
static_cast<loff_t
>(statx_buf.stx_size));
133 auto end = std::chrono::high_resolution_clock::now();
134 std::chrono::duration<double> elapsed = end - start;
136 (
static_cast<double>(statx_buf.stx_size) / (1024.0 * 1024.0)) /
138 std::cout << std::format(
139 "Copied {} bytes in {:.2f} seconds ({:.2f} MB/s)\n", statx_buf.stx_size,
140 elapsed.count(), mbps);
145void usage(
const char *progname) {
146 std::cerr << std::format(
147 "Usage: {} [-hd] [-t <task_num>] [-c <chunk_size>] <infile> <outfile>\n"
148 " -h Show this help message\n"
149 " -d Use O_DIRECT for file operations\n"
150 " -t <task_num> Number of concurrent copy tasks\n"
151 " -c <chunk_size> Size of each copy chunk\n",
155size_t get_chunk_size(
const char *arg) {
156 size_t len = std::strlen(arg);
157 int suffix = std::tolower(arg[len - 1]);
158 size_t multiplier = 1;
162 }
else if (suffix ==
'm') {
163 multiplier = 1024l * 1024;
165 }
else if (suffix ==
'g') {
166 multiplier = 1024l * 1024 * 1024;
169 return std::stoul(std::string(arg, len)) * multiplier;
172int main(
int argc,
char **argv)
noexcept(
false) {
174 while ((opt = getopt(argc, argv,
"ht:c:d")) != -1) {
177 task_num = std::stoul(optarg);
180 chunk_size = get_chunk_size(optarg);
194 if (argc - optind < 2) {
Definitions of asynchronous operations.
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
Main include file for the Condy library.
auto async_fadvise(Fd fd, __u64 offset, off_t len, int advice)
See io_uring_prep_fadvise.
auto async_write(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_write.
T sync_wait(Runtime &runtime, Coro< T, Allocator > coro)
Synchronously wait for a coroutine to complete in the given runtime.
auto async_statx(int dfd, const char *path, int flags, unsigned mask, struct statx *statxbuf)
See io_uring_prep_statx.
auto fixed(int fd)
Mark a file descriptor as fixed for io_uring operations.
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
auto async_read(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_read.
auto async_close(int fd)
See io_uring_prep_close.
auto async_open(const char *path, int flags, mode_t mode)
See io_uring_prep_openat.
auto & current_runtime()
Get the current runtime.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Runtime type for running the io_uring event loop.
Self & sq_size(size_t v)
Set SQ size.