Skip to content

api: streaming handlers with backpressure and cancel support#548

Closed
rjarry wants to merge 7 commits intoDPDK:mainfrom
rjarry:api-stream
Closed

api: streaming handlers with backpressure and cancel support#548
rjarry wants to merge 7 commits intoDPDK:mainfrom
rjarry:api-stream

Conversation

@rjarry
Copy link
Collaborator

@rjarry rjarry commented Mar 12, 2026

The existing streaming response system sends objects one by one but does not check whether the client actually reads them. If the client socket buffer is full, grout keeps producing and buffering data in memory. When dumping a BGP full view, this causes OOM and insane CPU usage on the control thread.

Introduce a streaming handler framework driven by libevent write callbacks. Handlers register init() and next() via gr_api_handler_stream(). The next() callback produces one item at a time and pauses when the write buffer reaches a high watermark. Production resumes only when the client drains enough data. This keeps memory usage bounded regardless of result set size.

Convert nexthop, IPv4/IPv6 route, FDB, and conntrack list handlers to this new framework with cursor-based iterators (rib4_next(), rib6_next(), nexthop_next(), etc.).

Add a GR_STREAM_CANCEL request so clients can abort a stream without disconnecting. gr_api_client_stream_foreach() sends the cancel and drains remaining responses when the loop breaks, so CLI pager close is handled cleanly.

TECHNICAL TL;DR: Adds a libevent-driven streaming handler framework that emits list results one item at a time with backpressure, per-client single-stream enforcement, and an explicit cancel request.

  • How it works:

    • Modules register stream handlers via gr_api_handler_stream(req, init, next); init(request, ctx) allocates per-request state and next(state, ctx) is repeatedly invoked to produce one item.
    • Streaming progression is driven by the bufferevent write callback (libevent): the write callback calls stream_next until api_stream_next returns STREAM_PAUSE, STREAM_MORE, or STREAM_END; STREAM_PAUSE suspends production when the bev write buffer reaches the high watermark and resumes when drained.
    • api_stream_next(ctx, len, payload) pushes a single response for the active stream and returns STREAM_MORE/STREAM_PAUSE/STREAM_END (or negative errno) so the producer loop can pause/resume/terminate deterministically.
    • Each api_ctx tracks stream_state, stream_req_id and stream_next; lookup/registration logic was extended to allow either legacy callback or streaming handler (mutually exclusive).
  • Cancellation and client-side behavior:

    • Introduces GR_STREAM_CANCEL and struct gr_stream_cancel_req { uint32_t for_id; } and a helper __gr_api_client_stream_cancel(client, for_id) that sends the cancel request and drains remaining responses.
    • The gr_api_client_stream_foreach macro was updated to call __gr_api_client_stream_cancel and drain when the loop breaks, allowing CLI pager/early-exit to abort a stream without disconnecting.
    • Server rejects a second concurrent streaming request from the same client (EBUSY).
  • Conversions and iterator changes:

    • Replaced one-shot callback list handlers with streaming implementations for nexthop (modules/infra), IPv4 and IPv6 route lists (modules/ip/control and modules/ip6/control), FDB (modules/l2/control), and conntrack (modules/policy/control), each providing init/next pairs that produce entries via api_stream_next.
    • Added cursor-style iterators used by streaming next() functions where applicable (e.g., nexthop_next, rib4/rib6 traversal helpers) so streams advance statefully without bulk allocations.
  • Small but relevant control-flow adjustments:

    • Multiple CLI table-rendering loops now break on gr_table_print_row errors (stop on first write failure) to avoid continuing after output failures.

@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

📝 Walkthrough

Walkthrough

Adds a streaming API across the codebase: new request type GR_STREAM_CANCEL and client-side cancel function; handler typedefs stream_init_func and stream_next_func; new api_ctx streaming fields (stream_state, stream_req_id, stream_next) and STREAM_{END,MORE,PAUSE} return codes. Introduces registration helper gr_api_handler_stream and server-side stream lifecycle (start, next, end, cancel). Multiple list APIs (nexthop, IPv4/IPv6 routes, FDB, conntrack, DHCP-related outputs) were converted from one-shot callbacks to per-request streaming handlers. Numerous CLI table loops now break on gr_table_print_row errors instead of continuing.

📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes with custom instructions.

