Skip to content

handle data streaming for new responses API #807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
53d2946
ensure stream is configurable in chatbot-server-public layer
ASteinheiser Jul 7, 2025
c853971
setup data streamer helper
ASteinheiser Jul 7, 2025
50e58ba
add skeleton data streaming to createResponse service
ASteinheiser Jul 8, 2025
5f094df
move StreamFunction type to chat-server, share with public
ASteinheiser Jul 8, 2025
5ee9ce3
fix test
ASteinheiser Jul 8, 2025
685f140
start streaming in createResponse
ASteinheiser Jul 8, 2025
e5c600f
add stream disconnect
ASteinheiser Jul 8, 2025
636a3db
move in progress stream message
ASteinheiser Jul 8, 2025
eb511bf
stream event from verifiedAnswer helper
ASteinheiser Jul 8, 2025
29c18cd
create helper for addMessageToConversationVerifiedAnswerStream
ASteinheiser Jul 8, 2025
0b02450
apply stream configs to chat-public
ASteinheiser Jul 8, 2025
e521aff
update test to use openAI client
ASteinheiser Jul 8, 2025
5f41b6d
update input schema for openai client call to responses
ASteinheiser Jul 8, 2025
25ea804
add test helper for making local server
ASteinheiser Jul 8, 2025
838fc86
almost finish converting createResposne tests to use openai client
ASteinheiser Jul 9, 2025
1c1305a
more tests
ASteinheiser Jul 9, 2025
f821f8f
disconnect data streamed on error
ASteinheiser Jul 9, 2025
9822ecf
update data stream logic for create response
ASteinheiser Jul 9, 2025
94fdc7b
update data streamer sendResponsesEvent write type
ASteinheiser Jul 9, 2025
9c706a2
i think this still needs to be a string
ASteinheiser Jul 9, 2025
503b2df
mapper for streamError
ASteinheiser Jul 9, 2025
5a879f8
export openai shim types
ASteinheiser Jul 9, 2025
c4d1cf6
mostly working tests with reading the response stream from openai client
ASteinheiser Jul 10, 2025
dd736b0
create test helper
ASteinheiser Jul 10, 2025
1c34c76
fix test helper for reading entire stream
ASteinheiser Jul 10, 2025
84158a5
dont send normal http message at end (maybe need this when we support…
ASteinheiser Jul 10, 2025
792c919
improved tests
ASteinheiser Jul 10, 2025
9ece8ab
more test improvement -- proper use of conversation service, addition…
ASteinheiser Jul 10, 2025
b3188cc
fix test for too many messages
ASteinheiser Jul 10, 2025
cb8c0ef
remove skip tests
ASteinheiser Jul 10, 2025
314c576
mostly working responses tests
ASteinheiser Jul 10, 2025
4bfee69
abstract helpers for openai client requests
ASteinheiser Jul 10, 2025
be6183b
use helpers in create response tests
ASteinheiser Jul 10, 2025
c85d341
fix tests by passing responseId
ASteinheiser Jul 11, 2025
4fcd407
skip problematic test
ASteinheiser Jul 11, 2025
367b316
skip problematic test
ASteinheiser Jul 11, 2025
d03cac6
create baseResponseData helper
ASteinheiser Jul 11, 2025
044a7a0
pass zod validated req body
ASteinheiser Jul 11, 2025
f38631e
add tests for all responses fields
ASteinheiser Jul 11, 2025
ce3de9b
remove log
ASteinheiser Jul 11, 2025
c1841c1
abstract helper for formatOpenaiError
ASteinheiser Jul 11, 2025
823cd47
replace helper
ASteinheiser Jul 11, 2025
64d3a14
await server closing properly
ASteinheiser Jul 11, 2025
597745a
basic working responses tests with openai client
ASteinheiser Jul 11, 2025
f8d0120
update rate limit test
ASteinheiser Jul 11, 2025
990f352
fix testing port
ASteinheiser Jul 11, 2025
741f9fa
update test type related to responses streaming
ASteinheiser Jul 11, 2025
fb86f41
apply type to data streamer
ASteinheiser Jul 11, 2025
ffc69c4
cleanup shared type
ASteinheiser Jul 11, 2025
6f67454
fix router tests
ASteinheiser Jul 11, 2025
24c4cf0
fix router tests
ASteinheiser Jul 11, 2025
2a97b40
update errors to be proper openai stream errors
ASteinheiser Jul 11, 2025
8e23ed3
ensure format message cleans customData as well
ASteinheiser Jul 11, 2025
e05239e
add comment
ASteinheiser Jul 11, 2025
b7a1424
update tests per review
ASteinheiser Jul 15, 2025
e976991
update test utils
ASteinheiser Jul 15, 2025
1ccd448
fix test type
ASteinheiser Jul 15, 2025
7c6c712
update openai rag-core to 5.9
ASteinheiser Jul 15, 2025
bf0571a
fix data streamer for responses events to be SSE compliant
ASteinheiser Jul 15, 2025
87d7b7e
cleanup responses tests
ASteinheiser Jul 16, 2025
e0dc84d
cleanup createResponse tests
ASteinheiser Jul 16, 2025
c1629fe
cleanup error handling to match openai spec
ASteinheiser Jul 16, 2025
e1f3cad
fix tests for standard openai exceptions
ASteinheiser Jul 16, 2025
9b9b8b2
cleanup
ASteinheiser Jul 16, 2025
d1319c4
add "required" as an option for tool_choice
ASteinheiser Jul 16, 2025
171dee7
cleanup datastreamer test globals
ASteinheiser Jul 16, 2025
2282d0d
add test to dataStreamer for streamResponses
ASteinheiser Jul 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/chatbot-server-mongodb-public/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
defaultCreateConversationCustomData,
defaultAddMessageToConversationCustomData,
makeVerifiedAnswerGenerateResponse,
addMessageToConversationVerifiedAnswerStream,
} from "mongodb-chatbot-server";
import cookieParser from "cookie-parser";
import { blockGetRequests } from "./middleware/blockGetRequests";
Expand All @@ -40,7 +41,6 @@ import {
import { AzureOpenAI } from "mongodb-rag-core/openai";
import { MongoClient } from "mongodb-rag-core/mongodb";
import {
ANALYZER_ENV_VARS,
AZURE_OPENAI_ENV_VARS,
PREPROCESSOR_ENV_VARS,
TRACING_ENV_VARS,
Expand All @@ -53,7 +53,10 @@ import {
import { useSegmentIds } from "./middleware/useSegmentIds";
import { makeSearchTool } from "./tools/search";
import { makeMongoDbInputGuardrail } from "./processors/mongoDbInputGuardrail";
import { makeGenerateResponseWithSearchTool } from "./processors/generateResponseWithSearchTool";
import {
addMessageToConversationStream,
makeGenerateResponseWithSearchTool,
} from "./processors/generateResponseWithSearchTool";
import { makeBraintrustLogger } from "mongodb-rag-core/braintrust";
import { makeMongoDbScrubbedMessageStore } from "./tracing/scrubbedMessages/MongoDbScrubbedMessageStore";
import { MessageAnalysis } from "./tracing/scrubbedMessages/analyzeMessage";
Expand Down Expand Up @@ -231,6 +234,7 @@ export const generateResponse = wrapTraced(
references: verifiedAnswer.references.map(addReferenceSourceType),
};
},
stream: addMessageToConversationVerifiedAnswerStream,
onNoVerifiedAnswerFound: wrapTraced(
makeGenerateResponseWithSearchTool({
languageModel,
Expand All @@ -253,6 +257,7 @@ export const generateResponse = wrapTraced(
searchTool: makeSearchTool(findContent),
toolChoice: "auto",
maxSteps: 5,
stream: addMessageToConversationStream,
}),
{ name: "generateResponseWithSearchTool" }
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,21 @@ describe("generateResponseWithSearchTool", () => {
describe("streaming mode", () => {
// Create a mock DataStreamer implementation
const makeMockDataStreamer = () => {
const mockStreamData = jest.fn();
const mockConnect = jest.fn();
const mockDisconnect = jest.fn();
const mockStreamData = jest.fn();
const mockStreamResponses = jest.fn();
const mockStream = jest.fn().mockImplementation(async () => {
// Process the stream and return a string result
return "Hello";
});

const dataStreamer = {
connected: false,
connect: mockConnect,
disconnect: mockDisconnect,
streamData: mockStreamData,
streamResponses: mockStreamResponses,
stream: mockStream,
} as DataStreamer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
AssistantMessage,
ToolMessage,
} from "mongodb-rag-core";

import {
CoreAssistantMessage,
CoreMessage,
Expand All @@ -28,6 +27,7 @@ import {
GenerateResponse,
GenerateResponseReturnValue,
InputGuardrailResult,
type StreamFunction,
} from "mongodb-chatbot-server";
import {
MongoDbSearchToolArgs,
Expand All @@ -52,8 +52,59 @@ export interface GenerateResponseWithSearchToolParams {
search_content: SearchTool;
}>;
searchTool: SearchTool;
stream?: {
onLlmNotWorking: StreamFunction<{ notWorkingMessage: string }>;
onLlmRefusal: StreamFunction<{ refusalMessage: string }>;
onReferenceLinks: StreamFunction<{ references: References }>;
onTextDelta: StreamFunction<{ delta: string }>;
};
}

export const addMessageToConversationStream: GenerateResponseWithSearchToolParams["stream"] =
{
onLlmNotWorking({ dataStreamer, notWorkingMessage }) {
dataStreamer?.streamData({
type: "delta",
data: notWorkingMessage,
});
},
onLlmRefusal({ dataStreamer, refusalMessage }) {
dataStreamer?.streamData({
type: "delta",
data: refusalMessage,
});
},
onReferenceLinks({ dataStreamer, references }) {
dataStreamer?.streamData({
type: "references",
data: references,
});
},
onTextDelta({ dataStreamer, delta }) {
dataStreamer?.streamData({
type: "delta",
data: delta,
});
},
};

// TODO: implement this
export const responsesApiStream: GenerateResponseWithSearchToolParams["stream"] =
{
onLlmNotWorking() {
throw new Error("not yet implemented");
},
onLlmRefusal() {
throw new Error("not yet implemented");
},
onReferenceLinks() {
throw new Error("not yet implemented");
},
onTextDelta() {
throw new Error("not yet implemented");
},
};

/**
Generate chatbot response using RAG and a search tool named {@link SEARCH_TOOL_NAME}.
*/
Expand All @@ -69,6 +120,7 @@ export function makeGenerateResponseWithSearchTool({
maxSteps = 2,
searchTool,
toolChoice,
stream,
}: GenerateResponseWithSearchToolParams): GenerateResponse {
return async function generateResponseWithSearchTool({
conversation,
Expand All @@ -80,9 +132,11 @@ export function makeGenerateResponseWithSearchTool({
dataStreamer,
request,
}) {
if (shouldStream) {
assert(dataStreamer, "dataStreamer is required for streaming");
}
const streamingModeActive =
shouldStream === true &&
dataStreamer !== undefined &&
stream !== undefined;

const userMessage: UserMessage = {
role: "user",
content: latestMessageText,
Expand Down Expand Up @@ -179,10 +233,10 @@ export function makeGenerateResponseWithSearchTool({

switch (chunk.type) {
case "text-delta":
if (shouldStream) {
dataStreamer?.streamData({
data: chunk.textDelta,
type: "delta",
if (streamingModeActive) {
stream.onTextDelta({
dataStreamer,
delta: chunk.textDelta,
});
}
break;
Expand All @@ -202,10 +256,10 @@ export function makeGenerateResponseWithSearchTool({

// Stream references if we have any and weren't aborted
if (references.length > 0 && !generationController.signal.aborted) {
if (shouldStream) {
dataStreamer?.streamData({
data: references,
type: "references",
if (streamingModeActive) {
stream.onReferenceLinks({
dataStreamer,
references,
});
}
}
Expand Down Expand Up @@ -238,10 +292,10 @@ export function makeGenerateResponseWithSearchTool({
...userMessageCustomData,
...guardrailResult,
};
if (shouldStream) {
dataStreamer?.streamData({
type: "delta",
data: llmRefusalMessage,
if (streamingModeActive) {
stream.onLlmRefusal({
dataStreamer,
refusalMessage: llmRefusalMessage,
});
}
return handleReturnGeneration({
Expand Down Expand Up @@ -293,10 +347,10 @@ export function makeGenerateResponseWithSearchTool({
});
}
} catch (error: unknown) {
if (shouldStream) {
dataStreamer?.streamData({
type: "delta",
data: llmNotWorkingMessage,
if (streamingModeActive) {
stream.onLlmNotWorking({
dataStreamer,
notWorkingMessage: llmNotWorkingMessage,
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { ObjectId } from "mongodb-rag-core/mongodb";
import { makeVerifiedAnswerGenerateResponse } from "./makeVerifiedAnswerGenerateResponse";
import {
makeVerifiedAnswerGenerateResponse,
type StreamFunction,
} from "./makeVerifiedAnswerGenerateResponse";
import { VerifiedAnswer, WithScore, DataStreamer } from "mongodb-rag-core";
import { GenerateResponseReturnValue } from "./GenerateResponse";

Expand All @@ -24,6 +27,29 @@ describe("makeVerifiedAnswerGenerateResponse", () => {
},
] satisfies GenerateResponseReturnValue["messages"];

const streamVerifiedAnswer: StreamFunction<{
verifiedAnswer: VerifiedAnswer;
}> = async ({ dataStreamer, verifiedAnswer }) => {
dataStreamer.streamData({
type: "metadata",
data: {
verifiedAnswer: {
_id: verifiedAnswer._id,
created: verifiedAnswer.created,
updated: verifiedAnswer.updated,
},
},
});
dataStreamer.streamData({
type: "delta",
data: verifiedAnswer.answer,
});
dataStreamer.streamData({
type: "references",
data: verifiedAnswer.references,
});
};

// Create a mock verified answer
const createMockVerifiedAnswer = (): WithScore<VerifiedAnswer> => ({
answer: verifiedAnswerContent,
Expand Down Expand Up @@ -55,6 +81,7 @@ describe("makeVerifiedAnswerGenerateResponse", () => {
connect: jest.fn(),
disconnect: jest.fn(),
stream: jest.fn(),
streamResponses: jest.fn(),
});

// Create base request parameters
Expand All @@ -79,6 +106,9 @@ describe("makeVerifiedAnswerGenerateResponse", () => {
onNoVerifiedAnswerFound: async () => ({
messages: noVerifiedAnswerFoundMessages,
}),
stream: {
onVerifiedAnswerFound: streamVerifiedAnswer,
},
});

it("uses onNoVerifiedAnswerFound if no verified answer is found", async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { VerifiedAnswer, FindVerifiedAnswerFunc } from "mongodb-rag-core";
import {
VerifiedAnswer,
FindVerifiedAnswerFunc,
DataStreamer,
} from "mongodb-rag-core";
import { strict as assert } from "assert";
import {
GenerateResponse,
Expand All @@ -17,8 +21,40 @@ export interface MakeVerifiedAnswerGenerateResponseParams {
onVerifiedAnswerFound?: (verifiedAnswer: VerifiedAnswer) => VerifiedAnswer;

onNoVerifiedAnswerFound: GenerateResponse;

stream?: {
onVerifiedAnswerFound: StreamFunction<{ verifiedAnswer: VerifiedAnswer }>;
};
}

export type StreamFunction<Params> = (
params: { dataStreamer: DataStreamer } & Params
) => void;

export const addMessageToConversationVerifiedAnswerStream: MakeVerifiedAnswerGenerateResponseParams["stream"] =
{
onVerifiedAnswerFound: ({ verifiedAnswer, dataStreamer }) => {
dataStreamer.streamData({
type: "metadata",
data: {
verifiedAnswer: {
_id: verifiedAnswer._id,
created: verifiedAnswer.created,
updated: verifiedAnswer.updated,
},
},
});
dataStreamer.streamData({
type: "delta",
data: verifiedAnswer.answer,
});
dataStreamer.streamData({
type: "references",
data: verifiedAnswer.references,
});
},
};

/**
Searches for verified answers for the user query.
If no verified answer can be found for the given query, the
Expand All @@ -28,6 +64,7 @@ export const makeVerifiedAnswerGenerateResponse = ({
findVerifiedAnswer,
onVerifiedAnswerFound,
onNoVerifiedAnswerFound,
stream,
}: MakeVerifiedAnswerGenerateResponseParams): GenerateResponse => {
return async (args) => {
const { latestMessageText, shouldStream, dataStreamer } = args;
Expand All @@ -54,17 +91,11 @@ export const makeVerifiedAnswerGenerateResponse = ({

if (shouldStream) {
assert(dataStreamer, "Must have dataStreamer if shouldStream=true");
dataStreamer.streamData({
type: "metadata",
data: metadata,
});
dataStreamer.streamData({
type: "delta",
data: answer,
});
dataStreamer.streamData({
type: "references",
data: references,
assert(stream, "Must have stream if shouldStream=true");

stream.onVerifiedAnswerFound({
dataStreamer,
verifiedAnswer,
});
}

Expand Down
Loading