-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilescheduler.cc
More file actions
executable file
·71 lines (62 loc) · 2.39 KB
/
filescheduler.cc
File metadata and controls
executable file
·71 lines (62 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <stdexcept>
#include <cstring>
#include <cerrno>
#include "filescheduler.h"
#define QUEUE_DEPTH 1024
FileScheduler::FileScheduler() {
int ret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
if (ret < 0) {
throw std::runtime_error("Error: Unable to initialize IO uring");
}
}
FileScheduler::~FileScheduler() {
io_uring_queue_exit(&ring);
}
void FileScheduler::setScheduler(Scheduler* scheduler) {
this->scheduler = scheduler;
}
void FileScheduler::submit(AsyncFileReadTask* task) {
FileTaskInput* ft_input = static_cast<FileTaskInput*>(task->input);
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
io_uring_prep_readv(sqe, ft_input->file_fd, ft_input->iovecs, ft_input->blocks, 0);
io_uring_sqe_set_data(sqe, task);
int ret = io_uring_submit(&ring);
if (ret < 0) {
throw std::runtime_error(std::string("Error: File read submission failed: ") + std::to_string(ret));
}
task->setStartTime();
pending_requests++;
}
void FileScheduler::submit(AsyncFileWriteTask* task) {
FileTaskInput* ft_input = static_cast<FileTaskInput*>(task->input);
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
io_uring_prep_writev(sqe, ft_input->file_fd, ft_input->iovecs, ft_input->blocks, 0);
io_uring_sqe_set_data(sqe, task);
int ret = io_uring_submit(&ring);
if (ret < 0) {
throw std::runtime_error(std::string("Error: File write submission failed: ") + std::to_string(ret));
}
task->setStartTime();
pending_requests++;
}
void FileScheduler::process_completed() {
if (pending_requests == 0) return;
struct io_uring_cqe* cqe;
unsigned head;
int processed{0};
io_uring_for_each_cqe(&ring, head, cqe) {
if (cqe->res < 0) {
throw std::runtime_error(std::string("Error: Async request processing failed: ") + std::to_string(cqe->res));
}
AsyncFileTask* task = (AsyncFileTask*)io_uring_cqe_get_data(cqe);
FileTaskInput* fr_input = static_cast<FileTaskInput*>(task->input);
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - task->start_time);
task->history.addEvent({EventType::IO, duration});
task->setExecutionMode(TaskExecutionMode::SYNC);
scheduler->submit(task);
processed++;
pending_requests--;
}
io_uring_cq_advance(&ring, processed);
}