Skip to content

Python SDK Reference

This page contains SDK documentation for the a2a-sdk Python package.

pip install a2a-sdk

The A2A Python SDK.

auth

user

Authenticated user information.

UnauthenticatedUser

Bases: User

A representation that no user has been authenticated in the request.

is_authenticated property
user_name property

User

Bases: ABC

A representation of an authenticated user.

is_authenticated abstractmethod property

Returns whether the current user is authenticated.

user_name abstractmethod property

Returns the user name of the current user.

client

Client-side components for interacting with an A2A agent.

A2ACardResolver

Agent Card resolver.

agent_card_path = agent_card_path.lstrip('/') instance-attribute

base_url = base_url.rstrip('/') instance-attribute

httpx_client = httpx_client instance-attribute

get_agent_card(relative_card_path=None, http_kwargs=None) async

Fetches an agent card from a specified path relative to the base_url.

If relative_card_path is None, it defaults to the resolver's configured agent_card_path (for the public agent card).

Parameters:

Name Type Description Default
relative_card_path str | None

Optional path to the agent card endpoint, relative to the base URL. If None, uses the default public agent card path.

None
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.get request.

None

Returns:

Type Description
AgentCard

An AgentCard object representing the agent's capabilities.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated against the AgentCard schema.

A2AClient

A2A Client for interacting with an A2A agent.

httpx_client = httpx_client instance-attribute

url = agent_card.url instance-attribute

cancel_task(request, *, http_kwargs=None) async

Requests the agent to cancel a specific task.

Parameters:

Name Type Description Default
request CancelTaskRequest

The CancelTaskRequest object specifying the task ID.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
CancelTaskResponse

A CancelTaskResponse object containing the updated Task with canceled status or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

get_client_from_agent_card_url(httpx_client, base_url, agent_card_path='/.well-known/agent.json', http_kwargs=None) async staticmethod

Fetches the public AgentCard and initializes an A2A client.

This method will always fetch the public agent card. If an authenticated or extended agent card is required, the A2ACardResolver should be used directly to fetch the specific card, and then the A2AClient should be instantiated with it.

Parameters:

Name Type Description Default
httpx_client AsyncClient

An async HTTP client instance (e.g., httpx.AsyncClient).

required
base_url str

The base URL of the agent's host.

required
agent_card_path str

The path to the agent card endpoint, relative to the base URL.

'/.well-known/agent.json'
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.get request when fetching the agent card.

None

Returns: An initialized A2AClient instance.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs fetching the agent card.

A2AClientJSONError

If the agent card response is invalid.

get_task(request, *, http_kwargs=None) async

Retrieves the current state and history of a specific task.

Parameters:

Name Type Description Default
request GetTaskRequest

The GetTaskRequest object specifying the task ID and history length.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
GetTaskResponse

A GetTaskResponse object containing the Task or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

get_task_callback(request, *, http_kwargs=None) async

Retrieves the push notification configuration for a specific task.

Parameters:

Name Type Description Default
request GetTaskPushNotificationConfigRequest

The GetTaskPushNotificationConfigRequest object specifying the task ID.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
GetTaskPushNotificationConfigResponse

A GetTaskPushNotificationConfigResponse object containing the configuration or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

send_message(request, *, http_kwargs=None) async

Sends a non-streaming message request to the agent.

Parameters:

Name Type Description Default
request SendMessageRequest

The SendMessageRequest object containing the message and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
SendMessageResponse

A SendMessageResponse object containing the agent's response (Task or Message) or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

send_message_streaming(request, *, http_kwargs=None) async

Sends a streaming message request to the agent and yields responses as they arrive.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:

Name Type Description Default
request SendStreamingMessageRequest

The SendStreamingMessageRequest object containing the message and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

None

Yields:

Type Description
AsyncGenerator[SendStreamingMessageResponse]

SendStreamingMessageResponse objects as they are received in the SSE stream.

AsyncGenerator[SendStreamingMessageResponse]

These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:

Type Description
A2AClientHTTPError

If an HTTP or SSE protocol error occurs during the request.

A2AClientJSONError

If an SSE event data cannot be decoded as JSON or validated.

set_task_callback(request, *, http_kwargs=None) async

Sets or updates the push notification configuration for a specific task.

Parameters:

Name Type Description Default
request SetTaskPushNotificationConfigRequest

The SetTaskPushNotificationConfigRequest object specifying the task ID and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
SetTaskPushNotificationConfigResponse

A SetTaskPushNotificationConfigResponse object containing the confirmation or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

A2AClientError

Bases: Exception

Base exception for A2A Client errors.

A2AClientHTTPError

Bases: A2AClientError

Client exception for HTTP errors received from the server.

message = message instance-attribute

status_code = status_code instance-attribute

A2AClientJSONError

Bases: A2AClientError

Client exception for JSON errors during response parsing or validation.

message = message instance-attribute

create_text_message_object(role=Role.user, content='')

Create a Message object containing a single TextPart.

Parameters:

Name Type Description Default
role Role

The role of the message sender (user or agent). Defaults to Role.user.

user
content str

The text content of the message. Defaults to an empty string.

''

Returns:

Type Description
Message

A Message object with a new UUID messageId.

client

logger = logging.getLogger(__name__) module-attribute

A2ACardResolver

Agent Card resolver.

agent_card_path = agent_card_path.lstrip('/') instance-attribute
base_url = base_url.rstrip('/') instance-attribute
httpx_client = httpx_client instance-attribute
get_agent_card(relative_card_path=None, http_kwargs=None) async

Fetches an agent card from a specified path relative to the base_url.

If relative_card_path is None, it defaults to the resolver's configured agent_card_path (for the public agent card).

Parameters:

Name Type Description Default
relative_card_path str | None

Optional path to the agent card endpoint, relative to the base URL. If None, uses the default public agent card path.

None
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.get request.

None

Returns:

Type Description
AgentCard

An AgentCard object representing the agent's capabilities.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated against the AgentCard schema.

A2AClient

A2A Client for interacting with an A2A agent.

httpx_client = httpx_client instance-attribute
url = agent_card.url instance-attribute
cancel_task(request, *, http_kwargs=None) async

Requests the agent to cancel a specific task.

Parameters:

Name Type Description Default
request CancelTaskRequest

The CancelTaskRequest object specifying the task ID.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
CancelTaskResponse

A CancelTaskResponse object containing the updated Task with canceled status or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

get_client_from_agent_card_url(httpx_client, base_url, agent_card_path='/.well-known/agent.json', http_kwargs=None) async staticmethod

Fetches the public AgentCard and initializes an A2A client.

This method will always fetch the public agent card. If an authenticated or extended agent card is required, the A2ACardResolver should be used directly to fetch the specific card, and then the A2AClient should be instantiated with it.

Parameters:

Name Type Description Default
httpx_client AsyncClient

An async HTTP client instance (e.g., httpx.AsyncClient).

required
base_url str

The base URL of the agent's host.

required
agent_card_path str

The path to the agent card endpoint, relative to the base URL.

'/.well-known/agent.json'
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.get request when fetching the agent card.

None

Returns: An initialized A2AClient instance.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs fetching the agent card.

A2AClientJSONError

If the agent card response is invalid.

get_task(request, *, http_kwargs=None) async

Retrieves the current state and history of a specific task.

Parameters:

Name Type Description Default
request GetTaskRequest

The GetTaskRequest object specifying the task ID and history length.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
GetTaskResponse

A GetTaskResponse object containing the Task or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

get_task_callback(request, *, http_kwargs=None) async

Retrieves the push notification configuration for a specific task.

Parameters:

Name Type Description Default
request GetTaskPushNotificationConfigRequest

The GetTaskPushNotificationConfigRequest object specifying the task ID.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
GetTaskPushNotificationConfigResponse

A GetTaskPushNotificationConfigResponse object containing the configuration or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

send_message(request, *, http_kwargs=None) async

Sends a non-streaming message request to the agent.

Parameters:

Name Type Description Default
request SendMessageRequest

The SendMessageRequest object containing the message and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
SendMessageResponse

A SendMessageResponse object containing the agent's response (Task or Message) or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

send_message_streaming(request, *, http_kwargs=None) async

Sends a streaming message request to the agent and yields responses as they arrive.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:

Name Type Description Default
request SendStreamingMessageRequest

The SendStreamingMessageRequest object containing the message and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

None

Yields:

Type Description
AsyncGenerator[SendStreamingMessageResponse]

SendStreamingMessageResponse objects as they are received in the SSE stream.

AsyncGenerator[SendStreamingMessageResponse]

These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:

Type Description
A2AClientHTTPError

If an HTTP or SSE protocol error occurs during the request.

A2AClientJSONError

If an SSE event data cannot be decoded as JSON or validated.

set_task_callback(request, *, http_kwargs=None) async

Sets or updates the push notification configuration for a specific task.

Parameters:

Name Type Description Default
request SetTaskPushNotificationConfigRequest

The SetTaskPushNotificationConfigRequest object specifying the task ID and configuration.

required
http_kwargs dict[str, Any] | None

Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

None

Returns:

Type Description
SetTaskPushNotificationConfigResponse

A SetTaskPushNotificationConfigResponse object containing the confirmation or an error.

Raises:

Type Description
A2AClientHTTPError

If an HTTP error occurs during the request.

A2AClientJSONError

If the response body cannot be decoded as JSON or validated.

errors

Custom exceptions for the A2A client.

A2AClientError

Bases: Exception

Base exception for A2A Client errors.

A2AClientHTTPError

Bases: A2AClientError

Client exception for HTTP errors received from the server.

message = message instance-attribute
status_code = status_code instance-attribute

A2AClientJSONError

Bases: A2AClientError

Client exception for JSON errors during response parsing or validation.

message = message instance-attribute

helpers

Helper functions for the A2A client.

create_text_message_object(role=Role.user, content='')

Create a Message object containing a single TextPart.

Parameters:

Name Type Description Default
role Role

The role of the message sender (user or agent). Defaults to Role.user.

user
content str

The text content of the message. Defaults to an empty string.

''

Returns:

Type Description
Message

A Message object with a new UUID messageId.

server

Server-side components for implementing an A2A agent.

agent_execution

Components for executing agent logic within the A2A server.

AgentExecutor

Bases: ABC

Agent Executor interface.

Implementations of this interface contain the core logic of the agent, executing tasks based on requests and publishing updates to an event queue.

cancel(context, event_queue) abstractmethod async

Request the agent to cancel an ongoing task.

The agent should attempt to stop the task identified by the task_id in the context and publish a TaskStatusUpdateEvent with state TaskState.canceled to the event_queue.

Parameters:

Name Type Description Default
context RequestContext

The request context containing the task ID to cancel.

required
event_queue EventQueue

The queue to publish the cancellation status update to.

required
execute(context, event_queue) abstractmethod async

Execute the agent's logic for a given request context.

The agent should read necessary information from the context and publish Task or Message events, or TaskStatusUpdateEvent / TaskArtifactUpdateEvent to the event_queue. This method should return once the agent's execution for this request is complete or yields control (e.g., enters an input-required state).

Parameters:

Name Type Description Default
context RequestContext

The request context containing the message, task ID, etc.

required
event_queue EventQueue

The queue to publish events to.

required

RequestContext

Request Context.

Holds information about the current request being processed by the server, including the incoming message, task and context identifiers, and related tasks.

call_context property

The server call context associated with this request.

configuration property

The MessageSendConfiguration from the request, if available.

context_id property

The ID of the conversation context associated with this task.

current_task property writable

The current Task object being processed.

message property

The incoming Message object from the request, if available.

related_tasks property

A list of tasks related to the current request.

task_id property