Set the reviews.auto_title_instructions setting to generate a title for your PR based on the changes in the PR with custom instructions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@main/api.c`:
- Around line 231-257: api_send currently uses ctx->stream_req_id preferentially
which causes unrelated handlers (e.g., fib4_info_list) to send responses under
an active stream’s for_id; fix by making api_send take an explicit for_id
parameter instead of reading ctx->stream_req_id. Change the signature of
api_send to accept a uint32_t for_id, set resp.for_id = for_id, update all
callers: stream senders should pass ctx->stream_req_id, normal handlers should
pass ctx->header.id, and ensure any special handling for GR_STREAM_CANCEL
remains intact (or blocked by callers if needed); update declarations/prototypes
accordingly so no caller relies on the old implicit behavior.

In `@modules/ip/control/route.c`:
- Around line 448-452: route4_list_next currently stores a raw struct
rte_rib_node * in route4_stream which can dangle if routes/VRFs/FIBs are
modified; instead change route4_stream to store a stable resume key (e.g.,
IP/address + prefix length and vrf id) or snapshot identifier and remove the raw
node pointer, clear cur_vrf when vrf_fibs == NULL, and re-find the next node
each callback via a safe lookup/next-by-key API (e.g., use rte_rib lookup-by-key
/ get-next-by-key or restart iteration on table change) rather than calling
rte_rib_get_nxt() with a potentially freed pointer; also update callers
(route4_list_next and any write callbacks) so they build the resume key from the
current node before returning, check for table/VRF replacement
(fib4_conf_set/iface_rm_cb) and reset the resume state when detected, and
document that node pointers must never be persisted across callbacks.

In `@modules/ip6/control/route.c`:
- Around line 490-545: The stream currently stores a raw node pointer
(route6_stream::node) and passes it to rib6_next_node/rte_rib6_get_nxt across
yields, which can reference freed RIB memory; change the cursor to a stable key
(e.g., last emitted struct rte_ipv6_addr and uint8_t prefixlen plus cur_vrf)
instead of persisting struct rte_rib6_node * in route6_stream, update
route6_list_next and rib6_next_node to locate the start node on resume by
calling rte_rib6_lookup_exact(rib, &last_addr, last_plen) (or start from NULL
for beginning) and then call rte_rib6_get_nxt with that freshly obtained node,
and ensure you never return or store a raw rte_rib6_node * across a STREAM_PAUSE
(clear any node pointer before yielding and handle rib replacements by
restarting iteration when vrf_fibs[cur_vrf] changes).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6a7f2540-baed-434d-9ad7-771637e258d4

📥 Commits

Reviewing files that changed from the base of the PR and between 18775ef and 7e6394e.

📒 Files selected for processing (27)
  • api/gr_api.h
  • api/gr_api_client_impl.h
  • main/api.c
  • main/gr_module.h
  • main/module.c
  • main/module.h
  • modules/dhcp/cli/dhcp.c
  • modules/infra/api/nexthop.c
  • modules/infra/cli/affinity.c
  • modules/infra/cli/iface.c
  • modules/infra/cli/nexthop.c
  • modules/infra/cli/stats.c
  • modules/infra/control/gr_nh_control.h
  • modules/infra/control/nexthop.c
  • modules/ip/cli/address.c
  • modules/ip/cli/route.c
  • modules/ip/control/route.c
  • modules/ip6/cli/address.c
  • modules/ip6/cli/route.c
  • modules/ip6/cli/router_advert.c
  • modules/ip6/control/route.c
  • modules/l2/cli/fdb.c
  • modules/l2/control/fdb.c
  • modules/policy/cli/conntrack.c
  • modules/policy/cli/dnat44.c
  • modules/policy/cli/snat44.c
  • modules/policy/control/conntrack.c

rjarry added 7 commits March 16, 2026 16:32
Add support for streaming API handlers that produce multiple responses
for a single request. This is needed for list operations that may return
large result sets.

Streaming handlers register two callbacks via gr_api_handler_stream:
init() allocates cursor state, next() advances the cursor and sends one
item via api_stream_next(). The framework always calls free(state) when
the stream ends.

api_stream_next() returns STREAM_MORE when the output buffer has room,
STREAM_PAUSE when the bufferevent write watermark is reached, or
a negative errno on error.

Handlers return api_stream_next(ctx, len, data) to send an item, or
return STREAM_END when done.

Production is driven by the libevent write callback: items are produced
in a loop until STREAM_PAUSE, then the loop resumes when the client
drains the buffer. A second streaming request from the same client is
rejected with EBUSY.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add nexthop_next() for cursor-based iteration over the nexthop mempool
and use it in a streaming handler for nexthop listing.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add rib4_next() for cursor-based route iteration and use it in
a streaming handler for IPv4 route listing.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Add rib6_next() for cursor-based route iteration and use it in
a streaming handler for IPv6 route listing.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Replace the callback-based FDB list with a streaming handler that
iterates over the FDB hash table entries with filtering.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Replace the callback-based conntrack list with a streaming handler
that iterates over the connection tracking hash table.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
There was no way to cancel a stream without disconnecting. When the CLI
pager was closed, gr_table_print_row() detected it but the client still
had to drain all remaining responses.

Add a GR_STREAM_CANCEL request type. If the client context is currently
processing a stream request, the server ends the stream with ECANCELED.

Update gr_api_client_stream_foreach to support break: when the loop is
interrupted, it sends the cancel request and drains remaining responses
(if any). Change all CLI stream consumers to break instead of continue
on gr_table_print_row() error.

Even if an API handler does not use the iterator framework introduced in
previous commits, breaking out of a gr_api_client_stream_foreach loop
will not cause any memory leak. The request will *NOT* be cancelled
however and all remaining items will be drained from the client. It is
fine for small collections (interfaces, addresses, etc.).

Signed-off-by: Robin Jarry <rjarry@redhat.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
modules/ip/control/route.c (1)

446-478: ⚠️ Potential issue | 🔴 Critical

Do not carry struct rte_rib_node * across paused stream callbacks.

main/api.c:334-369 resumes route4_list_next() after STREAM_PAUSE, but this stream keeps a raw s->node and Line 465 feeds it back into rte_rib_get_nxt() on the next callback. If fib4_reconfig() replaces the table or fib4_fini() / VRF removal tears it down while the stream is paused, that cursor is stale; Lines 493-497 also advance to the next VRF without clearing it, so a node from one RIB can be passed into a different RIB. That is undefined behavior and can crash the daemon. Use a stable resume key and reacquire a fresh node on each callback instead.

Also applies to: 480-506

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modules/ip/control/route.c` around lines 446 - 478, The stream currently
stores a raw pointer s->node across pauses (in struct route4_stream and used by
rib4_next_node), which can become stale if the RIB is reconfigured; instead
store a stable resume key (e.g. the prefix and prefix_len/depth and the vrf id)
in route4_stream and never persist a raw rte_rib_node* across callbacks. Update
route4_list_init/route4_stream to hold the resume fields (prefix, depth and
current vrf) and change rib4_next_node to, on each invocation, re-acquire a
fresh node from the given rib via rte_rib_lookup_exact (using the stored
prefix/depth) before calling rte_rib_get_nxt, and clear the resume key when
advancing to a new VRF or when returning NULL; remove reliance on s->node
pointer being valid across STREAM_PAUSE.
modules/ip6/control/route.c (1)

