diff --git a/src/server.ts b/src/server.ts index 93460b9..96f40c6 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,6 +2,10 @@ import express, { Request, Response, RequestHandler } from 'express'; import cors from 'cors'; import { EventEmitter } from 'events'; import { getResponse } from './agent'; +import { tokenTracker } from './utils/token-tracker'; +import { StepAction } from './types'; +import fs from 'fs/promises'; +import path from 'path'; const app = express(); const port = process.env.PORT || 3000; @@ -14,6 +18,8 @@ app.use(express.json()); const eventEmitter = new EventEmitter(); // Type definitions +import { StreamMessage } from './types'; + interface QueryRequest extends Request { body: { q: string; @@ -34,7 +40,7 @@ app.get('/api/v1/stream/:requestId', ((req: Request, res: StreamResponse) => { res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); - const listener = (data: any) => { + const listener = (data: StreamMessage) => { res.write(`data: ${JSON.stringify(data)}\n\n`); }; @@ -57,6 +63,7 @@ app.post('/api/v1/query', (async (req: QueryRequest, res: Response) => { // Store original console.log const originalConsoleLog: typeof console.log = console.log; + let thisStep: StepAction | undefined; try { // Wrap getResponse to emit progress @@ -64,11 +71,79 @@ app.post('/api/v1/query', (async (req: QueryRequest, res: Response) => { originalConsoleLog(...args); const message = args.join(' '); if (message.includes('Step') || message.includes('Budget used')) { - eventEmitter.emit(`progress-${requestId}`, { type: 'progress', data: message }); + const step = parseInt(message.match(/Step (\d+)/)?.[1] || '0'); + const budgetPercentage = message.match(/Budget used ([\d.]+)%/)?.[1]; + const budgetInfo = budgetPercentage ? { + used: tokenTracker.getTotalUsage(), + total: budget || 1_000_000, + percentage: budgetPercentage + } : undefined; + + // Emit the full thisStep object if available + if (thisStep && thisStep.action && thisStep.thoughts) { + eventEmitter.emit(`progress-${requestId}`, { + type: 'progress', + data: thisStep, + step, + budget: budgetInfo + }); + } else { + eventEmitter.emit(`progress-${requestId}`, { + type: 'progress', + data: message, + step, + budget: budgetInfo + }); + } + } + }; + + // Track step updates during execution + const emitStep = (step: StepAction & { totalStep?: number }) => { + if (step.action && step.thoughts) { + eventEmitter.emit(`progress-${requestId}`, { + type: 'progress', + data: step, + step: step.totalStep, + budget: { + used: tokenTracker.getTotalUsage(), + total: budget || 1_000_000, + percentage: ((tokenTracker.getTotalUsage() / (budget || 1_000_000)) * 100).toFixed(2) + } + }); + } + }; + + // Override console.log to track steps + const originalConsoleLog = console.log; + console.log = (...args: any[]) => { + originalConsoleLog(...args); + const message = args.join(' '); + if (message.includes('Step') || message.includes('Budget used')) { + const step = parseInt(message.match(/Step (\d+)/)?.[1] || '0'); + const budgetPercentage = message.match(/Budget used ([\d.]+)%/)?.[1]; + const budgetInfo = budgetPercentage ? { + used: tokenTracker.getTotalUsage(), + total: budget || 1_000_000, + percentage: budgetPercentage + } : undefined; + + if (thisStep && thisStep.action && thisStep.thoughts) { + emitStep({...thisStep, totalStep: step}); + } else { + eventEmitter.emit(`progress-${requestId}`, { + type: 'progress', + data: message, + step, + budget: budgetInfo + }); + } } }; const result = await getResponse(q, budget, maxBadAttempt); + thisStep = result; + await storeTaskResult(requestId, result); eventEmitter.emit(`progress-${requestId}`, { type: 'answer', data: result }); } catch (error: any) { eventEmitter.emit(`progress-${requestId}`, { type: 'error', data: error?.message || 'Unknown error' }); @@ -77,6 +152,31 @@ app.post('/api/v1/query', (async (req: QueryRequest, res: Response) => { } }) as RequestHandler); +async function storeTaskResult(requestId: string, result: StepAction) { + try { + const taskDir = path.join(process.cwd(), 'tasks'); + await fs.mkdir(taskDir, { recursive: true }); + await fs.writeFile( + path.join(taskDir, `${requestId}.json`), + JSON.stringify(result, null, 2) + ); + } catch (error) { + console.error('Task storage failed:', error); + } +} + +// GET endpoint to fetch task results +app.get('/api/v1/task/:requestId', (async (req: Request, res: Response) => { + const requestId = req.params.requestId; + try { + const taskPath = path.join(process.cwd(), 'tasks', `${requestId}.json`); + const taskData = await fs.readFile(taskPath, 'utf-8'); + res.json(JSON.parse(taskData)); + } catch (error) { + res.status(404).json({ error: 'Task not found' }); + } +}) as RequestHandler); + app.listen(port, () => { console.log(`Server running at http://localhost:${port}`); }); diff --git a/src/types.ts b/src/types.ts index 5c34bff..209bd9a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -134,3 +134,14 @@ export type ResponseSchema = { properties: Record; required: string[]; }; + +export interface StreamMessage { + type: 'progress' | 'answer' | 'error'; + data: string | StepAction; + step?: number; + budget?: { + used: number; + total: number; + percentage: string; + }; +}