The ID of the task associated with this context.

Attaches a related task to the context.

This is useful for scenarios like tool execution where a new task might be spawned.

Parameters:

Name Type Description Default
task Task

The Task object to attach.

required
get_user_input(delimiter='\n')

Extracts text content from the user's message parts.

Parameters:

Name Type Description Default
delimiter

The string to use when joining multiple text parts.

'\n'

Returns:

Type Description
str

A single string containing all text content from the user message,

str

joined by the specified delimiter. Returns an empty string if no

str

user message is present or if it contains no text parts.

RequestContextBuilder

Bases: ABC

Builds request context to be supplied to agent executor

build(params=None, task_id=None, context_id=None, task=None, context=None) abstractmethod async

SimpleRequestContextBuilder

Bases: RequestContextBuilder

Builds request context and populates referred tasks

build(params=None, task_id=None, context_id=None, task=None, context=None) async

agent_executor

AgentExecutor

Bases: ABC

Agent Executor interface.

Implementations of this interface contain the core logic of the agent, executing tasks based on requests and publishing updates to an event queue.

cancel(context, event_queue) abstractmethod async

Request the agent to cancel an ongoing task.

The agent should attempt to stop the task identified by the task_id in the context and publish a TaskStatusUpdateEvent with state TaskState.canceled to the event_queue.

Parameters:

Name Type Description Default
context RequestContext

The request context containing the task ID to cancel.

required
event_queue EventQueue

The queue to publish the cancellation status update to.

required
execute(context, event_queue) abstractmethod async

Execute the agent's logic for a given request context.

The agent should read necessary information from the context and publish Task or Message events, or TaskStatusUpdateEvent / TaskArtifactUpdateEvent to the event_queue. This method should return once the agent's execution for this request is complete or yields control (e.g., enters an input-required state).

Parameters:

Name Type Description Default
context RequestContext

The request context containing the message, task ID, etc.

required
event_queue EventQueue

The queue to publish events to.

required

context

RequestContext

Request Context.

Holds information about the current request being processed by the server, including the incoming message, task and context identifiers, and related tasks.

call_context property

The server call context associated with this request.

configuration property

The MessageSendConfiguration from the request, if available.

context_id property

The ID of the conversation context associated with this task.

current_task property writable

The current Task object being processed.

message property

The incoming Message object from the request, if available.

related_tasks property

A list of tasks related to the current request.

task_id property

The ID of the task associated with this context.

Attaches a related task to the context.

This is useful for scenarios like tool execution where a new task might be spawned.

Parameters:

Name Type Description Default
task Task

The Task object to attach.

required
get_user_input(delimiter='\n')

Extracts text content from the user's message parts.

Parameters:

Name Type Description Default
delimiter

The string to use when joining multiple text parts.

'\n'

Returns:

Type Description
str

A single string containing all text content from the user message,

str

joined by the specified delimiter. Returns an empty string if no

str

user message is present or if it contains no text parts.

request_context_builder

RequestContextBuilder

Bases: ABC

Builds request context to be supplied to agent executor

build(params=None, task_id=None, context_id=None, task=None, context=None) abstractmethod async

simple_request_context_builder

SimpleRequestContextBuilder

Bases: RequestContextBuilder

Builds request context and populates referred tasks

build(params=None, task_id=None, context_id=None, task=None, context=None) async

apps

HTTP application components for the A2A server.

A2AStarletteApplication

A Starlette application implementing the A2A protocol server endpoints.

Handles incoming JSON-RPC requests, routes them to the appropriate handler methods, and manages response generation including Server-Sent Events (SSE).

agent_card = agent_card instance-attribute
extended_agent_card = extended_agent_card instance-attribute
handler = JSONRPCHandler(agent_card=agent_card, request_handler=http_handler) instance-attribute
build(agent_card_url='/.well-known/agent.json', extended_agent_card_url='/agent/authenticatedExtendedCard', rpc_url='/', **kwargs)

Builds and returns the Starlette application instance.

Parameters:

Name Type Description Default
agent_card_url str

The URL path for the agent card endpoint.

'/.well-known/agent.json'
rpc_url str

The URL path for the A2A JSON-RPC endpoint (POST requests).

'/'
extended_agent_card_url str

The URL for the authenticated extended agent card endpoint.

'/agent/authenticatedExtendedCard'
**kwargs Any

Additional keyword arguments to pass to the Starlette constructor.

{}

Returns:

Type Description
Starlette

A configured Starlette application instance.

routes(agent_card_url='/.well-known/agent.json', extended_agent_card_url='/agent/authenticatedExtendedCard', rpc_url='/')

Returns the Starlette Routes for handling A2A requests.

Parameters:

Name Type Description Default
agent_card_url str

The URL path for the agent card endpoint.

'/.well-known/agent.json'
rpc_url str

The URL path for the A2A JSON-RPC endpoint (POST requests).

'/'
extended_agent_card_url str

The URL for the authenticated extended agent card endpoint.

'/agent/authenticatedExtendedCard'

Returns:

Type Description
list[Route]

A list of Starlette Route objects.

starlette_app

logger = logging.getLogger(__name__) module-attribute
A2AStarletteApplication

A Starlette application implementing the A2A protocol server endpoints.

Handles incoming JSON-RPC requests, routes them to the appropriate handler methods, and manages response generation including Server-Sent Events (SSE).

agent_card = agent_card instance-attribute
extended_agent_card = extended_agent_card instance-attribute
handler = JSONRPCHandler(agent_card=agent_card, request_handler=http_handler) instance-attribute
build(agent_card_url='/.well-known/agent.json', extended_agent_card_url='/agent/authenticatedExtendedCard', rpc_url='/', **kwargs)

Builds and returns the Starlette application instance.

Parameters:

Name Type Description Default
agent_card_url str

The URL path for the agent card endpoint.

'/.well-known/agent.json'
rpc_url str

The URL path for the A2A JSON-RPC endpoint (POST requests).

'/'
extended_agent_card_url str

The URL for the authenticated extended agent card endpoint.

'/agent/authenticatedExtendedCard'
**kwargs Any

Additional keyword arguments to pass to the Starlette constructor.

{}

Returns:

Type Description
Starlette

A configured Starlette application instance.

routes(agent_card_url='/.well-known/agent.json', extended_agent_card_url='/agent/authenticatedExtendedCard', rpc_url='/')

Returns the Starlette Routes for handling A2A requests.

Parameters:

Name Type Description Default
agent_card_url str

The URL path for the agent card endpoint.

'/.well-known/agent.json'
rpc_url str

The URL path for the A2A JSON-RPC endpoint (POST requests).

'/'
extended_agent_card_url str

The URL for the authenticated extended agent card endpoint.

'/agent/authenticatedExtendedCard'

Returns:

Type Description
list[Route]

A list of Starlette Route objects.

CallContextBuilder

Bases: ABC

A class for building ServerCallContexts using the Starlette Request.

build(request) abstractmethod

Builds a ServerCallContext from a Starlette Request.

DefaultCallContextBuilder

Bases: CallContextBuilder

A default implementation of CallContextBuilder.

build(request)

context

Defines the ServerCallContext class.

State = collections.abc.MutableMapping[str, typing.Any] module-attribute

ServerCallContext

Bases: BaseModel

A context passed when calling a server method.

This class allows storing arbitrary user data in the state attribute.

model_config = ConfigDict(arbitrary_types_allowed=True) class-attribute instance-attribute
state = Field(default={}) class-attribute instance-attribute
user = Field(default=UnauthenticatedUser()) class-attribute instance-attribute

events

Event handling components for the A2A server.

Event = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent | A2AError | JSONRPCError module-attribute

Type alias for events that can be enqueued.

EventConsumer

Consumer to read events from the agent event queue.

queue = queue instance-attribute
agent_task_callback(agent_task)

Callback to handle exceptions from the agent's execution task.

If the agent's asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.

Parameters:

Name Type Description Default
agent_task Task[None]

The asyncio.Task that completed.

required
consume_all() async

Consume all the generated streaming events from the agent.

This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.

Yields:

Type Description
AsyncGenerator[Event]

Events dequeued from the queue.

Raises:

Type Description
BaseException

If an exception was set by the agent_task_callback.

consume_one() async

Consume one event from the agent event queue non-blocking.

Returns:

Type Description
Event

The next event from the queue.

Raises:

Type Description
ServerError

If the queue is empty when attempting to dequeue immediately.

EventQueue

Event queue for A2A responses from agent.

Acts as a buffer between the agent's asynchronous execution and the server's response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.

queue = asyncio.Queue() instance-attribute
close() async

Closes the queue for future push events.

Once closed, dequeue_event will eventually raise asyncio.QueueShutDown when the queue is empty. Also closes all child queues.

dequeue_event(no_wait=False) async

Dequeues an event from the queue.

This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won't block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.

Parameters:

Name Type Description Default
no_wait bool

If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.

False

Returns:

Type Description
Event

The next event from the queue.

Raises:

Type Description
QueueEmpty

If no_wait is True and the queue is empty.

QueueShutDown

If the queue has been closed and is empty.

enqueue_event(event)

Enqueues an event to this queue and all its children.

Parameters:

Name Type Description Default
event Event

The event object to enqueue.

required
is_closed()

Checks if the queue is closed.

tap()

Taps the event queue to create a new child queue that receives all future events.

Returns:

Type Description
EventQueue

A new EventQueue instance that will receive all events enqueued

EventQueue

to this parent queue from this point forward.

task_done()

Signals that a formerly enqueued task is complete.

Used in conjunction with dequeue_event to track processed items.

InMemoryQueueManager

Bases: QueueManager

InMemoryQueueManager is used for a single binary management.

This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.

This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.

add(task_id, queue) async

Adds a new event queue for a task ID.

Raises:

Type Description
TaskQueueExists

If a queue for the given task_id already exists.

close(task_id) async

Closes and removes the event queue for a task ID.

Raises:

Type Description
NoTaskQueue

If no queue exists for the given task_id.

create_or_tap(task_id) async

Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.

Returns:

Type Description
EventQueue

A new or child EventQueue instance for the task_id.

get(task_id) async

Retrieves the event queue for a task ID.

Returns:

Type Description
EventQueue | None

The EventQueue instance for the task_id, or None if not found.

tap(task_id) async

Taps the event queue for a task ID to create a child queue.

Returns:

Type Description
EventQueue | None

A new child EventQueue instance, or None if the task ID is not found.

NoTaskQueue

Bases: Exception

Exception raised when attempting to access or close a queue for a task ID that does not exist.

QueueManager

Bases: ABC

Interface for managing the event queue lifecycles per task.

add(task_id, queue) abstractmethod async

Adds a new event queue associated with a task ID.

close(task_id) abstractmethod async

Closes and removes the event queue for a task ID.

create_or_tap(task_id) abstractmethod async

Creates a queue if one doesn't exist, otherwise taps the existing one.

get(task_id) abstractmethod async

Retrieves the event queue for a task ID.

tap(task_id) abstractmethod async

Creates a child event queue (tap) for an existing task ID.

TaskQueueExists

Bases: Exception

Exception raised when attempting to add a queue for a task ID that already exists.

event_consumer

QueueClosed = asyncio.QueueEmpty module-attribute
logger = logging.getLogger(__name__) module-attribute
EventConsumer

Consumer to read events from the agent event queue.

queue = queue instance-attribute
agent_task_callback(agent_task)

Callback to handle exceptions from the agent's execution task.

If the agent's asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.

Parameters:

Name Type Description Default
agent_task Task[None]

The asyncio.Task that completed.

required
consume_all() async

Consume all the generated streaming events from the agent.

This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.

Yields:

Type Description
AsyncGenerator[Event]

Events dequeued from the queue.

Raises:

Type Description
BaseException

If an exception was set by the agent_task_callback.

consume_one() async

Consume one event from the agent event queue non-blocking.

Returns:

Type Description
Event

The next event from the queue.

Raises:

Type Description
ServerError

If the queue is empty when attempting to dequeue immediately.

event_queue

Event = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent | A2AError | JSONRPCError module-attribute

Type alias for events that can be enqueued.

logger = logging.getLogger(__name__) module-attribute
EventQueue

Event queue for A2A responses from agent.

Acts as a buffer between the agent's asynchronous execution and the server's response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.

queue = asyncio.Queue() instance-attribute
close() async

Closes the queue for future push events.

Once closed, dequeue_event will eventually raise asyncio.QueueShutDown when the queue is empty. Also closes all child queues.

dequeue_event(no_wait=False) async

Dequeues an event from the queue.

This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won't block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.

Parameters:

Name Type Description Default
no_wait bool

If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.

False

Returns:

Type Description
Event

The next event from the queue.

Raises:

Type Description
QueueEmpty

If no_wait is True and the queue is empty.

QueueShutDown

If the queue has been closed and is empty.

enqueue_event(event)

Enqueues an event to this queue and all its children.

Parameters:

Name Type Description Default
event Event

The event object to enqueue.

required
is_closed()

Checks if the queue is closed.

tap()

Taps the event queue to create a new child queue that receives all future events.

Returns:

Type Description
EventQueue

A new EventQueue instance that will receive all events enqueued

EventQueue

to this parent queue from this point forward.

task_done()

Signals that a formerly enqueued task is complete.

Used in conjunction with dequeue_event to track processed items.

in_memory_queue_manager

InMemoryQueueManager

Bases: QueueManager

InMemoryQueueManager is used for a single binary management.

This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.

This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.

add(task_id, queue) async

Adds a new event queue for a task ID.

Raises:

Type Description
TaskQueueExists

If a queue for the given task_id already exists.

close(task_id) async

Closes and removes the event queue for a task ID.

Raises:

Type Description
NoTaskQueue

If no queue exists for the given task_id.

create_or_tap(task_id) async

Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.

Returns:

Type Description
EventQueue

A new or child EventQueue instance for the task_id.

get(task_id) async

Retrieves the event queue for a task ID.

Returns:

Type Description
EventQueue | None

The EventQueue instance for the task_id, or None if not found.

tap(task_id) async

Taps the event queue for a task ID to create a child queue.

Returns:

Type Description
EventQueue | None

A new child EventQueue instance, or None if the task ID is not found.

queue_manager

NoTaskQueue

Bases: Exception

Exception raised when attempting to access or close a queue for a task ID that does not exist.

QueueManager

Bases: ABC

Interface for managing the event queue lifecycles per task.

add(task_id, queue) abstractmethod async

Adds a new event queue associated with a task ID.

close(task_id) abstractmethod async

Closes and removes the event queue for a task ID.

create_or_tap(task_id) abstractmethod async

Creates a queue if one doesn't exist, otherwise taps the existing one.

get(task_id) abstractmethod async

Retrieves the event queue for a task ID.

tap(task_id) abstractmethod async

Creates a child event queue (tap) for an existing task ID.

TaskQueueExists

Bases: Exception

Exception raised when attempting to add a queue for a task ID that already exists.

request_handlers

Request handler components for the A2A server.

DefaultRequestHandler

Bases: RequestHandler

Default request handler for all incoming requests.

This handler provides default implementations for all A2A JSON-RPC methods, coordinating between the AgentExecutor, TaskStore, QueueManager, and optional PushNotifier.

agent_executor = agent_executor instance-attribute
task_store = task_store instance-attribute
on_cancel_task(params, context=None) async

Default handler for 'tasks/cancel'.

Attempts to cancel the task managed by the AgentExecutor.

on_get_task(params, context=None) async

Default handler for 'tasks/get'.

on_get_task_push_notification_config(params, context=None) async

Default handler for 'tasks/pushNotificationConfig/get'.

Requires a PushNotifier to be configured.

on_message_send(params, context=None) async

Default handler for 'message/send' interface (non-streaming).

Starts the agent execution for the message and waits for the final result (Task or Message).

on_message_send_stream(params, context=None) async

Default handler for 'message/stream' (streaming).

Starts the agent execution and yields events as they are produced by the agent.

on_resubscribe_to_task(params, context=None) async

Default handler for 'tasks/resubscribe'.

Allows a client to re-attach to a running streaming task's event stream. Requires the task and its queue to still be active.

on_set_task_push_notification_config(params, context=None) async

Default handler for 'tasks/pushNotificationConfig/set'.

Requires a PushNotifier to be configured.

should_add_push_info(params)

JSONRPCHandler

Maps incoming JSON-RPC requests to the appropriate request handler method and formats responses.

agent_card = agent_card instance-attribute
request_handler = request_handler instance-attribute
get_push_notification(request, context=None) async

Handles the 'tasks/pushNotificationConfig/get' JSON-RPC method.

Parameters:

Name Type Description Default
request GetTaskPushNotificationConfigRequest

The incoming GetTaskPushNotificationConfigRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
GetTaskPushNotificationConfigResponse

A GetTaskPushNotificationConfigResponse object containing the config or a JSON-RPC error.

on_cancel_task(request, context=None) async

Handles the 'tasks/cancel' JSON-RPC method.

Parameters:

Name Type Description Default
request CancelTaskRequest

The incoming CancelTaskRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
CancelTaskResponse

A CancelTaskResponse object containing the updated Task or a JSON-RPC error.

on_get_task(request, context=None) async

Handles the 'tasks/get' JSON-RPC method.

Parameters:

Name Type Description Default
request GetTaskRequest

The incoming GetTaskRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
GetTaskResponse

A GetTaskResponse object containing the Task or a JSON-RPC error.

on_message_send(request, context=None) async

Handles the 'message/send' JSON-RPC method.

Parameters:

Name Type Description Default
request SendMessageRequest

The incoming SendMessageRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
SendMessageResponse

A SendMessageResponse object containing the result (Task or Message)

SendMessageResponse

or a JSON-RPC error response if a ServerError is raised by the handler.

on_message_send_stream(request, context=None) async

Handles the 'message/stream' JSON-RPC method.

Yields response objects as they are produced by the underlying handler's stream.

Parameters:

Name Type Description Default
request SendStreamingMessageRequest

The incoming SendStreamingMessageRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncIterable[SendStreamingMessageResponse]

SendStreamingMessageResponse objects containing streaming events

AsyncIterable[SendStreamingMessageResponse]

(Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent)

AsyncIterable[SendStreamingMessageResponse]

or JSON-RPC error responses if a ServerError is raised.

on_resubscribe_to_task(request, context=None) async

Handles the 'tasks/resubscribe' JSON-RPC method.

Yields response objects as they are produced by the underlying handler's stream.

Parameters:

Name Type Description Default
request TaskResubscriptionRequest

The incoming TaskResubscriptionRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncIterable[SendStreamingMessageResponse]

SendStreamingMessageResponse objects containing streaming events

AsyncIterable[SendStreamingMessageResponse]

or JSON-RPC error responses if a ServerError is raised.

set_push_notification(request, context=None) async

Handles the 'tasks/pushNotificationConfig/set' JSON-RPC method.

Requires the agent to support push notifications.

Parameters:

Name Type Description Default
request SetTaskPushNotificationConfigRequest

The incoming SetTaskPushNotificationConfigRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
SetTaskPushNotificationConfigResponse

A SetTaskPushNotificationConfigResponse object containing the config or a JSON-RPC error.

Raises:

Type Description
ServerError

If push notifications are not supported by the agent (due to the @validate decorator).

RequestHandler

Bases: ABC

A2A request handler interface.

This interface defines the methods that an A2A server implementation must provide to handle incoming JSON-RPC requests.

on_cancel_task(params, context=None) abstractmethod async

Handles the 'tasks/cancel' method.

Requests the agent to cancel an ongoing task.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters specifying the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | None

The Task object with its status updated to canceled, or None if the task was not found.

on_get_task(params, context=None) abstractmethod async

Handles the 'tasks/get' method.

Retrieves the state and history of a specific task.

Parameters:

Name Type Description Default
params TaskQueryParams

Parameters specifying the task ID and optionally history length.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | None

The Task object if found, otherwise None.

on_get_task_push_notification_config(params, context=None) abstractmethod async

Handles the 'tasks/pushNotificationConfig/get' method.

Retrieves the current push notification configuration for a task.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters including the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
TaskPushNotificationConfig

The TaskPushNotificationConfig for the task.

on_message_send(params, context=None) abstractmethod async

Handles the 'message/send' method (non-streaming).

Sends a message to the agent to create, continue, or restart a task, and waits for the final result (Task or Message).

Parameters:

Name Type Description Default
params MessageSendParams

Parameters including the message and configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | Message

The final Task object or a final Message object.

on_message_send_stream(params, context=None) abstractmethod async

Handles the 'message/stream' method (streaming).

Sends a message to the agent and yields stream events as they are produced (Task updates, Message chunks, Artifact updates).

Parameters:

Name Type Description Default
params MessageSendParams

Parameters including the message and configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncGenerator[Event]

Event objects from the agent's execution.

Raises:

Type Description
ServerError(UnsupportedOperationError)

By default, if not implemented.

on_resubscribe_to_task(params, context=None) abstractmethod async

Handles the 'tasks/resubscribe' method.

Allows a client to re-subscribe to a running streaming task's event stream.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters including the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncGenerator[Event]

Event objects from the agent's ongoing execution for the specified task.

Raises:

Type Description
ServerError(UnsupportedOperationError)

By default, if not implemented.

on_set_task_push_notification_config(params, context=None) abstractmethod async

Handles the 'tasks/pushNotificationConfig/set' method.

Sets or updates the push notification configuration for a task.

Parameters:

Name Type Description Default
params TaskPushNotificationConfig

Parameters including the task ID and push notification configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
TaskPushNotificationConfig

The provided TaskPushNotificationConfig upon success.

build_error_response(request_id, error, response_wrapper_type)

Helper method to build a JSONRPCErrorResponse wrapped in the appropriate response type.

Parameters:

Name Type Description Default
request_id str | int | None

The ID of the request that caused the error.

required
error A2AError | JSONRPCError

The A2AError or JSONRPCError object.

required
response_wrapper_type type[RT]

The Pydantic RootModel type that wraps the response for the specific RPC method (e.g., SendMessageResponse).

required

Returns:

Type Description
RT

A Pydantic model representing the JSON-RPC error response,

RT

wrapped in the specified response type.

prepare_response_object(request_id, response, success_response_types, success_payload_type, response_type)

Helper method to build appropriate JSONRPCResponse object for RPC methods.

Based on the type of the response object received from the handler, it constructs either a success response wrapped in the appropriate payload type or an error response.

Parameters:

Name Type Description Default
request_id str | int | None

The ID of the request.

required
response EventTypes

The object received from the request handler.

required
success_response_types tuple[type, ...]

A tuple of expected Pydantic model types for a successful result.

required
success_payload_type type[SPT]

The Pydantic model type for the success payload (e.g., SendMessageSuccessResponse).

required
response_type type[RT]

The Pydantic RootModel type that wraps the final response (e.g., SendMessageResponse).

required

Returns:

Type Description
RT

A Pydantic model representing the final JSON-RPC response (success or error).

default_request_handler

logger = logging.getLogger(__name__) module-attribute
DefaultRequestHandler

Bases: RequestHandler

Default request handler for all incoming requests.

