7. Streaming & Multi-Turn Interactions (LangGraph Example)¶
The Helloworld example demonstrates the basic mechanics of A2A. For more advanced features like robust streaming, task state management, and multi-turn conversations powered by an LLM, we'll turn to the LangGraph example located in a2a-samples/samples/python/agents/langgraph/
.
This example features a "Currency Agent" that uses the Gemini model via LangChain and LangGraph to answer currency conversion questions.
Setting up the LangGraph Example¶
-
Create a Gemini API Key, if you don't already have one.
-
Environment Variable:
Create a
.env
file in thea2a-samples/samples/python/agents/langgraph/
directory:Replace
YOUR_API_KEY_HERE
with your actual Gemini API key. -
Install Dependencies (if not already covered):
The
langgraph
example has its ownpyproject.toml
which includes dependencies likelangchain-google-genai
andlanggraph
. When you installed the SDK from thea2a-samples
root usingpip install -e .[dev]
, this should have also installed the dependencies for the workspace examples, includinglanggraph-example
. If you encounter import errors, ensure your primary SDK installation from the root directory was successful.
Running the LangGraph Server¶
Navigate to the a2a-samples/samples/python/agents/langgraph/
directory in your terminal and ensure your virtual environment (from the SDK root) is activated.
Start the LangGraph agent server:
This will start the server, usually on http://localhost:10000
.
Interacting with the LangGraph Agent¶
Open a new terminal window, activate your virtual environment, and navigate to a2a-samples/samples/python/agents/langgraph/
.
Run its test client:
Now, you can shut down the server by typing Ctrl+C in the terminal window where __main__.py
is running.
Key Features Demonstrated¶
The langgraph
example showcases several important A2A concepts:
-
LLM Integration:
agent.py
definesCurrencyAgent
. It usesChatGoogleGenerativeAI
and LangGraph'screate_react_agent
to process user queries.- This demonstrates how a real LLM can power the agent's logic.
-
Task State Management:
-
samples/langgraph/__main__.py
initializes aDefaultRequestHandler
with anInMemoryTaskStore
.httpx_client = httpx.AsyncClient() request_handler = DefaultRequestHandler( agent_executor=CurrencyAgentExecutor(), task_store=InMemoryTaskStore(), push_notifier=InMemoryPushNotifier(httpx_client), ) server = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler ) uvicorn.run(server.build(), host=host, port=port)
-
The
CurrencyAgentExecutor
(insamples/langgraph/agent_executor.py
), when itsexecute
method is called by theDefaultRequestHandler
, interacts with theRequestContext
which contains the current task (if any). - For
message/send
, theDefaultRequestHandler
uses theTaskStore
to persist and retrieve task state across interactions. The response tomessage/send
will be a fullTask
object if the agent's execution flow involves multiple steps or results in a persistent task. - The
test_client.py
'srun_single_turn_test
demonstrates getting aTask
object back and then querying it usingget_task
.
-
-
Streaming with
TaskStatusUpdateEvent
andTaskArtifactUpdateEvent
:- The
execute
method inCurrencyAgentExecutor
is responsible for handling both non-streaming and streaming requests, orchestrated by theDefaultRequestHandler
. - As the LangGraph agent processes the request (which might involve calling tools like
get_exchange_rate
), theCurrencyAgentExecutor
enqueues different types of events onto theEventQueue
:TaskStatusUpdateEvent
: For intermediate updates (e.g., "Looking up exchange rates...", "Processing the exchange rates.."). Thefinal
flag on these events isFalse
.TaskArtifactUpdateEvent
: When the final answer is ready, it's enqueued as an artifact. ThelastChunk
flag isTrue
.- A final
TaskStatusUpdateEvent
withstate=TaskState.completed
andfinal=True
is sent to signify the end of the task for streaming.
- The
test_client.py
'srun_streaming_test
function will print these individual event chunks as they are received from the server.
- The
-
Multi-Turn Conversation (
TaskState.input_required
):- The
CurrencyAgent
can ask for clarification if a query is ambiguous (e.g., user asks "how much is 100 USD?"). - When this happens, the
CurrencyAgentExecutor
will enqueue aTaskStatusUpdateEvent
wherestatus.state
isTaskState.input_required
andstatus.message
contains the agent's question (e.g., "To which currency would you like to convert?"). This event will havefinal=True
for the current interaction stream. - The
test_client.py
'srun_multi_turn_test
function demonstrates this:- It sends an initial ambiguous query.
- The agent responds (via the
DefaultRequestHandler
processing the enqueued events) with aTask
whose status isinput_required
. - The client then sends a second message, including the
taskId
andcontextId
from the first turn'sTask
response, to provide the missing information ("in GBP"). This continues the same task.
- The
Exploring the Code¶
Take some time to look through these files:
__main__.py
: Server setup usingA2AStarletteApplication
andDefaultRequestHandler
. Note theAgentCard
definition includescapabilities.streaming=True
.agent.py
: TheCurrencyAgent
with LangGraph, LLM model, and tool definitions.agent_executor.py
: TheCurrencyAgentExecutor
implementing theexecute
(andcancel
) method. It uses theRequestContext
to understand the ongoing task and theEventQueue
to send back various events (TaskStatusUpdateEvent
,TaskArtifactUpdateEvent
, newTask
object implicitly via the first event if no task exists).test_client.py
: Demonstrates various interaction patterns, including retrieving task IDs and using them for multi-turn conversations.
This example provides a much richer illustration of how A2A facilitates complex, stateful, and asynchronous interactions between agents.