-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy patha2a-python_0.3.0.txt
More file actions
6814 lines (6244 loc) · 233 KB
/
a2a-python_0.3.0.txt
File metadata and controls
6814 lines (6244 loc) · 233 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
This file is a merged representation of a subset of the codebase, containing specifically included files, combined into a single document by Repomix.
The content has been processed where content has been compressed (code blocks are separated by ⋮---- delimiter), security check has been disabled.
================================================================
File Summary
================================================================
Purpose:
--------
This file contains a packed representation of a subset of the repository's contents that is considered the most important context.
It is designed to be easily consumable by AI systems for analysis, code review,
or other automated processes.
File Format:
------------
The content is organized as follows:
1. This summary section
2. Repository information
3. Directory structure
4. Repository files (if enabled)
5. Multiple file entries, each consisting of:
a. A separator line (================)
b. The file path (File: path/to/file)
c. Another separator line
d. The full contents of the file
e. A blank line
Usage Guidelines:
-----------------
- This file should be treated as read-only. Any changes should be made to the
original repository files, not this packed version.
- When processing this file, use the file path to distinguish
between different files in the repository.
- Be aware that this file may contain sensitive information. Handle it with
the same level of security as you would the original repository.
Notes:
------
- Some files may have been excluded based on .gitignore rules and Repomix's configuration
- Binary files are not included in this packed representation. Please refer to the Repository Structure section for a complete list of file paths, including binary files
- Only files matching these patterns are included: src/a2a/**/*.*
- Files matching patterns in .gitignore are excluded
- Files matching default ignore patterns are excluded
- Content has been compressed - code blocks are separated by ⋮---- delimiter
- Security check has been disabled - content may contain sensitive information
- Files are sorted by Git change count (files with more changes are at the bottom)
================================================================
Directory Structure
================================================================
src/
a2a/
auth/
user.py
client/
auth/
__init__.py
credentials.py
interceptor.py
transports/
__init__.py
base.py
grpc.py
jsonrpc.py
rest.py
__init__.py
base_client.py
card_resolver.py
client_factory.py
client_task_manager.py
client.py
errors.py
helpers.py
legacy_grpc.py
legacy.py
middleware.py
optionals.py
extensions/
common.py
grpc/
a2a_pb2_grpc.py
a2a_pb2.py
a2a_pb2.pyi
server/
agent_execution/
__init__.py
agent_executor.py
context.py
request_context_builder.py
simple_request_context_builder.py
apps/
jsonrpc/
__init__.py
fastapi_app.py
jsonrpc_app.py
starlette_app.py
rest/
__init__.py
fastapi_app.py
rest_adapter.py
__init__.py
events/
__init__.py
event_consumer.py
event_queue.py
in_memory_queue_manager.py
queue_manager.py
request_handlers/
__init__.py
default_request_handler.py
grpc_handler.py
jsonrpc_handler.py
request_handler.py
response_helpers.py
rest_handler.py
tasks/
__init__.py
base_push_notification_sender.py
database_push_notification_config_store.py
database_task_store.py
inmemory_push_notification_config_store.py
inmemory_task_store.py
push_notification_config_store.py
push_notification_sender.py
result_aggregator.py
task_manager.py
task_store.py
task_updater.py
__init__.py
context.py
models.py
utils/
__init__.py
artifact.py
constants.py
error_handlers.py
errors.py
helpers.py
message.py
proto_utils.py
task.py
telemetry.py
__init__.py
_base.py
types.py
================================================================
Files
================================================================
================
File: src/a2a/auth/user.py
================
"""Authenticated user information."""
⋮----
class User(ABC)
⋮----
"""A representation of an authenticated user."""
⋮----
@property
@abstractmethod
def is_authenticated(self) -> bool
⋮----
"""Returns whether the current user is authenticated."""
⋮----
@property
@abstractmethod
def user_name(self) -> str
⋮----
"""Returns the user name of the current user."""
⋮----
class UnauthenticatedUser(User)
⋮----
"""A representation that no user has been authenticated in the request."""
⋮----
@property
def is_authenticated(self) -> bool
⋮----
@property
def user_name(self) -> str
================
File: src/a2a/client/auth/__init__.py
================
"""Client-side authentication components for the A2A Python SDK."""
⋮----
__all__ = [
================
File: src/a2a/client/auth/credentials.py
================
class CredentialService(ABC)
⋮----
"""An abstract service for retrieving credentials."""
⋮----
"""
Retrieves a credential (e.g., token) for a security scheme.
"""
⋮----
class InMemoryContextCredentialStore(CredentialService)
⋮----
"""A simple in-memory store for session-keyed credentials.
This class uses the 'sessionId' from the ClientCallContext state to
store and retrieve credentials...
"""
⋮----
def __init__(self) -> None
⋮----
"""Retrieves credentials from the in-memory store.
Args:
security_scheme_name: The name of the security scheme.
context: The client call context.
Returns:
The credential string, or None if not found.
"""
⋮----
session_id = context.state['sessionId']
⋮----
"""Method to populate the store."""
================
File: src/a2a/client/auth/interceptor.py
================
import logging # noqa: I001
⋮----
logger = logging.getLogger(__name__)
⋮----
class AuthInterceptor(ClientCallInterceptor)
⋮----
"""An interceptor that automatically adds authentication details to requests.
Based on the agent's security schemes.
"""
⋮----
def __init__(self, credential_service: CredentialService)
⋮----
"""Applies authentication headers to the request if credentials are available."""
⋮----
credential = await self._credential_service.get_credentials(
⋮----
scheme_def_union = agent_card.security_schemes.get(
⋮----
scheme_def = scheme_def_union.root
⋮----
headers = http_kwargs.get('headers', {})
⋮----
# Case 1a: HTTP Bearer scheme with an if guard
⋮----
# Case 1b: OAuth2 and OIDC schemes, which are implicitly Bearer
⋮----
# Case 2: API Key in Header
⋮----
# Note: Other cases like API keys in query/cookie are not handled and will be skipped.
================
File: src/a2a/client/transports/__init__.py
================
"""A2A Client Transports."""
⋮----
GrpcTransport = None # type: ignore
⋮----
__all__ = [
================
File: src/a2a/client/transports/base.py
================
class ClientTransport(ABC)
⋮----
"""Abstract base class for a client transport."""
⋮----
"""Sends a non-streaming message request to the agent."""
⋮----
"""Sends a streaming message request to the agent and yields responses as they arrive."""
⋮----
"""Retrieves the current state and history of a specific task."""
⋮----
"""Requests the agent to cancel a specific task."""
⋮----
"""Sets or updates the push notification configuration for a specific task."""
⋮----
"""Retrieves the push notification configuration for a specific task."""
⋮----
"""Reconnects to get task updates."""
⋮----
"""Retrieves the AgentCard."""
⋮----
@abstractmethod
async def close(self) -> None
⋮----
"""Closes the transport."""
================
File: src/a2a/client/transports/grpc.py
================
logger = logging.getLogger(__name__)
⋮----
@trace_class(kind=SpanKind.CLIENT)
class GrpcTransport(ClientTransport)
⋮----
"""A gRPC transport for the A2A client."""
⋮----
"""Initializes the GrpcTransport."""
⋮----
"""Creates a gRPC transport for the A2A client."""
⋮----
"""Sends a non-streaming message request to the agent."""
response = await self.stub.SendMessage(
⋮----
"""Sends a streaming message request to the agent and yields responses as they arrive."""
stream = self.stub.SendStreamingMessage(
⋮----
response = await stream.read()
if response == grpc.aio.EOF: # pyright: ignore[reportAttributeAccessIssue]
⋮----
"""Reconnects to get task updates."""
stream = self.stub.TaskSubscription(
⋮----
"""Retrieves the current state and history of a specific task."""
task = await self.stub.GetTask(
⋮----
"""Requests the agent to cancel a specific task."""
task = await self.stub.CancelTask(
⋮----
"""Sets or updates the push notification configuration for a specific task."""
config = await self.stub.CreateTaskPushNotificationConfig(
⋮----
"""Retrieves the push notification configuration for a specific task."""
config = await self.stub.GetTaskPushNotificationConfig(
⋮----
"""Retrieves the agent's card."""
card = self.agent_card
⋮----
card_pb = await self.stub.GetAgentCard(
card = proto_utils.FromProto.agent_card(card_pb)
⋮----
async def close(self) -> None
⋮----
"""Closes the gRPC channel."""
================
File: src/a2a/client/transports/jsonrpc.py
================
logger = logging.getLogger(__name__)
⋮----
@trace_class(kind=SpanKind.CLIENT)
class JsonRpcTransport(ClientTransport)
⋮----
"""A JSON-RPC transport for the A2A client."""
⋮----
"""Initializes the JsonRpcTransport."""
⋮----
final_http_kwargs = http_kwargs or {}
final_request_payload = request_payload
⋮----
"""Sends a non-streaming message request to the agent."""
rpc_request = SendMessageRequest(params=request, id=str(uuid4()))
⋮----
response_data = await self._send_request(payload, modified_kwargs)
response = SendMessageResponse.model_validate(response_data)
⋮----
"""Sends a streaming message request to the agent and yields responses as they arrive."""
rpc_request = SendStreamingMessageRequest(
⋮----
response = SendStreamingMessageResponse.model_validate(
⋮----
response = await self.httpx_client.post(
⋮----
"""Retrieves the current state and history of a specific task."""
rpc_request = GetTaskRequest(params=request, id=str(uuid4()))
⋮----
response = GetTaskResponse.model_validate(response_data)
⋮----
"""Requests the agent to cancel a specific task."""
rpc_request = CancelTaskRequest(params=request, id=str(uuid4()))
⋮----
response = CancelTaskResponse.model_validate(response_data)
⋮----
"""Sets or updates the push notification configuration for a specific task."""
rpc_request = SetTaskPushNotificationConfigRequest(
⋮----
response = SetTaskPushNotificationConfigResponse.model_validate(
⋮----
"""Retrieves the push notification configuration for a specific task."""
rpc_request = GetTaskPushNotificationConfigRequest(
⋮----
response = GetTaskPushNotificationConfigResponse.model_validate(
⋮----
"""Reconnects to get task updates."""
rpc_request = TaskResubscriptionRequest(params=request, id=str(uuid4()))
⋮----
response = SendStreamingMessageResponse.model_validate_json(
⋮----
"""Retrieves the agent's card."""
card = self.agent_card
⋮----
resolver = A2ACardResolver(self.httpx_client, self.url)
card = await resolver.get_agent_card(
⋮----
request = GetAuthenticatedExtendedCardRequest(id=str(uuid4()))
⋮----
response_data = await self._send_request(
response = GetAuthenticatedExtendedCardResponse.model_validate(
⋮----
async def close(self) -> None
⋮----
"""Closes the httpx client."""
================
File: src/a2a/client/transports/rest.py
================
logger = logging.getLogger(__name__)
⋮----
@trace_class(kind=SpanKind.CLIENT)
class RestTransport(ClientTransport)
⋮----
"""A REST transport for the A2A client."""
⋮----
"""Initializes the RestTransport."""
⋮----
final_http_kwargs = http_kwargs or {}
final_request_payload = request_payload
# TODO: Implement interceptors for other transports
⋮----
pb = a2a_pb2.SendMessageRequest(
payload = MessageToDict(pb)
⋮----
"""Sends a non-streaming message request to the agent."""
⋮----
response_data = await self._send_post_request(
response_pb = a2a_pb2.SendMessageResponse()
⋮----
"""Sends a streaming message request to the agent and yields responses as they arrive."""
⋮----
event = a2a_pb2.StreamResponse()
⋮----
async def _send_request(self, request: httpx.Request) -> dict[str, Any]
⋮----
response = await self.httpx_client.send(request)
⋮----
"""Retrieves the current state and history of a specific task."""
⋮----
response_data = await self._send_get_request(
task = a2a_pb2.Task()
⋮----
"""Requests the agent to cancel a specific task."""
pb = a2a_pb2.CancelTaskRequest(name=f'tasks/{request.id}')
⋮----
"""Sets or updates the push notification configuration for a specific task."""
pb = a2a_pb2.CreateTaskPushNotificationConfigRequest(
⋮----
config = a2a_pb2.TaskPushNotificationConfig()
⋮----
"""Retrieves the push notification configuration for a specific task."""
pb = a2a_pb2.GetTaskPushNotificationConfigRequest(
⋮----
"""Reconnects to get task updates."""
http_kwargs = self._get_http_args(context) or {}
⋮----
"""Retrieves the agent's card."""
card = self.agent_card
⋮----
resolver = A2ACardResolver(self.httpx_client, self.url)
card = await resolver.get_agent_card(
⋮----
card = AgentCard.model_validate(response_data)
⋮----
async def close(self) -> None
⋮----
"""Closes the httpx client."""
================
File: src/a2a/client/__init__.py
================
"""Client-side components for interacting with an A2A agent."""
⋮----
logger = logging.getLogger(__name__)
⋮----
from a2a.client.legacy_grpc import A2AGrpcClient # type: ignore
⋮----
_original_error = e
⋮----
class A2AGrpcClient: # type: ignore
⋮----
"""Placeholder for A2AGrpcClient when dependencies are not installed."""
⋮----
def __init__(self, *args, **kwargs)
⋮----
__all__ = [
================
File: src/a2a/client/base_client.py
================
class BaseClient(Client)
⋮----
"""Base implementation of the A2A client, containing transport-independent logic."""
⋮----
"""Sends a message to the agent.
This method handles both streaming and non-streaming (polling) interactions
based on the client configuration and agent capabilities. It will yield
events as they are received from the agent.
Args:
request: The message to send to the agent.
context: The client call context.
Yields:
An async iterator of `ClientEvent` or a final `Message` response.
"""
config = MessageSendConfiguration(
params = MessageSendParams(message=request, configuration=config)
⋮----
response = await self._transport.send_message(
result = (
⋮----
tracker = ClientTaskManager()
stream = self._transport.send_message_streaming(params, context=context)
⋮----
first_event = await anext(stream)
# The response from a server may be either exactly one Message or a
# series of Task updates. Separate out the first message for special
# case handling, which allows us to simplify further stream processing.
⋮----
task = tracker.get_task_or_raise()
update = None if isinstance(event, Task) else event
client_event = (task, update)
⋮----
"""Retrieves the current state and history of a specific task.
Args:
request: The `TaskQueryParams` object specifying the task ID.
context: The client call context.
Returns:
A `Task` object representing the current state of the task.
"""
⋮----
"""Requests the agent to cancel a specific task.
Args:
request: The `TaskIdParams` object specifying the task ID.
context: The client call context.
Returns:
A `Task` object containing the updated task status.
"""
⋮----
"""Sets or updates the push notification configuration for a specific task.
Args:
request: The `TaskPushNotificationConfig` object with the new configuration.
context: The client call context.
Returns:
The created or updated `TaskPushNotificationConfig` object.
"""
⋮----
"""Retrieves the push notification configuration for a specific task.
Args:
request: The `GetTaskPushNotificationConfigParams` object specifying the task.
context: The client call context.
Returns:
A `TaskPushNotificationConfig` object containing the configuration.
"""
⋮----
"""Resubscribes to a task's event stream.
This is only available if both the client and server support streaming.
Args:
request: Parameters to identify the task to resubscribe to.
context: The client call context.
Yields:
An async iterator of `ClientEvent` objects.
Raises:
NotImplementedError: If streaming is not supported by the client or server.
"""
⋮----
# Note: resubscribe can only be called on an existing task. As such,
# we should never see Message updates, despite the typing of the service
# definition indicating it may be possible.
⋮----
"""Retrieves the agent's card.
This will fetch the authenticated card if necessary and update the
client's internal state with the new card.
Args:
context: The client call context.
Returns:
The `AgentCard` for the agent.
"""
card = await self._transport.get_card(context=context)
⋮----
async def close(self) -> None
⋮----
"""Closes the underlying transport."""
================
File: src/a2a/client/card_resolver.py
================
logger = logging.getLogger(__name__)
⋮----
class A2ACardResolver
⋮----
"""Agent Card resolver."""
⋮----
"""Initializes the A2ACardResolver.
Args:
httpx_client: An async HTTP client instance (e.g., httpx.AsyncClient).
base_url: The base URL of the agent's host.
agent_card_path: The path to the agent card endpoint, relative to the base URL.
"""
⋮----
"""Fetches an agent card from a specified path relative to the base_url.
If relative_card_path is None, it defaults to the resolver's configured
agent_card_path (for the public agent card).
Args:
relative_card_path: Optional path to the agent card endpoint,
relative to the base URL. If None, uses the default public
agent card path.
http_kwargs: Optional dictionary of keyword arguments to pass to the
underlying httpx.get request.
Returns:
An `AgentCard` object representing the agent's capabilities.
Raises:
A2AClientHTTPError: If an HTTP error occurs during the request.
A2AClientJSONError: If the response body cannot be decoded as JSON
or validated against the AgentCard schema.
"""
⋮----
# Use the default public agent card path configured during initialization
path_segment = self.agent_card_path
⋮----
path_segment = relative_card_path.lstrip('/')
⋮----
target_url = f'{self.base_url}/{path_segment}'
⋮----
response = await self.httpx_client.get(
⋮----
agent_card_data = response.json()
⋮----
agent_card = AgentCard.model_validate(agent_card_data)
⋮----
except ValidationError as e: # Pydantic validation error
================
File: src/a2a/client/client_factory.py
================
GrpcTransport = None # type: ignore # pyright: ignore
⋮----
logger = logging.getLogger(__name__)
⋮----
TransportProducer = Callable[
⋮----
class ClientFactory
⋮----
"""ClientFactory is used to generate the appropriate client for the agent.
The factory is configured with a `ClientConfig` and optionally a list of
`Consumer`s to use for all generated `Client`s. The expected use is:
factory = ClientFactory(config, consumers)
# Optionally register custom client implementations
factory.register('my_customer_transport', NewCustomTransportClient)
# Then with an agent card make a client with additional consumers and
# interceptors
client = factory.create(card, additional_consumers, interceptors)
# Now the client can be used the same regardless of transport and
# aligns client config with server capabilities.
"""
⋮----
consumers = []
⋮----
# Empty support list implies JSON-RPC only.
⋮----
def register(self, label: str, generator: TransportProducer) -> None
⋮----
"""Register a new transport producer for a given transport label."""
⋮----
"""Create a new `Client` for the provided `AgentCard`.
Args:
card: An `AgentCard` defining the characteristics of the agent.
consumers: A list of `Consumer` methods to pass responses to.
interceptors: A list of interceptors to use for each request. These
are used for things like attaching credentials or http headers
to all outbound requests.
Returns:
A `Client` object.
Raises:
If there is no valid matching of the client configuration with the
server configuration, a `ValueError` is raised.
"""
server_preferred = card.preferred_transport or TransportProtocol.jsonrpc
server_set = {server_preferred: card.url}
⋮----
client_set = self._config.supported_transports or [
transport_protocol = None
transport_url = None
⋮----
transport_protocol = x
transport_url = server_set[x]
⋮----
transport_url = url
⋮----
all_consumers = self._consumers.copy()
⋮----
transport = self._registry[transport_protocol](
⋮----
"""Generates a minimal card to simplify bootstrapping client creation.
This minimal card is not viable itself to interact with the remote agent.
Instead this is a short hand way to take a known url and transport option
and interact with the get card endpoint of the agent server to get the
correct agent card. This pattern is necessary for gRPC based card access
as typically these servers won't expose a well known path card.
"""
⋮----
transports = []
================
File: src/a2a/client/client_task_manager.py
================
logger = logging.getLogger(__name__)
⋮----
class ClientTaskManager
⋮----
"""Helps manage a task's lifecycle during execution of a request.
Responsible for retrieving, saving, and updating the `Task` object based on
events received from the agent.
"""
⋮----
"""Initializes the `ClientTaskManager`."""
⋮----
def get_task(self) -> Task | None
⋮----
"""Retrieves the current task object, either from memory.
If `task_id` is set, it returns `_current_task` otherwise None.
Returns:
The `Task` object if found, otherwise `None`.
"""
⋮----
def get_task_or_raise(self) -> Task
⋮----
"""Retrieves the current task object.
Returns:
The `Task` object.
Raises:
A2AClientInvalidStateError: If there is no current known Task.
"""
⋮----
# Note: The source of this error is either from bad client usage
# or from the server sending invalid updates. It indicates that this
# task manager has not consumed any information about a task, yet
# the caller is attempting to retrieve the current state of the task
# it expects to be present.
⋮----
"""Processes a task-related event (Task, Status, Artifact) and saves the updated task state.
Ensures task and context IDs match or are set from the event.
Args:
event: The task-related event (`Task`, `TaskStatusUpdateEvent`, or `TaskArtifactUpdateEvent`).
Returns:
The updated `Task` object after processing the event.
Raises:
ClientError: If the task ID in the event conflicts with the TaskManager's ID
when the TaskManager's ID is already set.
"""
⋮----
task_id_from_event = (
⋮----
task = self._current_task
⋮----
task = Task(
⋮----
async def process(self, event: Event) -> Event
⋮----
"""Processes an event, updates the task state if applicable, stores it, and returns the event.
If the event is task-related (`Task`, `TaskStatusUpdateEvent`, `TaskArtifactUpdateEvent`),
the internal task state is updated and persisted.
Args:
event: The event object received from the agent.
Returns:
The same event object that was processed.
"""
⋮----
async def _save_task(self, task: Task) -> None
⋮----
"""Saves the given task to the `_current_task` and updated `_task_id` and `_context_id`.
Args:
task: The `Task` object to save.
"""
⋮----
def update_with_message(self, message: Message, task: Task) -> Task
⋮----
"""Updates a task object adding a new message to its history.
If the task has a message in its current status, that message is moved
to the history first.
Args:
message: The new `Message` to add to the history.
task: The `Task` object to update.
Returns:
The updated `Task` object (updated in-place).
"""
================
File: src/a2a/client/client.py
================
logger = logging.getLogger(__name__)
⋮----
@dataclasses.dataclass
class ClientConfig
⋮----
"""Configuration class for the A2AClient Factory."""
⋮----
streaming: bool = True
"""Whether client supports streaming"""
⋮----
polling: bool = False
"""Whether client prefers to poll for updates from message:send. It is
the callers job to check if the response is completed and if not run a
polling loop."""
⋮----
httpx_client: httpx.AsyncClient | None = None
"""Http client to use to connect to agent."""
⋮----
grpc_channel_factory: Callable[[str], Channel] | None = None
"""Generates a grpc connection channel for a given url."""
⋮----
supported_transports: list[TransportProtocol | str] = dataclasses.field(
"""Ordered list of transports for connecting to agent
(in order of preference). Empty implies JSONRPC only.
This is a string type to allow custom
transports to exist in closed ecosystems.
"""
⋮----
use_client_preference: bool = False
"""Whether to use client transport preferences over server preferences.
Recommended to use server preferences in most situations."""
⋮----
accepted_output_modes: list[str] = dataclasses.field(default_factory=list)
"""The set of accepted output modes for the client."""
⋮----
push_notification_configs: list[PushNotificationConfig] = dataclasses.field(
"""Push notification callbacks to use for every request."""
⋮----
UpdateEvent = TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None
# Alias for emitted events from client
ClientEvent = tuple[Task, UpdateEvent]
# Alias for an event consuming callback. It takes either a (task, update) pair
# or a message as well as the agent card for the agent this came from.
Consumer = Callable[
⋮----
class Client(ABC)
⋮----
"""Abstract base class defining the interface for an A2A client.
This class provides a standard set of methods for interacting with an A2A
agent, regardless of the underlying transport protocol (e.g., gRPC, JSON-RPC).
It supports sending messages, managing tasks, and handling event streams.
"""
⋮----
"""Initializes the client with consumers and middleware.
Args:
consumers: A list of callables to process events from the agent.
middleware: A list of interceptors to process requests and responses.
"""
⋮----
middleware = []
⋮----
consumers = []
⋮----
"""Sends a message to the server.
This will automatically use the streaming or non-streaming approach
as supported by the server and the client config. Client will
aggregate update events and return an iterator of (`Task`,`Update`)
pairs, or a `Message`. Client will also send these values to any
configured `Consumer`s in the client.
"""
⋮----
"""Retrieves the current state and history of a specific task."""
⋮----
"""Requests the agent to cancel a specific task."""
⋮----
"""Sets or updates the push notification configuration for a specific task."""
⋮----
"""Retrieves the push notification configuration for a specific task."""
⋮----
"""Resubscribes to a task's event stream."""
⋮----
"""Retrieves the agent's card."""
⋮----
async def add_event_consumer(self, consumer: Consumer) -> None
⋮----
"""Attaches additional consumers to the `Client`."""
⋮----
"""Attaches additional middleware to the `Client`."""
⋮----
"""Processes the event via all the registered `Consumer`s."""
================
File: src/a2a/client/errors.py
================
"""Custom exceptions for the A2A client."""
⋮----
class A2AClientError(Exception)
⋮----
"""Base exception for A2A Client errors."""
⋮----
class A2AClientHTTPError(A2AClientError)
⋮----
"""Client exception for HTTP errors received from the server."""
⋮----
def __init__(self, status_code: int, message: str)
⋮----
"""Initializes the A2AClientHTTPError.
Args:
status_code: The HTTP status code of the response.
message: A descriptive error message.
"""
⋮----
class A2AClientJSONError(A2AClientError)
⋮----
"""Client exception for JSON errors during response parsing or validation."""
⋮----
def __init__(self, message: str)
⋮----
"""Initializes the A2AClientJSONError.
Args:
message: A descriptive error message.
"""
⋮----
class A2AClientTimeoutError(A2AClientError)
⋮----
"""Client exception for timeout errors during a request."""
⋮----
"""Initializes the A2AClientTimeoutError.
Args:
message: A descriptive error message.
"""
⋮----
class A2AClientInvalidArgsError(A2AClientError)
⋮----
"""Client exception for invalid arguments passed to a method."""
⋮----
"""Initializes the A2AClientInvalidArgsError.
Args:
message: A descriptive error message.
"""
⋮----
class A2AClientInvalidStateError(A2AClientError)
⋮----
"""Client exception for an invalid client state."""
⋮----
"""Initializes the A2AClientInvalidStateError.
Args:
message: A descriptive error message.
"""
⋮----
class A2AClientJSONRPCError(A2AClientError)