488-512: ⚠️ Potential issue | 🔴 Critical

Do not carry struct rte_rib6_node * across paused stream callbacks.

main/api.c:334-369 resumes route6_list_next() after STREAM_PAUSE, but this stream keeps a raw s->node and Line 499 feeds it back into rte_rib6_get_nxt() on the next callback. If fib6_reconfig() replaces the table or fib6_fini() / VRF removal tears it down while the stream is paused, that cursor is stale; Lines 536-540 also advance to the next VRF without clearing it, so a node from one RIB can be passed into a different RIB. That is undefined behavior and can crash the daemon. Use a stable resume key and reacquire a fresh node on each callback instead.

Also applies to: 523-549

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modules/ip6/control/route.c` around lines 488 - 512, The stream currently
stores a raw pointer s->node (in struct route6_stream) and reuses it in
rib6_next_node via rte_rib6_get_nxt, which is unsafe across STREAM_PAUSE because
fib6_reconfig()/fib6_fini() or VRF removal can replace the RIB; stop carrying
raw node pointers across pauses: replace s->node with a stable resume key (e.g.,
vrf id in s->cur_vrf plus an IPv6 address+prefixlen or an index/sequence token)
and on each resumed callback (route6_list_next and inside rib6_next_node)
reacquire a fresh node from the current rib with rte_rib6_lookup_exact or a safe
lookup by that resume key before calling rte_rib6_get_nxt; also clear or update
the resume key when advancing to the next VRF so a node from one RIB is never
passed into another; ensure rib6_next_node always calls a lookup to obtain a
current node rather than trusting s->node across pauses.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@modules/l2/control/fdb.c`:
- Around line 290-300: The function fdb_list_init unconditionally dereferences
req (const struct gr_fdb_list_req *req = request) which can be NULL for a
zero-length payload; add a NULL check for request immediately after malloc and
before accessing req->flags/bridge_id/iface_id: if request is NULL, free the
allocated struct fdb_list_stream (or avoid allocating it) and return NULL (or
initialize safe defaults), ensuring no dereference of req occurs; update
fdb_list_init to perform this check and cleanup so a malformed GR_FDB_LIST
payload cannot crash the server.

