Skip to content

initial work on python new conversation API functionality for 1.16 #806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ coverage.xml
# Editor config
.idea
.vscode
.cursor

# Translations
*.mo
Expand Down
177 changes: 176 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable, AsyncIterator
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand Down Expand Up @@ -82,6 +82,8 @@
BindingResponse,
ConversationResponse,
ConversationResult,
ConversationStreamResponse,
ConversationUsage,
DaprResponse,
GetSecretResponse,
GetBulkSecretResponse,
Expand Down Expand Up @@ -1771,6 +1773,179 @@ async def converse_alpha1(
except grpc.aio.AioRpcError as err:
raise DaprGrpcError(err) from err

async def converse_stream_alpha1(
self,
name: str,
inputs: List[ConversationInput],
*,
context_id: Optional[str] = None,
parameters: Optional[Dict[str, GrpcAny]] = None,
metadata: Optional[Dict[str, str]] = None,
scrub_pii: Optional[bool] = None,
temperature: Optional[float] = None,
) -> AsyncIterator[ConversationStreamResponse]:
"""Invoke an LLM using the streaming conversation API (Alpha).

Args:
name: Name of the LLM component to invoke
inputs: List of conversation inputs
context_id: Optional ID for continuing an existing chat
parameters: Optional custom parameters for the request
metadata: Optional metadata for the component
scrub_pii: Optional flag to scrub PII from inputs and outputs
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability

Yields:
ConversationStreamResponse containing conversation result chunks

Raises:
DaprGrpcError: If the Dapr runtime returns an error
"""
from dapr.clients.grpc._response import ConversationStreamResponse

inputs_pb = [
api_v1.ConversationInput(content=inp.content, role=inp.role, scrubPII=inp.scrub_pii)
for inp in inputs
]

request = api_v1.ConversationRequest(
name=name,
inputs=inputs_pb,
contextID=context_id,
parameters=parameters or {},
metadata=metadata or {},
scrubPII=scrub_pii,
temperature=temperature,
)

try:
response_stream = self._stub.ConverseStreamAlpha1(request)

async for response in response_stream:
context_id = None
result = None
usage = None

# Handle chunk response
if response.HasField('chunk'):
result = ConversationResult(
result=response.chunk.content,
parameters={}
)

# Handle completion response
elif response.HasField('complete'):
context_id = response.complete.contextID

# Extract usage information if available
if response.complete.HasField('usage'):
usage = ConversationUsage(
prompt_tokens=response.complete.usage.prompt_tokens,
completion_tokens=response.complete.usage.completion_tokens,
total_tokens=response.complete.usage.total_tokens
)

yield ConversationStreamResponse(
context_id=context_id,
result=result,
usage=usage
)
except grpc.aio.AioRpcError as err:
raise DaprGrpcError(err) from err

async def converse_stream_json(
self,
name: str,
inputs: List[ConversationInput],
*,
context_id: Optional[str] = None,
parameters: Optional[Dict[str, GrpcAny]] = None,
metadata: Optional[Dict[str, str]] = None,
scrub_pii: Optional[bool] = None,
temperature: Optional[float] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""Invoke an LLM using the streaming conversation API with JSON response format (Alpha).

This method provides a JSON-formatted streaming interface that's compatible with
common LLM response formats, making it easier to integrate with existing tools
and frameworks that expect JSON responses.

Args:
name: Name of the LLM component to invoke
inputs: List of conversation inputs
context_id: Optional ID for continuing an existing chat
parameters: Optional custom parameters for the request
metadata: Optional metadata for the component
scrub_pii: Optional flag to scrub PII from inputs and outputs
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability

Yields:
Dict[str, Any]: JSON-formatted conversation response chunks with structure:
{
"choices": [
{
"delta": {
"content": "chunk content",
"role": "assistant"
},
"index": 0,
"finish_reason": None
}
],
"context_id": "optional context ID",
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}

Raises:
DaprGrpcError: If the Dapr runtime returns an error
"""
async for chunk in self.converse_stream_alpha1(
name=name,
inputs=inputs,
context_id=context_id,
parameters=parameters,
metadata=metadata,
scrub_pii=scrub_pii,
temperature=temperature,
):
# Transform the chunk to JSON format compatible with common LLM APIs
chunk_dict = {
'choices': [],
'context_id': None,
'usage': None,
}

# Handle streaming result chunks
if chunk.result and chunk.result.result:
chunk_dict['choices'] = [
{
'delta': {
'content': chunk.result.result,
'role': 'assistant'
},
'index': 0,
'finish_reason': None
}
]

# Handle context ID
if chunk.context_id:
chunk_dict['context_id'] = chunk.context_id

# Handle usage information (typically in the final chunk)
if chunk.usage:
chunk_dict['usage'] = {
'prompt_tokens': chunk.usage.prompt_tokens,
'completion_tokens': chunk.usage.completion_tokens,
'total_tokens': chunk.usage.total_tokens,
}

yield chunk_dict

async def wait(self, timeout_s: float):
"""Waits for sidecar to be available within the timeout.

Expand Down
19 changes: 19 additions & 0 deletions dapr/clients/grpc/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,3 +1087,22 @@ class ConversationResponse:

context_id: Optional[str]
outputs: List[ConversationResult]
usage: Optional[ConversationUsage] = None


@dataclass
class ConversationUsage:
"""Token usage statistics from conversation API."""

prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0


@dataclass
class ConversationStreamResponse:
"""Single response chunk from the streaming conversation API."""

context_id: Optional[str]
result: Optional[ConversationResult] = None
usage: Optional[ConversationUsage] = None
Loading
Loading