Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Assistants Streaming #367

Merged
merged 28 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ba32578
WIP Implement Assistants Streaming
EthanBarlo Apr 2, 2024
d51cbca
WIP Assistants Streaming: Fixed typo in file name
EthanBarlo Apr 2, 2024
61b589d
WIP Assistants Streaming: Updated types
EthanBarlo Apr 2, 2024
a86cd96
WIP Assistants Streaming: Fixed errors
EthanBarlo Apr 2, 2024
d486e0e
WIP Assistants Streaming: Updated readme to include a reference for a…
EthanBarlo Apr 2, 2024
f820adb
Revert "WIP Assistants Streaming: Updated readme to include a referen…
EthanBarlo Apr 2, 2024
d92d464
Assistants Streaming: Undo formatting of readme file
EthanBarlo Apr 2, 2024
4a7821c
Assistants Stream: Resolved type issue with streaming tool calls
EthanBarlo Apr 2, 2024
e97b65d
Assistants Stream: Added stream to submitToolOutput
EthanBarlo Apr 2, 2024
9c1b567
Assistant Streaming: Updated readme with an example of streaming and …
EthanBarlo Apr 2, 2024
9b78e7b
Resolved incorrect import within ThreadMessageDeltaResponseContentIma…
EthanBarlo Apr 3, 2024
3e92122
Assistants Streaming | Added CreateThreadAndRun streaming support
EthanBarlo Apr 3, 2024
1720983
Assistants Streaming | Updated ThreadRunsContract.php to contain new …
EthanBarlo Apr 3, 2024
2f67d0f
Assistants Streaming | Fixed `createAndRunStreamed` using the wrong t…
EthanBarlo Apr 3, 2024
123fa77
Assistants Streaming | Updated Readme & Fixed some errors
EthanBarlo Apr 16, 2024
256e5bd
Assistants Streaming | Resolved incorrect toArray method within Threa…
EthanBarlo Apr 16, 2024
86b586e
Assistants Streaming | Resolved missing 'use' imports within ThreadMe…
EthanBarlo Apr 22, 2024
e9477f2
lint
gehrisandro Apr 24, 2024
738a671
run rector
gehrisandro Apr 24, 2024
c353543
refactor to use existing StreamResponse
gehrisandro Apr 24, 2024
c503621
rename thread run stream response
gehrisandro Apr 24, 2024
21a49ed
introduce UnknownEventException
gehrisandro Apr 24, 2024
abd0f6c
assistants streaming: Fix existing tests
knash94 May 8, 2024
57b8123
Merge pull request #1 from knash94/Implement-Assistants-Streaming
EthanBarlo May 8, 2024
89d4051
fix annotations
gehrisandro May 16, 2024
bb9749b
Merge remote-tracking branch 'github-desktop-EthanBarlo/Implement-Ass…
gehrisandro May 16, 2024
90e8085
fixes after merge
gehrisandro May 16, 2024
c798d28
Merge branch 'refs/heads/main' into pr/367
gehrisandro May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,82 @@ $response->metadata; // []
$response->toArray(); // ['id' => 'run_4RCYyYzX9m41WQicoJtUQAb8', ...]
```

#### `create streamed`

Creates a streamed run.

[OpenAI Assistant Events](https://platform.openai.com/docs/api-reference/assistants-streaming/events)

```php
$stream = $client->threads()->runs()->createStreamed(
threadId: 'thread_tKFLqzRN9n7MnyKKvc1Q7868',
parameters: [
'assistant_id' => 'asst_gxzBkD1wkKEloYqZ410pT5pd',
],
);

foreach($stream as $response){
$response->event // 'thread.run.created' | 'thread.run.in_progress' | .....
$response->data // ThreadResponse | ThreadRunResponse | ThreadRunStepResponse | ThreadRunStepDeltaResponse | ThreadMessageResponse | ThreadMessageDeltaResponse
}

