fix: streaming

This commit is contained in:
Han Xiao
2025-02-22 12:37:11 +08:00
parent 86c9ccb4e7
commit a85eb82936

View File

@@ -408,12 +408,20 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
while (streamingState.queue.length > 0) { while (streamingState.queue.length > 0) {
const current = streamingState.queue[0]; const current = streamingState.queue[0];
// Clear any previous state
streamingState.remainingContent = ''; // Add this line
// Reset streaming state for new content // Reset streaming state for new content
streamingState.currentlyStreaming = true; streamingState.currentlyStreaming = true;
streamingState.remainingContent = current.content; streamingState.remainingContent = current.content;
streamingState.isEmitting = true; streamingState.isEmitting = true;
try { try {
// Add a check to prevent duplicate streaming
if (streamingState.currentGenerator) {
streamingState.currentGenerator = null; // Add this line
}
for await (const word of streamTextNaturally(current.content, streamingState)) { for await (const word of streamTextNaturally(current.content, streamingState)) {
const chunk: ChatCompletionChunk = { const chunk: ChatCompletionChunk = {
id: requestId, id: requestId,
@@ -429,27 +437,7 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
}] }]
}; };
res.write(`data: ${JSON.stringify(chunk)}\n\n`); res.write(`data: ${JSON.stringify(chunk)}\n\n`);
// Small delay between words
await new Promise(resolve => setTimeout(resolve, 30));
} }
// Add newline after content
const newlineChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created,
model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: '\n'},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`);
} catch (error) { } catch (error) {
console.error('Error in streaming:', error); console.error('Error in streaming:', error);
} finally { } finally {
@@ -459,15 +447,11 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
streamingState.remainingContent = ''; streamingState.remainingContent = '';
streamingState.queue.shift(); streamingState.queue.shift();
current.resolve(); current.resolve();
// Small delay between queue items
await new Promise(resolve => setTimeout(resolve, 50));
} }
} }
streamingState.processingQueue = false; streamingState.processingQueue = false;
} }
app.post('/v1/chat/completions', (async (req: Request, res: Response) => { app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Check authentication only if secret is set // Check authentication only if secret is set
if (secret) { if (secret) {
@@ -551,10 +535,9 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const actionListener = async (step: StepAction) => { const actionListener = async (step: StepAction) => {
// Add content to queue for both thinking steps and final answer // Add content to queue for both thinking steps and final answer
if (step.think) { if (step.think) {
const content = step.think; // if not ends with a space, add one
const content = step.think + ' ';
await new Promise<void>(resolve => { await new Promise<void>(resolve => {
streamingState.currentlyStreaming = false;
streamingState.queue.push({ streamingState.queue.push({
content, content,
resolve resolve