mirror of
https://github.com/jina-ai/node-DeepResearch.git
synced 2026-03-22 07:29:35 +08:00
chore: first commit
This commit is contained in:
150
src/server.ts
150
src/server.ts
@@ -1,11 +1,13 @@
|
|||||||
import express, { Request, Response, RequestHandler } from 'express';
|
import express, {Request, Response, RequestHandler} from 'express';
|
||||||
import cors from 'cors';
|
import cors from 'cors';
|
||||||
import { EventEmitter } from 'events';
|
import {EventEmitter} from 'events';
|
||||||
import { getResponse } from './agent';
|
import {getResponse} from './agent';
|
||||||
import {StepAction, StreamMessage} from './types';
|
import {StepAction, StreamMessage} from './types';
|
||||||
import { TrackerContext } from './types/tracker';
|
import {TrackerContext} from './types/tracker';
|
||||||
import fs from 'fs/promises';
|
import fs from 'fs/promises';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
import {TokenTracker} from "./utils/token-tracker";
|
||||||
|
import {ActionTracker} from "./utils/action-tracker";
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
const port = process.env.PORT || 3000;
|
const port = process.env.PORT || 3000;
|
||||||
@@ -37,36 +39,6 @@ async function checkRequestExists(requestId: string): Promise<boolean> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SSE endpoint with request validation
|
|
||||||
app.get('/api/v1/stream/:requestId', (async (req: Request, res: StreamResponse) => {
|
|
||||||
const requestId = req.params.requestId;
|
|
||||||
|
|
||||||
// Check if request exists before setting up SSE
|
|
||||||
const exists = await checkRequestExists(requestId);
|
|
||||||
if (!exists) {
|
|
||||||
res.status(404).json({ error: 'Request ID not found' });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
res.setHeader('Content-Type', 'text/event-stream');
|
|
||||||
res.setHeader('Cache-Control', 'no-cache');
|
|
||||||
res.setHeader('Connection', 'keep-alive');
|
|
||||||
|
|
||||||
const listener = (data: StreamMessage) => {
|
|
||||||
res.write(`data: ${JSON.stringify(data)}\n\n`);
|
|
||||||
};
|
|
||||||
|
|
||||||
eventEmitter.on(`progress-${requestId}`, listener);
|
|
||||||
|
|
||||||
// Handle client disconnection
|
|
||||||
req.on('close', () => {
|
|
||||||
eventEmitter.removeListener(`progress-${requestId}`, listener);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Send initial connection confirmation
|
|
||||||
res.write(`data: ${JSON.stringify({ type: 'connected', requestId })}\n\n`);
|
|
||||||
}) as RequestHandler);
|
|
||||||
|
|
||||||
function createProgressEmitter(requestId: string, budget: number | undefined, context: TrackerContext) {
|
function createProgressEmitter(requestId: string, budget: number | undefined, context: TrackerContext) {
|
||||||
return () => {
|
return () => {
|
||||||
const state = context.actionTracker.getState();
|
const state = context.actionTracker.getState();
|
||||||
@@ -78,41 +50,131 @@ function createProgressEmitter(requestId: string, budget: number | undefined, co
|
|||||||
|
|
||||||
eventEmitter.emit(`progress-${requestId}`, {
|
eventEmitter.emit(`progress-${requestId}`, {
|
||||||
type: 'progress',
|
type: 'progress',
|
||||||
data: { ...state.thisStep, totalStep: state.totalStep },
|
data: {...state.thisStep, totalStep: state.totalStep},
|
||||||
step: state.totalStep,
|
step: state.totalStep,
|
||||||
budget: budgetInfo
|
budget: budgetInfo,
|
||||||
|
trackers: {
|
||||||
|
tokenUsage: context.tokenTracker.getTotalUsage(),
|
||||||
|
actionState: context.actionTracker.getState()
|
||||||
|
}
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function cleanup(requestId: string) {
|
||||||
|
const context = trackers.get(requestId);
|
||||||
|
if (context) {
|
||||||
|
context.actionTracker.removeAllListeners();
|
||||||
|
context.tokenTracker.removeAllListeners();
|
||||||
|
trackers.delete(requestId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function emitTrackerUpdate(requestId: string, context: TrackerContext) {
|
||||||
|
const trackerData = {
|
||||||
|
tokenUsage: context.tokenTracker.getTotalUsage(),
|
||||||
|
tokenBreakdown: context.tokenTracker.getUsageBreakdown(),
|
||||||
|
actionState: context.actionTracker.getState().thisStep,
|
||||||
|
step: context.actionTracker.getState().totalStep,
|
||||||
|
badAttempts: context.actionTracker.getState().badAttempts,
|
||||||
|
gaps: context.actionTracker.getState().gaps
|
||||||
|
};
|
||||||
|
|
||||||
|
eventEmitter.emit(`progress-${requestId}`, {
|
||||||
|
type: 'progress',
|
||||||
|
trackers: trackerData
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the trackers for each request
|
||||||
|
const trackers = new Map<string, TrackerContext>();
|
||||||
|
|
||||||
app.post('/api/v1/query', (async (req: QueryRequest, res: Response) => {
|
app.post('/api/v1/query', (async (req: QueryRequest, res: Response) => {
|
||||||
const { q, budget, maxBadAttempt } = req.body;
|
const {q, budget, maxBadAttempt} = req.body;
|
||||||
if (!q) {
|
if (!q) {
|
||||||
return res.status(400).json({ error: 'Query (q) is required' });
|
return res.status(400).json({error: 'Query (q) is required'});
|
||||||
}
|
}
|
||||||
|
|
||||||
const requestId = Date.now().toString();
|
const requestId = Date.now().toString();
|
||||||
res.json({ requestId });
|
|
||||||
|
// Create new trackers for this request
|
||||||
|
const context: TrackerContext = {
|
||||||
|
tokenTracker: new TokenTracker(),
|
||||||
|
actionTracker: new ActionTracker()
|
||||||
|
};
|
||||||
|
trackers.set(requestId, context);
|
||||||
|
|
||||||
|
// Set up listeners immediately for both trackers
|
||||||
|
context.actionTracker.on('action', () => emitTrackerUpdate(requestId, context));
|
||||||
|
// context.tokenTracker.on('usage', () => emitTrackerUpdate(requestId, context));
|
||||||
|
|
||||||
|
res.json({requestId});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { result, context } = await getResponse(q, budget, maxBadAttempt);
|
const {result} = await getResponse(q, budget, maxBadAttempt, context);
|
||||||
const emitProgress = createProgressEmitter(requestId, budget, context);
|
const emitProgress = createProgressEmitter(requestId, budget, context);
|
||||||
context.actionTracker.on('action', emitProgress);
|
context.actionTracker.on('action', emitProgress);
|
||||||
await storeTaskResult(requestId, result);
|
await storeTaskResult(requestId, result);
|
||||||
eventEmitter.emit(`progress-${requestId}`, { type: 'answer', data: result });
|
eventEmitter.emit(`progress-${requestId}`, {
|
||||||
|
type: 'answer',
|
||||||
|
data: result,
|
||||||
|
trackers: {
|
||||||
|
tokenUsage: context.tokenTracker.getTotalUsage(),
|
||||||
|
actionState: context.actionTracker.getState()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cleanup(requestId);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
eventEmitter.emit(`progress-${requestId}`, {
|
eventEmitter.emit(`progress-${requestId}`, {
|
||||||
type: 'error',
|
type: 'error',
|
||||||
data: error?.message || 'Unknown error',
|
data: error?.message || 'Unknown error',
|
||||||
status: 500
|
status: 500,
|
||||||
|
trackers: {
|
||||||
|
tokenUsage: context.tokenTracker.getTotalUsage(),
|
||||||
|
actionState: context.actionTracker.getState()
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
cleanup(requestId);
|
||||||
}
|
}
|
||||||
}) as RequestHandler);
|
}) as RequestHandler);
|
||||||
|
|
||||||
|
app.get('/api/v1/stream/:requestId', (async (req: Request, res: StreamResponse) => {
|
||||||
|
const requestId = req.params.requestId;
|
||||||
|
const context = trackers.get(requestId);
|
||||||
|
|
||||||
|
res.setHeader('Content-Type', 'text/event-stream');
|
||||||
|
res.setHeader('Cache-Control', 'no-cache');
|
||||||
|
res.setHeader('Connection', 'keep-alive');
|
||||||
|
|
||||||
|
const listener = (data: StreamMessage) => {
|
||||||
|
// The trackers are now included in all event types
|
||||||
|
// We don't need to add them here as they're already part of the data
|
||||||
|
res.write(`data: ${JSON.stringify(data)}\n\n`);
|
||||||
|
};
|
||||||
|
|
||||||
|
eventEmitter.on(`progress-${requestId}`, listener);
|
||||||
|
|
||||||
|
// Handle client disconnection
|
||||||
|
req.on('close', () => {
|
||||||
|
eventEmitter.removeListener(`progress-${requestId}`, listener);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send initial connection confirmation with tracker state
|
||||||
|
const initialData = {
|
||||||
|
type: 'connected',
|
||||||
|
requestId,
|
||||||
|
trackers: context ? {
|
||||||
|
tokenUsage: context.tokenTracker.getTotalUsage(),
|
||||||
|
actionState: context.actionTracker.getState()
|
||||||
|
} : null
|
||||||
|
};
|
||||||
|
res.write(`data: ${JSON.stringify(initialData)}\n\n`);
|
||||||
|
}) as RequestHandler);
|
||||||
|
|
||||||
async function storeTaskResult(requestId: string, result: StepAction) {
|
async function storeTaskResult(requestId: string, result: StepAction) {
|
||||||
try {
|
try {
|
||||||
const taskDir = path.join(process.cwd(), 'tasks');
|
const taskDir = path.join(process.cwd(), 'tasks');
|
||||||
await fs.mkdir(taskDir, { recursive: true });
|
await fs.mkdir(taskDir, {recursive: true});
|
||||||
await fs.writeFile(
|
await fs.writeFile(
|
||||||
path.join(taskDir, `${requestId}.json`),
|
path.join(taskDir, `${requestId}.json`),
|
||||||
JSON.stringify(result, null, 2)
|
JSON.stringify(result, null, 2)
|
||||||
@@ -130,7 +192,7 @@ app.get('/api/v1/task/:requestId', (async (req: Request, res: Response) => {
|
|||||||
const taskData = await fs.readFile(taskPath, 'utf-8');
|
const taskData = await fs.readFile(taskPath, 'utf-8');
|
||||||
res.json(JSON.parse(taskData));
|
res.json(JSON.parse(taskData));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(404).json({ error: 'Task not found' });
|
res.status(404).json({error: 'Task not found'});
|
||||||
}
|
}
|
||||||
}) as RequestHandler);
|
}) as RequestHandler);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user