// ...
```

#### `create streamed with function calls`

Creates a streamed run with function calls

[OpenAI Assistant Events](https://platform.openai.com/docs/api-reference/assistants-streaming/events)

```php
$stream = $client->threads()->runs()->createStreamed(
threadId: 'thread_tKFLqzRN9n7MnyKKvc1Q7868',
parameters: [
'assistant_id' => 'asst_gxzBkD1wkKEloYqZ410pT5pd',
],
);


do{
foreach($stream as $response){
$response->event // 'thread.run.created' | 'thread.run.in_progress' | .....
$response->data // ThreadResponse | ThreadRunResponse | ThreadRunStepResponse | ThreadRunStepDeltaResponse | ThreadMessageResponse | ThreadMessageDeltaResponse

switch($response->event){
case 'thread.run.created':
case 'thread.run.queued':
case 'thread.run.completed':
case 'thread.run.cancelling':
$run = $response->data;
break;
case 'thread.run.expired':
case 'thread.run.cancelled':
case 'thread.run.failed':
$run = $response->data;
break 3;
case 'thread.run.requires_action':
// Overwrite the stream with the new stream started by submitting the tool outputs
$stream = $client->threads()->runs()->submitToolOutputsStreamed(
threadId: $run->threadId,
runId: $run->id,
parameters: [
'tool_outputs' => [
[
'tool_call_id' => 'call_KSg14X7kZF2WDzlPhpQ168Mj',
'output' => '12',
]
],
]
);
break;
}
}
} while ($run->status != "completed")

