Skip to content

Commit 5ba8ba9

Browse files
committed
multishot: implement detection of finished
When a multishot IO will no longer yield a value the IORING_CQE_F_MORE flag will not be set. Detect that here and make all multishot IOs return `Poll::Ready(None)` in that case to signal that the end of the stream has been reached.
1 parent 4058766 commit 5ba8ba9

10 files changed

Lines changed: 262 additions & 168 deletions

File tree

examples/udp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ fn main() {
2424
});
2525

2626
Executor::spawn(async {
27-
let mut buf = [0xadu8; 20];
27+
let buf = [0xadu8; 20];
2828
let udpsock = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
2929
Timer::sleep(Duration::from_secs(1)).unwrap().await;
3030
let len = udpsock
31-
.send_to(&mut buf, (Ipv4Addr::LOCALHOST, 9998))
31+
.send_to(&buf, (Ipv4Addr::LOCALHOST, 9998))
3232
.await
3333
.unwrap();
3434

examples/web_server.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,10 @@ async fn handle_connection(mut conn: TcpStream) -> anyhow::Result<()> {
134134

135135
let file = if path == PathBuf::from("/") {
136136
ARGS.get().unwrap().webroot.join("index.html")
137+
} else if let Ok(path) = path.strip_prefix("/") {
138+
ARGS.get().unwrap().webroot.join(path)
137139
} else {
138-
if let Ok(path) = path.strip_prefix("/") {
139-
ARGS.get().unwrap().webroot.join(path)
140-
} else {
141-
return send_response_hdr(&mut conn, Response::NotFound, 0).await;
142-
}
140+
return send_response_hdr(&mut conn, Response::NotFound, 0).await;
143141
};
144142

145143
send_file(conn, file).await

src/futures/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use io_uring::{opcode, types};
4242
use libc::{AF_INET, AF_INET6, SOCK_STREAM};
4343
use tokio_stream::Stream;
4444

45-
use crate::reactor::{Reactor, ReactorIo};
45+
use crate::reactor::{MultishotReactorIo, Reactor, ReactorIo};
4646

4747
use super::{
4848
read::{AsyncRead, AsyncReader},
@@ -56,7 +56,7 @@ use super::{
5656
/// accept connections on the specified address.
5757
pub struct TcpListener {
5858
inner: OwnedFd,
59-
io: ReactorIo,
59+
io: MultishotReactorIo,
6060
}
6161

6262
fn mk_sock(addr: &SocketAddr) -> std::io::Result<OwnedFd> {
@@ -127,7 +127,7 @@ impl Stream for TcpListener {
127127
)
128128
})
129129
.map(|x| {
130-
Some({
130+
x.map(|x| {
131131
x.map(|fd| TcpStream {
132132
inner: unsafe { OwnedFd::from_raw_fd(fd) },
133133
})

src/reactor/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use std::{mem::transmute, task::Waker};
2-
use uring::{ReactorUring, UringIo};
1+
use std::task::Waker;
2+
3+
use uring::{MultishotUringIo, OneshotUringIo, ReactorUring};
34

45
mod uring;
56

6-
pub type ReactorIo = UringIo<'static, Waker>;
7+
pub type ReactorIo = OneshotUringIo<Waker>;
8+
pub type MultishotReactorIo = MultishotUringIo<Waker>;
79

810
pub(crate) struct Reactor {}
911

@@ -13,11 +15,11 @@ thread_local! {
1315

1416
impl Reactor {
1517
pub fn new_io() -> ReactorIo {
16-
REACTOR.with(|r| unsafe { transmute(r.new_io()) })
18+
REACTOR.with(|r| r.new_oneshot_io())
1719
}
1820

19-
pub fn new_multishot_io() -> ReactorIo {
20-
REACTOR.with(|r| unsafe { transmute(r.new_multishot_io()) })
21+
pub fn new_multishot_io() -> MultishotReactorIo {
22+
REACTOR.with(|r| r.new_multishot_io())
2123
}
2224

2325
pub fn react() {

src/reactor/uring.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,32 @@
1-
pub(crate) use io::UringIo;
2-
use io_uring::{squeue, CompletionQueue, IoUring};
1+
pub(crate) use io::{multishot::MultishotUringIo, oneshot::OneshotUringIo};
2+
use io_uring::{cqueue, squeue, CompletionQueue, IoUring};
33
use result::RingResults;
44
use slab::Slab;
5-
use std::cell::{RefCell, RefMut};
5+
use std::{
6+
cell::{RefCell, RefMut},
7+
rc::Rc,
8+
};
69

710
mod io;
811
mod result;
912

1013
pub struct ReactorUring<T: Clone> {
11-
inner: RefCell<ReactorInner<T>>,
14+
inner: Rc<RefCell<ReactorInner<T>>>,
1215
}
1316

1417
impl<T: Clone> ReactorUring<T> {
1518
pub fn new() -> Self {
1619
Self {
17-
inner: RefCell::new(ReactorInner::new()),
20+
inner: Rc::new(RefCell::new(ReactorInner::new())),
1821
}
1922
}
2023

21-
pub fn new_io(&self) -> UringIo<'_, T> {
22-
UringIo::new(&self.inner, IoKind::Oneshot)
24+
pub fn new_oneshot_io(&self) -> OneshotUringIo<T> {
25+
OneshotUringIo::new(self.inner.clone())
2326
}
2427

25-
pub fn new_multishot_io(&self) -> UringIo<'_, T> {
26-
UringIo::new(&self.inner, IoKind::Multi)
28+
pub fn new_multishot_io(&self) -> MultishotUringIo<T> {
29+
MultishotUringIo::new(self.inner.clone())
2730
}
2831

2932
pub fn react(&self) -> IoCompletionIter<'_, T> {
@@ -47,7 +50,7 @@ impl<T: Clone> ReactorUring<T> {
4750
}
4851
}
4952

50-
struct ReactorInner<T> {
53+
pub(crate) struct ReactorInner<T> {
5154
uring: IoUring,
5255
pending: Slab<PendingIo<T>>,
5356
results: RingResults,
@@ -75,8 +78,11 @@ impl<T> ReactorInner<T> {
7578
}
7679
}
7780

78-
fn submit_io(&mut self, entry: squeue::Entry, obj: T, kind: IoKind) -> usize {
79-
let result_slab_idx = self.results.get(kind).create_slot();
81+
fn submit_io(&mut self, entry: squeue::Entry, obj: T, kind: IoKind) -> (u64, usize) {
82+
let result_slab_idx = match kind {
83+
IoKind::Oneshot => self.results.get_oneshot().create_slot(),
84+
IoKind::Multi => self.results.get_multishot().create_slot(),
85+
};
8086

8187
let slot = self.pending.insert(PendingIo {
8288
assoc_obj: obj,
@@ -91,7 +97,7 @@ impl<T> ReactorInner<T> {
9197
.unwrap();
9298
}
9399

94-
result_slab_idx
100+
(slot as u64, result_slab_idx)
95101
}
96102
}
97103

@@ -113,13 +119,21 @@ impl<T: Clone> Iterator for IoCompletionIter<'_, T> {
113119
.unwrap()
114120
.clone();
115121

116-
self.ring
117-
.results
118-
.get(pending_io.kind)
119-
.set_result(entry.result(), pending_io.result_slab_idx);
120-
121-
if let IoKind::Oneshot = pending_io.kind {
122-
self.ring.pending.remove(entry.user_data() as usize);
122+
match pending_io.kind {
123+
IoKind::Oneshot => {
124+
self.ring
125+
.results
126+
.get_oneshot()
127+
.set_result(entry.result(), pending_io.result_slab_idx);
128+
self.ring.pending.remove(entry.user_data() as usize);
129+
}
130+
IoKind::Multi => {
131+
let results = self.ring.results.get_multishot();
132+
results.push_result(entry.result(), pending_io.result_slab_idx);
133+
if !cqueue::more(entry.flags()) {
134+
results.set_finished(pending_io.result_slab_idx);
135+
}
136+
}
123137
}
124138

125139
Some(pending_io.assoc_obj)
@@ -189,7 +203,7 @@ mod tests {
189203
run_test(|a, b, uring| {
190204
let mut buf = [0];
191205

192-
let mut io = uring.new_io();
206+
let mut io = uring.new_oneshot_io();
193207
let result = io.submit_or_get_result(|| {
194208
(
195209
opcode::Read::new(types::Fd(a.as_raw_fd()), buf.as_mut_ptr(), 1).build(),
@@ -224,7 +238,7 @@ mod tests {
224238
run_test(|a, b, uring| {
225239
let mut buf = [0];
226240

227-
let mut io = uring.new_io();
241+
let mut io = uring.new_oneshot_io();
228242
assert!(matches!(
229243
io.submit_or_get_result(|| {
230244
(
@@ -255,7 +269,7 @@ mod tests {
255269
run_test(|a, b, uring| {
256270
let buf = [0];
257271

258-
let mut io = uring.new_io();
272+
let mut io = uring.new_oneshot_io();
259273
let result = io.submit_or_get_result(|| {
260274
(
261275
opcode::Write::new(types::Fd(a.as_raw_fd()), buf.as_ptr(), buf.len() as _)
@@ -293,7 +307,7 @@ mod tests {
293307
run_test(|a, b, uring| {
294308
let mut buf = [0, 0];
295309

296-
let mut io1 = uring.new_io();
310+
let mut io1 = uring.new_oneshot_io();
297311
assert!(matches!(
298312
io1.submit_or_get_result(|| {
299313
(
@@ -304,7 +318,7 @@ mod tests {
304318
Poll::Pending
305319
));
306320

307-
let mut io2 = uring.new_io();
321+
let mut io2 = uring.new_oneshot_io();
308322
assert!(matches!(
309323
io2.submit_or_get_result(|| {
310324
(
@@ -344,7 +358,7 @@ mod tests {
344358
run_test(|a, b, uring| {
345359
let buf = [0xbe, 0xef];
346360

347-
let mut io1 = uring.new_io();
361+
let mut io1 = uring.new_oneshot_io();
348362
assert!(matches!(
349363
io1.submit_or_get_result(|| {
350364
(
@@ -355,7 +369,7 @@ mod tests {
355369
Poll::Pending
356370
));
357371

358-
let mut io2 = uring.new_io();
372+
let mut io2 = uring.new_oneshot_io();
359373
assert!(matches!(
360374
io2.submit_or_get_result(|| {
361375
(

src/reactor/uring/io.rs

Lines changed: 0 additions & 80 deletions
This file was deleted.

src/reactor/uring/io/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pub mod multishot;
2+
pub mod oneshot;
3+
4+
fn reactor_value_to_result(v: i32) -> std::io::Result<i32> {
5+
if v < 0 {
6+
Err(std::io::Error::from_raw_os_error(v.abs()))
7+
} else {
8+
Ok(v)
9+
}
10+
}

0 commit comments

Comments
 (0)