Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.cache
__pycache__
.coverage*
12 changes: 10 additions & 2 deletions fasta2a/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from .applications import FastA2A
from .broker import Broker
from .schema import Skill
from .schema import AgentExtension, Skill, StreamEvent
from .storage import Storage
from .worker import Worker

__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']
__all__ = [
'AgentExtension',
'Broker',
'FastA2A',
'Skill',
'Storage',
'StreamEvent',
'Worker',
]
30 changes: 24 additions & 6 deletions fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
A2AResponse,
AgentCapabilities,
AgentCard,
AgentExtension,
AgentInterface,
AgentProvider,
Skill,
Expand All @@ -43,7 +44,9 @@ def __init__(
description: str | None = None,
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
extensions: list[AgentExtension] | None = None,
docs_url: str | None = '/docs',
streaming: bool = True,
# Starlette
debug: bool = False,
routes: Sequence[Route] | None = None,
Expand All @@ -68,7 +71,9 @@ def __init__(
self.description = description
self.provider = provider
self.skills = skills or []
self.extensions = extensions or []
self.docs_url = docs_url
self.streaming = streaming
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
self.default_input_modes = ['application/json']
self.default_output_modes = ['application/json']
Expand All @@ -92,6 +97,11 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:

async def _agent_card_endpoint(self, request: Request) -> Response:
if self._agent_card_json_schema is None:
capabilities = AgentCapabilities(
streaming=self.streaming, push_notifications=False, state_transition_history=False
)
Comment on lines +100 to +102
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 state_transition_history=False is silently dropped during serialization

At fasta2a/applications.py:101, state_transition_history=False is passed to the AgentCapabilities TypedDict constructor, but this field is not defined in the TypedDict at fasta2a/schema.py:78-100. Pyright does flag this (confirmed via uv run pyright), so it's a CI-caught type error. However, the runtime impact is notable: pydantic's dump_json silently strips the unknown key, so stateTransitionHistory never appears in the serialized agent card JSON. Since the value is False, this is arguably equivalent to omitting it (default assumed false), but if someone changes this to True in the future, it would also be silently dropped. The fix would be to either add a state_transition_history field to AgentCapabilities TypedDict or remove it from the constructor call.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

if self.extensions:
capabilities['extensions'] = self.extensions
agent_card = AgentCard(
name=self.name,
description=self.description or 'An AI agent exposed as an A2A agent.',
Expand All @@ -102,7 +112,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
skills=self.skills,
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(streaming=True, push_notifications=False),
capabilities=capabilities,
)
if self.provider is not None:
agent_card['provider'] = self.provider
Expand Down Expand Up @@ -130,9 +140,22 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
data = await request.body()
a2a_request = a2a_request_ta.validate_json(data)

# Parse activated extensions from the A2A-Extensions header
extensions_header = request.headers.get('a2a-extensions', '')
activated_extensions: list[str] = (
[uri.strip() for uri in extensions_header.split(',') if uri.strip()] if extensions_header else []
)
# Stash on the request state so workers / handlers can inspect them
request.state.activated_extensions = activated_extensions
Comment on lines +143 to +149
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 activated_extensions parsed but never consumed by downstream code

At fasta2a/applications.py:144-150, the A2A-Extensions header is parsed and stored on request.state.activated_extensions with a comment saying "so workers / handlers can inspect them." However, this state is never passed to TaskManager, Broker, or Worker — those components receive only TaskSendParams which doesn't include activated extensions information. The request.state is scoped to the HTTP request handler and is not accessible from workers. This appears to be scaffolding for future use rather than a bug, but it means extensions have no functional effect currently.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


jsonrpc_response: A2AResponse
if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
return StreamingResponse(
self.task_manager.stream_message(a2a_request),
media_type='text/event-stream',
)
Comment on lines +154 to +158
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 No guard against message/stream requests when streaming=False

The new streaming parameter on FastA2A controls whether the agent card advertises streaming capability (fasta2a/applications.py:49), but the _agent_run_endpoint at line 154 will still accept and process message/stream requests even when streaming=False. The agent card is just an informational hint to clients; a misbehaving client could still send message/stream and receive a streaming response. This may be intentional (capability advertisement vs. enforcement), but it's worth considering whether to return an error for message/stream when streaming is disabled.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand All @@ -147,11 +170,6 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
jsonrpc_response = await self.task_manager.delete_task_push_notification_config(a2a_request)
elif a2a_request['method'] == 'tasks/list':
jsonrpc_response = await self.task_manager.list_tasks(a2a_request)
elif a2a_request['method'] == 'message/stream':
return StreamingResponse(
self.task_manager.stream_message(a2a_request),
media_type='text/event-stream',
)
elif a2a_request['method'] == 'tasks/resubscribe':
return StreamingResponse(
self.task_manager.resubscribe_task(a2a_request),
Expand Down
15 changes: 15 additions & 0 deletions fasta2a/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ class AgentCapabilities(TypedDict):
extensions: NotRequired[list[AgentExtension]]
"""A list of protocol extensions supported by the agent."""

extensions: NotRequired[list[AgentExtension]]
"""A2A extensions supported by this agent.

Each extension is declared as an ``AgentExtension`` object with a
unique ``uri``, optional ``description``, ``required`` flag, and
``params`` configuration. Clients activate extensions by sending
the selected URIs in the ``A2A-Extensions`` HTTP header.
"""
Comment on lines +93 to +100
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Duplicate extensions field in AgentCapabilities TypedDict

The PR adds a second extensions: NotRequired[list[AgentExtension]] field at fasta2a/schema.py:93 without removing the pre-existing identical field at fasta2a/schema.py:90. In Python, the second class-body definition silently overwrites the first in __annotations__, so the TypedDict ends up with one extensions key. Pyright does not flag this (confirmed: 0 errors on this file). While the types are identical so there's no runtime breakage today, this is clearly an accidental duplication — the intent was to update the docstring, not add a second field. The original definition (lines 90–91) should be removed.

Suggested change
extensions: NotRequired[list[AgentExtension]]
"""A2A extensions supported by this agent.
Each extension is declared as an ``AgentExtension`` object with a
unique ``uri``, optional ``description``, ``required`` flag, and
``params`` configuration. Clients activate extensions by sending
the selected URIs in the ``A2A-Extensions`` HTTP header.
"""
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.



@pydantic.with_config({'alias_generator': to_camel})
class HttpSecurityScheme(TypedDict):
Expand Down Expand Up @@ -992,3 +1001,9 @@ class StreamResponse(TypedDict):
send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse)
stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest)
stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse)

