Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new cli for batch jobs #754

Merged
merged 7 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions instructor/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from typing import Literal, Any, Union
from pydantic import BaseModel, Field
from instructor.process_response import handle_response_model
import uuid

openai_models = Literal[
"gpt-4o",
"gpt-4-turbo",
"gpt-4",
"gpt-4-32k",
"gpt-3.5-turbo",
"gpt-3.5-turbo-16k",
"gpt-4-turbo-preview",
"gpt-4-vision-preview",
"gpt-4-turbo-2024-04-09",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-32k-0613",
"gpt-3.5-turbo-0301",
"gpt-3.5-turbo-16k-0613",
"gpt-3.5-turbo-1106",
"gpt-3.5-turbo-0613",
]


class Function(BaseModel):
name: str
description: str
parameters: Any


class Tool(BaseModel):
type: str
function: Function


class RequestBody(BaseModel):
model: Union[openai_models, str]
messages: list[dict[str, Any]]
max_tokens: int = Field(default=1000)
tools: list[Tool]


class BatchModel(BaseModel):
custom_id: str
method: Literal["POST"]
url: Literal["/v1/chat/completions"]
body: RequestBody


class BatchJob:
@classmethod
def create_from_messages(
cls,
messages_batch: list[list[dict[str, Any]]],
jxnl marked this conversation as resolved.
Show resolved Hide resolved
model: Union[openai_models, str],
response_model: type[BaseModel],
max_tokens: int = 1000,
):
_, tools = handle_response_model(response_model=response_model)
return [
BatchModel(
custom_id=str(uuid.uuid4()),
method="POST",
url="/v1/chat/completions",
body=RequestBody(
model=model,
max_tokens=max_tokens,
messages=messages,
**tools,
),
).model_dump_json()
ivanleomk marked this conversation as resolved.
Show resolved Hide resolved
for messages in messages_batch
]
105 changes: 105 additions & 0 deletions instructor/cli/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from rich.console import Console
from rich.table import Table
from rich.live import Live
from openai import OpenAI
from openai.types.batch import Batch
import typer
import datetime
import time
from typing import Optional

client = OpenAI()
app = typer.Typer()

console = Console()


def generate_table(batch_jobs: list[Batch]):
table = Table(
ivanleomk marked this conversation as resolved.
Show resolved Hide resolved
title="OpenAI Batch Jobs",
)

table.add_column("Batch ID", style="dim")
table.add_column("Created At")
table.add_column("Status")
table.add_column("Failed")
table.add_column("Completed")
table.add_column("Total")

for batch_job in batch_jobs:
table.add_row(
batch_job.id,
str(datetime.datetime.fromtimestamp(batch_job.created_at)),
batch_job.status,
str(batch_job.request_counts.failed), # type: ignore
str(batch_job.request_counts.completed), # type: ignore
str(batch_job.request_counts.total), # type: ignore
)

return table


def get_jobs(after: Optional[str], limit: int = 10):
if not after:
return client.batches.list(limit=limit).data
return client.batches.list(after=after, limit=limit).data


@app.command(name="list", help="See all existing batch jobs")
def watch(
limit: int = typer.Option(10, help="Total number of batch jobs to show"),
after: Optional[str] = typer.Option(
None, help="Batch job ID to start listing from"
),
poll: int = typer.Option(
10, help="Time in seconds to wait for the batch job to complete"
),
screen: bool = typer.Option(False, help="Enable or disable screen output"),
):
"""
Monitor the status of the most recent batch jobs
"""
batch_jobs = get_jobs(after, limit)
table = generate_table(batch_jobs)
with Live(
generate_table(batch_jobs), refresh_per_second=2, screen=screen
) as live_table:
while True:
batch_jobs = get_jobs(after, limit)
table = generate_table(batch_jobs)
live_table.update(table)
time.sleep(poll)


@app.command(
help="Create a batch job from a file",
)
def create_from_file(
file_path: str = typer.Option(help="File containing the batch job requests"),
):
with console.status(f"[bold green] Uploading batch job file...", spinner="dots"):
batch_input_file = client.files.create(
file=open(file_path, "rb"), purpose="batch"
)

batch_input_file_id = batch_input_file.id

with console.status(
f"[bold green] Creating batch job from ID {batch_input_file_id}", spinner="dots"
):
client.batches.create(
input_file_id=batch_input_file_id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": "testing job"},
)
watch(limit=5, poll=2, screen=False)


@app.command(help="Cancel a batch job")
def cancel(batch_id: str = typer.Option(help="Batch job ID to cancel")):
try:
client.batches.cancel(batch_id)
console.log(f"[bold red]Job {batch_id} cancelled successfully!")
except Exception as e:
console.log(f"[bold red]Error cancelling job {batch_id}: {e}")
2 changes: 2 additions & 0 deletions instructor/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import instructor.cli.files as files
import instructor.cli.usage as usage
import instructor.cli.hub as hub
import instructor.cli.batch as batch

app = typer.Typer()

app.add_typer(jobs.app, name="jobs", help="Monitor and create fine tuning jobs")
app.add_typer(files.app, name="files", help="Manage files on OpenAI's servers")
app.add_typer(usage.app, name="usage", help="Check OpenAI API usage data")
app.add_typer(hub.app, name="hub", help="Interact with the instructor hub")
app.add_typer(batch.app, name="batch", help="Manage OpenAI Batch jobs")


@app.command()
Expand Down
Loading