This handler provides default implementations for all A2A JSON-RPC methods, coordinating between the AgentExecutor, TaskStore, QueueManager, and optional PushNotifier.

agent_executor = agent_executor instance-attribute
task_store = task_store instance-attribute
on_cancel_task(params, context=None) async

Default handler for 'tasks/cancel'.

Attempts to cancel the task managed by the AgentExecutor.

on_get_task(params, context=None) async

Default handler for 'tasks/get'.

on_get_task_push_notification_config(params, context=None) async

Default handler for 'tasks/pushNotificationConfig/get'.

Requires a PushNotifier to be configured.

on_message_send(params, context=None) async

Default handler for 'message/send' interface (non-streaming).

Starts the agent execution for the message and waits for the final result (Task or Message).

on_message_send_stream(params, context=None) async

Default handler for 'message/stream' (streaming).

Starts the agent execution and yields events as they are produced by the agent.

on_resubscribe_to_task(params, context=None) async

Default handler for 'tasks/resubscribe'.

Allows a client to re-attach to a running streaming task's event stream. Requires the task and its queue to still be active.

on_set_task_push_notification_config(params, context=None) async

Default handler for 'tasks/pushNotificationConfig/set'.

Requires a PushNotifier to be configured.

should_add_push_info(params)

jsonrpc_handler

logger = logging.getLogger(__name__) module-attribute
JSONRPCHandler

Maps incoming JSON-RPC requests to the appropriate request handler method and formats responses.

agent_card = agent_card instance-attribute
request_handler = request_handler instance-attribute
get_push_notification(request, context=None) async

Handles the 'tasks/pushNotificationConfig/get' JSON-RPC method.

Parameters:

Name Type Description Default
request GetTaskPushNotificationConfigRequest

The incoming GetTaskPushNotificationConfigRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
GetTaskPushNotificationConfigResponse

A GetTaskPushNotificationConfigResponse object containing the config or a JSON-RPC error.

on_cancel_task(request, context=None) async

Handles the 'tasks/cancel' JSON-RPC method.

Parameters:

Name Type Description Default
request CancelTaskRequest

The incoming CancelTaskRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
CancelTaskResponse

A CancelTaskResponse object containing the updated Task or a JSON-RPC error.

on_get_task(request, context=None) async

Handles the 'tasks/get' JSON-RPC method.

Parameters:

Name Type Description Default
request GetTaskRequest

The incoming GetTaskRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
GetTaskResponse

A GetTaskResponse object containing the Task or a JSON-RPC error.

on_message_send(request, context=None) async

Handles the 'message/send' JSON-RPC method.

Parameters:

Name Type Description Default
request SendMessageRequest

The incoming SendMessageRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
SendMessageResponse

A SendMessageResponse object containing the result (Task or Message)

SendMessageResponse

or a JSON-RPC error response if a ServerError is raised by the handler.

on_message_send_stream(request, context=None) async

Handles the 'message/stream' JSON-RPC method.

Yields response objects as they are produced by the underlying handler's stream.

Parameters:

Name Type Description Default
request SendStreamingMessageRequest

The incoming SendStreamingMessageRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncIterable[SendStreamingMessageResponse]

SendStreamingMessageResponse objects containing streaming events

AsyncIterable[SendStreamingMessageResponse]

(Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent)

AsyncIterable[SendStreamingMessageResponse]

or JSON-RPC error responses if a ServerError is raised.

on_resubscribe_to_task(request, context=None) async

Handles the 'tasks/resubscribe' JSON-RPC method.

Yields response objects as they are produced by the underlying handler's stream.

Parameters:

Name Type Description Default
request TaskResubscriptionRequest

The incoming TaskResubscriptionRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncIterable[SendStreamingMessageResponse]

SendStreamingMessageResponse objects containing streaming events

AsyncIterable[SendStreamingMessageResponse]

or JSON-RPC error responses if a ServerError is raised.

set_push_notification(request, context=None) async

Handles the 'tasks/pushNotificationConfig/set' JSON-RPC method.

Requires the agent to support push notifications.

Parameters:

Name Type Description Default
request SetTaskPushNotificationConfigRequest

The incoming SetTaskPushNotificationConfigRequest object.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
SetTaskPushNotificationConfigResponse

A SetTaskPushNotificationConfigResponse object containing the config or a JSON-RPC error.

Raises:

Type Description
ServerError

If push notifications are not supported by the agent (due to the @validate decorator).

request_handler

RequestHandler

Bases: ABC

A2A request handler interface.

This interface defines the methods that an A2A server implementation must provide to handle incoming JSON-RPC requests.

on_cancel_task(params, context=None) abstractmethod async

Handles the 'tasks/cancel' method.

Requests the agent to cancel an ongoing task.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters specifying the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | None

The Task object with its status updated to canceled, or None if the task was not found.

on_get_task(params, context=None) abstractmethod async

Handles the 'tasks/get' method.

Retrieves the state and history of a specific task.

Parameters:

Name Type Description Default
params TaskQueryParams

Parameters specifying the task ID and optionally history length.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | None

The Task object if found, otherwise None.

on_get_task_push_notification_config(params, context=None) abstractmethod async

Handles the 'tasks/pushNotificationConfig/get' method.

Retrieves the current push notification configuration for a task.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters including the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
TaskPushNotificationConfig

The TaskPushNotificationConfig for the task.

on_message_send(params, context=None) abstractmethod async

Handles the 'message/send' method (non-streaming).

Sends a message to the agent to create, continue, or restart a task, and waits for the final result (Task or Message).

Parameters:

Name Type Description Default
params MessageSendParams

Parameters including the message and configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
Task | Message

The final Task object or a final Message object.

on_message_send_stream(params, context=None) abstractmethod async

Handles the 'message/stream' method (streaming).

Sends a message to the agent and yields stream events as they are produced (Task updates, Message chunks, Artifact updates).

Parameters:

Name Type Description Default
params MessageSendParams

Parameters including the message and configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncGenerator[Event]

Event objects from the agent's execution.

Raises:

Type Description
ServerError(UnsupportedOperationError)

By default, if not implemented.

on_resubscribe_to_task(params, context=None) abstractmethod async

Handles the 'tasks/resubscribe' method.

Allows a client to re-subscribe to a running streaming task's event stream.

Parameters:

Name Type Description Default
params TaskIdParams

Parameters including the task ID.

required
context ServerCallContext | None

Context provided by the server.

None

Yields:

Type Description
AsyncGenerator[Event]

Event objects from the agent's ongoing execution for the specified task.

Raises:

Type Description
ServerError(UnsupportedOperationError)

By default, if not implemented.

on_set_task_push_notification_config(params, context=None) abstractmethod async

Handles the 'tasks/pushNotificationConfig/set' method.

Sets or updates the push notification configuration for a task.

Parameters:

Name Type Description Default
params TaskPushNotificationConfig

Parameters including the task ID and push notification configuration.

required
context ServerCallContext | None

Context provided by the server.

None

Returns:

Type Description
TaskPushNotificationConfig

The provided TaskPushNotificationConfig upon success.

response_helpers

Helper functions for building A2A JSON-RPC responses.

EventTypes = Task | Message | TaskArtifactUpdateEvent | TaskStatusUpdateEvent | TaskPushNotificationConfig | A2AError | JSONRPCError module-attribute

Type alias for possible event types produced by handlers.

RT = TypeVar('RT', GetTaskResponse, CancelTaskResponse, SendMessageResponse, SetTaskPushNotificationConfigResponse, GetTaskPushNotificationConfigResponse, SendStreamingMessageResponse) module-attribute

Type variable for RootModel response types.

SPT = TypeVar('SPT', GetTaskSuccessResponse, CancelTaskSuccessResponse, SendMessageSuccessResponse, SetTaskPushNotificationConfigSuccessResponse, GetTaskPushNotificationConfigSuccessResponse, SendStreamingMessageSuccessResponse) module-attribute

Type variable for SuccessResponse types.

build_error_response(request_id, error, response_wrapper_type)

Helper method to build a JSONRPCErrorResponse wrapped in the appropriate response type.

Parameters:

Name Type Description Default
request_id str | int | None

The ID of the request that caused the error.

required
error A2AError | JSONRPCError

The A2AError or JSONRPCError object.

required
response_wrapper_type type[RT]

The Pydantic RootModel type that wraps the response for the specific RPC method (e.g., SendMessageResponse).

required

Returns:

Type Description
RT

A Pydantic model representing the JSON-RPC error response,

RT

wrapped in the specified response type.

prepare_response_object(request_id, response, success_response_types, success_payload_type, response_type)

Helper method to build appropriate JSONRPCResponse object for RPC methods.

Based on the type of the response object received from the handler, it constructs either a success response wrapped in the appropriate payload type or an error response.

Parameters:

Name Type Description Default
request_id str | int | None

The ID of the request.

required
response EventTypes

The object received from the request handler.

required
success_response_types tuple[type, ...]

A tuple of expected Pydantic model types for a successful result.

required
success_payload_type type[SPT]

The Pydantic model type for the success payload (e.g., SendMessageSuccessResponse).

required
response_type type[RT]

The Pydantic RootModel type that wraps the final response (e.g., SendMessageResponse).

required

Returns:

Type Description
RT

A Pydantic model representing the final JSON-RPC response (success or error).

tasks

Components for managing tasks within the A2A server.

InMemoryPushNotifier

Bases: PushNotifier

In-memory implementation of PushNotifier interface.

Stores push notification configurations in memory and uses an httpx client to send notifications.

lock = asyncio.Lock() instance-attribute
delete_info(task_id) async

Deletes the push notification configuration for a task from memory.

get_info(task_id) async

Retrieves the push notification configuration for a task from memory.

send_notification(task) async

Sends a push notification for a task if configuration exists.

set_info(task_id, notification_config) async

Sets or updates the push notification configuration for a task in memory.

InMemoryTaskStore

Bases: TaskStore

In-memory implementation of TaskStore.

Stores task objects in a dictionary in memory. Task data is lost when the server process stops.

lock = asyncio.Lock() instance-attribute
tasks = {} instance-attribute
delete(task_id) async

Deletes a task from the in-memory store by ID.

get(task_id) async

Retrieves a task from the in-memory store by ID.

save(task) async

Saves or updates a task in the in-memory store.

PushNotifier

Bases: ABC

PushNotifier interface to store, retrieve push notification for tasks and send push notifications.

delete_info(task_id) abstractmethod async

Deletes the push notification configuration for a task.

get_info(task_id) abstractmethod async

Retrieves the push notification configuration for a task.

send_notification(task) abstractmethod async

Sends a push notification containing the latest task state.

set_info(task_id, notification_config) abstractmethod async

Sets or updates the push notification configuration for a task.

ResultAggregator

ResultAggregator is used to process the event streams from an AgentExecutor.

There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated task as the events arrive, and re-emit those events for another consumer 2) As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object 3) As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.

current_result async property

Returns the current aggregated result (Task or Message).

This is the latest state processed from the event stream.

Returns:

Type Description
Task | Message | None

The current Task object managed by the TaskManager, or the final

Task | Message | None

Message if one was received, or None if no result has been produced yet.

task_manager = task_manager instance-attribute
consume_all(consumer) async

Processes the entire event stream from the consumer and returns the final result.

Blocks until the event stream ends (queue is closed after final event or exception).

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Returns:

Type Description
Task | Message | None

The final Task object or Message object after the stream is exhausted.

Task | Message | None

Returns None if the stream ends without producing a final result.

Raises:

Type Description
BaseException

If the EventConsumer raises an exception during consumption.

consume_and_break_on_interrupt(consumer) async

