Skip to content

Feat: separate tool_call_item and tool_call_output_item in stream events #974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,12 +907,12 @@ async def run_single_output_guardrail(
return result

@classmethod
def stream_step_result_to_queue(
def stream_step_items_to_queue(
cls,
step_result: SingleStepResult,
new_step_items: list[RunItem],
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
for item in step_result.new_step_items:
for item in new_step_items:
if isinstance(item, MessageOutputItem):
event = RunItemStreamEvent(item=item, name="message_output_created")
elif isinstance(item, HandoffCallItem):
Expand All @@ -937,6 +937,14 @@ def stream_step_result_to_queue(
if event:
queue.put_nowait(event)

@classmethod
def stream_step_result_to_queue(
cls,
step_result: SingleStepResult,
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
cls.stream_step_items_to_queue(step_result.new_step_items, queue)

@classmethod
async def _check_for_final_output_from_tools(
cls,
Expand Down
59 changes: 53 additions & 6 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,10 +854,9 @@ async def _run_single_turn_streamed(
raise ModelBehaviorError("Model did not produce a final response!")

# 3. Now, we can process the turn as we do in the non-streaming case
single_step_result = await cls._get_single_step_result_from_response(
return await cls._get_single_step_result_from_streamed_response(
agent=agent,
original_input=streamed_result.input,
pre_step_items=streamed_result.new_items,
streamed_result=streamed_result,
new_response=final_response,
output_schema=output_schema,
all_tools=all_tools,
Expand All @@ -868,9 +867,6 @@ async def _run_single_turn_streamed(
tool_use_tracker=tool_use_tracker,
)

RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
return single_step_result

@classmethod
async def _run_single_turn(
cls,
Expand Down Expand Up @@ -973,6 +969,57 @@ async def _get_single_step_result_from_response(
run_config=run_config,
)

@classmethod
async def _get_single_step_result_from_streamed_response(
cls,
*,
agent: Agent[TContext],
all_tools: list[Tool],
streamed_result: RunResultStreaming,
new_response: ModelResponse,
output_schema: AgentOutputSchemaBase | None,
handoffs: list[Handoff],
hooks: RunHooks[TContext],
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
tool_use_tracker: AgentToolUseTracker,
) -> SingleStepResult:

original_input = streamed_result.input
pre_step_items = streamed_result.new_items
event_queue = streamed_result._event_queue

processed_response = RunImpl.process_model_response(
agent=agent,
all_tools=all_tools,
response=new_response,
output_schema=output_schema,
handoffs=handoffs,
)
new_items_processed_response = processed_response.new_items
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue)

single_step_result = await RunImpl.execute_tools_and_side_effects(
agent=agent,
original_input=original_input,
pre_step_items=pre_step_items,
new_response=new_response,
processed_response=processed_response,
output_schema=output_schema,
hooks=hooks,
context_wrapper=context_wrapper,
run_config=run_config,
)
new_step_items = [
item
for item in single_step_result.new_step_items
if item not in new_items_processed_response
]
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)

return single_step_result

@classmethod
async def _run_input_guardrails(
cls,
Expand Down
66 changes: 66 additions & 0 deletions tests/test_stream_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
import time

import pytest

from agents import Agent, ItemHelpers, Runner, function_tool

from .fake_model import FakeModel
from .test_responses import get_function_tool_call, get_text_message


@function_tool
async def foo() -> str:
await asyncio.sleep(3)
return "success!"

@pytest.mark.asyncio
async def test_stream_events_main():
model = FakeModel()
agent = Agent(
name="Joker",
model=model,
tools=[foo],
)

model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[
get_text_message("a_message"),
get_function_tool_call("foo", ""),
],
# Second turn: text message
[get_text_message("done")],
]
)

result = Runner.run_streamed(
agent,
input="Hello",
)
print("=== Run starting ===")
tool_call_start_time = -1
tool_call_end_time = -1
async for event in result.stream_events():
# We'll ignore the raw responses event deltas
if event.type == "raw_response_event":
continue
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
tool_call_start_time = time.time_ns()
print(f"-- Tool was called at {tool_call_start_time}")
elif event.item.type == "tool_call_output_item":
tool_call_end_time = time.time_ns()
print(f"-- Tool output: {event.item.output} at {tool_call_end_time}")
elif event.item.type == "message_output_item":
print(
f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}"
)

print("=== Run complete ===")
assert tool_call_start_time > 0, "tool_call_item was not observed"
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"