fix: emit stream (#61)

This commit is contained in:
Han Xiao
2025-02-13 12:33:37 +08:00
committed by GitHub
parent fa4dccc94d
commit 086bde514e
4 changed files with 47 additions and 47 deletions

View File

@@ -324,8 +324,7 @@ export async function getResponse(question: string,
let allowRead = true; let allowRead = true;
let allowReflect = true; let allowReflect = true;
let prompt = ''; let prompt = '';
let thisStep: StepAction = {action: 'answer', answer: '', references: [], think: ''}; let thisStep: StepAction = {action: 'answer', answer: '', references: [], think: '', isFinal: false};
let isAnswered = false;
const allURLs: Record<string, string> = {}; const allURLs: Record<string, string> = {};
const visitedURLs: string[] = []; const visitedURLs: string[] = [];
@@ -388,7 +387,7 @@ export async function getResponse(question: string,
if (thisStep.action === 'answer') { if (thisStep.action === 'answer') {
if (step === 1) { if (step === 1) {
// LLM is so confident and answer immediately, skip all evaluations // LLM is so confident and answer immediately, skip all evaluations
isAnswered = true; thisStep.isFinal = true;
break break
} }
@@ -417,11 +416,11 @@ ${evaluation.think}
Your journey ends here. You have successfully answered the original question. Congratulations! 🎉 Your journey ends here. You have successfully answered the original question. Congratulations! 🎉
`); `);
isAnswered = true; thisStep.isFinal = true;
break break
} else { } else {
if (badAttempts >= maxBadAttempts) { if (badAttempts >= maxBadAttempts) {
isAnswered = false; thisStep.isFinal = false;
break break
} else { } else {
diaryContext.push(` diaryContext.push(`
@@ -676,9 +675,7 @@ You decided to think out of the box or cut from a completely different angle.`);
} }
await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep); await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep);
if (isAnswered) { if (!(thisStep as AnswerAction).isFinal) {
return {result: thisStep, context};
} else {
console.log('Enter Beast mode!!!') console.log('Enter Beast mode!!!')
// any answer is better than no answer, humanity last resort // any answer is better than no answer, humanity last resort
step++; step++;
@@ -705,14 +702,15 @@ You decided to think out of the box or cut from a completely different angle.`);
schema, schema,
prompt, prompt,
}); });
thisStep = result.object as AnswerAction;
await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep); (thisStep as AnswerAction).isFinal = true;
thisStep = result.object as StepAction;
context.actionTracker.trackAction({totalStep, thisStep, gaps, badAttempts}); context.actionTracker.trackAction({totalStep, thisStep, gaps, badAttempts});
console.log(thisStep)
return {result: thisStep, context};
} }
console.log(thisStep)
await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep);
return {result: thisStep, context};
} }
async function storeContext(prompt: string, schema: any, memory: any[][], step: number) { async function storeContext(prompt: string, schema: any, memory: any[][], step: number) {

View File

@@ -7,7 +7,7 @@ import {
ChatCompletionResponse, ChatCompletionResponse,
ChatCompletionChunk, ChatCompletionChunk,
AnswerAction, AnswerAction,
Model Model, StepAction
} from './types'; } from './types';
import {TokenTracker} from "./utils/token-tracker"; import {TokenTracker} from "./utils/token-tracker";
import {ActionTracker} from "./utils/action-tracker"; import {ActionTracker} from "./utils/action-tracker";
@@ -26,25 +26,26 @@ app.get('/health', (req, res) => {
}); });
function buildMdFromAnswer(answer: AnswerAction) { function buildMdFromAnswer(answer: AnswerAction) {
let refStr = ''; if (!answer.references?.length || !answer.references.some(ref => ref.url.startsWith('http'))) {
if (answer.references?.length > 0) { return answer.answer;
refStr = ` }
const references = answer.references.map((ref, i) => {
const escapedQuote = ref.exactQuote
.replace(/([[\]_*`])/g, '\\$1')
.replace(/\n/g, ' ')
.trim();
return `[^${i + 1}]: [${escapedQuote}](${ref.url})`;
}).join('\n\n');
return `${answer.answer.replace(/\(REF_(\d+)\)/g, (_, num) => `[^${num}]`)}
<references> <references>
${answer.references.map((ref, i) => { ${references}
// Escape special markdown characters in the quote
const escapedQuote = ref.exactQuote
.replace(/([[\]_*`])/g, '\\$1') // Escape markdown syntax chars
.replace(/\n/g, ' ') // Replace line breaks with spaces
.trim(); // Remove excess whitespace
return `[^${i + 1}]: [${escapedQuote}](${ref.url})\n\n`;
}).join()}
</references> </references>`;
`.trim();
}
return `${answer.answer.replace(/\(REF_(\d+)\)/g, (_, num) => `[^${num}]`)}\n\n${refStr}`;
} }
async function* streamTextNaturally(text: string, streamingState: StreamingState) { async function* streamTextNaturally(text: string, streamingState: StreamingState) {
@@ -465,16 +466,16 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
res.write(`data: ${JSON.stringify(initialChunk)}\n\n`); res.write(`data: ${JSON.stringify(initialChunk)}\n\n`);
// Set up progress listener with cleanup // Set up progress listener with cleanup
const actionListener = async (action: any) => { const actionListener = async (step: StepAction) => {
if (action.thisStep.think) { // Add content to queue for both thinking steps and final answer
// Create a promise that resolves when this content is done streaming if (step.think) {
const content = step.think;
await new Promise<void>(resolve => { await new Promise<void>(resolve => {
streamingState.queue.push({ streamingState.queue.push({
content: action.thisStep.think, content,
resolve resolve
}); });
// Single call to process queue is sufficient
// Start processing queue if not already processing
processQueue(streamingState, res, requestId, created, body.model); processQueue(streamingState, res, requestId, created, body.model);
}); });
} }
@@ -491,13 +492,13 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
} }
try { try {
const {result} = await getResponse(lastMessage.content as string, tokenBudget, maxBadAttempts, context, body.messages) const {result: finalStep} = await getResponse(lastMessage.content as string, tokenBudget, maxBadAttempts, context, body.messages)
const usage = context.tokenTracker.getTotalUsageSnakeCase(); const usage = context.tokenTracker.getTotalUsageSnakeCase();
if (body.stream) { if (body.stream) {
// Complete any ongoing streaming before sending final answer // Complete any ongoing streaming before sending final answer
await completeCurrentStreaming(streamingState, res, requestId, created, body.model); await completeCurrentStreaming(streamingState, res, requestId, created, body.model);
const finalAnswer = buildMdFromAnswer(finalStep as AnswerAction);
// Send closing think tag // Send closing think tag
const closeThinkChunk: ChatCompletionChunk = { const closeThinkChunk: ChatCompletionChunk = {
id: requestId, id: requestId,
@@ -507,15 +508,15 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId, system_fingerprint: 'fp_' + requestId,
choices: [{ choices: [{
index: 0, index: 0,
delta: {content: `</think>\n\n`}, delta: {content: `</think>\n\n${finalAnswer}`},
logprobs: null, logprobs: null,
finish_reason: null finish_reason: null
}] }]
}; };
res.write(`data: ${JSON.stringify(closeThinkChunk)}\n\n`); res.write(`data: ${JSON.stringify(closeThinkChunk)}\n\n`);
// Send final answer as separate chunk // After the content is fully streamed, send the final chunk with finish_reason and usage
const answerChunk: ChatCompletionChunk = { const finalChunk: ChatCompletionChunk = {
id: requestId, id: requestId,
object: 'chat.completion.chunk', object: 'chat.completion.chunk',
created, created,
@@ -523,13 +524,13 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId, system_fingerprint: 'fp_' + requestId,
choices: [{ choices: [{
index: 0, index: 0,
delta: {content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think}, delta: {content: ''},
logprobs: null, logprobs: null,
finish_reason: 'stop' finish_reason: 'stop'
}], }],
usage usage
}; };
res.write(`data: ${JSON.stringify(answerChunk)}\n\n`); res.write(`data: ${JSON.stringify(finalChunk)}\n\n`);
res.end(); res.end();
} else { } else {
@@ -543,7 +544,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
index: 0, index: 0,
message: { message: {
role: 'assistant', role: 'assistant',
content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think content: finalStep.action === 'answer' ? buildMdFromAnswer(finalStep) : finalStep.think
}, },
logprobs: null, logprobs: null,
finish_reason: 'stop' finish_reason: 'stop'

View File

@@ -18,6 +18,7 @@ export type AnswerAction = BaseAction & {
exactQuote: string; exactQuote: string;
url: string; url: string;
}>; }>;
isFinal?: boolean;
}; };
export type ReflectAction = BaseAction & { export type ReflectAction = BaseAction & {

View File

@@ -18,13 +18,13 @@ export class ActionTracker extends EventEmitter {
trackAction(newState: Partial<ActionState>) { trackAction(newState: Partial<ActionState>) {
this.state = { ...this.state, ...newState }; this.state = { ...this.state, ...newState };
this.emit('action', this.state); this.emit('action', this.state.thisStep);
} }
trackThink(think: string) { trackThink(think: string) {
// only update the think field of the current state // only update the think field of the current state
this.state = { ...this.state, thisStep: { ...this.state.thisStep, think } }; this.state = { ...this.state, thisStep: { ...this.state.thisStep, think } };
this.emit('action', this.state); this.emit('action', this.state.thisStep);
} }
getState(): ActionState { getState(): ActionState {