Processes the event stream until completion or an interruptable state is encountered.

Interruptable states currently include TaskState.auth_required. If interrupted, consumption continues in a background task.

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Returns:

Type Description
Task | Message | None

A tuple containing:

bool
  • The current aggregated result (Task or Message) at the point of completion or interruption.
tuple[Task | Message | None, bool]
  • A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).

Raises:

Type Description
BaseException

If the EventConsumer raises an exception during consumption.

consume_and_emit(consumer) async

Processes the event stream from the consumer, updates the task state, and re-emits the same events.

Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Yields:

Type Description
AsyncGenerator[Event]

The Event objects consumed from the EventConsumer.

TaskManager

Helps manage a task's lifecycle during execution of a request.

Responsible for retrieving, saving, and updating the Task object based on events received from the agent.

context_id = context_id instance-attribute
task_id = task_id instance-attribute
task_store = task_store instance-attribute
ensure_task(event) async

Ensures a Task object exists in memory, loading from store or creating new if needed.

Parameters:

Name Type Description Default
event TaskStatusUpdateEvent | TaskArtifactUpdateEvent

The task-related event triggering the need for a Task object.

required

Returns:

Type Description
Task

An existing or newly created Task object.

get_task() async

Retrieves the current task object, either from memory or the store.

If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.

Returns:

Type Description
Task | None

The Task object if found, otherwise None.

process(event) async

Processes an event, updates the task state if applicable, stores it, and returns the event.

If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.

Parameters:

Name Type Description Default
event Event

The event object received from the agent.

required

Returns:

Type Description
Event

The same event object that was processed.

save_task_event(event) async

Processes a task-related event (Task, Status, Artifact) and saves the updated task state.

Ensures task and context IDs match or are set from the event.

Parameters:

Name Type Description Default
event Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).

required

Returns:

Type Description
Task | None

The updated Task object after processing the event.

Raises:

Type Description
ServerError

If the task ID in the event conflicts with the TaskManager's ID when the TaskManager's ID is already set.

update_with_message(message, task)

Updates a task object in memory by adding a new message to its history.

If the task has a message in its current status, that message is moved to the history first.

Parameters:

Name Type Description Default
message Message

The new Message to add to the history.

required
task Task

The Task object to update.

required

Returns:

Type Description
Task

The updated Task object (updated in-place).

TaskStore

Bases: ABC

Agent Task Store interface.

Defines the methods for persisting and retrieving Task objects.

delete(task_id) abstractmethod async

Deletes a task from the store by ID.

get(task_id) abstractmethod async

Retrieves a task from the store by ID.

save(task) abstractmethod async

Saves or updates a task in the store.

TaskUpdater

Helper class for agents to publish updates to a task's event queue.

Simplifies the process of creating and enqueueing standard task events.

context_id = context_id instance-attribute
event_queue = event_queue instance-attribute
task_id = task_id instance-attribute
add_artifact(parts, artifact_id=str(uuid.uuid4()), name=None, metadata=None)

Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects forming the artifact chunk.

required
artifact_id str

The ID of the artifact. A new UUID is generated if not provided.

str(uuid4())
name str | None

Optional name for the artifact.

None
metadata dict[str, Any] | None

Optional metadata for the artifact.

None
append

Optional boolean indicating if this chunk appends to a previous one.

required
last_chunk

Optional boolean indicating if this is the last chunk.

required
complete(message=None)

Marks the task as completed and publishes a final status update.

failed(message=None)

Marks the task as failed and publishes a final status update.

new_agent_message(parts, metadata=None)

Creates a new message object sent by the agent for this task/context.

This method only creates the message object. It does not

automatically enqueue it.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects for the message content.

required
final

Optional boolean indicating if this is the final message in a stream.

required
metadata dict[str, Any] | None

Optional metadata for the message.

None

Returns:

Type Description
Message

A new Message object.

start_work(message=None)

Marks the task as working and publishes a status update.

submit(message=None)

Marks the task as submitted and publishes a status update.

update_status(state, message=None, final=False)

Updates the status of the task and publishes a TaskStatusUpdateEvent.

Parameters:

Name Type Description Default
state TaskState

The new state of the task.

required
message Message | None

An optional message associated with the status update.

None
final

If True, indicates this is the final status update for the task.

False

inmemory_push_notifier

logger = logging.getLogger(__name__) module-attribute
InMemoryPushNotifier

Bases: PushNotifier

In-memory implementation of PushNotifier interface.

Stores push notification configurations in memory and uses an httpx client to send notifications.

lock = asyncio.Lock() instance-attribute
delete_info(task_id) async

Deletes the push notification configuration for a task from memory.

get_info(task_id) async

Retrieves the push notification configuration for a task from memory.

send_notification(task) async

Sends a push notification for a task if configuration exists.

set_info(task_id, notification_config) async

Sets or updates the push notification configuration for a task in memory.

inmemory_task_store

logger = logging.getLogger(__name__) module-attribute
InMemoryTaskStore

Bases: TaskStore

In-memory implementation of TaskStore.

Stores task objects in a dictionary in memory. Task data is lost when the server process stops.

lock = asyncio.Lock() instance-attribute
tasks = {} instance-attribute
delete(task_id) async

Deletes a task from the in-memory store by ID.

get(task_id) async

Retrieves a task from the in-memory store by ID.

save(task) async

Saves or updates a task in the in-memory store.

push_notifier

PushNotifier

Bases: ABC

PushNotifier interface to store, retrieve push notification for tasks and send push notifications.

delete_info(task_id) abstractmethod async

Deletes the push notification configuration for a task.

get_info(task_id) abstractmethod async

Retrieves the push notification configuration for a task.

send_notification(task) abstractmethod async

Sends a push notification containing the latest task state.

set_info(task_id, notification_config) abstractmethod async

Sets or updates the push notification configuration for a task.

result_aggregator

logger = logging.getLogger(__name__) module-attribute
ResultAggregator

ResultAggregator is used to process the event streams from an AgentExecutor.

There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated task as the events arrive, and re-emit those events for another consumer 2) As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object 3) As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.

current_result async property

Returns the current aggregated result (Task or Message).

This is the latest state processed from the event stream.

Returns:

Type Description
Task | Message | None

The current Task object managed by the TaskManager, or the final

Task | Message | None

Message if one was received, or None if no result has been produced yet.

task_manager = task_manager instance-attribute
consume_all(consumer) async

Processes the entire event stream from the consumer and returns the final result.

Blocks until the event stream ends (queue is closed after final event or exception).

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Returns:

Type Description
Task | Message | None

The final Task object or Message object after the stream is exhausted.

Task | Message | None

Returns None if the stream ends without producing a final result.

Raises:

Type Description
BaseException

If the EventConsumer raises an exception during consumption.

consume_and_break_on_interrupt(consumer) async

Processes the event stream until completion or an interruptable state is encountered.

Interruptable states currently include TaskState.auth_required. If interrupted, consumption continues in a background task.

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Returns:

Type Description
Task | Message | None

A tuple containing:

bool
  • The current aggregated result (Task or Message) at the point of completion or interruption.
tuple[Task | Message | None, bool]
  • A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).

Raises:

Type Description
BaseException

If the EventConsumer raises an exception during consumption.

consume_and_emit(consumer) async

Processes the event stream from the consumer, updates the task state, and re-emits the same events.

Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.

Parameters:

Name Type Description Default
consumer EventConsumer

The EventConsumer to read events from.

required

Yields:

Type Description
AsyncGenerator[Event]

The Event objects consumed from the EventConsumer.

task_manager

logger = logging.getLogger(__name__) module-attribute
TaskManager

Helps manage a task's lifecycle during execution of a request.

Responsible for retrieving, saving, and updating the Task object based on events received from the agent.

context_id = context_id instance-attribute
task_id = task_id instance-attribute
task_store = task_store instance-attribute
ensure_task(event) async

Ensures a Task object exists in memory, loading from store or creating new if needed.

Parameters:

Name Type Description Default
event TaskStatusUpdateEvent | TaskArtifactUpdateEvent

The task-related event triggering the need for a Task object.

required

Returns:

Type Description
Task

An existing or newly created Task object.

get_task() async

Retrieves the current task object, either from memory or the store.

If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.

Returns:

Type Description
Task | None

The Task object if found, otherwise None.

process(event) async

Processes an event, updates the task state if applicable, stores it, and returns the event.

If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.

Parameters:

Name Type Description Default
event Event

The event object received from the agent.

required

Returns:

Type Description
Event

The same event object that was processed.

save_task_event(event) async

Processes a task-related event (Task, Status, Artifact) and saves the updated task state.

Ensures task and context IDs match or are set from the event.

Parameters:

Name Type Description Default
event Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).

required

Returns:

Type Description
Task | None

The updated Task object after processing the event.

Raises:

Type Description
ServerError

If the task ID in the event conflicts with the TaskManager's ID when the TaskManager's ID is already set.

update_with_message(message, task)

Updates a task object in memory by adding a new message to its history.

If the task has a message in its current status, that message is moved to the history first.

Parameters:

Name Type Description Default
message Message

The new Message to add to the history.

required
task Task

The Task object to update.

required

Returns:

Type Description
Task

The updated Task object (updated in-place).

task_store

TaskStore

Bases: ABC

Agent Task Store interface.

Defines the methods for persisting and retrieving Task objects.

delete(task_id) abstractmethod async

Deletes a task from the store by ID.

get(task_id) abstractmethod async

Retrieves a task from the store by ID.

save(task) abstractmethod async

Saves or updates a task in the store.

task_updater

TaskUpdater

Helper class for agents to publish updates to a task's event queue.

Simplifies the process of creating and enqueueing standard task events.

context_id = context_id instance-attribute
event_queue = event_queue instance-attribute
task_id = task_id instance-attribute
add_artifact(parts, artifact_id=str(uuid.uuid4()), name=None, metadata=None)

Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects forming the artifact chunk.

required
artifact_id str

The ID of the artifact. A new UUID is generated if not provided.

str(uuid4())
name str | None

Optional name for the artifact.

None
metadata dict[str, Any] | None

Optional metadata for the artifact.

None
append

Optional boolean indicating if this chunk appends to a previous one.

required
last_chunk

Optional boolean indicating if this is the last chunk.

required
complete(message=None)

Marks the task as completed and publishes a final status update.

failed(message=None)

Marks the task as failed and publishes a final status update.

new_agent_message(parts, metadata=None)

Creates a new message object sent by the agent for this task/context.

This method only creates the message object. It does not

automatically enqueue it.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects for the message content.

required
final

Optional boolean indicating if this is the final message in a stream.

required
metadata dict[str, Any] | None

Optional metadata for the message.

None

Returns:

Type Description
Message

A new Message object.

start_work(message=None)

Marks the task as working and publishes a status update.

submit(message=None)

Marks the task as submitted and publishes a status update.

update_status(state, message=None, final=False)

Updates the status of the task and publishes a TaskStatusUpdateEvent.

Parameters:

Name Type Description Default
state TaskState

The new state of the task.

required
message Message | None

An optional message associated with the status update.

None
final

If True, indicates this is the final status update for the task.

False

types

A2A

Bases: RootModel[Any]

root instance-attribute

APIKeySecurityScheme

Bases: BaseModel

API Key security scheme.

description = None class-attribute instance-attribute

Description of this security scheme.

in_ = Field(..., alias='in') class-attribute instance-attribute

The location of the API key. Valid values are "query", "header", or "cookie".

name instance-attribute

The name of the header, query or cookie parameter to be used.

type = 'apiKey' class-attribute instance-attribute

AgentCapabilities

Bases: BaseModel

Defines optional capabilities supported by an agent.

pushNotifications = None class-attribute instance-attribute

