Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
712 changes: 485 additions & 227 deletions SECURITY_AUDIT.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions tap-http/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub struct TapHttpConfig {
/// When enabled, the server resolves the HTTP Host header to a `did:web`
/// DID and serves the corresponding DID document.
pub enable_web_did: bool,

/// Maximum number of agents that can be auto-created via the web DID endpoint.
/// Prevents denial-of-service via unbounded agent creation.
pub max_agents: usize,
}

/// Configuration for rate limiting.
Expand Down Expand Up @@ -67,6 +71,7 @@ impl Default for TapHttpConfig {
request_timeout_secs: 30,
event_logger: Some(EventLoggerConfig::default()),
enable_web_did: false,
max_agents: 100,
}
}
}
Expand Down
65 changes: 44 additions & 21 deletions tap-http/src/external_decision/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct ExternalDecisionManager {
tool_registry: Arc<ToolRegistry>,
storage: Arc<Storage>,
/// Channel for sending lines to stdin writer task
stdin_tx: RwLock<Option<mpsc::Sender<String>>>,
stdin_tx: Arc<RwLock<Option<mpsc::Sender<String>>>>,
/// Whether the process is currently running
is_running: AtomicBool,
/// Pending RPC responses — maps request_id to a oneshot sender
Expand Down Expand Up @@ -95,7 +95,7 @@ impl ExternalDecisionManager {
agent_dids,
tool_registry,
storage,
stdin_tx: RwLock::new(None),
stdin_tx: Arc::new(RwLock::new(None)),
is_running: AtomicBool::new(false),
pending_responses: Arc::new(Mutex::new(std::collections::HashMap::new())),
management_handle: Mutex::new(None),
Expand Down Expand Up @@ -204,6 +204,7 @@ impl ExternalDecisionManager {
let tool_registry = Arc::clone(&self.tool_registry);
let pending_responses = Arc::clone(&self.pending_responses);
let storage = Arc::clone(&self.storage);
let stdin_tx_clone = Arc::clone(&self.stdin_tx);
let stdout_handle = tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
Expand All @@ -215,8 +216,14 @@ impl ExternalDecisionManager {

debug!("Received from external process: {}", line);

Self::handle_stdout_message(&line, &tool_registry, &pending_responses, &storage)
.await;
Self::handle_stdout_message(
&line,
&tool_registry,
&pending_responses,
&storage,
&stdin_tx_clone,
)
.await;
}
debug!("External process stdout closed");
});
Expand Down Expand Up @@ -340,11 +347,12 @@ impl ExternalDecisionManager {
std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
>,
_storage: &Storage,
stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
) {
// Try to parse as incoming message
match serde_json::from_str::<IncomingMessage>(line) {
Ok(IncomingMessage::Request(req)) => {
Self::handle_tool_call(req, tool_registry, pending_responses).await;
Self::handle_tool_call(req, tool_registry, pending_responses, stdin_tx).await;
}
Ok(IncomingMessage::Notification(notif)) => {
debug!(
Expand All @@ -368,15 +376,16 @@ impl ExternalDecisionManager {
}
}

/// Handle a tool call from the external process
/// Handle a tool call from the external process and send response back via stdin
async fn handle_tool_call(
req: JsonRpcRequest,
tool_registry: &ToolRegistry,
_pending_responses: &Mutex<
std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
>,
stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
) {
match req.method.as_str() {
let response = match req.method.as_str() {
"tools/call" => {
let params = req.params.unwrap_or(json!({}));
let tool_name = params["name"].as_str().unwrap_or("");
Expand All @@ -385,31 +394,45 @@ impl ExternalDecisionManager {
debug!("External process calling tool: {}", tool_name);

match tool_registry.call_tool(tool_name, arguments).await {
Ok(result) => {
let response = JsonRpcResponse::new(
Ok(result) => JsonRpcResponse::new(
req.id,
json!({
"content": result.content.iter().map(|c| match c {
ToolContent::Text { text } => json!({"type": "text", "text": text}),
_ => json!({"type": "unknown"}),
}).collect::<Vec<_>>(),
"isError": result.is_error.unwrap_or(false),
}),
),
Err(e) => {
error!("Tool call failed: {}", e);
JsonRpcResponse::new(
req.id,
json!({
"content": result.content.iter().map(|c| match c {
ToolContent::Text { text } => json!({"type": "text", "text": text}),
_ => json!({"type": "unknown"}),
}).collect::<Vec<_>>(),
"isError": result.is_error.unwrap_or(false),
"content": [{"type": "text", "text": format!("Tool call failed: {}", e)}],
"isError": true,
}),
);
debug!("Tool call response: {:?}", response);
}
Err(e) => {
error!("Tool call failed: {}", e);
)
}
}
}
"tools/list" => {
let tools = tool_registry.list_tools();
let response = JsonRpcResponse::new(req.id, json!({ "tools": tools }));
debug!("Tools list response: {:?}", response);
JsonRpcResponse::new(req.id, json!({ "tools": tools }))
}
_ => {
warn!("Unknown method from external process: {}", req.method);
return;
}
};

// Send response back to external process via stdin
if let Ok(response_str) = serde_json::to_string(&response) {
let tx = stdin_tx.read().await;
if let Some(tx) = tx.as_ref() {
if let Err(e) = tx.send(response_str).await {
debug!("Failed to send tool response to external process: {}", e);
}
}
}
}
Expand Down
62 changes: 47 additions & 15 deletions tap-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ pub async fn handle_didcomm(
)
.await;

// Create error response
let response = json_error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string());
// Create error response (generic message to avoid leaking internals)
let response =
json_error_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error");

// Calculate response size and duration (approximate)
let response_size = 200; // Approximate size
Expand Down Expand Up @@ -495,6 +496,7 @@ pub async fn handle_well_known_did(
host: Option<String>,
node: Arc<TapNode>,
event_bus: Arc<EventBus>,
max_agents: usize,
) -> std::result::Result<impl Reply, Infallible> {
let start_time = Instant::now();

Expand All @@ -508,10 +510,7 @@ pub async fn handle_well_known_did(
Ok(d) => d,
Err(e) => {
warn!("Invalid Host header for web DID: {}", e);
let response = json_error_response(
StatusCode::BAD_REQUEST,
&format!("Invalid Host header: {}", e),
);
let response = json_error_response(StatusCode::BAD_REQUEST, "Invalid Host header");
let duration_ms = start_time.elapsed().as_millis() as u64;
event_bus
.publish_response_sent(StatusCode::BAD_REQUEST, 200, duration_ms)
Expand Down Expand Up @@ -576,6 +575,25 @@ pub async fn handle_well_known_did(
}
}

// Check agent count limit before creating a new one
if node.agents().agent_count() >= max_agents {
warn!(
"Agent limit reached ({}/{}), refusing to create agent for {}",
node.agents().agent_count(),
max_agents,
did_web
);
let response = json_error_response(
StatusCode::SERVICE_UNAVAILABLE,
"Maximum number of agents reached",
);
let duration_ms = start_time.elapsed().as_millis() as u64;
event_bus
.publish_response_sent(StatusCode::SERVICE_UNAVAILABLE, 200, duration_ms)
.await;
return Ok(response);
}

// No agent exists — create a new one
info!("Creating new agent for {}", did_web);

Expand Down Expand Up @@ -890,7 +908,9 @@ mod tests {
let node = Arc::new(TapNode::new(config));
let event_bus = Arc::new(crate::event::EventBus::new());

let response = handle_well_known_did(None, node, event_bus).await.unwrap();
let response = handle_well_known_did(None, node, event_bus, 100)
.await
.unwrap();

let response_bytes = to_bytes(response.into_response().into_body())
.await
Expand All @@ -916,6 +936,7 @@ mod tests {
Some("<script>alert(1)</script>".to_string()),
node,
event_bus,
100,
)
.await
.unwrap();
Expand All @@ -940,10 +961,14 @@ mod tests {
let node = Arc::new(TapNode::new(config));
let event_bus = Arc::new(crate::event::EventBus::new());

let response =
handle_well_known_did(Some("example.com".to_string()), node.clone(), event_bus)
.await
.unwrap();
let response = handle_well_known_did(
Some("example.com".to_string()),
node.clone(),
event_bus,
100,
)
.await
.unwrap();

let response_bytes = to_bytes(response.into_response().into_body())
.await
Expand Down Expand Up @@ -998,6 +1023,7 @@ mod tests {
Some("test.example.com".to_string()),
node.clone(),
event_bus,
100,
)
.await
.unwrap();
Expand All @@ -1020,10 +1046,14 @@ mod tests {
let node = Arc::new(TapNode::new(config));
let event_bus = Arc::new(crate::event::EventBus::new());

let response =
handle_well_known_did(Some("localhost:3000".to_string()), node.clone(), event_bus)
.await
.unwrap();
let response = handle_well_known_did(
Some("localhost:3000".to_string()),
node.clone(),
event_bus,
100,
)
.await
.unwrap();

let response_bytes = to_bytes(response.into_response().into_body())
.await
Expand Down Expand Up @@ -1057,6 +1087,7 @@ mod tests {
Some("idempotent.example.com".to_string()),
node.clone(),
event_bus.clone(),
100,
)
.await
.unwrap();
Expand All @@ -1070,6 +1101,7 @@ mod tests {
Some("idempotent.example.com".to_string()),
node.clone(),
event_bus,
100,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions tap-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
tls: None,
event_logger: None,
enable_web_did: args.enable_web_did,
max_agents: 100,
};

// Configure event logging - use TAP root-based default if not specified
Expand Down
25 changes: 24 additions & 1 deletion tap-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ impl TapHttpServer {
.trim_start_matches('/')
.to_string();

// Create DIDComm endpoint
// Create DIDComm endpoint (1MB body size limit)
let didcomm_route = warp::path(endpoint_path)
.and(warp::post())
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::content_length_limit(1024 * 1024))
.and(warp::body::bytes())
.and(with_node(node.clone()))
.and(with_event_bus(event_bus.clone()))
Expand All @@ -190,19 +191,31 @@ impl TapHttpServer {
if enable_web_did {
info!("Web DID hosting enabled at /.well-known/did.json");

let max_agents = self.config.max_agents;
let well_known_route = warp::path(".well-known")
.and(warp::path("did.json"))
.and(warp::path::end())
.and(warp::get())
.and(warp::header::optional::<String>("host"))
.and(with_node(node.clone()))
.and(with_event_bus(event_bus.clone()))
.and(warp::any().map(move || max_agents))
.and_then(handle_well_known_did);

let routes = didcomm_route
.or(health_route)
.or(well_known_route)
.with(warp::log("tap_http"))
.with(warp::reply::with::header(
"X-Content-Type-Options",
"nosniff",
))
.with(warp::reply::with::header("X-Frame-Options", "DENY"))
.with(warp::reply::with::header("Cache-Control", "no-store"))
.with(warp::reply::with::header(
"Content-Security-Policy",
"default-src 'none'",
))
.recover(handle_rejection);

return self.spawn_server(routes, addr, event_bus).await;
Expand All @@ -212,6 +225,16 @@ impl TapHttpServer {
let routes = didcomm_route
.or(health_route)
.with(warp::log("tap_http"))
.with(warp::reply::with::header(
"X-Content-Type-Options",
"nosniff",
))
.with(warp::reply::with::header("X-Frame-Options", "DENY"))
.with(warp::reply::with::header("Cache-Control", "no-store"))
.with(warp::reply::with::header(
"Content-Security-Policy",
"default-src 'none'",
))
.recover(handle_rejection);

self.spawn_server(routes, addr, event_bus).await
Expand Down
12 changes: 6 additions & 6 deletions tap-http/tests/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ async fn test_didcomm_endpoint_content_types() {
assert_eq!(status, 500);
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["status"], "error");
// The message should indicate that no agent could process it
// The message should be a generic internal server error (no internal details leaked)
let message = json["message"].as_str().unwrap_or("");
assert!(
message.contains("No agent could process"),
"Expected 'No agent could process' but got: {}",
message.contains("Internal server error"),
"Expected 'Internal server error' but got: {}",
message
);

Expand Down Expand Up @@ -262,11 +262,11 @@ async fn test_didcomm_endpoint_content_types() {
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(status, 500);
assert_eq!(json["status"], "error");
// The message should indicate that no agent could process it or verification failed
// The message should be a generic internal server error (no internal details leaked)
let message = json["message"].as_str().unwrap_or("");
assert!(
message.contains("No agent could process") || message.contains("Verification error"),
"Expected processing/verification error but got: {}",
message.contains("Internal server error"),
"Expected 'Internal server error' but got: {}",
message
);

Expand Down
Loading