# Type for streaming events (used by broker and task manager)
StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent]
"""A streaming event that can be sent during message/stream requests."""

stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent)
8 changes: 7 additions & 1 deletion fasta2a/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from typing_extensions import TypeVar

from .schema import Artifact, Message, Task, TaskState, TaskStatus
from .schema import (
Artifact,
Message,
Task,
TaskState,
TaskStatus,
)

ContextT = TypeVar('ContextT', default=Any)

Expand Down
67 changes: 65 additions & 2 deletions fasta2a/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from opentelemetry.trace import get_tracer, use_span
from typing_extensions import assert_never

from .schema import StreamResponse, TaskArtifactUpdateEvent, TaskStatusUpdateEvent
from .storage import ContextT, Storage

if TYPE_CHECKING:
from .broker import Broker, TaskOperation
from .schema import Artifact, Message, TaskIdParams, TaskSendParams
from .schema import Artifact, Message, TaskIdParams, TaskSendParams, TaskState

tracer = get_tracer(__name__)

Expand Down Expand Up @@ -56,7 +57,7 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
except Exception:
task_id = task_operation['params']['id']
task = await self.storage.update_task(task_id, state='failed')
from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent
from .schema import TaskStatus

await self.broker.event_bus.emit(
task_id,
Expand All @@ -70,6 +71,68 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
)
await self.broker.event_bus.close(task_id)

async def update_task(
self,
task_id: str,
state: TaskState,
new_artifacts: list[Artifact] | None = None,
new_messages: list[Message] | None = None,
) -> None:
"""Update a task's state in storage and publish streaming events to the broker.

This is the primary method workers should use to update task state. It handles
both persisting the update and notifying any stream subscribers.
"""
task = await self.storage.update_task(task_id, state, new_artifacts, new_messages)

final = state in ('completed', 'failed', 'canceled')

# For non-final updates, publish status first
if not final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)

# Publish message events before final status so subscribers receive them
if new_messages:
for message in new_messages:
await self.broker.event_bus.emit(task_id, StreamResponse(message=message))

# Publish artifact events
if new_artifacts:
for artifact in new_artifacts:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
artifact_update=TaskArtifactUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
artifact=artifact,
),
),
)

# For final updates, publish status last (after messages and artifacts)
if final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)
await self.broker.event_bus.close(task_id)

@abstractmethod
async def run_task(self, params: TaskSendParams) -> None: ...

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies = [
"pydantic>=2.10",
"opentelemetry-api>=1.28.0",
"eval_type_backport>=0.2.2; python_version <= '3.9'",
"sse-starlette>=2.0.0",
]

[project.optional-dependencies]
Expand All @@ -58,8 +59,11 @@ dev = [
"asgi-lifespan",
"coverage",
"httpx",
"httpx-sse",
"inline-snapshot",
"pytest",
"pytest-asyncio",
"pytest-mock",
"ruff",
"pyright",
]
Expand Down
Loading
Loading