Type: Implementation guide. Normative spec: PROTOCOL_SPEC §11.1 Middleware/Interceptors.
Composable middleware pipeline using the onion execution model with before/after/on_error phases. Each middleware can inspect and modify inputs before module execution, transform outputs after execution, and participate in error recovery when failures occur. The pipeline supports both full subclass-based middleware and lightweight function adapters for simple use cases.
- Provide a base
Middlewareclass with no-op defaults for all three lifecycle phases (before, after, on_error), allowing subclasses to override only the methods they need. - Implement onion-model execution: before hooks run in registration order, after hooks run in reverse registration order, and on_error hooks run in reverse order over only the middlewares that executed before the failure. When middleware has explicit priority values (0-1000), higher priority executes first per PROTOCOL_SPEC. When priorities are equal, registration order applies.
- Support input modification in
before()(return a new dict to replace inputs, or None to pass through unchanged) and output modification inafter()(same contract). - Support error recovery:
on_error()handlers are called in reverse order; the first handler to return a non-None dict provides recovery output, short-circuiting the remaining handlers. - Provide
BeforeMiddlewareandAfterMiddlewareadapters that wrap plain callback functions as middleware instances, reducing boilerplate for single-phase hooks. - Include a
LoggingMiddlewarewith structured logging, security-aware redaction of inputs viacontext.redacted_inputs, and per-call duration tracking stored incontext.data. - Wrap before-phase failures in
MiddlewareChainErrorcarrying both the original exception and the list of executed middlewares, enabling targeted error recovery. - Ensure all mutations to the middleware list are thread-safe.
The middleware system follows a classic onion (layered) execution model. The MiddlewareManager holds an ordered list of Middleware instances and provides three execution methods corresponding to the module call lifecycle:
-
execute_before() -> tuple[dict, list[Middleware]]-- Iterates middlewares in registration order. Each middleware'sbefore()receives the current inputs and may return a replacement dict. Returns both the (possibly modified) inputs and the list of executed middlewares (for error rollback inexecute_on_error). If a middleware raises, aMiddlewareChainErroris raised with the list of already-executed middlewares attached. -
execute_after()-- Iterates middlewares in reverse registration order. Each middleware'safter()receives both original inputs and the current output, and may return a replacement output dict. -
execute_on_error(module_id, inputs, error, context, executed_middlewares)-- Iterates theexecuted_middlewareslist (from the before phase) in reverse order. The first handler to return a non-None dict becomes the recovery output. If a handler itself raises, the exception is logged and iteration continues.
The MiddlewareManager uses a lock-protected snapshot pattern for thread safety. Before each execution pass, snapshot() acquires the lock, copies the middleware list, and releases the lock. The execution then iterates over the snapshot without holding the lock, so concurrent add()/remove() calls do not interfere with in-flight pipelines.
Middleware(base class) -- Plain class (not ABC) with three methods returning None by default. Subclasses override only what they need.MiddlewareManager-- Manages the ordered list and orchestrates the three execution phases. Uses a lock with the snapshot pattern for thread safety.BeforeMiddleware/AfterMiddleware-- Lightweight adapters wrapping a single callback function as a fullMiddlewaresubclass. Non-overridden phases remain no-ops.LoggingMiddleware-- Structured logging middleware that records start time incontext.data["_apcore.mw.logging.start_time"]duringbefore(), computes duration inafter(), and usescontext.redacted_inputsto avoid leaking sensitive data. Configurable vialog_inputs,log_outputs, andlog_errorsflags.RetryMiddleware-- Built-in middleware that retries failed module calls with configurable backoff strategies (exponential or fixed). Only retries errors markedretryable=True. Supportsmax_retries,base_delay_ms,max_delay_ms, and jitter. See Middleware Guide for configuration details.MiddlewareChainError-- Exception subclass carryingoriginal(the root cause) andexecuted_middlewares(the list of middlewares whosebefore()was called, for targeted error recovery).
Inputs --> [MW1.before] --> [MW2.before] --> [MW3.before] --> Module.execute()
|
Output <-- [MW1.after] <-- [MW2.after] <-- [MW3.after] <------+
On Error (if MW3.before fails):
[MW2.on_error] <-- [MW3.on_error]
(MW1.on_error is not called because MW3 is where before failed,
and recovery walks backwards through executed middlewares)
=== "Python" ```python from apcore import APCore from apcore.middleware import Middleware, BeforeMiddleware, AfterMiddleware
client = APCore()
# Subclass-based middleware
class AuditMiddleware(Middleware):
def before(self, module_id, inputs, context):
print(f"[AUDIT] calling {module_id}")
def after(self, module_id, inputs, output, context):
print(f"[AUDIT] {module_id} returned {output}")
# Register middleware
client.use(AuditMiddleware())
# Lightweight function adapters
client.use_before(lambda module_id, inputs, ctx: print(f"Before: {module_id}"))
client.use_after(lambda module_id, inputs, out, ctx: print(f"After: {module_id}"))
@client.module(id="greet", description="Say hello")
def greet(name: str) -> dict:
return {"message": f"Hello, {name}!"}
result = client.call("greet", {"name": "World"})
```
=== "TypeScript" ```typescript import { APCore, Middleware, BeforeMiddleware, AfterMiddleware } from "apcore-js";
const client = new APCore();
// Subclass-based middleware
class AuditMiddleware extends Middleware {
before(moduleId: string, inputs: Record<string, unknown>, context: unknown) {
console.log(`[AUDIT] calling ${moduleId}`);
}
after(moduleId: string, inputs: Record<string, unknown>, output: Record<string, unknown>, context: unknown) {
console.log(`[AUDIT] ${moduleId} returned`, output);
}
}
// Register middleware
client.use(new AuditMiddleware());
// Lightweight function adapters
client.useBefore((moduleId, inputs, ctx) => { console.log(`Before: ${moduleId}`); return null; });
client.useAfter((moduleId, inputs, out, ctx) => { console.log(`After: ${moduleId}`); return null; });
client.module({
id: "greet",
description: "Say hello",
inputSchema: { type: "object", properties: { name: { type: "string" } } },
outputSchema: { type: "object", properties: { message: { type: "string" } } },
execute: ({ name }: { name: string }) => ({ message: `Hello, ${name}!` }),
});
const result = await client.call("greet", { name: "World" });
```
=== "Rust" ```rust use apcore::APCore; use apcore::middleware::{Middleware, MiddlewareContext}; use apcore::context::Context; use apcore::errors::ModuleError; use async_trait::async_trait; use serde_json::Value;
struct AuditMiddleware;
#[async_trait]
impl Middleware for AuditMiddleware {
async fn before(
&self,
module_id: &str,
inputs: &Value,
_ctx: &Context<Value>,
) -> Result<Option<Value>, ModuleError> {
println!("[AUDIT] calling {}", module_id);
Ok(None)
}
async fn after(
&self,
module_id: &str,
_inputs: &Value,
output: &Value,
_ctx: &Context<Value>,
) -> Result<Option<Value>, ModuleError> {
println!("[AUDIT] {} returned {:?}", module_id, output);
Ok(None)
}
}
let mut client = APCore::new();
client.use_middleware(Box::new(AuditMiddleware));
```
apcore.context.Context-- Execution context passed to all middleware methods, providestrace_id,caller_id,redacted_inputs, anddatadict for per-call state storage.
??? info "Python SDK reference"
The following tables are not protocol requirements — they document the Python SDK's source layout and runtime dependencies for implementers/users of apcore-python.
**Source files:**
| File | Lines | Purpose |
|------|-------|---------|
| `src/apcore/middleware/__init__.py` | 16 | Package re-exports for convenient imports |
| `src/apcore/middleware/base.py` | 36 | `Middleware` base class with no-op defaults |
| `src/apcore/middleware/manager.py` | 129 | `MiddlewareManager` and `MiddlewareChainError` |
| `src/apcore/middleware/logging.py` | 94 | `LoggingMiddleware` with structured logging and redaction |
| `src/apcore/middleware/adapters.py` | 43 | `BeforeMiddleware` and `AfterMiddleware` function adapters |
| `src/apcore/middleware/retry.py` | ~190 | `RetryMiddleware` with configurable backoff strategies (exponential/fixed) |
**Runtime dependencies:**
- `threading` (stdlib) -- Lock for thread-safe middleware list management.
- `logging` (stdlib) -- Standard library logging used by `LoggingMiddleware` and manager error reporting.
- `time` (stdlib) -- Wall-clock timing for duration measurements in `LoggingMiddleware`.
Tests are split across two files targeting different abstraction levels:
- Middleware base class: Verifies it is not an ABC, can be instantiated directly, all methods return None by default, and subclasses can selectively override methods.
- BeforeMiddleware adapter: Confirms it is a
Middlewaresubclass, delegatesbefore()to the callback, and leavesafter()/on_error()as no-ops. Validates correct argument forwarding. - AfterMiddleware adapter: Same structure as
BeforeMiddlewaretests but for theafter()phase.
- add/remove: Verifies append ordering, identity-based removal, and return values.
- execute_before: Tests registration-order execution, input replacement via returned dicts, None passthrough,
MiddlewareChainErroron failure with correctexecuted_middlewarestracking, and empty-list passthrough. - execute_after: Tests reverse-order execution, output replacement, None passthrough, exception propagation, and empty-list passthrough.
- execute_on_error: Tests reverse iteration over executed middlewares, first-dict-wins recovery, None continuation, exception-in-handler logging and continuation, and empty-list returns None.
- Thread safety: Concurrent
add()with no lost middlewares (10 threads x 50 adds), snapshot consistency after mutations, and concurrentadd()+snapshot()with no exceptions (5 writer + 5 reader threads).
- Full pipeline tests exercising middleware through the
Executor.call()path.
module_id(str/string/&str, required) — ID of the module about to executeinputs(dict/object/Value, required) — module inputs (may be modified and returned)context(Context, required) — current execution context
- Any error raised by the middleware propagates and aborts the execution pipeline (downstream middlewares'
beforehooks are skipped;on_errorhooks of already-executed middlewares are invoked)
- On success:
dict/Record<string, unknown>/Valueor None/null/() — modified inputs (or None to pass inputs unchanged)
- async: language-dependent (Python allows sync or async; TypeScript and Rust MUST be async)
- thread_safe: true (called under executor lock on shared mutable state)
- pure: false (may mutate context or inputs)
- Any error raised by the middleware: behavior is SDK-defined. Python and Rust propagate the first error immediately; TypeScript catches per-hook and rethrows the first error after all hooks have run. See PROTOCOL_SPEC.md §Middleware for the normative MUST once aligned.
module_id(str/string/&str, required)inputs(dict/object/Value, required)output(dict/object/Value, required) — module outputcontext(Context, required)
- On success:
dict/Record<string, unknown>/Valueor None/null/() — modified output (or None to pass unchanged)
- async: language-dependent
- thread_safe: true
module_id(str/string/&str, required)inputs(dict/object/Value, required)error(ModuleError, required) — the error that terminated executioncontext(Context, required)
- No errors raised (on_error MUST NOT raise)
- On success with recovery:
dict/Record<string, unknown>/Value— replacement output; Python and TypeScript return immediately on first recovery value; Rust continues calling all hooks but keeps first recovery. See note underMiddleware.after. - On pass-through: None/null/None — signals no recovery; error continues propagating
- async: language-dependent
- thread_safe: true
Context keys in context.data are partitioned by namespace to prevent collisions between framework internals and user extensions.
Normative rules:
- Framework-owned keys MUST use the
_apcore.*prefix (e.g.,_apcore.mw.logging.start_time). - User extensions MUST use the
ext.*prefix (e.g.,ext.my_company.request_id). - Implementations MUST NOT write to keys in the other party's namespace. A framework implementation MUST NOT write
ext.*keys; user middleware MUST NOT write_apcore.*keys. - Keys that begin with neither prefix are allowed for backward compatibility but SHOULD be migrated to one of the two namespaces.
Canonical _apcore.* keys:
| Key | Set by | Value |
|---|---|---|
_apcore.mw.logging.start_time |
LoggingMiddleware.before() |
Wall-clock time (float, seconds since epoch) at the start of the module call |
_apcore.mw.tracing.span_id |
TracingMiddleware.before() |
Active span ID string for the current module execution span |
_apcore.mw.circuit.state |
CircuitBreakerMiddleware.before() |
Circuit state string: CLOSED, OPEN, or HALF_OPEN |
The CircuitBreakerMiddleware tracks per-module error rates and latencies, and opens a circuit when thresholds are exceeded, preventing calls to unhealthy modules.
Normative rules:
CircuitBreakerMiddlewareMUST track per-(module_id,caller_id) error statistics in a configurable rolling window.- When the error rate in the window exceeds
open_threshold(default:0.5), the circuit for that pair MUST transition toOPEN. InOPENstate, calls MUST be short-circuited and MUST raiseCircuitBreakerOpenError. - The circuit MUST transition to
HALF_OPENafterrecovery_window_ms(default:30000) has elapsed since the circuit opened. InHALF_OPENstate, exactly one probe call is allowed through. A successful probe transitions the circuit toCLOSED; a failed probe transitions it back toOPEN. CircuitBreakerMiddlewareMUST store the current circuit state incontext.data["_apcore.mw.circuit.state"]on every call.- Implementations MUST emit
apcore.circuit.openedandapcore.circuit.closedevents via theEventEmitterwhen state transitions occur.
=== "Python" ```python from apcore import APCore from apcore.middleware import CircuitBreakerMiddleware
client = APCore()
client.use(CircuitBreakerMiddleware(
open_threshold=0.3, # open if >30% of calls fail
recovery_window_ms=60000, # probe after 60 seconds
window_size=20, # rolling window of 20 calls
))
result = client.call("executor.payment.charge", {"amount": 100})
```
=== "TypeScript" ```typescript import { APCore, CircuitBreakerMiddleware } from "apcore-js";
const client = new APCore();
client.use(new CircuitBreakerMiddleware({
openThreshold: 0.3, // open if >30% of calls fail
recoveryWindowMs: 60000, // probe after 60 seconds
windowSize: 20, // rolling window of 20 calls
}));
const result = await client.call("executor.payment.charge", { amount: 100 });
```
=== "Rust" ```rust use apcore::APCore; use apcore::middleware::CircuitBreakerMiddleware;
let mut client = APCore::new();
client.use_middleware(Box::new(
CircuitBreakerMiddleware::builder()
.open_threshold(0.3) // open if >30% of calls fail
.recovery_window_ms(60_000) // probe after 60 seconds
.window_size(20) // rolling window of 20 calls
.build(),
));
let result = client.call("executor.payment.charge", json!({ "amount": 100 })).await?;
```
The TracingMiddleware creates spans around module execution compatible with OTLP exporters.
Normative rules:
TracingMiddlewareMUST create a span inbefore()with span name equal tomodule_idand attributes{ apcore.trace_id, apcore.caller_id, apcore.module_id }.TracingMiddlewareMUST end the span inafter()with the execution result status (okon success,erroron failure).TracingMiddlewareMUST store the span ID incontext.data["_apcore.mw.tracing.span_id"].TracingMiddlewareSHOULD propagate W3Ctraceparentheaders to outbound calls.TracingMiddlewareMUST NOT raise if the OpenTelemetry SDK is not installed; in that case it MUST operate as a no-op.
=== "Python" ```python from apcore import APCore from apcore.middleware import TracingMiddleware
# opentelemetry-api must be installed for active tracing;
# if absent, TracingMiddleware silently becomes a no-op.
client = APCore()
client.use(TracingMiddleware(
service_name="my-service",
propagate_traceparent=True,
))
result = client.call("executor.email.send_email", {"to": "user@example.com"})
```
=== "TypeScript" ```typescript import { APCore, TracingMiddleware } from "apcore-js"; // @opentelemetry/api must be installed for active tracing; // if absent, TracingMiddleware silently becomes a no-op.
const client = new APCore();
client.use(new TracingMiddleware({
serviceName: "my-service",
propagateTraceparent: true,
}));
const result = await client.call("executor.email.send_email", { to: "user@example.com" });
```
=== "Rust" ```rust use apcore::APCore; use apcore::middleware::TracingMiddleware; // opentelemetry crate must be in Cargo.toml for active tracing; // if absent (feature-flag disabled), TracingMiddleware is a no-op.
let mut client = APCore::new();
client.use_middleware(Box::new(
TracingMiddleware::builder()
.service_name("my-service")
.propagate_traceparent(true)
.build(),
));
let result = client.call("executor.email.send_email", json!({ "to": "user@example.com" })).await?;
```
Config-over-code: middleware chains SHOULD be configurable via apcore.yaml without writing any application code.
middleware:
- type: "tracing"
match_modules: ["executor.*"]
- type: "circuit_breaker"
open_threshold: 0.3
recovery_window_ms: 60000
- type: "logging"
log_inputs: true
log_outputs: false
- type: "custom"
handler: "myapp.middleware.RateLimiter"
config:
requests_per_second: 100Normative rules:
- Implementations MUST support at minimum the
tracing,circuit_breaker, andloggingbuilt-in middleware types via YAML configuration. - Custom middleware types MUST be resolvable via a dotted module path supplied in the
handlerfield (e.g.,myapp.middleware.RateLimiter). Implementations MUST raise a clear configuration error if the handler cannot be imported or does not implement theMiddlewareinterface. - The
match_modulesfield, when present, restricts the middleware to module IDs matching the provided glob patterns. When absent, the middleware applies to all modules.
Incorrect async detection causes middleware to be invoked synchronously when it should be awaited, silently swallowing results.
!!! warning
Using isawaitable(handler) in Python always returns False for non-called functions — it tests whether an object is awaitable, not whether a function is a coroutine function. Use inspect.iscoroutinefunction(handler) instead.
Normative rules:
- Python: Implementations MUST use
inspect.iscoroutinefunction(handler)to detect async handlers. Usingisawaitable(handler)on an uncalled function is incorrect and MUST NOT be used for this purpose. - TypeScript: Implementations MUST check
handler.constructor.name === 'AsyncFunction'to detect async handlers before invocation. Checkinginstanceof Promiseafter invocation is too late (the function has already been called synchronously). Preferred approach: inspect the function itself viahandler.constructor.name. - Rust: Async handlers are statically typed via
async_trait; no runtime detection is needed or possible.
handler(callable/Function/fn, required) — the middleware function to inspect
- None
- On success: bool/boolean/bool —
trueif the handler is asynchronous,falseotherwise
- async: false
- thread_safe: true
- pure: true
- idempotent: true
Module-level middleware (before / after / on_error) wraps the entire 11-step pipeline. Pipeline step middleware is a finer-grained extension point: it wraps the execution of a single named pipeline step (e.g., validate_input, acl_check, execute). This is the lifecycle-shaped surface for the step-level middleware concept introduced in core-executor.md §1.3 — the core-executor section documents the wrapper-style next-callback API, while this section documents the lifecycle-shaped API that mirrors module-level Middleware.
A registered StepMiddleware participates in three callbacks per step invocation:
before_step(step_name, ctx, inputs)
--> step body executes
|
+-- on success: after_step(step_name, ctx, inputs, output)
+-- on failure: on_step_error(step_name, ctx, inputs, error)
- Implementations MUST provide a
StepMiddlewareextension point with three callbacks:before_step,after_step, andon_step_error. Each callback MAY be omitted (default no-op). before_step(step_name, ctx, inputs)MUST be invoked before the step body executes. Returning a non-null/non-None/Some(...)dict MUST replace the inputs passed to the step body. Returning null/None/NoneMUST pass inputs through unchanged.after_step(step_name, ctx, inputs, output)MUST be invoked after the step body completes successfully. Returning a non-null/non-None/Some(...)dict MUST replace the step output observed by downstream steps.on_step_error(step_name, ctx, inputs, error)MUST be invoked when the step body raises. Returning a non-null/non-None/Some(...)value MUST be treated as recovery output, mirroring the recovery contract of module-levelon_error. The error MUST NOT propagate further; the recovery value becomes the step's output.- Returning null/None/
Nonefromon_step_errorMUST cause the original error to continue propagating (subject to the step'signore_errorssetting per §1.1 Fail-Fast Error Handling). - When multiple
StepMiddlewareinstances are registered for the same step,before_stepcallbacks MUST run in registration order, andafter_stepcallbacks MUST run in reverse registration order (onion model, identical to module-level middleware). on_step_errorcallbacks MUST run in reverse registration order over only the middlewares whosebefore_stephad executed before the failure. The first non-null recovery value short-circuits remaining handlers (first-recovery-wins).- Async
StepMiddlewarecallbacks MUST be supported in all SDKs. Python SDKs MUST detect async callbacks viainspect.iscoroutinefunction; TypeScript SDKs MUST inspecthandler.constructor.name === "AsyncFunction"; Rust SDKs MUST useasync_trait(see §1.5 Async Handler Detection). - Step middleware errors raised from
before_stepitself (not from the step body) MUST be wrapped inMiddlewareChainError, identical to the module-level contract.
Pipeline configuration MUST fail fast on structural errors so that misconfiguration is caught at startup rather than at first request.
- Implementations MUST raise
ConfigurationErrorat YAML/config parse time when apipeline.configure[].namereferences a step that does not exist in the active strategy. Implementations MUST NOT silently ignore the directive or log a warning and continue. - Implementations MUST raise
PipelineDependencyErrorat strategy construction time (before anycall()runs) when a step's declaredrequires:is not satisfied by an upstream step'sprovides:. The error message MUST include the unsatisfied capability name and the dependent step name. - Implementations MUST NOT defer dependency validation until first invocation. Strategy construction MUST be all-or-nothing: a strategy either validates cleanly and is callable, or construction fails with a typed error.
# apcore.yaml — declarative step middleware + dependency contract
pipeline:
step_middleware:
- step: validate_input
handler: "myapp.pipeline.tracing:TimingStepMiddleware"
- step: execute
handler: "myapp.pipeline.cost:CostStepMiddleware"
configure:
- name: validate_input
requires: ["context.identity"] # provided by context_creation
provides: ["validated_inputs"] # consumed by executeA tracing-style StepMiddleware that logs the wall-clock duration of each step:
=== "Python" ```python import time from apcore import APCore from apcore.middleware import StepMiddleware
class TimingStepMiddleware(StepMiddleware):
async def before_step(self, step_name, ctx, inputs):
ctx.data[f"_apcore.step.{step_name}.start"] = time.perf_counter()
return None # pass inputs through unchanged
async def after_step(self, step_name, ctx, inputs, output):
start = ctx.data.pop(f"_apcore.step.{step_name}.start", None)
if start is not None:
elapsed_ms = (time.perf_counter() - start) * 1000
print(f"step={step_name} elapsed_ms={elapsed_ms:.2f}")
return None # pass output through unchanged
async def on_step_error(self, step_name, ctx, inputs, error):
start = ctx.data.pop(f"_apcore.step.{step_name}.start", None)
elapsed_ms = (time.perf_counter() - start) * 1000 if start else 0.0
print(f"step={step_name} elapsed_ms={elapsed_ms:.2f} error={type(error).__name__}")
return None # do not recover; let the error propagate
client = APCore()
client.use_step_middleware("validate_input", TimingStepMiddleware())
@client.module(id="demo.greet", description="Greet the user")
def greet(name: str) -> dict:
return {"message": f"Hello, {name}!"}
result = client.call("demo.greet", {"name": "World"})
```
=== "TypeScript" ```typescript import { APCore, StepMiddleware } from "apcore-js";
class TimingStepMiddleware extends StepMiddleware {
async beforeStep(stepName: string, ctx: any, inputs: Record<string, unknown>) {
ctx.data[`_apcore.step.${stepName}.start`] = performance.now();
return null; // pass inputs through unchanged
}
async afterStep(stepName: string, ctx: any, inputs: Record<string, unknown>, output: Record<string, unknown>) {
const start = ctx.data[`_apcore.step.${stepName}.start`];
delete ctx.data[`_apcore.step.${stepName}.start`];
if (start !== undefined) {
const elapsedMs = performance.now() - start;
console.log(`step=${stepName} elapsed_ms=${elapsedMs.toFixed(2)}`);
}
return null; // pass output through unchanged
}
async onStepError(stepName: string, ctx: any, inputs: Record<string, unknown>, error: Error) {
const start = ctx.data[`_apcore.step.${stepName}.start`];
delete ctx.data[`_apcore.step.${stepName}.start`];
const elapsedMs = start !== undefined ? performance.now() - start : 0;
console.log(`step=${stepName} elapsed_ms=${elapsedMs.toFixed(2)} error=${error.constructor.name}`);
return null; // do not recover; let the error propagate
}
}
const client = new APCore();
client.useStepMiddleware("validate_input", new TimingStepMiddleware());
client.module({
id: "demo.greet",
description: "Greet the user",
inputSchema: { type: "object", properties: { name: { type: "string" } } },
outputSchema: { type: "object", properties: { message: { type: "string" } } },
execute: ({ name }: { name: string }) => ({ message: `Hello, ${name}!` }),
});
const result = await client.call("demo.greet", { name: "World" });
```
=== "Rust" ```rust use apcore::APCore; use apcore::middleware::StepMiddleware; use apcore::context::Context; use apcore::errors::ModuleError; use async_trait::async_trait; use serde_json::Value; use std::time::Instant;
struct TimingStepMiddleware;
#[async_trait]
impl StepMiddleware for TimingStepMiddleware {
async fn before_step(
&self,
step_name: &str,
ctx: &mut Context<Value>,
_inputs: &Value,
) -> Result<Option<Value>, ModuleError> {
let key = format!("_apcore.step.{}.start", step_name);
ctx.data.insert(key, Value::String(format!("{:?}", Instant::now())));
Ok(None)
}
async fn after_step(
&self,
step_name: &str,
_ctx: &mut Context<Value>,
_inputs: &Value,
_output: &Value,
) -> Result<Option<Value>, ModuleError> {
println!("step={} completed", step_name);
Ok(None)
}
async fn on_step_error(
&self,
step_name: &str,
_ctx: &mut Context<Value>,
_inputs: &Value,
error: &ModuleError,
) -> Result<Option<Value>, ModuleError> {
println!("step={} error={}", step_name, error.code());
Ok(None) // do not recover; let the error propagate
}
}
let mut client = APCore::new();
client.use_step_middleware("validate_input", Box::new(TimingStepMiddleware));
```
step_name(str/string/&str, required) — pipeline step name (e.g.,validate_input)ctx(PipelineContext, required) — current pipeline contextinputs(dict/object/Value, required) — current inputs to the step
- Any error raised aborts the step body and triggers
on_step_errorcallbacks of already-executed step middlewares (mirrorsMiddlewareChainErrorfor the module-level chain)
- On success: dict/object/Value or null/None/None — replacement inputs (null = pass through unchanged)
- async: language-dependent (Python sync or async; TypeScript and Rust MUST be async)
- thread_safe: true
- pure: false (may mutate ctx)
step_name,ctx,inputs(same asbefore_step)output(dict/object/Value, required) — step output
- Behavior is SDK-defined (see Middleware.after for parity rule)
- On success: dict/object/Value or null/None/None — replacement output (null = pass through)
- async: language-dependent
- thread_safe: true
step_name,ctx,inputs(same asbefore_step)error(ModuleError, required) — the error raised by the step body
- on_step_error MUST NOT raise; exceptions inside the handler MUST be logged and iteration continues with the next handler
- On success with recovery: dict/object/Value — replacement output, short-circuits remaining handlers
- On pass-through: null/None/None — error continues propagating
- async: language-dependent
- thread_safe: true
The iscoroutinefunction-style detection in §1.5 is necessary but not sufficient. Higher-order wrappers like Python's functools.partial, JavaScript closures returned from factories, or class methods rebound onto instances are not literally async def / async function, yet they MAY return a coroutine / Promise when invoked. Function-shape inspection misses these cases and silently drops the awaited result.
To preserve correctness across all wrapper styles, middleware managers MUST detect awaitability on the return value of each before / after / on_error invocation, not on the function shape alone.
- Implementations MUST inspect each handler's RETURN value: if the returned object is awaitable (Python:
inspect.isawaitable(value)) or a thenable (TypeScript: object with a callable.thenmethod) or a future (Rust: aFutureresolved by the runtime), the middleware manager MUSTawaitit before proceeding to the next phase. - Implementations MUST NOT rely solely on
iscoroutinefunction/handler.constructor.name === 'AsyncFunction'as the gating check. These checks SHOULD remain as a fast-path optimization but MUST be supplemented by return-value detection. - The function-shape check from §1.5 is RETAINED as guidance for the warning case (e.g.,
async defdeclared but never awaited at the call site), but the authoritative decision MUST use the actual return value. - Synchronous middleware (return value is a plain
dict/null/()) MUST NOT be awaited; the manager MUST forward the value as-is.
import functools
async def _audit(module_id, inputs, ctx, *, source):
print(f"[{source}] {module_id}")
return None
# This is NOT a coroutine function — iscoroutinefunction returns False.
# But calling it returns a coroutine. Old detection silently swallows it.
audit = functools.partial(_audit, source="api")
client.use_before(audit)The same pattern occurs in TypeScript when a factory returns an arrow function whose body is async (the wrapper is sync, the body is async), or when a class method is wrapped in a decorator that re-binds this.
=== "Python" ```python import asyncio import functools import inspect from typing import Any, Callable
async def invoke_handler(handler: Callable[..., Any], *args: Any) -> Any:
# Fast path: known async function. Optimization, not the gate.
result = handler(*args)
# Authoritative check: did we get an awaitable back?
if inspect.isawaitable(result):
return await result
return result
# Works for: async def, functools.partial(async_fn, ...),
# decorated methods, lambda returning a coroutine, etc.
async def main():
async def audit(module_id, inputs, ctx, *, source):
return None
wrapped = functools.partial(audit, source="api")
await invoke_handler(wrapped, "math.add", {"a": 1}, None)
```
=== "TypeScript" ```typescript type Handler = (...args: unknown[]) => unknown;
function isThenable(v: unknown): v is PromiseLike<unknown> {
return (
typeof v === "object" &&
v !== null &&
typeof (v as { then?: unknown }).then === "function"
);
}
async function invokeHandler(handler: Handler, ...args: unknown[]): Promise<unknown> {
// Authoritative check: inspect the RETURN value, not handler.constructor.name.
const result = handler(...args);
if (isThenable(result)) {
return await result;
}
return result;
}
// Works for: async function, () => somePromise, partial(asyncFn, ...),
// decorated class methods rebound via .bind(this), etc.
const wrapped = (...args: unknown[]) => audit("api", ...args);
await invokeHandler(wrapped, "math.add", { a: 1 }, null);
```
=== "Rust"
```rust
// Rust handles this statically: every Middleware trait method returns
// impl Future via #[async_trait]. The compiler ENFORCES that callers
// .await the result; there is no runtime "shape" detection and no way
// to silently drop a Future. The async-correctness bug class is impossible.
//
// Higher-order wrappers (closures, partials, decorators) preserve the
// Future signature through the trait bound, so they remain awaitable
// by construction.
use async_trait::async_trait;
use apcore::middleware::Middleware;
use apcore::context::Context;
use apcore::errors::ModuleError;
use serde_json::Value;
struct Audit;
#[async_trait]
impl Middleware for Audit {
async fn before(
&self,
module_id: &str,
_inputs: &Value,
_ctx: &Context<Value>,
) -> Result<Option<Value>, ModuleError> {
println!("[audit] {module_id}");
Ok(None)
}
}
```
Implementations that previously gated awaiting on iscoroutinefunction / handler.constructor.name === 'AsyncFunction' MUST switch to return-value inspection. The fix is backward-compatible: synchronous handlers that return non-awaitable values are unaffected; awaitable returns from non-async-declared functions are now correctly awaited instead of leaked.