Handle blocking I/O using Mio, add accept and connect network socket shims#4892
Conversation
accept network socket shimaccept and connect network socket shims
|
Thank you for contributing to Miri! A reviewer will take a look at your PR, typically within a week or two. |
This comment has been minimized.
This comment has been minimized.
311951f to
46ffedf
Compare
|
This PR was rebased onto a different master commit. Here's a range-diff highlighting what actually changed. Rebasing is a normal part of keeping PRs up to date, so no action is needed—this note is just to help reviewers. |
|
Reminder, once the PR becomes ready for a review, use |
|
@rustbot ready |
There was a problem hiding this comment.
I think we can simplify this trait and RefCell setup quite a bit. As noted in the comments there's two approaches for how we could avoid "leaking" the Rc that's inside FileDescriptionRef; I sketched out the one that adds an extra Box. This also demonstrates how we can avoid the nested RefCell.
diff --git a/src/concurrency/blocking_io.rs b/src/concurrency/blocking_io.rs
index 9e46f8c7e..5f5846e77 100644
--- a/src/concurrency/blocking_io.rs
+++ b/src/concurrency/blocking_io.rs
@@ -1,6 +1,5 @@
use std::cell::RefCell;
use std::io;
-use std::rc::Rc;
use std::time::Duration;
use mio::event::Source;
@@ -14,33 +13,12 @@ use crate::*;
/// this value can be set rather low.
const IO_EVENT_CAPACITY: usize = 16;
-/// Trait containing the relevant subset of methods of [`Source`].
-/// Notably, the methods don't require a mutable self-reference.
-pub trait BlockingIoSource {
- /// Register `self` with the given [`Registry`] instance.
- /// For more information, see [`Source::register`].
- fn register(&self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()>;
-
- /// Deregister `self` from the given [`Registry`] instance.
- /// For more information, see [`Source::deregister`].
- fn deregister(&self, registry: &Registry) -> io::Result<()>;
+/// Trait for values that contain a mio [`Source`].
+pub trait WithSource {
+ /// Invoke `f` on the source inside `self`.
+ fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
}
-impl<T> BlockingIoSource for RefCell<T>
-where
- T: Source,
-{
- fn register(&self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> {
- registry.register(&mut *self.borrow_mut(), token, interests)
- }
-
- fn deregister(&self, registry: &Registry) -> io::Result<()> {
- registry.deregister(&mut *self.borrow_mut())
- }
-}
-
-type BlockingIoSourceRef = Rc<dyn BlockingIoSource>;
-
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Types of I/O a thread can be blocked on.
pub enum BlockingIoKind {
@@ -68,7 +46,7 @@ pub struct BlockingIoManager {
events: Events,
/// Map between threads which are currently blocked, the kind of I/O
/// they are blocked on and the underlying I/O source.
- sources: FxHashMap<ThreadId, (BlockingIoKind, BlockingIoSourceRef)>,
+ sources: FxHashMap<ThreadId, (BlockingIoKind, Box<dyn WithSource>)>,
}
impl BlockingIoManager {
@@ -128,7 +106,7 @@ impl BlockingIoManager {
pub fn register(
&mut self,
kind: BlockingIoKind,
- source: BlockingIoSourceRef,
+ source: Box<dyn WithSource>,
thread: ThreadId,
interests: Interest,
) {
@@ -139,7 +117,9 @@ impl BlockingIoManager {
// Treat errors from registering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
- source.register(poll.registry(), token, interests).unwrap();
+ source
+ .with_source(&mut |source| source.register(poll.registry(), token, interests))
+ .unwrap();
self.sources
.try_insert(thread, (kind, source))
.unwrap_or_else(|_| panic!("A thread cannot be registered twice at the same time"));
@@ -157,7 +137,7 @@ impl BlockingIoManager {
// Treat errors from deregistering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
- source.deregister(poll.registry()).unwrap();
+ source.with_source(&mut |source| source.deregister(poll.registry())).unwrap();
kind
}
@@ -176,7 +156,7 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
fn block_thread_for_io(
&mut self,
kind: BlockingIoKind,
- source: BlockingIoSourceRef,
+ source: impl WithSource + 'static,
interests: Interest,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
callback: DynUnblockCallback<'tcx>,
@@ -184,7 +164,7 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
let this = self.eval_context_mut();
this.machine.blocking_io.register(
kind,
- source,
+ Box::new(source),
this.machine.threads.active_thread(),
interests,
);
diff --git a/src/lib.rs b/src/lib.rs
index 01dfcf499..4bbcb7375 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -134,7 +134,7 @@ pub use crate::borrow_tracker::{
};
pub use crate::clock::{Instant, MonotonicClock};
pub use crate::concurrency::blocking_io::{
- BlockingIoKind, BlockingIoManager, BlockingIoSource, EvalContextExt as _,
+ BlockingIoKind, BlockingIoManager, WithSource, EvalContextExt as _,
};
pub use crate::concurrency::cpu_affinity::MAX_CPUS;
pub use crate::concurrency::data_race::{
diff --git a/src/shims/files.rs b/src/shims/files.rs
index 924739dbd..694a5922b 100644
--- a/src/shims/files.rs
+++ b/src/shims/files.rs
@@ -120,36 +120,6 @@ impl<T: FileDescription + 'static> FileDescriptionExt for T {
}
}
-/// A helper trait to allow downcasting a file description reference containing a blocking
-/// I/O source to a reference to the blocking I/O source.
-pub trait FileDescriptionBlockingIoSourceExt {
- fn into_source_rc(self: FileDescriptionRef<Self>) -> Rc<dyn BlockingIoSource>;
-}
-
-impl<T: FileDescription + BlockingIoSource + 'static> FileDescriptionBlockingIoSourceExt for T {
- fn into_source_rc(self: FileDescriptionRef<Self>) -> Rc<dyn BlockingIoSource> {
- self.0
- }
-}
-
-impl<T> BlockingIoSource for FdIdWith<T>
-where
- T: BlockingIoSource + ?Sized,
-{
- fn register(
- &self,
- registry: &mio::Registry,
- token: mio::Token,
- interests: mio::Interest,
- ) -> io::Result<()> {
- self.inner.register(registry, token, interests)
- }
-
- fn deregister(&self, registry: &mio::Registry) -> io::Result<()> {
- self.inner.deregister(registry)
- }
-}
-
pub type DynFileDescriptionRef = FileDescriptionRef<dyn FileDescription>;
impl FileDescriptionRef<dyn FileDescription> {
diff --git a/src/shims/unix/socket.rs b/src/shims/unix/socket.rs
index 341b99d6a..0cd5586e0 100644
--- a/src/shims/unix/socket.rs
+++ b/src/shims/unix/socket.rs
@@ -9,9 +9,7 @@ use rustc_const_eval::interpret::{InterpResult, interp_ok};
use rustc_middle::throw_unsup_format;
use rustc_target::spec::Os;
-use crate::shims::files::{
- FdId, FileDescription, FileDescriptionBlockingIoSourceExt, FileDescriptionRef,
-};
+use crate::shims::files::{FdId, FileDescription, FileDescriptionRef};
use crate::{OpTy, Scalar, *};
#[derive(Debug, PartialEq)]
@@ -48,17 +46,17 @@ enum SocketState {
Bound(SocketAddr),
/// The `listen` syscall has been called on the socket.
/// This is only reachable from the [`SocketState::Bound`] state.
- Listening(RefCell<TcpListener>),
+ Listening(TcpListener),
/// The `connect` syscall has been called and we weren't yet able
/// to ensure the connection is established. This is only reachable
/// from the [`SocketState::Initial`] state.
- Connecting(RefCell<TcpStream>),
+ Connecting(TcpStream),
/// The `connect` syscall has been called on the socket and
/// we ensured that the connection is established, or
/// the socket was created by the `accept` syscall.
/// For a socket created using the `connect` syscall, this is
/// only reachable from the [`SocketState::Connecting`] state.
- Connected(RefCell<TcpStream>),
+ Connected(TcpStream),
}
impl SocketState {
@@ -79,7 +77,6 @@ impl SocketState {
// address even when the connection might not yet be established.
let SocketState::Connecting(stream) = self else { return Ok(()) };
- let stream = stream.borrow();
if let Ok(Some(e)) = stream.take_error() {
// There was an error whilst connecting.
@@ -338,7 +335,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
match *state {
SocketState::Bound(socket_addr) =>
match TcpListener::bind(socket_addr) {
- Ok(listener) => *state = SocketState::Listening(RefCell::new(listener)),
+ Ok(listener) => *state = SocketState::Listening(listener),
Err(e) => return this.set_last_error_and_return_i32(e),
},
SocketState::Initial => {
@@ -490,7 +487,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// [`Interest::WRITEABLE`] event on the stream.
match TcpStream::connect(address) {
Ok(stream) =>
- *socket.state.borrow_mut() = SocketState::Connecting(RefCell::new(stream)),
+ *socket.state.borrow_mut() = SocketState::Connecting(stream),
Err(e) => return this.set_last_error_and_return(e, dest),
};
@@ -614,7 +611,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
*address
}
SocketState::Listening(listener) =>
- match listener.borrow().local_addr() {
+ match listener.local_addr() {
Ok(address) => address,
Err(e) => return this.set_last_error_and_return_i32(e),
},
@@ -914,7 +911,7 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let this = self.eval_context_mut();
this.block_thread_for_io(
BlockingIoKind::TcpAccept,
- socket.clone().into_source_rc(),
+ socket.clone(),
Interest::READABLE,
None,
callback!(@capture<'tcx> {
@@ -935,7 +932,6 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
unreachable!()
};
- let listener = listener.borrow();
let (stream, addr) = match listener.accept() {
Ok(peer) => peer,
// We need to block the thread again as it would still block.
@@ -964,7 +960,7 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let fd = this.machine.fds.new_ref(Socket {
family,
- state: RefCell::new(SocketState::Connected(RefCell::new(stream))),
+ state: RefCell::new(SocketState::Connected(stream)),
is_non_block: Cell::new(is_client_sock_nonblock),
});
let sockfd = this.machine.fds.insert(fd);
@@ -982,7 +978,7 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let this = self.eval_context_mut();
this.block_thread_for_io(
BlockingIoKind::TcpAccept,
- socket.clone().into_source_rc(),
+ socket.clone(),
Interest::WRITABLE,
None,
callback!(@capture<'tcx> {
@@ -1014,29 +1010,15 @@ impl VisitProvenance for FileDescriptionRef<Socket> {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
}
-impl BlockingIoSource for Socket {
- fn register(
+impl WithSource for FileDescriptionRef<Socket> {
+ fn with_source(
&self,
- registry: &mio::Registry,
- token: mio::Token,
- interests: Interest,
+ f: &mut dyn FnMut(&mut dyn mio::event::Source) -> io::Result<()>,
) -> io::Result<()> {
- let state = self.state.borrow();
- match &*state {
- SocketState::Listening(listener) => listener.register(registry, token, interests),
- SocketState::Connecting(stream) | SocketState::Connected(stream) =>
- stream.register(registry, token, interests),
- // We never try adding a socket which is not backed by a real socket to the poll registry.
- _ => unreachable!(),
- }
- }
-
- fn deregister(&self, registry: &mio::Registry) -> io::Result<()> {
- let state = self.state.borrow();
- match &*state {
- SocketState::Listening(listener) => listener.deregister(registry),
- SocketState::Connecting(stream) | SocketState::Connected(stream) =>
- stream.deregister(registry),
+ let mut state = self.state.borrow_mut();
+ match &mut *state {
+ SocketState::Listening(listener) => f(listener),
+ SocketState::Connecting(stream) | SocketState::Connected(stream) => f(stream),
// We never try adding a socket which is not backed by a real socket to the poll registry.
_ => unreachable!(),
}
Oh yeah, this was a total oversight. We could've avoided the nested
I've now implemented the @rustbot ready |
|
@rustbot ready |
|
This looks great, thanks! Please squash the commits. You can squash manually if there are multiple independent commits you want to preserve, or use @rustbot author |
Integrates mio into the scheduler to block threads for external I/O events (like blocking socket operations)
a41c5df to
ce1f53f
Compare
|
@rustbot ready |
Hi,
This pull request adds a new blocking I/O manager to the Miri machine. This manager can be used to register blocking I/O operations using
block_thread_for_ioand unblock them once an event for this I/O source happened.To test the new non-blocking I/O, this pull request also adds support for the
acceptandconnectTCP socket operations. Miri now supports blocking and non-blocking accepts and connects.