fix: streaming msg

This commit is contained in:
Han Xiao 2025-02-12 09:55:17 +08:00
parent bdf03ef978
commit 45bb105ad5
3 changed files with 25 additions and 67 deletions

View File

@ -128,7 +128,7 @@ The server will start on http://localhost:3000 with the following endpoint:
curl http://localhost:3000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"messages": [
{
"role": "user",
@ -142,7 +142,7 @@ curl http://localhost:3000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your_secret_token" \
-d '{
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"messages": [
{
"role": "user",
@ -159,7 +159,7 @@ Response format:
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
@ -189,7 +189,7 @@ For streaming responses (stream: true), the server sends chunks in this format:
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1694268190,
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,

View File

@ -10,7 +10,6 @@ import {
ChatCompletionResponse,
ChatCompletionChunk,
AnswerAction,
TOKEN_CATEGORIES,
Model
} from './types';
import fs from 'fs/promises';
@ -76,6 +75,7 @@ async function* streamTextWordByWord(text: string, streamingState: StreamingStat
async function emitRemainingContent(
res: Response,
requestId: string,
created: number,
model: string,
content: string
) {
@ -84,7 +84,7 @@ async function emitRemainingContent(
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -107,33 +107,11 @@ interface StreamingState {
}
async function emitContentImmediately(
res: Response,
requestId: string,
model: string,
content: string
) {
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
async function completeCurrentStreaming(
streamingState: StreamingState,
res: Response,
requestId: string,
created: number,
model: string
) {
if (streamingState.currentlyStreaming && streamingState.remainingContent) {
@ -141,6 +119,7 @@ async function completeCurrentStreaming(
await emitRemainingContent(
res,
requestId,
created,
model,
streamingState.remainingContent
);
@ -203,7 +182,7 @@ if (secret) {
});
}
async function processQueue(streamingState: StreamingState, res: Response, requestId: string, model: string) {
async function processQueue(streamingState: StreamingState, res: Response, requestId: string, created: number, model: string) {
if (streamingState.processingQueue) return;
streamingState.processingQueue = true;
@ -221,8 +200,8 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
created,
model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
@ -301,16 +280,12 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}
const requestId = Date.now().toString();
const created = Math.floor(Date.now() / 1000);
const context: TrackerContext = {
tokenTracker: new TokenTracker(),
actionTracker: new ActionTracker()
};
// Track prompt tokens for the initial message
// Use Vercel's token counting convention - 1 token per message
const messageTokens = body.messages.length;
context.tokenTracker.trackUsage('agent', messageTokens, TOKEN_CATEGORIES.PROMPT);
// Add this inside the chat completions endpoint, before setting up the action listener
const streamingState: StreamingState = {
currentlyStreaming: false,
@ -331,7 +306,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const initialChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -354,7 +329,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
});
// Start processing queue if not already processing
processQueue(streamingState, res, requestId, body.model);
processQueue(streamingState, res, requestId, created, body.model);
});
}
};
@ -370,10 +345,6 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}
try {
// Track initial query tokens - already tracked above
// const queryTokens = Buffer.byteLength(lastMessage.content, 'utf-8');
// context.tokenTracker.trackUsage('agent', queryTokens, 'prompt');
let result;
try {
({result} = await getResponse(lastMessage.content, undefined, undefined, context));
@ -387,26 +358,15 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}
}
// Track tokens based on action type
if (result.action === 'answer') {
// Track accepted prediction tokens for the final answer using Vercel's convention
const answerTokens = 1; // Default to 1 token per answer
context.tokenTracker.trackUsage('evaluator', answerTokens, TOKEN_CATEGORIES.ACCEPTED);
} else {
// Track rejected prediction tokens for non-answer responses
const rejectedTokens = 1; // Default to 1 token per rejected response
context.tokenTracker.trackUsage('evaluator', rejectedTokens, TOKEN_CATEGORIES.REJECTED);
}
if (body.stream) {
// Complete any ongoing streaming before sending final answer
await completeCurrentStreaming(streamingState, res, requestId, body.model);
await completeCurrentStreaming(streamingState, res, requestId, created, body.model);
// Send closing think tag
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -422,7 +382,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const answerChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -439,7 +399,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const response: ChatCompletionResponse = {
id: requestId,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -475,9 +435,6 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Track error as rejected tokens with Vercel token counting
const errorMessage = error?.message || 'An error occurred';
// Default to 1 token for errors as per Vercel AI SDK convention
const errorTokens = 1;
context.tokenTracker.trackUsage('evaluator', errorTokens, TOKEN_CATEGORIES.REJECTED);
// Clean up event listeners
context.actionTracker.removeAllListeners('action');
@ -491,7 +448,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -499,16 +456,16 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
delta: {content: '</think>'},
logprobs: null,
finish_reason: null
}]
}],
usage
};
res.write(`data: ${JSON.stringify(closeThinkChunk)}\n\n`);
// Track error token and send error message
context.tokenTracker.trackUsage('evaluator', 1, TOKEN_CATEGORIES.REJECTED);
const errorChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
@ -525,7 +482,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const response: ChatCompletionResponse = {
id: requestId,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{

View File

@ -217,6 +217,7 @@ export interface ChatCompletionChunk {
logprobs: null;
finish_reason: null | 'stop';
}>;
usage?: any;
}
// Tracker Types