true if the agent can notify updates to client.

stateTransitionHistory = None class-attribute instance-attribute

true if the agent exposes status change history for tasks.

streaming = None class-attribute instance-attribute

true if the agent supports SSE.

AgentCard

Bases: BaseModel

An AgentCard conveys key information: - Overall details (version, name, description, uses) - Skills: A set of capabilities the agent can perform - Default modalities/content types supported by the agent. - Authentication requirements

capabilities instance-attribute

Optional capabilities supported by the agent.

defaultInputModes instance-attribute

The set of interaction modes that the agent supports across all skills. This can be overridden per-skill. Supported mime types for input.

defaultOutputModes instance-attribute

Supported mime types for output.

description instance-attribute

A human-readable description of the agent. Used to assist users and other agents in understanding what the agent can do.

documentationUrl = None class-attribute instance-attribute

A URL to documentation for the agent.

name instance-attribute

Human readable name of the agent.

provider = None class-attribute instance-attribute

The service provider of the agent

security = None class-attribute instance-attribute

Security requirements for contacting the agent.

securitySchemes = None class-attribute instance-attribute

Security scheme details used for authenticating with this agent.

skills instance-attribute

Skills are a unit of capability that an agent can perform.

supportsAuthenticatedExtendedCard = Field(default=None) class-attribute instance-attribute

Optional field indicating there is an extended card available post authentication at the /agent/authenticatedExtendedCard endpoint.

url instance-attribute

A URL to the address the agent is hosted at.

version instance-attribute

The version of the agent - format is up to the provider.

AgentProvider

Bases: BaseModel

Represents the service provider of an agent.

organization instance-attribute

Agent provider's organization name.

url instance-attribute

Agent provider's URL.

AgentSkill

Bases: BaseModel

Represents a unit of capability that an agent can perform.

description instance-attribute

Description of the skill - will be used by the client or a human as a hint to understand what the skill does.

examples = None class-attribute instance-attribute

The set of example scenarios that the skill can perform. Will be used by the client as a hint to understand how the skill can be used.

id instance-attribute

Unique identifier for the agent's skill.

inputModes = None class-attribute instance-attribute

The set of interaction modes that the skill supports (if different than the default). Supported mime types for input.

name instance-attribute

Human readable name of the skill.

outputModes = None class-attribute instance-attribute

Supported mime types for output.

tags instance-attribute

Set of tagwords describing classes of capabilities for this specific skill.

Artifact

Bases: BaseModel

Represents an artifact generated for a task.

artifactId instance-attribute

Unique identifier for the artifact.

description = None class-attribute instance-attribute

Optional description for the artifact.

metadata = None class-attribute instance-attribute

Extension metadata.

name = None class-attribute instance-attribute

Optional name for the artifact.

parts instance-attribute

Artifact parts.

AuthorizationCodeOAuthFlow

Bases: BaseModel

Configuration details for a supported OAuth Flow

authorizationUrl instance-attribute

The authorization URL to be used for this flow. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS

refreshUrl = None class-attribute instance-attribute

The URL to be used for obtaining refresh tokens. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

scopes instance-attribute

The available scopes for the OAuth2 security scheme. A map between the scope name and a short description for it. The map MAY be empty.

tokenUrl instance-attribute

The token URL to be used for this flow. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

CancelTaskRequest

Bases: BaseModel

JSON-RPC request model for the 'tasks/cancel' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'tasks/cancel' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

CancelTaskResponse

Bases: RootModel[JSONRPCErrorResponse | CancelTaskSuccessResponse]

root instance-attribute

JSON-RPC response for the 'tasks/cancel' method.

CancelTaskSuccessResponse

Bases: BaseModel

JSON-RPC success response model for the 'tasks/cancel' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success.

ClientCredentialsOAuthFlow

Bases: BaseModel

Configuration details for a supported OAuth Flow

refreshUrl = None class-attribute instance-attribute

The URL to be used for obtaining refresh tokens. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

scopes instance-attribute

The available scopes for the OAuth2 security scheme. A map between the scope name and a short description for it. The map MAY be empty.

tokenUrl instance-attribute

The token URL to be used for this flow. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

ContentTypeNotSupportedError

Bases: BaseModel

A2A specific error indicating incompatible content types between request and agent capabilities.

code = -32005 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Incompatible content types' class-attribute instance-attribute

A String providing a short description of the error.

DataPart

Bases: BaseModel

Represents a structured data segment within a message part.

data instance-attribute

Structured data content

kind = 'data' class-attribute instance-attribute

Part type - data for DataParts

metadata = None class-attribute instance-attribute

Optional metadata associated with the part.

FileBase

Bases: BaseModel

Represents the base entity for FileParts

mimeType = None class-attribute instance-attribute

Optional mimeType for the file

name = None class-attribute instance-attribute

Optional name for the file

FilePart

Bases: BaseModel

Represents a File segment within parts.

file instance-attribute

File content either as url or bytes

kind = 'file' class-attribute instance-attribute

Part type - file for FileParts

metadata = None class-attribute instance-attribute

Optional metadata associated with the part.

FileWithBytes

Bases: BaseModel

Define the variant where 'bytes' is present and 'uri' is absent

bytes instance-attribute

base64 encoded content of the file

mimeType = None class-attribute instance-attribute

Optional mimeType for the file

name = None class-attribute instance-attribute

Optional name for the file

FileWithUri

Bases: BaseModel

Define the variant where 'uri' is present and 'bytes' is absent

mimeType = None class-attribute instance-attribute

Optional mimeType for the file

name = None class-attribute instance-attribute

Optional name for the file

uri instance-attribute

URL for the File content

GetTaskPushNotificationConfigRequest

Bases: BaseModel

JSON-RPC request model for the 'tasks/pushNotificationConfig/get' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'tasks/pushNotificationConfig/get' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

GetTaskPushNotificationConfigResponse

Bases: RootModel[JSONRPCErrorResponse | GetTaskPushNotificationConfigSuccessResponse]

root instance-attribute

JSON-RPC response for the 'tasks/pushNotificationConfig/set' method.

GetTaskPushNotificationConfigSuccessResponse

Bases: BaseModel

JSON-RPC success response model for the 'tasks/pushNotificationConfig/get' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success.

GetTaskRequest

Bases: BaseModel

JSON-RPC request model for the 'tasks/get' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'tasks/get' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

GetTaskResponse

Bases: RootModel[JSONRPCErrorResponse | GetTaskSuccessResponse]

root instance-attribute

JSON-RPC success response for the 'tasks/get' method.

GetTaskSuccessResponse

Bases: BaseModel

JSON-RPC success response for the 'tasks/get' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success.

HTTPAuthSecurityScheme

Bases: BaseModel

HTTP Authentication security scheme.

bearerFormat = None class-attribute instance-attribute

A hint to the client to identify how the bearer token is formatted. Bearer tokens are usually generated by an authorization server, so this information is primarily for documentation purposes.

description = None class-attribute instance-attribute

Description of this security scheme.

scheme instance-attribute

The name of the HTTP Authentication scheme to be used in the Authorization header as defined in RFC7235. The values used SHOULD be registered in the IANA Authentication Scheme registry. The value is case-insensitive, as defined in RFC7235.

type = 'http' class-attribute instance-attribute

ImplicitOAuthFlow

Bases: BaseModel

Configuration details for a supported OAuth Flow

authorizationUrl instance-attribute

The authorization URL to be used for this flow. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS

refreshUrl = None class-attribute instance-attribute

The URL to be used for obtaining refresh tokens. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

scopes instance-attribute

The available scopes for the OAuth2 security scheme. A map between the scope name and a short description for it. The map MAY be empty.

In

Bases: Enum

The location of the API key. Valid values are "query", "header", or "cookie".

cookie = 'cookie' class-attribute instance-attribute

header = 'header' class-attribute instance-attribute

query = 'query' class-attribute instance-attribute

InternalError

Bases: BaseModel

JSON-RPC error indicating an internal JSON-RPC error on the server.

code = -32603 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Internal error' class-attribute instance-attribute

A String providing a short description of the error.

InvalidAgentResponseError

Bases: BaseModel

A2A specific error indicating agent returned invalid response for the current method

code = -32006 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Invalid agent response' class-attribute instance-attribute

A String providing a short description of the error.

InvalidParamsError

Bases: BaseModel

JSON-RPC error indicating invalid method parameter(s).

code = -32602 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Invalid parameters' class-attribute instance-attribute

A String providing a short description of the error.

InvalidRequestError

Bases: BaseModel

JSON-RPC error indicating the JSON sent is not a valid Request object.

code = -32600 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Request payload validation error' class-attribute instance-attribute

A String providing a short description of the error.

JSONParseError

Bases: BaseModel

JSON-RPC error indicating invalid JSON was received by the server.

code = -32700 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Invalid JSON payload' class-attribute instance-attribute

A String providing a short description of the error.

JSONRPCError

Bases: BaseModel

Represents a JSON-RPC 2.0 Error object. This is typically included in a JSONRPCErrorResponse when an error occurs.

code instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message instance-attribute

A String providing a short description of the error.

JSONRPCErrorResponse

Bases: BaseModel

Represents a JSON-RPC 2.0 Error Response object.

error instance-attribute

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

JSONRPCMessage

Bases: BaseModel

Base interface for any JSON-RPC 2.0 request or response.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

JSONRPCRequest

Bases: BaseModel

Represents a JSON-RPC 2.0 Request object.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method instance-attribute

A String containing the name of the method to be invoked.

params = None class-attribute instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

JSONRPCResult

Bases: BaseModel

Represents a JSON-RPC 2.0 Result object.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success

Message

Bases: BaseModel

Represents a single message exchanged between user and agent.

contextId = None class-attribute instance-attribute

The context the message is associated with

kind = 'message' class-attribute instance-attribute

Event type

messageId instance-attribute

Identifier created by the message creator

metadata = None class-attribute instance-attribute

Extension metadata.

parts instance-attribute

Message content

referenceTaskIds = None class-attribute instance-attribute

List of tasks referenced as context by this message.

role instance-attribute

Message sender's role

taskId = None class-attribute instance-attribute

Identifier of task the message is related to

MessageSendConfiguration

Bases: BaseModel

Configuration for the send message request.

acceptedOutputModes instance-attribute

Accepted output modalities by the client.

blocking = None class-attribute instance-attribute

If the server should treat the client as a blocking request.

historyLength = None class-attribute instance-attribute

Number of recent messages to be retrieved.

pushNotificationConfig = None class-attribute instance-attribute

Where the server should send notifications when disconnected.

MessageSendParams

Bases: BaseModel

Sent by the client to the agent as a request. May create, continue or restart a task.

configuration = None class-attribute instance-attribute

Send message configuration.

message instance-attribute

The message being sent to the server.

metadata = None class-attribute instance-attribute

Extension metadata.

MethodNotFoundError

Bases: BaseModel

JSON-RPC error indicating the method does not exist or is not available.

code = -32601 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Method not found' class-attribute instance-attribute

A String providing a short description of the error.

OAuth2SecurityScheme

Bases: BaseModel

OAuth2.0 security scheme configuration.

description = None class-attribute instance-attribute

Description of this security scheme.

flows instance-attribute

An object containing configuration information for the flow types supported.

type = 'oauth2' class-attribute instance-attribute

OAuthFlows

Bases: BaseModel

Allows configuration of the supported OAuth Flows

authorizationCode = None class-attribute instance-attribute

Configuration for the OAuth Authorization Code flow. Previously called accessCode in OpenAPI 2.0.

clientCredentials = None class-attribute instance-attribute

Configuration for the OAuth Client Credentials flow. Previously called application in OpenAPI 2.0

