chore: update eval

This commit is contained in:
Han Xiao 2025-02-10 13:15:37 +08:00
parent e05175b6b1
commit a1b7c853a7

View File

@ -49,6 +49,75 @@ ${i + 1}. [${ref.exactQuote}](${ref.url})`).join('')}`;
}
// Modified streamTextWordByWord function
async function* streamTextWordByWord(text: string, streamingState: StreamingState) {
const words = text.split(/(\s+)/);
for (const word of words) {
if (streamingState.currentlyStreaming) {
const delay = Math.floor(Math.random() * 100);
await new Promise(resolve => setTimeout(resolve, delay));
yield word;
} else {
// If streaming was interrupted, yield all remaining words at once
const remainingWords = words.slice(words.indexOf(word)).join('');
yield remainingWords;
return;
}
}
}
// Helper function to emit remaining content immediately
async function emitRemainingContent(
res: Response,
requestId: string,
model: string,
content: string
) {
if (!content) return;
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
interface StreamingState {
currentlyStreaming: boolean;
currentGenerator: AsyncGenerator<string> | null;
remainingContent: string;
}
async function completeCurrentStreaming(
streamingState: StreamingState,
res: Response,
requestId: string,
model: string
) {
if (streamingState.currentlyStreaming && streamingState.remainingContent) {
// Force completion of current streaming
await emitRemainingContent(
res,
requestId,
model,
streamingState.remainingContent
);
// Reset streaming state
streamingState.currentlyStreaming = false;
streamingState.remainingContent = '';
streamingState.currentGenerator = null;
}
}
// OpenAI-compatible chat completions endpoint
app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Check authentication only if secret is set
@ -56,7 +125,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ') || authHeader.split(' ')[1] !== secret) {
console.log('[chat/completions] Unauthorized request');
res.status(401).json({ error: 'Unauthorized' });
res.status(401).json({error: 'Unauthorized'});
return;
}
}
@ -72,11 +141,11 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const body = req.body as ChatCompletionRequest;
if (!body.messages?.length) {
return res.status(400).json({ error: 'Messages array is required and must not be empty' });
return res.status(400).json({error: 'Messages array is required and must not be empty'});
}
const lastMessage = body.messages[body.messages.length - 1];
if (lastMessage.role !== 'user') {
return res.status(400).json({ error: 'Last message must be from user' });
return res.status(400).json({error: 'Last message must be from user'});
}
const requestId = Date.now().toString();
@ -90,11 +159,19 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const messageTokens = body.messages.length;
context.tokenTracker.trackUsage('agent', messageTokens, TOKEN_CATEGORIES.PROMPT);
// Add this inside the chat completions endpoint, before setting up the action listener
const streamingState: StreamingState = {
currentlyStreaming: false,
currentGenerator: null,
remainingContent: ''
};
if (body.stream) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Send initial chunk with opening think tag
const initialChunk: ChatCompletionChunk = {
id: requestId,
@ -104,7 +181,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { role: 'assistant', content: '<think>' },
delta: {role: 'assistant', content: '<think>'},
logprobs: null,
finish_reason: null
}]
@ -112,39 +189,70 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
res.write(`data: ${JSON.stringify(initialChunk)}\n\n`);
// Set up progress listener with cleanup
const actionListener = (action: any) => {
// Track reasoning tokens for each chunk using Vercel's convention
const chunkTokens = 1; // Default to 1 token per chunk
context.tokenTracker.trackUsage('evaluator', chunkTokens, TOKEN_CATEGORIES.REASONING);
// Only send chunk if there's content to send
const actionListener = async (action: any) => {
if (action.thisStep.think) {
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: `${action.thisStep.think}\n` },
logprobs: null,
finish_reason: null
}]
};
const chunkStr = `data: ${JSON.stringify(chunk)}\n\n`;
console.log('[chat/completions] Sending chunk:', {
id: chunk.id,
content: chunk.choices[0].delta.content,
finish_reason: chunk.choices[0].finish_reason
});
res.write(chunkStr);
// Complete any ongoing streaming first
await completeCurrentStreaming(streamingState, res, requestId, body.model);
// Start new streaming session
streamingState.currentlyStreaming = true;
streamingState.remainingContent = action.thisStep.think;
try {
for await (const word of streamTextWordByWord(action.thisStep.think, streamingState)) {
if (!streamingState.currentlyStreaming) {
break;
}
// Update remaining content
streamingState.remainingContent = streamingState.remainingContent.slice(word.length);
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: word},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
// Only add newline if this streaming completed normally
if (streamingState.currentlyStreaming) {
const newlineChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: '\n'},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`);
}
} catch (error) {
console.error('Error in streaming:', error);
await completeCurrentStreaming(streamingState, res, requestId, body.model);
}
}
};
context.actionTracker.on('action', actionListener);
// Clean up listener on response finish
// Make sure to update the cleanup code
res.on('finish', () => {
streamingState.currentlyStreaming = false;
streamingState.currentGenerator = null;
streamingState.remainingContent = '';
context.actionTracker.removeListener('action', actionListener);
});
}
@ -156,17 +264,17 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
let result;
try {
({ result } = await getResponse(lastMessage.content, undefined, undefined, context));
({result} = await getResponse(lastMessage.content, undefined, undefined, context));
} catch (error: any) {
// If deduplication fails, retry without it
if (error?.response?.status === 402) {
// If deduplication fails, retry with maxBadAttempt=3 to skip dedup
({ result } = await getResponse(lastMessage.content, undefined, 3, context));
({result} = await getResponse(lastMessage.content, undefined, 3, context));
} else {
throw error;
}
}
// Track tokens based on action type
if (result.action === 'answer') {
// Track accepted prediction tokens for the final answer using Vercel's convention
@ -179,6 +287,9 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}
if (body.stream) {
// Complete any ongoing streaming before sending final answer
await completeCurrentStreaming(streamingState, res, requestId, body.model);
// Send closing think tag
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
@ -188,7 +299,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: `</think>\n\n` },
delta: {content: `</think>\n\n`},
logprobs: null,
finish_reason: null
}]
@ -204,7 +315,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think },
delta: {content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think},
logprobs: null,
finish_reason: 'stop'
}]
@ -223,14 +334,14 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
index: 0,
message: {
role: 'assistant',
content: result.action === 'answer' ? buildMdFromAnswer(result): result.think
content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think
},
logprobs: null,
finish_reason: 'stop'
}],
usage
};
// Log final response (excluding full content for brevity)
console.log('[chat/completions] Response:', {
id: response.id,
@ -238,7 +349,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
contentLength: response.choices[0].message.content.length,
usage: response.usage
});
res.json(response);
}
} catch (error: any) {
@ -273,7 +384,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: '</think>' },
delta: {content: '</think>'},
logprobs: null,
finish_reason: null
}]
@ -290,7 +401,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: errorMessage },
delta: {content: errorMessage},
logprobs: null,
finish_reason: 'stop'
}]