feat: add mcp_gateway static broker filter#353
Conversation
PR too largeThis PR adds 2449 lines (limit: 500). Large PRs are difficult to review and more likely to introduce subtle bugs. Please break this contribution into smaller, focused PRs that each address a single concern. See our coding conventions for guidance.
|
f9c1a54 to
d3abf95
Compare
|
@shaneutt Not sure how to get this one coherent and much smaller but not sure how to reopen anymore. Can chunk it post review maybe. The last one for this section (and subsequent agent tasks after this are more chunkable) Ty! |
7a9268f to
8cfcfff
Compare
Unsigned commits detectedThe following commits are not signed: 8cfcfff Please sign your commits. For instructions on how to set up GPG/SSH signing, see GitHub Documentation. |
|
Converted to draft: required checks failing. |
7ee1fc6 to
ae470da
Compare
- Adds the new `mcp_gateway` filter for MCP gateway mode. - This PR implements the static broker/catalog phase and the supporting pipeline infrastructure needed for `mcp_gateway -> load_balancer` pipelines without requiring a `router` filter. Signed-off-by: Brent Salisbury <bsalisbu@redhat.com>
ae470da to
ce61ce5
Compare
|
mcp_gateway is placed at filter/src/agentic/ rather than under filter/src/builtins/http/ai/agentic/ where the existing mcp and json_rpc modules live ? It seems that the full path qualification of x::y::z would be sufficient for identifying module coordinates, is the there value in naming change or at least more qualified comment. From the tree: filter/src/agentic/ ← crate root level In Rust module paths:
Both have nearly identical doc comments ("Agentic protocol filters: …") and both contain MCP-related code. A contributor grepping for agentic or navigating the tree hits two results with no obvious signal for which one to look in or add to. |
twghu
left a comment
There was a problem hiding this comment.
This PR adds the first phase of an MCP gateway filter: a static tool catalog broker that short-circuits initialize, tools/list, ping, notifications/initialized, and DELETE. tools/call returns a controlled -32601 stub.
The supporting infrastructure includes StreamBuffer body-mutation writeback (rewritten_path, Content-Length repair), pipeline validation for mcp_gateway → load_balancer without a router, and cluster cross-reference validation extended to cover mcp_gateway.servers[].cluster.
The scope is clearly declared and the PR is internally consistent. Issues below are organized by severity.
Some nits, some functional questions.
b3ef1dd to
370d14b
Compare
| /// `router` cannot coexist with body-phase cluster selectors because it | ||
| /// runs later and unconditionally overwrites `ctx.cluster`. |
There was a problem hiding this comment.
I don't think this should work quite like this.
It may be reasonable in the future to have router run prior to mcp routing, to reduce the selectable clusters or something like that, in addition to other variations.
Trying to deepen how we validate pipelines in this way may lead to trouble later, perhaps we should consider moving the decision point into the filters themselves, and give them a way to pass messages to modify their results? Something I'm going to think on. 🤔
There was a problem hiding this comment.
I think that the MCP-idiomatic answer is that it should never need to set ctx.cluster in the first place.
Should a pipeline have a way for a filter to say "set cluster to X only if I handled this request."
The check doesn't verify ordering and ordering implies a code path we don't need to process.
The current design coordinates inter-filter behavior through shared mutable context (ctx.cluster), and then appears to catch broken combinations statically. But static name-based checks can't capture intent — they can ban or log configurations.
In short, we probably don't want to act on routing info inter filter for it to be possibly actionable. So does route make sense in this content?
Unless I've misunderstood this, a filter shouldn't be in the routing business and probably actions on routing based on context could lead to unwelcome complications.
Then if a filter isn't in the routing business why add the check.
There was a problem hiding this comment.
Thanks guys. Context on the thoughts. Did an e2e prototype where the MCP gateway was already forwarding tools/call to real backends. In that version, the MCP code picked the backend itself, then handed that choice to the load balancer. A router in the same pipeline could overwrite that backend.
This PR no longer does that forwarding. It only answers static catalog/broker methods locally and returns controlled unsupported for tools/call, so it does not currently choose a backend or set ctx.cluster.
Will take the feedback and rethink the e2e validation, for reference: #24 (comment)
|
|
||
| /// `router` cannot coexist with body-phase cluster selectors because it | ||
| /// runs later and unconditionally overwrites `ctx.cluster`. | ||
| pub(super) fn check_conflicting_cluster_selectors(names: &[&str], errors: &mut Vec<String>) { |
There was a problem hiding this comment.
This is broken in its current state, because it doesn't account for conditions which might actually result in no conflict.
There was a problem hiding this comment.
Backed out the change, it no longer does backend forwarding or sets ctx.cluster. Ty.
| .execute_http_request_body(&mut filter_ctx, &mut body, end_of_stream) | ||
| .await; | ||
|
|
||
| ctx.request_body_bytes = filter_ctx.request_body_bytes; |
There was a problem hiding this comment.
This removes accumulation in pre-read, wont this break things?
There was a problem hiding this comment.
Ty! Checked the Pingora body path as well. Pingora still has a fixed retry buffer, but it also has a specific path for bodies that were already consumed before forwarding: it gives request_body_filter() one terminal callback so the proxy implementation can emit its buffered body upstream.
In Praxis StreamBuffer mode, Praxis pre-read can run the body filters while reading chunks, then run them again at EOS with one full combined body. If we copy filter_ctx.request_body_bytes back directly, we can accidentally count those bytes twice.
Changed this so pre-read tracks the original bytes from the client separately as they arrive. At the end, ctx.request_body_bytes still reflects the real original body size, while ctx.mutated_request_body_len separately tracks the final forwarded body size if a body-writing filter changed the payload. That keeps retry/body-size logic correct without double-counting.
Changes:
- Added an explicit original_body_bytes counter in stream_buffer.rs.
- Count original downstream chunks before creating the synthetic full-body EOS pass.
- Seed/write ctx.request_body_bytes from that original counter during pre-read.
- Keep ctx.mutated_request_body_len for the final forwarded body length when body-writing filters mutate the payload.
- Did not restore the old direct ctx.request_body_bytes = filter_ctx.request_body_bytes line because that can double-count in this pre-read path.
Example Flow:
- Client sends request body in chunks.
- Praxis StreamBuffer pre-reads those chunks before forwarding upstream.
- While reading, Praxis counts the real client bytes into original_body_bytes.
- At EOS, Praxis combines the chunks into one full body.
- Praxis runs body filters on that full body so they can inspect or rewrite it.
- Pingora later calls request_body_filter() once because the body was already consumed before forwarding.
- Praxis uses that callback to forward the buffered body from ctx.pre_read_body.
- ctx.request_body_bytes stays as the original client body size.
- If filters changed the body, ctx.mutated_request_body_len stores the final forwarded size.
The json_rpc module moved from payload_processing/ to builtins/http/ai/agentic/ on upstream main. Update import paths in mcp_gateway and widen ai module visibility to pub(crate). Signed-off-by: Brent Salisbury <bsalisbu@redhat.com>
370d14b to
2de2b03
Compare
Summary
Adds the new
mcp_gatewayfilter for MCP gateway mode. This PR implements the static broker/catalog phase and the supporting pipeline infrastructure needed formcp_gateway -> load_balancerpipelines without requiring arouterfilter. Adding a static catalog makes sense for this first gateway PR because it lets us validate mcp_gateway behavior without adding backend discovery, caching, or session initialization yet.This PR adds:
mcp_gatewayfilter registration and config parsinginitialize,tools/list,ping,notifications/initialized, andDELETE-32601responses for unsupported MCP methods andtools/calluntil backend routing landsmcp_gatewayas a cluster-selecting filtermcp_gatewayandrouterin the same pipelinemcp_gateway.servers[].clustertoload_balancer.clusters[].namectx.cluster,ctx.rewritten_path, and durable metadataContent-LengthrepairScope
This implements static MCP gateway broker behavior only. It does not forward
tools/callto backend MCP servers yet.Follow-Up
A follow-up PR will complete end-to-end MCP gateway routing:
tools/callcatalog lookup by prefixed tool nameparams.namebefore forwardingContent-LengthTest Plan
behavior
mcp_gateway -> load_balancer, router conflict detection, and cluster cross-reference validationRefs #25, #197, #196