---

Duplicate comments:
In `@modules/ip/control/route.c`:
- Around line 446-478: The stream currently stores a raw pointer s->node across
pauses (in struct route4_stream and used by rib4_next_node), which can become
stale if the RIB is reconfigured; instead store a stable resume key (e.g. the
prefix and prefix_len/depth and the vrf id) in route4_stream and never persist a
raw rte_rib_node* across callbacks. Update route4_list_init/route4_stream to
hold the resume fields (prefix, depth and current vrf) and change rib4_next_node
to, on each invocation, re-acquire a fresh node from the given rib via
rte_rib_lookup_exact (using the stored prefix/depth) before calling
rte_rib_get_nxt, and clear the resume key when advancing to a new VRF or when
returning NULL; remove reliance on s->node pointer being valid across
STREAM_PAUSE.

In `@modules/ip6/control/route.c`:
- Around line 488-512: The stream currently stores a raw pointer s->node (in
struct route6_stream) and reuses it in rib6_next_node via rte_rib6_get_nxt,
which is unsafe across STREAM_PAUSE because fib6_reconfig()/fib6_fini() or VRF
removal can replace the RIB; stop carrying raw node pointers across pauses:
replace s->node with a stable resume key (e.g., vrf id in s->cur_vrf plus an
IPv6 address+prefixlen or an index/sequence token) and on each resumed callback
(route6_list_next and inside rib6_next_node) reacquire a fresh node from the
current rib with rte_rib6_lookup_exact or a safe lookup by that resume key
before calling rte_rib6_get_nxt; also clear or update the resume key when
advancing to the next VRF so a node from one RIB is never passed into another;
ensure rib6_next_node always calls a lookup to obtain a current node rather than
trusting s->node across pauses.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 757ec775-a3b9-4583-a084-f29d797bdcb1

📥 Commits

Reviewing files that changed from the base of the PR and between 7e6394e and 181d1c6.

📒 Files selected for processing (27)
  • api/gr_api.h
  • api/gr_api_client_impl.h
  • main/api.c
  • main/gr_module.h
  • main/module.c
  • main/module.h
  • modules/dhcp/cli/dhcp.c
  • modules/infra/api/nexthop.c
  • modules/infra/cli/affinity.c
  • modules/infra/cli/iface.c
  • modules/infra/cli/nexthop.c
  • modules/infra/cli/stats.c
  • modules/infra/control/gr_nh_control.h
  • modules/infra/control/nexthop.c
  • modules/ip/cli/address.c
  • modules/ip/cli/route.c
  • modules/ip/control/route.c
  • modules/ip6/cli/address.c
  • modules/ip6/cli/route.c
  • modules/ip6/cli/router_advert.c
  • modules/ip6/control/route.c
  • modules/l2/cli/fdb.c
  • modules/l2/control/fdb.c
  • modules/policy/cli/conntrack.c
  • modules/policy/cli/dnat44.c
  • modules/policy/cli/snat44.c
  • modules/policy/control/conntrack.c

