api: streaming handlers with backpressure and cancel support#548
api: streaming handlers with backpressure and cancel support#548
Conversation
📝 WalkthroughWalkthroughAdds a streaming API across the codebase: new request type GR_STREAM_CANCEL and client-side cancel function; handler typedefs 📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can generate a title for your PR based on the changes with custom instructions.Set the |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@main/api.c`:
- Around line 231-257: api_send currently uses ctx->stream_req_id preferentially
which causes unrelated handlers (e.g., fib4_info_list) to send responses under
an active stream’s for_id; fix by making api_send take an explicit for_id
parameter instead of reading ctx->stream_req_id. Change the signature of
api_send to accept a uint32_t for_id, set resp.for_id = for_id, update all
callers: stream senders should pass ctx->stream_req_id, normal handlers should
pass ctx->header.id, and ensure any special handling for GR_STREAM_CANCEL
remains intact (or blocked by callers if needed); update declarations/prototypes
accordingly so no caller relies on the old implicit behavior.
In `@modules/ip/control/route.c`:
- Around line 448-452: route4_list_next currently stores a raw struct
rte_rib_node * in route4_stream which can dangle if routes/VRFs/FIBs are
modified; instead change route4_stream to store a stable resume key (e.g.,
IP/address + prefix length and vrf id) or snapshot identifier and remove the raw
node pointer, clear cur_vrf when vrf_fibs == NULL, and re-find the next node
each callback via a safe lookup/next-by-key API (e.g., use rte_rib lookup-by-key
/ get-next-by-key or restart iteration on table change) rather than calling
rte_rib_get_nxt() with a potentially freed pointer; also update callers
(route4_list_next and any write callbacks) so they build the resume key from the
current node before returning, check for table/VRF replacement
(fib4_conf_set/iface_rm_cb) and reset the resume state when detected, and
document that node pointers must never be persisted across callbacks.
In `@modules/ip6/control/route.c`:
- Around line 490-545: The stream currently stores a raw node pointer
(route6_stream::node) and passes it to rib6_next_node/rte_rib6_get_nxt across
yields, which can reference freed RIB memory; change the cursor to a stable key
(e.g., last emitted struct rte_ipv6_addr and uint8_t prefixlen plus cur_vrf)
instead of persisting struct rte_rib6_node * in route6_stream, update
route6_list_next and rib6_next_node to locate the start node on resume by
calling rte_rib6_lookup_exact(rib, &last_addr, last_plen) (or start from NULL
for beginning) and then call rte_rib6_get_nxt with that freshly obtained node,
and ensure you never return or store a raw rte_rib6_node * across a STREAM_PAUSE
(clear any node pointer before yielding and handle rib replacements by
restarting iteration when vrf_fibs[cur_vrf] changes).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6a7f2540-baed-434d-9ad7-771637e258d4
📒 Files selected for processing (27)
api/gr_api.hapi/gr_api_client_impl.hmain/api.cmain/gr_module.hmain/module.cmain/module.hmodules/dhcp/cli/dhcp.cmodules/infra/api/nexthop.cmodules/infra/cli/affinity.cmodules/infra/cli/iface.cmodules/infra/cli/nexthop.cmodules/infra/cli/stats.cmodules/infra/control/gr_nh_control.hmodules/infra/control/nexthop.cmodules/ip/cli/address.cmodules/ip/cli/route.cmodules/ip/control/route.cmodules/ip6/cli/address.cmodules/ip6/cli/route.cmodules/ip6/cli/router_advert.cmodules/ip6/control/route.cmodules/l2/cli/fdb.cmodules/l2/control/fdb.cmodules/policy/cli/conntrack.cmodules/policy/cli/dnat44.cmodules/policy/cli/snat44.cmodules/policy/control/conntrack.c
Add support for streaming API handlers that produce multiple responses for a single request. This is needed for list operations that may return large result sets. Streaming handlers register two callbacks via gr_api_handler_stream: init() allocates cursor state, next() advances the cursor and sends one item via api_stream_next(). The framework always calls free(state) when the stream ends. api_stream_next() returns STREAM_MORE when the output buffer has room, STREAM_PAUSE when the bufferevent write watermark is reached, or a negative errno on error. Handlers return api_stream_next(ctx, len, data) to send an item, or return STREAM_END when done. Production is driven by the libevent write callback: items are produced in a loop until STREAM_PAUSE, then the loop resumes when the client drains the buffer. A second streaming request from the same client is rejected with EBUSY. Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add nexthop_next() for cursor-based iteration over the nexthop mempool and use it in a streaming handler for nexthop listing. Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add rib4_next() for cursor-based route iteration and use it in a streaming handler for IPv4 route listing. Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add rib6_next() for cursor-based route iteration and use it in a streaming handler for IPv6 route listing. Signed-off-by: Robin Jarry <rjarry@redhat.com>
Replace the callback-based FDB list with a streaming handler that iterates over the FDB hash table entries with filtering. Signed-off-by: Robin Jarry <rjarry@redhat.com>
Replace the callback-based conntrack list with a streaming handler that iterates over the connection tracking hash table. Signed-off-by: Robin Jarry <rjarry@redhat.com>
There was no way to cancel a stream without disconnecting. When the CLI pager was closed, gr_table_print_row() detected it but the client still had to drain all remaining responses. Add a GR_STREAM_CANCEL request type. If the client context is currently processing a stream request, the server ends the stream with ECANCELED. Update gr_api_client_stream_foreach to support break: when the loop is interrupted, it sends the cancel request and drains remaining responses (if any). Change all CLI stream consumers to break instead of continue on gr_table_print_row() error. Even if an API handler does not use the iterator framework introduced in previous commits, breaking out of a gr_api_client_stream_foreach loop will not cause any memory leak. The request will *NOT* be cancelled however and all remaining items will be drained from the client. It is fine for small collections (interfaces, addresses, etc.). Signed-off-by: Robin Jarry <rjarry@redhat.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
modules/ip/control/route.c (1)
446-478:⚠️ Potential issue | 🔴 CriticalDo not carry
struct rte_rib_node *across paused stream callbacks.
main/api.c:334-369resumesroute4_list_next()afterSTREAM_PAUSE, but this stream keeps a raws->nodeand Line 465 feeds it back intorte_rib_get_nxt()on the next callback. Iffib4_reconfig()replaces the table orfib4_fini()/ VRF removal tears it down while the stream is paused, that cursor is stale; Lines 493-497 also advance to the next VRF without clearing it, so a node from one RIB can be passed into a different RIB. That is undefined behavior and can crash the daemon. Use a stable resume key and reacquire a fresh node on each callback instead.Also applies to: 480-506
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@modules/ip/control/route.c` around lines 446 - 478, The stream currently stores a raw pointer s->node across pauses (in struct route4_stream and used by rib4_next_node), which can become stale if the RIB is reconfigured; instead store a stable resume key (e.g. the prefix and prefix_len/depth and the vrf id) in route4_stream and never persist a raw rte_rib_node* across callbacks. Update route4_list_init/route4_stream to hold the resume fields (prefix, depth and current vrf) and change rib4_next_node to, on each invocation, re-acquire a fresh node from the given rib via rte_rib_lookup_exact (using the stored prefix/depth) before calling rte_rib_get_nxt, and clear the resume key when advancing to a new VRF or when returning NULL; remove reliance on s->node pointer being valid across STREAM_PAUSE.modules/ip6/control/route.c (1)
488-512:⚠️ Potential issue | 🔴 CriticalDo not carry
struct rte_rib6_node *across paused stream callbacks.
main/api.c:334-369resumesroute6_list_next()afterSTREAM_PAUSE, but this stream keeps a raws->nodeand Line 499 feeds it back intorte_rib6_get_nxt()on the next callback. Iffib6_reconfig()replaces the table orfib6_fini()/ VRF removal tears it down while the stream is paused, that cursor is stale; Lines 536-540 also advance to the next VRF without clearing it, so a node from one RIB can be passed into a different RIB. That is undefined behavior and can crash the daemon. Use a stable resume key and reacquire a fresh node on each callback instead.Also applies to: 523-549
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@modules/ip6/control/route.c` around lines 488 - 512, The stream currently stores a raw pointer s->node (in struct route6_stream) and reuses it in rib6_next_node via rte_rib6_get_nxt, which is unsafe across STREAM_PAUSE because fib6_reconfig()/fib6_fini() or VRF removal can replace the RIB; stop carrying raw node pointers across pauses: replace s->node with a stable resume key (e.g., vrf id in s->cur_vrf plus an IPv6 address+prefixlen or an index/sequence token) and on each resumed callback (route6_list_next and inside rib6_next_node) reacquire a fresh node from the current rib with rte_rib6_lookup_exact or a safe lookup by that resume key before calling rte_rib6_get_nxt; also clear or update the resume key when advancing to the next VRF so a node from one RIB is never passed into another; ensure rib6_next_node always calls a lookup to obtain a current node rather than trusting s->node across pauses.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@modules/l2/control/fdb.c`:
- Around line 290-300: The function fdb_list_init unconditionally dereferences
req (const struct gr_fdb_list_req *req = request) which can be NULL for a
zero-length payload; add a NULL check for request immediately after malloc and
before accessing req->flags/bridge_id/iface_id: if request is NULL, free the
allocated struct fdb_list_stream (or avoid allocating it) and return NULL (or
initialize safe defaults), ensuring no dereference of req occurs; update
fdb_list_init to perform this check and cleanup so a malformed GR_FDB_LIST
payload cannot crash the server.
---
Duplicate comments:
In `@modules/ip/control/route.c`:
- Around line 446-478: The stream currently stores a raw pointer s->node across
pauses (in struct route4_stream and used by rib4_next_node), which can become
stale if the RIB is reconfigured; instead store a stable resume key (e.g. the
prefix and prefix_len/depth and the vrf id) in route4_stream and never persist a
raw rte_rib_node* across callbacks. Update route4_list_init/route4_stream to
hold the resume fields (prefix, depth and current vrf) and change rib4_next_node
to, on each invocation, re-acquire a fresh node from the given rib via
rte_rib_lookup_exact (using the stored prefix/depth) before calling
rte_rib_get_nxt, and clear the resume key when advancing to a new VRF or when
returning NULL; remove reliance on s->node pointer being valid across
STREAM_PAUSE.
In `@modules/ip6/control/route.c`:
- Around line 488-512: The stream currently stores a raw pointer s->node (in
struct route6_stream) and reuses it in rib6_next_node via rte_rib6_get_nxt,
which is unsafe across STREAM_PAUSE because fib6_reconfig()/fib6_fini() or VRF
removal can replace the RIB; stop carrying raw node pointers across pauses:
replace s->node with a stable resume key (e.g., vrf id in s->cur_vrf plus an
IPv6 address+prefixlen or an index/sequence token) and on each resumed callback
(route6_list_next and inside rib6_next_node) reacquire a fresh node from the
current rib with rte_rib6_lookup_exact or a safe lookup by that resume key
before calling rte_rib6_get_nxt; also clear or update the resume key when
advancing to the next VRF so a node from one RIB is never passed into another;
ensure rib6_next_node always calls a lookup to obtain a current node rather than
trusting s->node across pauses.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 757ec775-a3b9-4583-a084-f29d797bdcb1
📒 Files selected for processing (27)
api/gr_api.hapi/gr_api_client_impl.hmain/api.cmain/gr_module.hmain/module.cmain/module.hmodules/dhcp/cli/dhcp.cmodules/infra/api/nexthop.cmodules/infra/cli/affinity.cmodules/infra/cli/iface.cmodules/infra/cli/nexthop.cmodules/infra/cli/stats.cmodules/infra/control/gr_nh_control.hmodules/infra/control/nexthop.cmodules/ip/cli/address.cmodules/ip/cli/route.cmodules/ip/control/route.cmodules/ip6/cli/address.cmodules/ip6/cli/route.cmodules/ip6/cli/router_advert.cmodules/ip6/control/route.cmodules/l2/cli/fdb.cmodules/l2/control/fdb.cmodules/policy/cli/conntrack.cmodules/policy/cli/dnat44.cmodules/policy/cli/snat44.cmodules/policy/control/conntrack.c
| struct nexthop *nexthop_next(const struct nexthop *prev) { | ||
| struct rte_mempool_objhdr *hdr; | ||
|
|
||
| if (prev == NULL) | ||
| hdr = STAILQ_FIRST(&pool->elt_list); | ||
| else { | ||
| hdr = RTE_PTR_SUB(prev, sizeof(*hdr)); | ||
| hdr = STAILQ_NEXT(hdr, next); | ||
| } | ||
|
|
||
| while (hdr != NULL) { | ||
| struct nexthop *nh = RTE_PTR_ADD(hdr, sizeof(*hdr)); | ||
| if (nh->ref_count != 0) | ||
| return nh; | ||
| hdr = STAILQ_NEXT(hdr, next); | ||
| } | ||
|
|
||
| return NULL; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "nexthop.c" -o -name "nexthop.h" | head -20Repository: DPDK/grout
Length of output: 210
🏁 Script executed:
rg -l "nexthop_next" --type c --type hRepository: DPDK/grout
Length of output: 151
🏁 Script executed:
cat -n modules/infra/control/nexthop.c | sed -n '435,453p'Repository: DPDK/grout
Length of output: 592
🏁 Script executed:
cat -n modules/infra/api/nexthop.c | sed -n '98,126p'Repository: DPDK/grout
Length of output: 904
🏁 Script executed:
# Search for nexthop_config_allocate to see if it replaces the pool
rg -A 20 "nexthop_config_allocate" --type cRepository: DPDK/grout
Length of output: 3594
🏁 Script executed:
# Search for nexthop_used_count to understand reference counting
rg -A 10 "nexthop_used_count" --type cRepository: DPDK/grout
Length of output: 2512
🏁 Script executed:
# Check if prev pointer is actually stored without holding a reference
rg -B 5 -A 5 "prev\|s->prev" modules/infra/api/nexthop.c | head -40Repository: DPDK/grout
Length of output: 36
Stream cursors cannot use raw pointers across pool reconfiguration.
The nh_list_next() function stores the last nexthop pointer in s->prev without holding a reference (line 109 of modules/infra/api/nexthop.c). When the stream pauses and resumes later, nexthop_next() performs pointer arithmetic (RTE_PTR_SUB) on prev assuming it's still valid. However, if all nexthops are deleted between pause and resume, nexthop_used_count() drops to zero, allowing nexthop_config_allocate() to free the old pool and create a new one. The stale prev pointer then points into freed memory, causing use-after-free when nexthop_next() reconstructs the mempool header and walks the list.
Use a stable cursor (e.g., nexthop ID or generation counter) instead of storing a raw object pointer across pause points.
🧰 Tools
🪛 Cppcheck (2.20.0)
[style] 435-435: The function 'nexthop_next' is never used.
(unusedFunction)
| static void *fdb_list_init(const void *request, struct api_ctx *) { | ||
| const struct gr_fdb_list_req *req = request; | ||
| struct gr_fdb_entry *fdb; | ||
| uint32_t next = 0; | ||
| struct fdb_list_stream *s = malloc(sizeof(*s)); | ||
| if (s == NULL) | ||
| return NULL; | ||
| s->flags = req->flags; | ||
| s->bridge_id = req->bridge_id; | ||
| s->iface_id = req->iface_id; | ||
| s->iter = 0; | ||
| return s; | ||
| } |
There was a problem hiding this comment.
Potential null pointer dereference if client sends malformed request.
If a client sends GR_FDB_LIST with payload_len=0, the request parameter will be NULL (from read_cb), and the unconditional dereference at Line 295 (req->flags) will crash the server.
🛡️ Suggested fix
static void *fdb_list_init(const void *request, struct api_ctx *) {
const struct gr_fdb_list_req *req = request;
+ if (req == NULL)
+ return NULL;
struct fdb_list_stream *s = malloc(sizeof(*s));
if (s == NULL)
return NULL;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| static void *fdb_list_init(const void *request, struct api_ctx *) { | |
| const struct gr_fdb_list_req *req = request; | |
| struct gr_fdb_entry *fdb; | |
| uint32_t next = 0; | |
| struct fdb_list_stream *s = malloc(sizeof(*s)); | |
| if (s == NULL) | |
| return NULL; | |
| s->flags = req->flags; | |
| s->bridge_id = req->bridge_id; | |
| s->iface_id = req->iface_id; | |
| s->iter = 0; | |
| return s; | |
| } | |
| static void *fdb_list_init(const void *request, struct api_ctx *) { | |
| const struct gr_fdb_list_req *req = request; | |
| if (req == NULL) | |
| return NULL; | |
| struct fdb_list_stream *s = malloc(sizeof(*s)); | |
| if (s == NULL) | |
| return NULL; | |
| s->flags = req->flags; | |
| s->bridge_id = req->bridge_id; | |
| s->iface_id = req->iface_id; | |
| s->iter = 0; | |
| return s; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@modules/l2/control/fdb.c` around lines 290 - 300, The function fdb_list_init
unconditionally dereferences req (const struct gr_fdb_list_req *req = request)
which can be NULL for a zero-length payload; add a NULL check for request
immediately after malloc and before accessing req->flags/bridge_id/iface_id: if
request is NULL, free the allocated struct fdb_list_stream (or avoid allocating
it) and return NULL (or initialize safe defaults), ensuring no dereference of
req occurs; update fdb_list_init to perform this check and cleanup so a
malformed GR_FDB_LIST payload cannot crash the server.
| while (hdr != NULL) { | ||
| struct nexthop *nh = RTE_PTR_ADD(hdr, sizeof(*hdr)); | ||
| if (nh->ref_count != 0) | ||
| return nh; |
There was a problem hiding this comment.
It's not a real protection without a proper rcu lol
The existing streaming response system sends objects one by one but does not check whether the client actually reads them. If the client socket buffer is full, grout keeps producing and buffering data in memory. When dumping a BGP full view, this causes OOM and insane CPU usage on the control thread.
Introduce a streaming handler framework driven by libevent write callbacks. Handlers register init() and next() via gr_api_handler_stream(). The next() callback produces one item at a time and pauses when the write buffer reaches a high watermark. Production resumes only when the client drains enough data. This keeps memory usage bounded regardless of result set size.
Convert nexthop, IPv4/IPv6 route, FDB, and conntrack list handlers to this new framework with cursor-based iterators (rib4_next(), rib6_next(), nexthop_next(), etc.).
Add a GR_STREAM_CANCEL request so clients can abort a stream without disconnecting. gr_api_client_stream_foreach() sends the cancel and drains remaining responses when the loop breaks, so CLI pager close is handled cleanly.
TECHNICAL TL;DR: Adds a libevent-driven streaming handler framework that emits list results one item at a time with backpressure, per-client single-stream enforcement, and an explicit cancel request.
How it works:
Cancellation and client-side behavior:
Conversions and iterator changes:
Small but relevant control-flow adjustments: