Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/net/sops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait SocketOps: Send + Sync {
buf: UA,
count: usize,
flags: RecvFlags,
) -> libkernel::error::Result<usize>;
) -> libkernel::error::Result<(usize, Option<SockAddr>)>;
async fn recvfrom(
&mut self,
ctx: &mut FileCtx,
Expand Down Expand Up @@ -95,7 +95,9 @@ where
buf: UA,
count: usize,
) -> libkernel::error::Result<usize> {
self.recv(ctx, buf, count, RecvFlags::empty()).await
self.recv(ctx, buf, count, RecvFlags::empty())
.await
.map(|(len, _)| len)
}

async fn readat(
Expand Down
2 changes: 1 addition & 1 deletion src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl SocketOps for TcpSocket {
_buf: UA,
_count: usize,
_flags: RecvFlags,
) -> libkernel::error::Result<usize> {
) -> libkernel::error::Result<(usize, Option<SockAddr>)> {
todo!()
}

Expand Down
122 changes: 103 additions & 19 deletions src/net/unix.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::fs::open_file::FileCtx;
use crate::kernel::kpipe::KPipe;
use crate::memory::uaccess::{copy_from_user_slice, copy_to_user_slice};
use crate::net::sops::{RecvFlags, SendFlags};
use crate::net::{SockAddr, SocketOps};
use crate::sync::OnceLock;
use crate::net::{SockAddr, SockAddrUn, SocketOps};
use crate::sync::SpinLock;
use crate::sync::{Mutex, OnceLock};
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::collections::{BTreeMap, VecDeque};
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use async_trait::async_trait;
use core::future::poll_fn;
Expand All @@ -15,9 +17,63 @@ use core::task::Waker;
use libkernel::error::{FsError, KernelError, Result};
use libkernel::memory::address::UA;

struct Message {
sender: SockAddrUn,
data: Vec<u8>,
}

#[derive(Clone)]
enum Inbox {
Pipe(Arc<KPipe>),
Datagram(Arc<Mutex<VecDeque<Message>>>),
}

impl Inbox {
fn new(socket_type: SocketType) -> Self {
match socket_type {
SocketType::Stream | SocketType::SeqPacket => {
Inbox::Pipe(Arc::new(KPipe::new().expect("KPipe creation failed")))
}
SocketType::Datagram => Inbox::Datagram(Arc::new(Mutex::new(VecDeque::new()))),
}
}

async fn send(&self, origin: SockAddrUn, buf: UA, count: usize) -> Result<usize> {
match self {
Inbox::Pipe(pipe) => pipe.copy_from_user(buf, count).await,
Inbox::Datagram(queue) => {
let mut data = vec![0u8; count];
copy_from_user_slice(buf, &mut data).await?;
let msg = Message {
sender: origin,
data,
};
queue.lock().await.push_back(msg);
Ok(count)
}
}
}

async fn recv(&self, buf: UA, count: usize) -> Result<(usize, Option<SockAddrUn>)> {
match self {
Inbox::Pipe(pipe) => Ok((pipe.copy_to_user(buf, count).await?, None)),
Inbox::Datagram(queue) => {
let mut q = queue.lock().await;
if let Some(msg) = q.pop_front() {
let n = msg.data.len().min(count);
copy_to_user_slice(&msg.data[..n], buf).await?;
Ok((n, Some(msg.sender)))
} else {
Ok((0, None))
}
}
}
}
}

/// Registry mapping Unix socket path bytes to endpoint inbox and listening state
struct Endpoint {
inbox: Arc<KPipe>,
inbox: Inbox,
listening: bool,
backlog_max: usize,
pending: Vec<UnixSocket>,
Expand All @@ -32,6 +88,7 @@ fn endpoints() -> &'static SpinLock<BTreeMap<Vec<u8>, Endpoint>> {
UNIX_ENDPOINTS.get_or_init(|| SpinLock::new(BTreeMap::new()))
}

#[derive(Copy, Clone)]
enum SocketType {
Stream,
Datagram,
Expand All @@ -41,10 +98,10 @@ enum SocketType {
pub struct UnixSocket {
socket_type: SocketType,
/// Recv inbox
inbox: Arc<KPipe>,
inbox: Inbox,
/// The peer endpoint's inbox
peer_inbox: SpinLock<Option<Arc<KPipe>>>,
local_addr: SpinLock<Option<crate::net::SockAddrUn>>,
peer_inbox: SpinLock<Option<Inbox>>,
local_addr: SpinLock<Option<SockAddrUn>>,
connected: SpinLock<bool>,
listening: SpinLock<bool>,
backlog: SpinLock<usize>,
Expand All @@ -57,7 +114,7 @@ impl UnixSocket {
fn new(socket_type: SocketType) -> Self {
UnixSocket {
socket_type,
inbox: Arc::new(KPipe::new().expect("KPipe::new for UnixSocket")),
inbox: Inbox::new(socket_type),
peer_inbox: SpinLock::new(None),
local_addr: SpinLock::new(None),
connected: SpinLock::new(false),
Expand Down Expand Up @@ -243,14 +300,17 @@ impl SocketOps for UnixSocket {
buf: UA,
count: usize,
_flags: RecvFlags,
) -> Result<usize> {
) -> Result<(usize, Option<SockAddr>)> {
if count == 0 {
return Ok(0);
return Ok((0, None));
}
if *self.rd_shutdown.lock_save_irq() {
return Ok(0);
return Ok((0, None));
}
self.inbox.copy_to_user(buf, count).await
self.inbox.recv(buf, count).await.map(|(n, peer)| {
let peer_addr = peer.map(SockAddr::Un);
(n, peer_addr)
})
}

async fn recvfrom(
Expand All @@ -262,7 +322,7 @@ impl SocketOps for UnixSocket {
_addr: Option<SockAddr>,
) -> Result<(usize, Option<SockAddr>)> {
let n = self.recv(ctx, buf, count, flags).await?;
Ok((n, None))
Ok(n)
}

async fn send(
Expand All @@ -289,19 +349,43 @@ impl SocketOps for UnixSocket {
let Some(peer) = self.peer_inbox.lock_save_irq().clone() else {
return Err(KernelError::InvalidValue);
};
peer.copy_from_user(buf, count).await
let local_addr = {
self.local_addr.lock_save_irq().unwrap_or(SockAddrUn {
family: crate::net::AF_UNIX as u16,
path: [0; 108],
})
};
peer.send(local_addr, buf, count).await
}

async fn sendto(
&mut self,
_ctx: &mut FileCtx,
_buf: UA,
_count: usize,
buf: UA,
count: usize,
_flags: SendFlags,
_addr: SockAddr,
addr: SockAddr,
) -> Result<usize> {
todo!();
// self.send(ctx, buf, count, flags).await
let peer_inbox = match addr {
SockAddr::Un(saun) => {
let Some(path) = UnixSocket::path_bytes(&saun) else {
return Err(KernelError::InvalidValue);
};
let reg = endpoints().lock_save_irq();
let Some(ep) = reg.get(&path) else {
return Err(KernelError::Fs(FsError::NotFound));
};
ep.inbox.clone()
}
_ => return Err(KernelError::InvalidValue),
};
let local_addr = {
self.local_addr.lock_save_irq().unwrap_or(SockAddrUn {
family: crate::net::AF_UNIX as u16,
path: [0; 108],
})
};
peer_inbox.send(local_addr, buf, count).await
}

async fn shutdown(&self, how: crate::net::ShutdownHow) -> Result<()> {
Expand Down
Loading