implicit = None class-attribute instance-attribute

Configuration for the OAuth Implicit flow

password = None class-attribute instance-attribute

Configuration for the OAuth Resource Owner Password flow

OpenIdConnectSecurityScheme

Bases: BaseModel

OpenID Connect security scheme configuration.

description = None class-attribute instance-attribute

Description of this security scheme.

openIdConnectUrl instance-attribute

Well-known URL to discover the [[OpenID-Connect-Discovery]] provider metadata.

type = 'openIdConnect' class-attribute instance-attribute

Part

Bases: RootModel[TextPart | FilePart | DataPart]

root instance-attribute

Represents a part of a message, which can be text, a file, or structured data.

PartBase

Bases: BaseModel

Base properties common to all message parts.

metadata = None class-attribute instance-attribute

Optional metadata associated with the part.

PasswordOAuthFlow

Bases: BaseModel

Configuration details for a supported OAuth Flow

refreshUrl = None class-attribute instance-attribute

The URL to be used for obtaining refresh tokens. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

scopes instance-attribute

The available scopes for the OAuth2 security scheme. A map between the scope name and a short description for it. The map MAY be empty.

tokenUrl instance-attribute

The token URL to be used for this flow. This MUST be in the form of a URL. The OAuth2 standard requires the use of TLS.

PushNotificationAuthenticationInfo

Bases: BaseModel

Defines authentication details for push notifications.

credentials = None class-attribute instance-attribute

Optional credentials

schemes instance-attribute

Supported authentication schemes - e.g. Basic, Bearer

PushNotificationConfig

Bases: BaseModel

Configuration for setting up push notifications for task updates.

authentication = None class-attribute instance-attribute

token = None class-attribute instance-attribute

Token unique to this task/session.

url instance-attribute

URL for sending the push notifications.

PushNotificationNotSupportedError

Bases: BaseModel

A2A specific error indicating the agent does not support push notifications.

code = -32003 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Push Notification is not supported' class-attribute instance-attribute

A String providing a short description of the error.

Role

Bases: Enum

Message sender's role

agent = 'agent' class-attribute instance-attribute

user = 'user' class-attribute instance-attribute

SecurityScheme

Bases: RootModel[APIKeySecurityScheme | HTTPAuthSecurityScheme | OAuth2SecurityScheme | OpenIdConnectSecurityScheme]

root instance-attribute

