14static size_t task_num = 64;
15static size_t chunk_size = 1024 * 1024l;
16static bool use_direct =
false;
19 loff_t file_size,
void *buffer) {
20 using condy::operators::operator>>;
22 int buffer_index =
static_cast<int>(task_id);
24 while (offset < file_size) {
25 loff_t current_offset = offset;
26 offset +=
static_cast<loff_t
>(chunk_size);
28 auto to_copy = std::min(
static_cast<loff_t
>(chunk_size),
29 file_size - current_offset);
36 auto [r1, r2] =
co_await (std::move(aw1) >> std::move(aw2));
38 if (r1 < 0 || r2 < 0) {
39 std::fprintf(stderr,
"Failed to copy at offset %lld: %d %d\n",
40 (
long long)current_offset, r1, r2);
47 size_t buffers_size = task_num * chunk_size;
49 if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) {
50 std::fprintf(stderr,
"Failed to allocate aligned buffers: %d\n", errno);
56 int fds[2] = {infd, outfd};
57 fd_table.update(0, fds, 2);
60 buffer_table.init(task_num);
61 std::vector<iovec> iovs(task_num);
62 for (
size_t i = 0; i < task_num; i++) {
64 .iov_base =
static_cast<char *
>(raw_buffer) + i * chunk_size,
65 .iov_len = chunk_size,
68 buffer_table.update(0, iovs.data(), task_num);
71 infd, 0,
static_cast<off_t
>(size), POSIX_FADV_SEQUENTIAL);
73 std::fprintf(stderr,
"Failed to fadvise input file: %d\n", r_faddvise);
77 std::vector<condy::Task<void>> tasks;
78 tasks.reserve(task_num);
80 for (
size_t i = 0; i < task_num; i++) {
82 copy_file_task(i, offset, size,
83 static_cast<char *
>(raw_buffer) + i * chunk_size)));
85 for (
auto &task : tasks) {
86 co_await std::move(task);
92 using condy::operators::operator&&;
102 if (infd < 0 || outfd < 0) {
103 std::fprintf(stderr,
"Failed to open file: %d %d\n", infd, outfd);
107 struct statx statx_buf;
109 AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf);
111 std::fprintf(stderr,
"Failed to statx file: %d\n", r_stat);
115 if (use_direct && (statx_buf.stx_size % 4096 != 0)) {
118 "File size %lld is not multiple of 4096 bytes for O_DIRECT\n",
119 (
long long)statx_buf.stx_size);
123 std::printf(
"Copy %lld bytes from %s to %s\n",
124 (
long long)statx_buf.stx_size, infile, outfile);
126 auto start = std::chrono::high_resolution_clock::now();
128 co_await do_file_copy(infd, outfd,
static_cast<loff_t
>(statx_buf.stx_size));
130 auto end = std::chrono::high_resolution_clock::now();
131 std::chrono::duration<double> elapsed = end - start;
133 (
static_cast<double>(statx_buf.stx_size) / (1024.0 * 1024.0)) /
135 std::printf(
"Copied %lld bytes in %.2f seconds (%.2f MB/s)\n",
136 (
long long)statx_buf.stx_size, elapsed.count(), mbps);
141void usage(
const char *progname) {
144 "Usage: %s [-hd] [-t <task_num>] [-c <chunk_size>] <infile> <outfile>\n"
145 " -h Show this help message\n"
146 " -d Use O_DIRECT for file operations\n"
147 " -t <task_num> Number of concurrent copy tasks\n"
148 " -c <chunk_size> Size of each copy chunk\n",
152size_t get_chunk_size(
const char *arg) {
153 size_t len = std::strlen(arg);
154 int suffix = std::tolower(arg[len - 1]);
155 size_t multiplier = 1;
159 }
else if (suffix ==
'm') {
160 multiplier = 1024l * 1024;
162 }
else if (suffix ==
'g') {
163 multiplier = 1024l * 1024 * 1024;
166 return std::stoul(std::string(arg, len)) * multiplier;
169int main(
int argc,
char **argv)
noexcept(
false) {
171 while ((opt = getopt(argc, argv,
"ht:c:d")) != -1) {
174 task_num = std::stoul(optarg);
177 chunk_size = get_chunk_size(optarg);
191 if (argc - optind < 2) {
Definitions of asynchronous operations.
Coroutine type used to define a coroutine function.
The event loop runtime for executing asynchronous.
Main include file for the Condy library.
auto async_fadvise(Fd fd, __u64 offset, off_t len, int advice)
See io_uring_prep_fadvise.
auto async_write(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_write.
T sync_wait(Runtime &runtime, Coro< T, Allocator > coro)
Synchronously wait for a coroutine to complete in the given runtime.
auto async_statx(int dfd, const char *path, int flags, unsigned mask, struct statx *statxbuf)
See io_uring_prep_statx.
auto fixed(int fd)
Mark a file descriptor as fixed for io_uring operations.
MutableBuffer buffer(void *data, size_t size)
Create a buffer object from various data sources.
auto async_read(Fd fd, Buffer &&buf, __u64 offset)
See io_uring_prep_read.
auto async_close(int fd)
See io_uring_prep_close.
auto async_open(const char *path, int flags, mode_t mode)
See io_uring_prep_openat.
auto & current_runtime()
Get the current runtime.
Task< T, Allocator > co_spawn(Runtime &runtime, Coro< T, Allocator > coro)
Spawn a coroutine as a task in the given runtime.
Runtime type for running the io_uring event loop.
Self & sq_size(size_t v)
Set SQ size.