fix: streaming msg

This commit is contained in:
Han Xiao 2025-02-12 09:30:31 +08:00
parent b352b239a7
commit c8a68a043f
2 changed files with 119 additions and 65 deletions

View File

@ -524,8 +524,8 @@ But then you realized you have asked them before. You decided to to think out of
if (keywordsQueries.length > 0) {
// let googleGrounded = '';
const searchResults = [];
context.actionTracker.trackThink(`Let me search for "${keywordsQueries.join(', ')}" to gather more information.`)
for (const query of keywordsQueries) {
context.actionTracker.trackThink(`Let me search for "${query}" to gather more information.`)
console.log(`Search query: ${query}`);
let results;
@ -618,11 +618,10 @@ You decided to think out of the box or cut from a completely different angle.
}
if (uniqueURLs.length > 0) {
context.actionTracker.trackThink(`Let me read ${uniqueURLs.join(', ')} to gather more information.`);
const urlResults = await Promise.all(
uniqueURLs.map(async (url: string) => {
try {
context.actionTracker.trackThink(`Let me visit ${url} to gather more information.`)
const {response, tokens} = await readUrl(url, context.tokenTracker);
allKnowledge.push({
question: `What is in ${response.data?.url || 'the URL'}?`,

View File

@ -28,7 +28,7 @@ app.use(express.json());
// Add health check endpoint for Docker container verification
app.get('/health', (req, res) => {
res.json({ status: 'ok' });
res.json({status: 'ok'});
});
const eventEmitter = new EventEmitter();
@ -101,8 +101,35 @@ interface StreamingState {
currentlyStreaming: boolean;
currentGenerator: AsyncGenerator<string> | null;
remainingContent: string;
isEmitting: boolean;
queue: { content: string; resolve: () => void }[];
processingQueue: boolean;
}
async function emitContentImmediately(
res: Response,
requestId: string,
model: string,
content: string
) {
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
async function completeCurrentStreaming(
streamingState: StreamingState,
res: Response,
@ -142,7 +169,7 @@ app.get('/v1/models', (async (_req: Request, res: Response) => {
app.get('/v1/models/:model', (async (req: Request, res: Response) => {
const modelId = req.params.model;
if (modelId === 'jina-deepsearch-v1') {
res.json({
id: 'jina-deepsearch-v1',
@ -168,7 +195,7 @@ if (secret) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ') || authHeader.split(' ')[1] !== secret) {
console.log('[chat/completions] Unauthorized request');
res.status(401).json({ error: 'Unauthorized' });
res.status(401).json({error: 'Unauthorized'});
return;
}
@ -176,6 +203,74 @@ if (secret) {
});
}
async function processQueue(streamingState: StreamingState, res: Response, requestId: string, model: string) {
if (streamingState.processingQueue) return;
streamingState.processingQueue = true;
while (streamingState.queue.length > 0) {
const current = streamingState.queue[0];
// Reset streaming state for new content
streamingState.currentlyStreaming = true;
streamingState.remainingContent = current.content;
streamingState.isEmitting = true;
try {
for await (const word of streamTextWordByWord(current.content, streamingState)) {
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: word },
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
// Small delay between words
await new Promise(resolve => setTimeout(resolve, 30));
}
// Add newline after content
const newlineChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: { content: '\n' },
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`);
} catch (error) {
console.error('Error in streaming:', error);
} finally {
// Reset state and remove from queue
streamingState.isEmitting = false;
streamingState.currentlyStreaming = false;
streamingState.remainingContent = '';
streamingState.queue.shift();
current.resolve();
// Small delay between queue items
await new Promise(resolve => setTimeout(resolve, 50));
}
}
streamingState.processingQueue = false;
}
app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Check authentication only if secret is set
if (secret) {
@ -218,10 +313,13 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Add this inside the chat completions endpoint, before setting up the action listener
const streamingState: StreamingState = {
currentlyStreaming: false,
currentGenerator: null,
remainingContent: ''
};
currentlyStreaming: false,
currentGenerator: null,
remainingContent: '',
isEmitting: false,
queue: [],
processingQueue: false
};
if (body.stream) {
res.setHeader('Content-Type', 'text/event-stream');
@ -247,62 +345,19 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Set up progress listener with cleanup
const actionListener = async (action: any) => {
if (action.thisStep.think) {
// Complete any ongoing streaming first
await completeCurrentStreaming(streamingState, res, requestId, body.model);
if (action.thisStep.think) {
// Create a promise that resolves when this content is done streaming
await new Promise<void>(resolve => {
streamingState.queue.push({
content: action.thisStep.think,
resolve
});
// Start new streaming session
streamingState.currentlyStreaming = true;
streamingState.remainingContent = action.thisStep.think;
try {
for await (const word of streamTextWordByWord(action.thisStep.think, streamingState)) {
if (!streamingState.currentlyStreaming) {
break;
}
// Update remaining content
streamingState.remainingContent = streamingState.remainingContent.slice(word.length);
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: word},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
// Only add newline if this streaming completed normally
if (streamingState.currentlyStreaming) {
const newlineChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: '\n'},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`);
}
} catch (error) {
console.error('Error in streaming:', error);
await completeCurrentStreaming(streamingState, res, requestId, body.model);
}
}
};
// Start processing queue if not already processing
processQueue(streamingState, res, requestId, body.model);
});
}
};
context.actionTracker.on('action', actionListener);
// Make sure to update the cleanup code