From 5daa8e1834d93d8bfbb319bf9ac1d6bc0d9c970d Mon Sep 17 00:00:00 2001 From: lilei <954649067@qq.com> Date: Mon, 30 Jun 2025 16:08:17 +0800 Subject: [PATCH] Feat: separate tool_call_item and tool_call_output_item in stream events --- src/agents/_run_impl.py | 14 ++++++-- src/agents/run.py | 59 +++++++++++++++++++++++++++++---- tests/test_stream_events.py | 66 +++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 9 deletions(-) create mode 100644 tests/test_stream_events.py diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index 4ac8b316..b91f7817 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -907,12 +907,12 @@ async def run_single_output_guardrail( return result @classmethod - def stream_step_result_to_queue( + def stream_step_items_to_queue( cls, - step_result: SingleStepResult, + new_step_items: list[RunItem], queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel], ): - for item in step_result.new_step_items: + for item in new_step_items: if isinstance(item, MessageOutputItem): event = RunItemStreamEvent(item=item, name="message_output_created") elif isinstance(item, HandoffCallItem): @@ -937,6 +937,14 @@ def stream_step_result_to_queue( if event: queue.put_nowait(event) + @classmethod + def stream_step_result_to_queue( + cls, + step_result: SingleStepResult, + queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel], + ): + cls.stream_step_items_to_queue(step_result.new_step_items, queue) + @classmethod async def _check_for_final_output_from_tools( cls, diff --git a/src/agents/run.py b/src/agents/run.py index e5f9378e..33c8aee2 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -854,10 +854,9 @@ async def _run_single_turn_streamed( raise ModelBehaviorError("Model did not produce a final response!") # 3. Now, we can process the turn as we do in the non-streaming case - single_step_result = await cls._get_single_step_result_from_response( + return await cls._get_single_step_result_from_streamed_response( agent=agent, - original_input=streamed_result.input, - pre_step_items=streamed_result.new_items, + streamed_result=streamed_result, new_response=final_response, output_schema=output_schema, all_tools=all_tools, @@ -868,9 +867,6 @@ async def _run_single_turn_streamed( tool_use_tracker=tool_use_tracker, ) - RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) - return single_step_result - @classmethod async def _run_single_turn( cls, @@ -973,6 +969,57 @@ async def _get_single_step_result_from_response( run_config=run_config, ) + @classmethod + async def _get_single_step_result_from_streamed_response( + cls, + *, + agent: Agent[TContext], + all_tools: list[Tool], + streamed_result: RunResultStreaming, + new_response: ModelResponse, + output_schema: AgentOutputSchemaBase | None, + handoffs: list[Handoff], + hooks: RunHooks[TContext], + context_wrapper: RunContextWrapper[TContext], + run_config: RunConfig, + tool_use_tracker: AgentToolUseTracker, + ) -> SingleStepResult: + + original_input = streamed_result.input + pre_step_items = streamed_result.new_items + event_queue = streamed_result._event_queue + + processed_response = RunImpl.process_model_response( + agent=agent, + all_tools=all_tools, + response=new_response, + output_schema=output_schema, + handoffs=handoffs, + ) + new_items_processed_response = processed_response.new_items + tool_use_tracker.add_tool_use(agent, processed_response.tools_used) + RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue) + + single_step_result = await RunImpl.execute_tools_and_side_effects( + agent=agent, + original_input=original_input, + pre_step_items=pre_step_items, + new_response=new_response, + processed_response=processed_response, + output_schema=output_schema, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + ) + new_step_items = [ + item + for item in single_step_result.new_step_items + if item not in new_items_processed_response + ] + RunImpl.stream_step_items_to_queue(new_step_items, event_queue) + + return single_step_result + @classmethod async def _run_input_guardrails( cls, diff --git a/tests/test_stream_events.py b/tests/test_stream_events.py new file mode 100644 index 00000000..45716473 --- /dev/null +++ b/tests/test_stream_events.py @@ -0,0 +1,66 @@ +import asyncio +import time + +import pytest + +from agents import Agent, ItemHelpers, Runner, function_tool + +from .fake_model import FakeModel +from .test_responses import get_function_tool_call, get_text_message + + +@function_tool +async def foo() -> str: + await asyncio.sleep(3) + return "success!" + +@pytest.mark.asyncio +async def test_stream_events_main(): + model = FakeModel() + agent = Agent( + name="Joker", + model=model, + tools=[foo], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: a message and tool call + [ + get_text_message("a_message"), + get_function_tool_call("foo", ""), + ], + # Second turn: text message + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed( + agent, + input="Hello", + ) + print("=== Run starting ===") + tool_call_start_time = -1 + tool_call_end_time = -1 + async for event in result.stream_events(): + # We'll ignore the raw responses event deltas + if event.type == "raw_response_event": + continue + elif event.type == "agent_updated_stream_event": + print(f"Agent updated: {event.new_agent.name}") + elif event.type == "run_item_stream_event": + if event.item.type == "tool_call_item": + tool_call_start_time = time.time_ns() + print(f"-- Tool was called at {tool_call_start_time}") + elif event.item.type == "tool_call_output_item": + tool_call_end_time = time.time_ns() + print(f"-- Tool output: {event.item.output} at {tool_call_end_time}") + elif event.item.type == "message_output_item": + print( + f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}" + ) + + print("=== Run complete ===") + assert tool_call_start_time > 0, "tool_call_item was not observed" + assert tool_call_end_time > 0, "tool_call_output_item was not observed" + assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"