diff --git a/src/net/sops.rs b/src/net/sops.rs index 03c42c1e..92474016 100644 --- a/src/net/sops.rs +++ b/src/net/sops.rs @@ -52,7 +52,7 @@ pub trait SocketOps: Send + Sync { buf: UA, count: usize, flags: RecvFlags, - ) -> libkernel::error::Result; + ) -> libkernel::error::Result<(usize, Option)>; async fn recvfrom( &mut self, ctx: &mut FileCtx, @@ -95,7 +95,9 @@ where buf: UA, count: usize, ) -> libkernel::error::Result { - self.recv(ctx, buf, count, RecvFlags::empty()).await + self.recv(ctx, buf, count, RecvFlags::empty()) + .await + .map(|(len, _)| len) } async fn readat( diff --git a/src/net/tcp.rs b/src/net/tcp.rs index e8d9f91c..637d11e2 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -95,7 +95,7 @@ impl SocketOps for TcpSocket { _buf: UA, _count: usize, _flags: RecvFlags, - ) -> libkernel::error::Result { + ) -> libkernel::error::Result<(usize, Option)> { todo!() } diff --git a/src/net/unix.rs b/src/net/unix.rs index 40b4fd83..01c59ee2 100644 --- a/src/net/unix.rs +++ b/src/net/unix.rs @@ -1,12 +1,14 @@ use crate::fs::open_file::FileCtx; use crate::kernel::kpipe::KPipe; +use crate::memory::uaccess::{copy_from_user_slice, copy_to_user_slice}; use crate::net::sops::{RecvFlags, SendFlags}; -use crate::net::{SockAddr, SocketOps}; -use crate::sync::OnceLock; +use crate::net::{SockAddr, SockAddrUn, SocketOps}; use crate::sync::SpinLock; +use crate::sync::{Mutex, OnceLock}; use alloc::boxed::Box; -use alloc::collections::BTreeMap; +use alloc::collections::{BTreeMap, VecDeque}; use alloc::sync::Arc; +use alloc::vec; use alloc::vec::Vec; use async_trait::async_trait; use core::future::poll_fn; @@ -15,9 +17,63 @@ use core::task::Waker; use libkernel::error::{FsError, KernelError, Result}; use libkernel::memory::address::UA; +struct Message { + sender: SockAddrUn, + data: Vec, +} + +#[derive(Clone)] +enum Inbox { + Pipe(Arc), + Datagram(Arc>>), +} + +impl Inbox { + fn new(socket_type: SocketType) -> Self { + match socket_type { + SocketType::Stream | SocketType::SeqPacket => { + Inbox::Pipe(Arc::new(KPipe::new().expect("KPipe creation failed"))) + } + SocketType::Datagram => Inbox::Datagram(Arc::new(Mutex::new(VecDeque::new()))), + } + } + + async fn send(&self, origin: SockAddrUn, buf: UA, count: usize) -> Result { + match self { + Inbox::Pipe(pipe) => pipe.copy_from_user(buf, count).await, + Inbox::Datagram(queue) => { + let mut data = vec![0u8; count]; + copy_from_user_slice(buf, &mut data).await?; + let msg = Message { + sender: origin, + data, + }; + queue.lock().await.push_back(msg); + Ok(count) + } + } + } + + async fn recv(&self, buf: UA, count: usize) -> Result<(usize, Option)> { + match self { + Inbox::Pipe(pipe) => Ok((pipe.copy_to_user(buf, count).await?, None)), + Inbox::Datagram(queue) => { + let mut q = queue.lock().await; + if let Some(msg) = q.pop_front() { + let n = msg.data.len().min(count); + copy_to_user_slice(&msg.data[..n], buf).await?; + Ok((n, Some(msg.sender))) + } else { + Ok((0, None)) + } + } + } + } +} + /// Registry mapping Unix socket path bytes to endpoint inbox and listening state struct Endpoint { - inbox: Arc, + inbox: Inbox, listening: bool, backlog_max: usize, pending: Vec, @@ -32,6 +88,7 @@ fn endpoints() -> &'static SpinLock, Endpoint>> { UNIX_ENDPOINTS.get_or_init(|| SpinLock::new(BTreeMap::new())) } +#[derive(Copy, Clone)] enum SocketType { Stream, Datagram, @@ -41,10 +98,10 @@ enum SocketType { pub struct UnixSocket { socket_type: SocketType, /// Recv inbox - inbox: Arc, + inbox: Inbox, /// The peer endpoint's inbox - peer_inbox: SpinLock>>, - local_addr: SpinLock>, + peer_inbox: SpinLock>, + local_addr: SpinLock>, connected: SpinLock, listening: SpinLock, backlog: SpinLock, @@ -57,7 +114,7 @@ impl UnixSocket { fn new(socket_type: SocketType) -> Self { UnixSocket { socket_type, - inbox: Arc::new(KPipe::new().expect("KPipe::new for UnixSocket")), + inbox: Inbox::new(socket_type), peer_inbox: SpinLock::new(None), local_addr: SpinLock::new(None), connected: SpinLock::new(false), @@ -243,14 +300,17 @@ impl SocketOps for UnixSocket { buf: UA, count: usize, _flags: RecvFlags, - ) -> Result { + ) -> Result<(usize, Option)> { if count == 0 { - return Ok(0); + return Ok((0, None)); } if *self.rd_shutdown.lock_save_irq() { - return Ok(0); + return Ok((0, None)); } - self.inbox.copy_to_user(buf, count).await + self.inbox.recv(buf, count).await.map(|(n, peer)| { + let peer_addr = peer.map(SockAddr::Un); + (n, peer_addr) + }) } async fn recvfrom( @@ -262,7 +322,7 @@ impl SocketOps for UnixSocket { _addr: Option, ) -> Result<(usize, Option)> { let n = self.recv(ctx, buf, count, flags).await?; - Ok((n, None)) + Ok(n) } async fn send( @@ -289,19 +349,43 @@ impl SocketOps for UnixSocket { let Some(peer) = self.peer_inbox.lock_save_irq().clone() else { return Err(KernelError::InvalidValue); }; - peer.copy_from_user(buf, count).await + let local_addr = { + self.local_addr.lock_save_irq().unwrap_or(SockAddrUn { + family: crate::net::AF_UNIX as u16, + path: [0; 108], + }) + }; + peer.send(local_addr, buf, count).await } async fn sendto( &mut self, _ctx: &mut FileCtx, - _buf: UA, - _count: usize, + buf: UA, + count: usize, _flags: SendFlags, - _addr: SockAddr, + addr: SockAddr, ) -> Result { - todo!(); - // self.send(ctx, buf, count, flags).await + let peer_inbox = match addr { + SockAddr::Un(saun) => { + let Some(path) = UnixSocket::path_bytes(&saun) else { + return Err(KernelError::InvalidValue); + }; + let reg = endpoints().lock_save_irq(); + let Some(ep) = reg.get(&path) else { + return Err(KernelError::Fs(FsError::NotFound)); + }; + ep.inbox.clone() + } + _ => return Err(KernelError::InvalidValue), + }; + let local_addr = { + self.local_addr.lock_save_irq().unwrap_or(SockAddrUn { + family: crate::net::AF_UNIX as u16, + path: [0; 108], + }) + }; + peer_inbox.send(local_addr, buf, count).await } async fn shutdown(&self, how: crate::net::ShutdownHow) -> Result<()> {