diff --git a/src/server.ts b/src/server.ts index 58a2956..2e4de08 100644 --- a/src/server.ts +++ b/src/server.ts @@ -49,6 +49,75 @@ ${i + 1}. [${ref.exactQuote}](${ref.url})`).join('')}`; } +// Modified streamTextWordByWord function +async function* streamTextWordByWord(text: string, streamingState: StreamingState) { + const words = text.split(/(\s+)/); + for (const word of words) { + if (streamingState.currentlyStreaming) { + const delay = Math.floor(Math.random() * 100); + await new Promise(resolve => setTimeout(resolve, delay)); + yield word; + } else { + // If streaming was interrupted, yield all remaining words at once + const remainingWords = words.slice(words.indexOf(word)).join(''); + yield remainingWords; + return; + } + } +} + +// Helper function to emit remaining content immediately +async function emitRemainingContent( + res: Response, + requestId: string, + model: string, + content: string +) { + if (!content) return; + + const chunk: ChatCompletionChunk = { + id: requestId, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: model, + system_fingerprint: 'fp_' + requestId, + choices: [{ + index: 0, + delta: {content}, + logprobs: null, + finish_reason: null + }] + }; + res.write(`data: ${JSON.stringify(chunk)}\n\n`); +} + +interface StreamingState { + currentlyStreaming: boolean; + currentGenerator: AsyncGenerator | null; + remainingContent: string; +} + +async function completeCurrentStreaming( + streamingState: StreamingState, + res: Response, + requestId: string, + model: string +) { + if (streamingState.currentlyStreaming && streamingState.remainingContent) { + // Force completion of current streaming + await emitRemainingContent( + res, + requestId, + model, + streamingState.remainingContent + ); + // Reset streaming state + streamingState.currentlyStreaming = false; + streamingState.remainingContent = ''; + streamingState.currentGenerator = null; + } +} + // OpenAI-compatible chat completions endpoint app.post('/v1/chat/completions', (async (req: Request, res: Response) => { // Check authentication only if secret is set @@ -56,7 +125,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { const authHeader = req.headers.authorization; if (!authHeader || !authHeader.startsWith('Bearer ') || authHeader.split(' ')[1] !== secret) { console.log('[chat/completions] Unauthorized request'); - res.status(401).json({ error: 'Unauthorized' }); + res.status(401).json({error: 'Unauthorized'}); return; } } @@ -72,11 +141,11 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { const body = req.body as ChatCompletionRequest; if (!body.messages?.length) { - return res.status(400).json({ error: 'Messages array is required and must not be empty' }); + return res.status(400).json({error: 'Messages array is required and must not be empty'}); } const lastMessage = body.messages[body.messages.length - 1]; if (lastMessage.role !== 'user') { - return res.status(400).json({ error: 'Last message must be from user' }); + return res.status(400).json({error: 'Last message must be from user'}); } const requestId = Date.now().toString(); @@ -90,11 +159,19 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { const messageTokens = body.messages.length; context.tokenTracker.trackUsage('agent', messageTokens, TOKEN_CATEGORIES.PROMPT); + // Add this inside the chat completions endpoint, before setting up the action listener + const streamingState: StreamingState = { + currentlyStreaming: false, + currentGenerator: null, + remainingContent: '' + }; + if (body.stream) { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); + // Send initial chunk with opening think tag const initialChunk: ChatCompletionChunk = { id: requestId, @@ -104,7 +181,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { system_fingerprint: 'fp_' + requestId, choices: [{ index: 0, - delta: { role: 'assistant', content: '' }, + delta: {role: 'assistant', content: ''}, logprobs: null, finish_reason: null }] @@ -112,39 +189,70 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { res.write(`data: ${JSON.stringify(initialChunk)}\n\n`); // Set up progress listener with cleanup - const actionListener = (action: any) => { - // Track reasoning tokens for each chunk using Vercel's convention - const chunkTokens = 1; // Default to 1 token per chunk - context.tokenTracker.trackUsage('evaluator', chunkTokens, TOKEN_CATEGORIES.REASONING); - - // Only send chunk if there's content to send + const actionListener = async (action: any) => { if (action.thisStep.think) { - const chunk: ChatCompletionChunk = { - id: requestId, - object: 'chat.completion.chunk', - created: Math.floor(Date.now() / 1000), - model: body.model, - system_fingerprint: 'fp_' + requestId, - choices: [{ - index: 0, - delta: { content: `${action.thisStep.think}\n` }, - logprobs: null, - finish_reason: null - }] - }; - const chunkStr = `data: ${JSON.stringify(chunk)}\n\n`; - console.log('[chat/completions] Sending chunk:', { - id: chunk.id, - content: chunk.choices[0].delta.content, - finish_reason: chunk.choices[0].finish_reason - }); - res.write(chunkStr); + // Complete any ongoing streaming first + await completeCurrentStreaming(streamingState, res, requestId, body.model); + + // Start new streaming session + streamingState.currentlyStreaming = true; + streamingState.remainingContent = action.thisStep.think; + + try { + for await (const word of streamTextWordByWord(action.thisStep.think, streamingState)) { + if (!streamingState.currentlyStreaming) { + break; + } + + // Update remaining content + streamingState.remainingContent = streamingState.remainingContent.slice(word.length); + + const chunk: ChatCompletionChunk = { + id: requestId, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: body.model, + system_fingerprint: 'fp_' + requestId, + choices: [{ + index: 0, + delta: {content: word}, + logprobs: null, + finish_reason: null + }] + }; + res.write(`data: ${JSON.stringify(chunk)}\n\n`); + } + + // Only add newline if this streaming completed normally + if (streamingState.currentlyStreaming) { + const newlineChunk: ChatCompletionChunk = { + id: requestId, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: body.model, + system_fingerprint: 'fp_' + requestId, + choices: [{ + index: 0, + delta: {content: '\n'}, + logprobs: null, + finish_reason: null + }] + }; + res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`); + } + } catch (error) { + console.error('Error in streaming:', error); + await completeCurrentStreaming(streamingState, res, requestId, body.model); + } } }; context.actionTracker.on('action', actionListener); - - // Clean up listener on response finish + + // Make sure to update the cleanup code res.on('finish', () => { + streamingState.currentlyStreaming = false; + streamingState.currentGenerator = null; + streamingState.remainingContent = ''; context.actionTracker.removeListener('action', actionListener); }); } @@ -156,17 +264,17 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { let result; try { - ({ result } = await getResponse(lastMessage.content, undefined, undefined, context)); + ({result} = await getResponse(lastMessage.content, undefined, undefined, context)); } catch (error: any) { // If deduplication fails, retry without it if (error?.response?.status === 402) { // If deduplication fails, retry with maxBadAttempt=3 to skip dedup - ({ result } = await getResponse(lastMessage.content, undefined, 3, context)); + ({result} = await getResponse(lastMessage.content, undefined, 3, context)); } else { throw error; } } - + // Track tokens based on action type if (result.action === 'answer') { // Track accepted prediction tokens for the final answer using Vercel's convention @@ -179,6 +287,9 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { } if (body.stream) { + // Complete any ongoing streaming before sending final answer + await completeCurrentStreaming(streamingState, res, requestId, body.model); + // Send closing think tag const closeThinkChunk: ChatCompletionChunk = { id: requestId, @@ -188,7 +299,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { system_fingerprint: 'fp_' + requestId, choices: [{ index: 0, - delta: { content: `\n\n` }, + delta: {content: `\n\n`}, logprobs: null, finish_reason: null }] @@ -204,7 +315,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { system_fingerprint: 'fp_' + requestId, choices: [{ index: 0, - delta: { content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think }, + delta: {content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think}, logprobs: null, finish_reason: 'stop' }] @@ -223,14 +334,14 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { index: 0, message: { role: 'assistant', - content: result.action === 'answer' ? buildMdFromAnswer(result): result.think + content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think }, logprobs: null, finish_reason: 'stop' }], usage }; - + // Log final response (excluding full content for brevity) console.log('[chat/completions] Response:', { id: response.id, @@ -238,7 +349,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { contentLength: response.choices[0].message.content.length, usage: response.usage }); - + res.json(response); } } catch (error: any) { @@ -273,7 +384,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { system_fingerprint: 'fp_' + requestId, choices: [{ index: 0, - delta: { content: '' }, + delta: {content: ''}, logprobs: null, finish_reason: null }] @@ -290,7 +401,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => { system_fingerprint: 'fp_' + requestId, choices: [{ index: 0, - delta: { content: errorMessage }, + delta: {content: errorMessage}, logprobs: null, finish_reason: 'stop' }]