Mirrors the OpenAPI Security Scheme Object (https://swagger.io/specification/#security-scheme-object)

SecuritySchemeBase

Bases: BaseModel

Base properties shared by all security schemes.

description = None class-attribute instance-attribute

Description of this security scheme.

SendMessageRequest

Bases: BaseModel

JSON-RPC request model for the 'message/send' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'message/send' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

SendMessageResponse

Bases: RootModel[JSONRPCErrorResponse | SendMessageSuccessResponse]

root instance-attribute

JSON-RPC response model for the 'message/send' method.

SendMessageSuccessResponse

Bases: BaseModel

JSON-RPC success response model for the 'message/send' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success

SendStreamingMessageRequest

Bases: BaseModel

JSON-RPC request model for the 'message/stream' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'message/stream' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

SendStreamingMessageResponse

Bases: RootModel[JSONRPCErrorResponse | SendStreamingMessageSuccessResponse]

root instance-attribute

JSON-RPC response model for the 'message/stream' method.

SendStreamingMessageSuccessResponse

Bases: BaseModel

JSON-RPC success response model for the 'message/stream' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success

SetTaskPushNotificationConfigRequest

Bases: BaseModel

JSON-RPC request model for the 'tasks/pushNotificationConfig/set' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'tasks/pushNotificationConfig/set' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

SetTaskPushNotificationConfigResponse

Bases: RootModel[JSONRPCErrorResponse | SetTaskPushNotificationConfigSuccessResponse]

root instance-attribute

JSON-RPC response for the 'tasks/pushNotificationConfig/set' method.

SetTaskPushNotificationConfigSuccessResponse

Bases: BaseModel

JSON-RPC success response model for the 'tasks/pushNotificationConfig/set' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

result instance-attribute

The result object on success.

Task

Bases: BaseModel

artifacts = None class-attribute instance-attribute

Collection of artifacts created by the agent.

contextId instance-attribute

Server-generated id for contextual alignment across interactions

history = None class-attribute instance-attribute

id instance-attribute

Unique identifier for the task

kind = 'task' class-attribute instance-attribute

Event type

metadata = None class-attribute instance-attribute

Extension metadata.

status instance-attribute

Current status of the task

TaskArtifactUpdateEvent

Bases: BaseModel

Sent by server during sendStream or subscribe requests

append = None class-attribute instance-attribute

Indicates if this artifact appends to a previous one

artifact instance-attribute

Generated artifact

contextId instance-attribute

The context the task is associated with

kind = 'artifact-update' class-attribute instance-attribute

Event type

lastChunk = None class-attribute instance-attribute

Indicates if this is the last chunk of the artifact

metadata = None class-attribute instance-attribute

Extension metadata.

taskId instance-attribute

Task id

TaskIdParams

Bases: BaseModel

Parameters containing only a task ID, used for simple task operations.

id instance-attribute

Task id.

metadata = None class-attribute instance-attribute

TaskNotCancelableError

Bases: BaseModel

A2A specific error indicating the task is in a state where it cannot be canceled.

code = -32002 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Task cannot be canceled' class-attribute instance-attribute

A String providing a short description of the error.

TaskNotFoundError

Bases: BaseModel

A2A specific error indicating the requested task ID was not found.

code = -32001 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'Task not found' class-attribute instance-attribute

A String providing a short description of the error.

TaskPushNotificationConfig

Bases: BaseModel

Parameters for setting or getting push notification configuration for a task

pushNotificationConfig instance-attribute

Push notification configuration.

taskId instance-attribute

Task id.

TaskQueryParams

Bases: BaseModel

Parameters for querying a task, including optional history length.

historyLength = None class-attribute instance-attribute

Number of recent messages to be retrieved.

id instance-attribute

Task id.

metadata = None class-attribute instance-attribute

TaskResubscriptionRequest

Bases: BaseModel

JSON-RPC request model for the 'tasks/resubscribe' method.

id = None class-attribute instance-attribute

An identifier established by the Client that MUST contain a String, Number. Numbers SHOULD NOT contain fractional parts.

jsonrpc = '2.0' class-attribute instance-attribute

Specifies the version of the JSON-RPC protocol. MUST be exactly "2.0".

method = 'tasks/resubscribe' class-attribute instance-attribute

A String containing the name of the method to be invoked.

params instance-attribute

A Structured value that holds the parameter values to be used during the invocation of the method.

TaskState

Bases: Enum

Represents the possible states of a Task.

auth_required = 'auth-required' class-attribute instance-attribute

canceled = 'canceled' class-attribute instance-attribute

completed = 'completed' class-attribute instance-attribute

failed = 'failed' class-attribute instance-attribute

input_required = 'input-required' class-attribute instance-attribute

rejected = 'rejected' class-attribute instance-attribute

submitted = 'submitted' class-attribute instance-attribute

unknown = 'unknown' class-attribute instance-attribute

working = 'working' class-attribute instance-attribute

TaskStatus

Bases: BaseModel

TaskState and accompanying message.

message = None class-attribute instance-attribute

Additional status updates for client

state instance-attribute

timestamp = None class-attribute instance-attribute

ISO 8601 datetime string when the status was recorded.

TaskStatusUpdateEvent

Bases: BaseModel

Sent by server during sendStream or subscribe requests

contextId instance-attribute

The context the task is associated with

final instance-attribute

Indicates the end of the event stream

kind = 'status-update' class-attribute instance-attribute

Event type

metadata = None class-attribute instance-attribute

Extension metadata.

status instance-attribute

Current status of the task

taskId instance-attribute

Task id

TextPart

Bases: BaseModel

Represents a text segment within parts.

kind = 'text' class-attribute instance-attribute

Part type - text for TextParts

metadata = None class-attribute instance-attribute

Optional metadata associated with the part.

text instance-attribute

Text content

UnsupportedOperationError

Bases: BaseModel

A2A specific error indicating the requested operation is not supported by the agent.

code = -32004 class-attribute instance-attribute

A Number that indicates the error type that occurred.

data = None class-attribute instance-attribute

A Primitive or Structured value that contains additional information about the error. This may be omitted.

message = 'This operation is not supported' class-attribute instance-attribute

A String providing a short description of the error.

utils

Utility functions for the A2A Python SDK.

append_artifact_to_task(task, event)

Helper method for updating a Task object with new artifact data from an event.

Handles creating the artifacts list if it doesn't exist, adding new artifacts, and appending parts to existing artifacts based on the append flag in the event.

Parameters:

Name Type Description Default
task Task

The Task object to modify.

required
event TaskArtifactUpdateEvent

The TaskArtifactUpdateEvent containing the artifact data.

required

are_modalities_compatible(server_output_modes, client_output_modes)

Checks if server and client output modalities (MIME types) are compatible.

Modalities are compatible if: 1. The client specifies no preferred output modes (client_output_modes is None or empty). 2. The server specifies no supported output modes (server_output_modes is None or empty). 3. There is at least one common modality between the server's supported list and the client's preferred list.

Parameters:

Name Type Description Default
server_output_modes list[str] | None

A list of MIME types supported by the server/agent for output. Can be None or empty if the server doesn't specify.

required
client_output_modes list[str] | None

A list of MIME types preferred by the client for output. Can be None or empty if the client accepts any.

required

Returns:

Type Description
bool

True if the modalities are compatible, False otherwise.

build_text_artifact(text, artifact_id)

Helper to create a text artifact.

Parameters:

Name Type Description Default
text str

The text content for the artifact.

required
artifact_id str

The ID for the artifact.

required

Returns:

Type Description
Artifact

An Artifact object containing a single TextPart.

completed_task(task_id, context_id, artifacts, history=None)

Creates a Task object in the 'completed' state.

Useful for constructing a final Task representation when the agent finishes and produces artifacts.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
context_id str

The context ID of the task.

required
artifacts list[Artifact]

A list of Artifact objects produced by the task.

required
history list[Message] | None

An optional list of Message objects representing the task history.

None

Returns:

Type Description
Task

A Task object with status set to 'completed'.

create_task_obj(message_send_params)

Create a new task object from message send params.

Generates UUIDs for task and context IDs if they are not already present in the message.

Parameters:

Name Type Description Default
message_send_params MessageSendParams

The MessageSendParams object containing the initial message.

required

Returns:

Type Description
Task

A new Task object initialized with 'submitted' status and the input message in history.

get_message_text(message, delimiter='\n')

Extracts and joins all text content from a Message's parts.

Parameters:

Name Type Description Default
message Message

The Message object.

required
delimiter

The string to use when joining text from multiple TextParts.

'\n'

Returns:

Type Description
str

A single string containing all text content, or an empty string if no text parts are found.

get_text_parts(parts)

Extracts text content from all TextPart objects in a list of Parts.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects.

required

Returns:

Type Description
list[str]

A list of strings containing the text content from any TextPart objects found.

new_agent_parts_message(parts, context_id=None, task_id=None)

Creates a new agent message containing a list of Parts.

Parameters:

Name Type Description Default
parts list[Part]

The list of Part objects for the message content.

required
context_id str | None

The context ID for the message.

None
task_id str | None

The task ID for the message.

None
final

Optional boolean indicating if this is the final message.

required
metadata

Optional metadata for the message.

required

Returns:

Type Description

A new Message object with role 'agent'.

new_agent_text_message(text, context_id=None, task_id=None)

Creates a new agent message containing a single TextPart.

Parameters:

Name Type Description Default
text str

The text content of the message.

required
context_id str | None

The context ID for the message.

None
task_id str | None

The task ID for the message.

None
final

Optional boolean indicating if this is the final message.

required
metadata

Optional metadata for the message.

required

Returns:

Type Description
Message

A new Message object with role 'agent'.

new_artifact(parts, name, description='')

Creates a new Artifact object.

Parameters:

Name Type Description Default
parts list[Part]

The list of Part objects forming the artifact's content.

required
name str

The human-readable name of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

new_data_artifact(name, data, description='')

Creates a new Artifact object containing only a single DataPart.

Parameters:

Name Type Description Default
name str

The human-readable name of the artifact.

required
data dict[str, Any]

The structured data content of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

new_task(request)

Creates a new Task object from an initial user message.

Generates task and context IDs if not provided in the message.

Parameters:

Name Type Description Default
request Message

The initial Message object from the user.

required

Returns:

Type Description
Task

A new Task object initialized with 'submitted' status and the input message in history.

new_text_artifact(name, text, description='')

Creates a new Artifact object containing only a single TextPart.

Parameters:

Name Type Description Default
name str

The human-readable name of the artifact.

required
text str

The text content of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

artifact

Utility functions for creating A2A Artifact objects.

new_artifact(parts, name, description='')

Creates a new Artifact object.

Parameters:

Name Type Description Default
parts list[Part]

The list of Part objects forming the artifact's content.

required
name str

The human-readable name of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

new_data_artifact(name, data, description='')

Creates a new Artifact object containing only a single DataPart.

Parameters:

Name Type Description Default
name str

The human-readable name of the artifact.

required
data dict[str, Any]

The structured data content of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

new_text_artifact(name, text, description='')

Creates a new Artifact object containing only a single TextPart.

Parameters:

Name Type Description Default
name str

The human-readable name of the artifact.

required
text str

The text content of the artifact.

required
description str

An optional description of the artifact.

''

Returns:

Type Description
Artifact

A new Artifact object with a generated artifactId.

errors

Custom exceptions for A2A server-side errors.

A2AServerError

Bases: Exception

Base exception for A2A Server errors.

MethodNotImplementedError

Bases: A2AServerError

Exception raised for methods that are not implemented by the server handler.

message = message instance-attribute

ServerError

Bases: Exception

Wrapper exception for A2A or JSON-RPC errors originating from the server's logic.

This exception is used internally by request handlers and other server components to signal a specific error that should be formatted as a JSON-RPC error response.

error = error instance-attribute

helpers

General utility functions for the A2A Python SDK.

logger = logging.getLogger(__name__) module-attribute

append_artifact_to_task(task, event)

Helper method for updating a Task object with new artifact data from an event.

Handles creating the artifacts list if it doesn't exist, adding new artifacts, and appending parts to existing artifacts based on the append flag in the event.

Parameters:

Name Type Description Default
task Task

The Task object to modify.

required
event TaskArtifactUpdateEvent

The TaskArtifactUpdateEvent containing the artifact data.

required

are_modalities_compatible(server_output_modes, client_output_modes)

Checks if server and client output modalities (MIME types) are compatible.

Modalities are compatible if: 1. The client specifies no preferred output modes (client_output_modes is None or empty). 2. The server specifies no supported output modes (server_output_modes is None or empty). 3. There is at least one common modality between the server's supported list and the client's preferred list.

Parameters:

Name Type Description Default
server_output_modes list[str] | None

A list of MIME types supported by the server/agent for output. Can be None or empty if the server doesn't specify.

required
client_output_modes list[str] | None

A list of MIME types preferred by the client for output. Can be None or empty if the client accepts any.

required

Returns:

Type Description
bool

True if the modalities are compatible, False otherwise.

build_text_artifact(text, artifact_id)

Helper to create a text artifact.

Parameters:

Name Type Description Default
text str

The text content for the artifact.

required
artifact_id str

The ID for the artifact.

required

Returns:

Type Description
Artifact

An Artifact object containing a single TextPart.

create_task_obj(message_send_params)

Create a new task object from message send params.

Generates UUIDs for task and context IDs if they are not already present in the message.

Parameters:

Name Type Description Default
message_send_params MessageSendParams

The MessageSendParams object containing the initial message.

required

Returns:

Type Description
Task

A new Task object initialized with 'submitted' status and the input message in history.

validate(expression, error_message=None)

Decorator that validates if a given expression evaluates to True.

Typically used on class methods to check capabilities or configuration before executing the method's logic. If the expression is False, a ServerError with an UnsupportedOperationError is raised.

Parameters:

Name Type Description Default
expression Callable[[Any], bool]

A callable that takes the instance (self) as its argument and returns a boolean.

required
error_message str | None

An optional custom error message for the UnsupportedOperationError. If None, the string representation of the expression will be used.

None

message

Utility functions for creating and handling A2A Message objects.

get_message_text(message, delimiter='\n')

Extracts and joins all text content from a Message's parts.

Parameters:

Name Type Description Default
message Message

The Message object.

required
delimiter

The string to use when joining text from multiple TextParts.

'\n'

Returns:

Type Description
str

A single string containing all text content, or an empty string if no text parts are found.

get_text_parts(parts)

Extracts text content from all TextPart objects in a list of Parts.

Parameters:

Name Type Description Default
parts list[Part]

A list of Part objects.

required

Returns:

Type Description
list[str]

A list of strings containing the text content from any TextPart objects found.

new_agent_parts_message(parts, context_id=None, task_id=None)

Creates a new agent message containing a list of Parts.

Parameters:

Name Type Description Default
parts list[Part]

The list of Part objects for the message content.

required
context_id str | None

The context ID for the message.

None
task_id str | None

The task ID for the message.

None
final

Optional boolean indicating if this is the final message.

required
metadata

Optional metadata for the message.

required

Returns:

Type Description

A new Message object with role 'agent'.

new_agent_text_message(text, context_id=None, task_id=None)

Creates a new agent message containing a single TextPart.

Parameters:

Name Type Description Default
text str

The text content of the message.

required
context_id str | None

The context ID for the message.

None
task_id str | None

The task ID for the message.

None
final

Optional boolean indicating if this is the final message.

required
metadata

Optional metadata for the message.

required

Returns:

Type Description
Message

A new Message object with role 'agent'.

task

Utility functions for creating A2A Task objects.

completed_task(task_id, context_id, artifacts, history=None)

Creates a Task object in the 'completed' state.

Useful for constructing a final Task representation when the agent finishes and produces artifacts.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
context_id str

The context ID of the task.

required
artifacts list[Artifact]

A list of Artifact objects produced by the task.

required
history list[Message] | None

An optional list of Message objects representing the task history.

None

Returns:

Type Description
Task

A Task object with status set to 'completed'.

new_task(request)

Creates a new Task object from an initial user message.

Generates task and context IDs if not provided in the message.

Parameters:

Name Type Description Default
request Message

The initial Message object from the user.

required

Returns:

Type Description
Task

A new Task object initialized with 'submitted' status and the input message in history.

telemetry

OpenTelemetry Tracing Utilities for A2A Python SDK.

This module provides decorators to simplify the integration of OpenTelemetry tracing into Python applications. It offers trace_function for instrumenting individual functions (both synchronous and asynchronous) and trace_class for instrumenting multiple methods within a class.

The tracer is initialized with the module name and version defined by INSTRUMENTING_MODULE_NAME ('a2a-python-sdk') and INSTRUMENTING_MODULE_VERSION ('1.0.0').

Features: - Automatic span creation for decorated functions/methods. - Support for both synchronous and asynchronous functions. - Default span naming based on module and function/class/method name. - Customizable span names, kinds, and static attributes. - Dynamic attribute setting via an attribute_extractor callback. - Automatic recording of exceptions and setting of span status. - Selective method tracing in classes using include/exclude lists.

Usage

For a single function:

from your_module import trace_function


@trace_function
def my_function():
    # ...
    pass


@trace_function(span_name='custom.op', kind=SpanKind.CLIENT)
async def my_async_function():
    # ...
    pass

For a class:

from your_module import trace_class


@trace_class(exclude_list=['internal_method'])
class MyService:
    def public_api(self, user_id):
        # This method will be traced
        pass

    def internal_method(self):
        # This method will not be traced
        pass

INSTRUMENTING_MODULE_NAME = 'a2a-python-sdk' module-attribute

INSTRUMENTING_MODULE_VERSION = '1.0.0' module-attribute

SpanKind = _SpanKind module-attribute

logger = logging.getLogger(__name__) module-attribute

trace_class(include_list=None, exclude_list=None, kind=SpanKind.INTERNAL)

A class decorator to automatically trace specified methods of a class.

This decorator iterates over the methods of a class and applies the trace_function decorator to them, based on the include_list and exclude_list criteria. Methods starting or ending with double underscores (dunder methods, e.g., __init__, __call__) are always excluded by default.

Parameters:

Name Type Description Default
include_list list[str]

A list of method names to explicitly include for tracing. If provided, only methods in this list (that are not dunder methods) will be traced. Defaults to None (trace all non-dunder methods).

None
exclude_list list[str]

A list of method names to exclude from tracing. This is only considered if include_list is not provided. Dunder methods are implicitly excluded. Defaults to an empty list.

None
kind SpanKind

The opentelemetry.trace.SpanKind for the created spans on the methods. Defaults to SpanKind.INTERNAL.

INTERNAL

Returns:

Name Type Description
callable

A decorator function that, when applied to a class, modifies the class to wrap its specified methods with tracing.

Example

To trace all methods except 'internal_method':

@trace_class(exclude_list=['internal_method'])
class MyService:
    def public_api(self):
        pass

    def internal_method(self):
        pass

To trace only 'method_one' and 'method_two':

@trace_class(include_list=['method_one', 'method_two'])
class AnotherService:
    def method_one(self):
        pass

    def method_two(self):
        pass

    def not_traced_method(self):
        pass

trace_function(func=None, *, span_name=None, kind=SpanKind.INTERNAL, attributes=None, attribute_extractor=None)

A decorator to automatically trace a function call with OpenTelemetry.

This decorator can be used to wrap both sync and async functions. When applied, it creates a new span for each call to the decorated function. The span will record the execution time, status (OK or ERROR), and any exceptions that occur.

It can be used in two ways:

  1. As a direct decorator: @trace_function
  2. As a decorator factory to provide arguments: @trace_function(span_name="custom.name")

Parameters:

Name Type Description Default
func callable

The function to be decorated. If None, the decorator returns a partial function, allowing it to be called with arguments. Defaults to None.

None
span_name str

Custom name for the span. If None, it defaults to f'{func.__module__}.{func.__name__}'. Defaults to None.

None
kind SpanKind

The opentelemetry.trace.SpanKind for the created span. Defaults to SpanKind.INTERNAL.

INTERNAL
attributes dict

A dictionary of static attributes to be set on the span. Keys are attribute names (str) and values are the corresponding attribute values. Defaults to None.

None
attribute_extractor callable

A function that can be used to dynamically extract and set attributes on the span. It is called within a finally block, ensuring it runs even if the decorated function raises an exception. The function signature should be: attribute_extractor(span, args, kwargs, result, exception) where: - span : the OpenTelemetry Span object. - args : a tuple of positional arguments passed - kwargs : a dictionary of keyword arguments passed - result : return value (None if an exception occurred) - exception : exception object if raised (None otherwise). Any exception raised by the attribute_extractor itself will be caught and logged. Defaults to None.

None

Returns:

Name Type Description
callable

The wrapped function that includes tracing, or a partial decorator if func is None.