// ...
```

#### `retrieve`

Retrieves a run.
Expand Down
12 changes: 12 additions & 0 deletions src/Contracts/Resources/ThreadsContract.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace OpenAI\Contracts\Resources;

use OpenAI\Responses\StreamResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunStreamResponse;
use OpenAI\Responses\Threads\ThreadDeleteResponse;
use OpenAI\Responses\Threads\ThreadResponse;

Expand All @@ -26,6 +28,16 @@ public function create(array $parameters): ThreadResponse;
*/
public function createAndRun(array $parameters): ThreadRunResponse;

/**
* Create a thread and run it in one request, returning a stream.
*
* @see https://platform.openai.com/docs/api-reference/runs/createThreadAndRun
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function createAndRunStreamed(array $parameters): StreamResponse;

/**
* Retrieves a thread.
*
Expand Down
23 changes: 23 additions & 0 deletions src/Contracts/Resources/ThreadsRunsContract.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

namespace OpenAI\Contracts\Resources;

use OpenAI\Responses\StreamResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunListResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunStreamResponse;

interface ThreadsRunsContract
{
Expand All @@ -16,6 +18,16 @@ interface ThreadsRunsContract
*/
public function create(string $threadId, array $parameters): ThreadRunResponse;

/**
* Create a streamed run.
*
* @see https://platform.openai.com/docs/api-reference/runs/createRun
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function createStreamed(string $threadId, array $parameters): StreamResponse;

/**
* Retrieves a run.
*
Expand All @@ -41,6 +53,17 @@ public function modify(string $threadId, string $runId, array $parameters): Thre
*/
public function submitToolOutputs(string $threadId, string $runId, array $parameters): ThreadRunResponse;

/**
* This endpoint can be used to submit the outputs from the tool calls once they're all completed.
* And stream back the response
*
* @see https://platform.openai.com/docs/api-reference/runs/submitToolOutputs
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function submitToolOutputsStreamed(string $threadId, string $runId, array $parameters): StreamResponse;

/**
* Cancels a run that is `in_progress`.
*
Expand Down
11 changes: 11 additions & 0 deletions src/Exceptions/UnknownEventException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace OpenAI\Exceptions;

use Exception;

final class UnknownEventException extends Exception
{
}
22 changes: 22 additions & 0 deletions src/Resources/Threads.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
use OpenAI\Contracts\Resources\ThreadsContract;
use OpenAI\Contracts\Resources\ThreadsMessagesContract;
use OpenAI\Contracts\Resources\ThreadsRunsContract;
use OpenAI\Responses\StreamResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunStreamResponse;
use OpenAI\Responses\Threads\ThreadDeleteResponse;
use OpenAI\Responses\Threads\ThreadResponse;
use OpenAI\ValueObjects\Transporter\Payload;
use OpenAI\ValueObjects\Transporter\Response;

final class Threads implements ThreadsContract
{
use Concerns\Streamable;
use Concerns\Transportable;

/**
Expand Down Expand Up @@ -51,6 +54,25 @@ public function createAndRun(array $parameters): ThreadRunResponse
return ThreadRunResponse::from($response->data(), $response->meta());
}

/**
* Create a thread and run it in one request, returning a stream.
*
* @see https://platform.openai.com/docs/api-reference/runs/createThreadAndRun
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function createAndRunStreamed(array $parameters): StreamResponse
{
$parameters = $this->setStreamParameter($parameters);

$payload = Payload::create('threads/runs', $parameters);

$response = $this->transporter->requestStream($payload);

return new StreamResponse(ThreadRunStreamResponse::class, $response);
}

/**
* Retrieves a thread.
*
Expand Down
42 changes: 42 additions & 0 deletions src/Resources/ThreadsRuns.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

use OpenAI\Contracts\Resources\ThreadsRunsContract;
use OpenAI\Contracts\Resources\ThreadsRunsStepsContract;
use OpenAI\Responses\StreamResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunListResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunResponse;
use OpenAI\Responses\Threads\Runs\ThreadRunStreamResponse;
use OpenAI\ValueObjects\Transporter\Payload;
use OpenAI\ValueObjects\Transporter\Response;

final class ThreadsRuns implements ThreadsRunsContract
{
use Concerns\Streamable;
use Concerns\Transportable;

/**
Expand All @@ -32,6 +35,25 @@ public function create(string $threadId, array $parameters): ThreadRunResponse
return ThreadRunResponse::from($response->data(), $response->meta());
}

/**
* Creates a streamed run
*
* @see https://platform.openai.com/docs/api-reference/runs/createRun
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function createStreamed(string $threadId, array $parameters): StreamResponse
{
$parameters = $this->setStreamParameter($parameters);

$payload = Payload::create('threads/'.$threadId.'/runs', $parameters);

$response = $this->transporter->requestStream($payload);

return new StreamResponse(ThreadRunStreamResponse::class, $response);
}

/**
* Retrieves a run.
*
Expand Down Expand Up @@ -81,6 +103,26 @@ public function submitToolOutputs(string $threadId, string $runId, array $parame
return ThreadRunResponse::from($response->data(), $response->meta());
}

/**
* This endpoint can be used to submit the outputs from the tool calls once they're all completed.
* And stream back the response
*
* @see https://platform.openai.com/docs/api-reference/runs/submitToolOutputs
*
* @param array<string, mixed> $parameters
* @return StreamResponse<ThreadRunStreamResponse>
*/
public function submitToolOutputsStreamed(string $threadId, string $runId, array $parameters): StreamResponse
{
$parameters = $this->setStreamParameter($parameters);

$payload = Payload::create('threads/'.$threadId.'/runs/'.$runId.'/submit_tool_outputs', $parameters);

$response = $this->transporter->requestStream($payload);

return new StreamResponse(ThreadRunStreamResponse::class, $response);
}

/**
* Cancels a run that is `in_progress`.
*
Expand Down
4 changes: 2 additions & 2 deletions src/Resources/ThreadsRunsSteps.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function retrieve(string $threadId, string $runId, string $stepId): Threa
{
$payload = Payload::retrieve('threads/'.$threadId.'/runs/'.$runId.'/steps', $stepId);

/** @var Response<array{id: string, object: string, created_at: int, thread_id: string, assistant_id: string, run_id: string, type: string, status: string, step_details: array{type: 'tool_calls', tool_calls: array<int, array{id: string, type: 'code_interpreter', code_interpreter: array{input: string, outputs: array<int, array{type: 'image', image: array{file_id: string}}|array{type: 'logs', logs: string}>}}|array{id: string, type: 'retrieval', retrieval: array<string, string>}|array{id: string, type: 'function', function: array{name: string, arguments: string, output?: ?string}}>}|array{type: 'message_creation', message_creation: array{message_id: string}}, last_error: ?array{code: string, message: string}, expires_at: ?int, cancelled_at: ?int, failed_at: ?int, completed_at: ?int, metadata?: array<string, string>}> $response */
/** @var Response<array{id: string, object: string, created_at: int, thread_id: string, assistant_id: string, run_id: string, type: string, status: string, step_details: array{type: 'tool_calls', tool_calls: array<int, array{id?: string, type: 'code_interpreter', code_interpreter: array{input: string, outputs: array<int, array{type: 'image', image: array{file_id: string}}|array{type: 'logs', logs: string}>}}|array{id: string, type: 'retrieval', retrieval: array<string, string>}|array{id: string, type: 'function', function: array{name: string, arguments: string, output?: ?string}}>}|array{type: 'message_creation', message_creation: array{message_id: string}}, last_error: ?array{code: string, message: string}, expires_at: ?int, cancelled_at: ?int, failed_at: ?int, completed_at: ?int, metadata?: array<string, string>}> $response */
$response = $this->transporter->requestObject($payload);

return ThreadRunStepResponse::from($response->data(), $response->meta());
Expand All @@ -40,7 +40,7 @@ public function list(string $threadId, string $runId, array $parameters = []): T
{
$payload = Payload::list('threads/'.$threadId.'/runs/'.$runId.'/steps', $parameters);

/** @var Response<array{object: string, data: array<int, array{id: string, object: string, created_at: int, thread_id: string, assistant_id: string, run_id: string, type: string, status: string, step_details: array{type: 'tool_calls', tool_calls: array<int, array{id: string, type: 'code_interpreter', code_interpreter: array{input: string, outputs: array<int, array{type: 'image', image: array{file_id: string}}|array{type: 'logs', logs: string}>}}|array{id: string, type: 'retrieval', retrieval: array<string, string>}|array{id: string, type: 'function', function: array{name: string, arguments: string, output: ?string}}>}|array{type: 'message_creation', message_creation: array{message_id: string}}, last_error: ?array{code: string, message: string}, expires_at: ?int, cancelled_at: ?int, failed_at: ?int, completed_at: ?int, metadata?: array<string, string>}>, first_id: ?string, last_id: ?string, has_more: bool}> $response */
/** @var Response<array{object: string, data: array<int, array{id: string, object: string, created_at: int, thread_id: string, assistant_id: string, run_id: string, type: string, status: string, step_details: array{type: 'tool_calls', tool_calls: array<int, array{id?: string, type: 'code_interpreter', code_interpreter: array{input: string, outputs: array<int, array{type: 'image', image: array{file_id: string}}|array{type: 'logs', logs: string}>}}|array{id: string, type: 'retrieval', retrieval: array<string, string>}|array{id: string, type: 'function', function: array{name: string, arguments: string, output: ?string}}>}|array{type: 'message_creation', message_creation: array{message_id: string}}, last_error: ?array{code: string, message: string}, expires_at: ?int, cancelled_at: ?int, failed_at: ?int, completed_at: ?int, metadata?: array<string, string>}>, first_id: ?string, last_id: ?string, has_more: bool}> $response */
$response = $this->transporter->requestObject($payload);

return ThreadRunStepListResponse::from($response->data(), $response->meta());
Expand Down
11 changes: 11 additions & 0 deletions src/Responses/StreamResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public function getIterator(): Generator
while (! $this->response->getBody()->eof()) {
$line = $this->readLine($this->response->getBody());

$event = null;
if (str_starts_with($line, 'event:')) {
$event = trim(substr($line, strlen('event:')));
$line = $this->readLine($this->response->getBody());
}

if (! str_starts_with($line, 'data:')) {
continue;
}
Expand All @@ -54,6 +60,11 @@ public function getIterator(): Generator
throw new ErrorException($response['error']);
}

if ($event !== null) {
$response['__event'] = $event;
$response['__meta'] = $this->meta();
}

yield $this->responseClass::from($response);
}
}
Expand Down
67 changes: 67 additions & 0 deletions src/Responses/Threads/Messages/Delta/ThreadMessageDeltaObject.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);

namespace OpenAI\Responses\Threads\Messages\Delta;

use OpenAI\Contracts\ResponseContract;
use OpenAI\Responses\Concerns\ArrayAccessible;
use OpenAI\Testing\Responses\Concerns\Fakeable;

/**
* @implements ResponseContract<array{role: ?string, content: array<int, array{index: int, type: 'image_file', image_file: array{file_id: string}}|array{index: int, type: 'text', text: array{value: string, annotations: array<int, array{type: 'file_citation', text: string, file_citation: array{file_id: string, quote: string}, start_index: int, end_index: int}|array{type: 'file_path', text: string, file_path: array{file_id: string}, start_index: int, end_index: int}>}}>, file_ids: ?array<int, string>}>
*/
final class ThreadMessageDeltaObject implements ResponseContract
{
/**
* @use ArrayAccessible<array{role: ?string, content: array<int, array{index: int, type: 'image_file', image_file: array{file_id: string}}|array{index: int, type: 'text', text: array{value: string, annotations: array<int, array{type: 'file_citation', text: string, file_citation: array{file_id: string, quote: string}, start_index: int, end_index: int}|array{type: 'file_path', text: string, file_path: array{file_id: string}, start_index: int, end_index: int}>}}>, file_ids: ?array<int, string>}>
*/
use ArrayAccessible;

use Fakeable;

/**
* @param array<int, ThreadMessageDeltaResponseContentImageFileObject|ThreadMessageDeltaResponseContentTextObject> $content
* @param array<int, string>|null $fileIds
*/
private function __construct(
public ?string $role,
public array $content,
public ?array $fileIds,
) {
}

/**
* Acts as static factory, and returns a new Response instance.
*
* @param array{role?: string, content: array<int, array{index: int, type: 'image_file', image_file: array{file_id: string}}|array{index: int, type: 'text', text: array{value: string, annotations: array<int, array{type: 'file_citation', text: string, file_citation: array{file_id: string, quote: string}, start_index: int, end_index: int}|array{type: 'file_path', text: string, file_path: array{file_id: string}, start_index: int, end_index: int}>}}>, file_ids?: array<int, string>} $attributes
*/
public static function from(array $attributes): self
{
$content = array_map(
fn (array $content): ThreadMessageDeltaResponseContentTextObject|ThreadMessageDeltaResponseContentImageFileObject => match ($content['type']) {
'text' => ThreadMessageDeltaResponseContentTextObject::from($content),
EthanBarlo marked this conversation as resolved.
Show resolved Hide resolved
'image_file' => ThreadMessageDeltaResponseContentImageFileObject::from($content),
},
$attributes['content'],
);

return new self(
$attributes['role'] ?? null,
$content,
$attributes['file_ids'] ?? null,
);
}

/**
* {@inheritDoc}
*/
public function toArray(): array
{
return [
'role' => $this->role,
'content' => array_map(fn (ThreadMessageDeltaResponseContentImageFileObject|ThreadMessageDeltaResponseContentTextObject $content): array => $content->toArray(), $this->content),
'file_ids' => $this->fileIds,
];
}
}
Loading
Loading