Comment on lines +435 to +453
struct nexthop *nexthop_next(const struct nexthop *prev) {
struct rte_mempool_objhdr *hdr;

if (prev == NULL)
hdr = STAILQ_FIRST(&pool->elt_list);
else {
hdr = RTE_PTR_SUB(prev, sizeof(*hdr));
hdr = STAILQ_NEXT(hdr, next);
}

while (hdr != NULL) {
struct nexthop *nh = RTE_PTR_ADD(hdr, sizeof(*hdr));
if (nh->ref_count != 0)
return nh;
hdr = STAILQ_NEXT(hdr, next);
}

return NULL;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "nexthop.c" -o -name "nexthop.h" | head -20

Repository: DPDK/grout

Length of output: 210


🏁 Script executed:

rg -l "nexthop_next" --type c --type h

Repository: DPDK/grout

Length of output: 151


🏁 Script executed:

cat -n modules/infra/control/nexthop.c | sed -n '435,453p'

Repository: DPDK/grout

Length of output: 592


🏁 Script executed:

cat -n modules/infra/api/nexthop.c | sed -n '98,126p'

Repository: DPDK/grout

Length of output: 904


🏁 Script executed:

# Search for nexthop_config_allocate to see if it replaces the pool
rg -A 20 "nexthop_config_allocate" --type c

Repository: DPDK/grout

Length of output: 3594


🏁 Script executed:

# Search for nexthop_used_count to understand reference counting
rg -A 10 "nexthop_used_count" --type c

Repository: DPDK/grout

Length of output: 2512


🏁 Script executed:

# Check if prev pointer is actually stored without holding a reference
rg -B 5 -A 5 "prev\|s->prev" modules/infra/api/nexthop.c | head -40

Repository: DPDK/grout

Length of output: 36


Stream cursors cannot use raw pointers across pool reconfiguration.

The nh_list_next() function stores the last nexthop pointer in s->prev without holding a reference (line 109 of modules/infra/api/nexthop.c). When the stream pauses and resumes later, nexthop_next() performs pointer arithmetic (RTE_PTR_SUB) on prev assuming it's still valid. However, if all nexthops are deleted between pause and resume, nexthop_used_count() drops to zero, allowing nexthop_config_allocate() to free the old pool and create a new one. The stale prev pointer then points into freed memory, causing use-after-free when nexthop_next() reconstructs the mempool header and walks the list.

Use a stable cursor (e.g., nexthop ID or generation counter) instead of storing a raw object pointer across pause points.

🧰 Tools
🪛 Cppcheck (2.20.0)

[style] 435-435: The function 'nexthop_next' is never used.

(unusedFunction)

Comment on lines +290 to +300
static void *fdb_list_init(const void *request, struct api_ctx *) {
const struct gr_fdb_list_req *req = request;
struct gr_fdb_entry *fdb;
uint32_t next = 0;
struct fdb_list_stream *s = malloc(sizeof(*s));
if (s == NULL)
return NULL;
s->flags = req->flags;
s->bridge_id = req->bridge_id;
s->iface_id = req->iface_id;
s->iter = 0;
return s;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential null pointer dereference if client sends malformed request.

If a client sends GR_FDB_LIST with payload_len=0, the request parameter will be NULL (from read_cb), and the unconditional dereference at Line 295 (req->flags) will crash the server.

🛡️ Suggested fix
 static void *fdb_list_init(const void *request, struct api_ctx *) {
 	const struct gr_fdb_list_req *req = request;
+	if (req == NULL)
+		return NULL;
 	struct fdb_list_stream *s = malloc(sizeof(*s));
 	if (s == NULL)
 		return NULL;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static void *fdb_list_init(const void *request, struct api_ctx *) {
const struct gr_fdb_list_req *req = request;
struct gr_fdb_entry *fdb;
uint32_t next = 0;
struct fdb_list_stream *s = malloc(sizeof(*s));
if (s == NULL)
return NULL;
s->flags = req->flags;
s->bridge_id = req->bridge_id;
s->iface_id = req->iface_id;
s->iter = 0;
return s;
}
static void *fdb_list_init(const void *request, struct api_ctx *) {
const struct gr_fdb_list_req *req = request;
if (req == NULL)
return NULL;
struct fdb_list_stream *s = malloc(sizeof(*s));
if (s == NULL)
return NULL;
s->flags = req->flags;
s->bridge_id = req->bridge_id;
s->iface_id = req->iface_id;
s->iter = 0;
return s;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modules/l2/control/fdb.c` around lines 290 - 300, The function fdb_list_init
unconditionally dereferences req (const struct gr_fdb_list_req *req = request)
which can be NULL for a zero-length payload; add a NULL check for request
immediately after malloc and before accessing req->flags/bridge_id/iface_id: if
request is NULL, free the allocated struct fdb_list_stream (or avoid allocating
it) and return NULL (or initialize safe defaults), ensuring no dereference of
req occurs; update fdb_list_init to perform this check and cleanup so a
malformed GR_FDB_LIST payload cannot crash the server.

while (hdr != NULL) {
struct nexthop *nh = RTE_PTR_ADD(hdr, sizeof(*hdr));
if (nh->ref_count != 0)
return nh;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a real protection without a proper rcu lol

@rjarry rjarry closed this Mar 18, 2026
@rjarry rjarry deleted the api-stream branch March 18, 2026 10:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants