feat: add agentic team mapreduce pattern

This commit is contained in:
Han Xiao 2025-06-11 13:18:00 -07:00
parent 147aa4befd
commit 3d10f25028
8 changed files with 156 additions and 124 deletions

View File

@ -39,9 +39,6 @@
"searchGrounding": {
"temperature": 0
},
"dedup": {
"temperature": 0.1
},
"evaluator": {
"temperature": 0.6,
"maxTokens": 200
@ -50,6 +47,7 @@
"queryRewriter": {
"temperature": 0.1
},
"researchPlanner": {},
"agent": {
"temperature": 0.7
},
@ -78,9 +76,7 @@
"searchGrounding": {
"temperature": 0
},
"dedup": {
"temperature": 0.1
},
"researchPlanner": {},
"evaluator": {},
"errorAnalyzer": {},
"queryRewriter": {

View File

@ -44,7 +44,6 @@
"model": "gemini-2.0-flash-lite"
},
"searchGrounding": {},
"dedup": {},
"evaluator": {
"maxTokens": 2000
},
@ -76,9 +75,6 @@
"searchGrounding": {
"temperature": 0
},
"dedup": {
"temperature": 0.1
},
"evaluator": {},
"errorAnalyzer": {},
"queryRewriter": {

View File

@ -45,6 +45,7 @@ import { formatDateBasedOnType, formatDateRange } from "./utils/date-tools";
import { reviseAnswer } from "./tools/md-fixer";
import { buildImageReferences, buildReferences } from "./tools/build-ref";
import { logInfo, logError, logDebug, logWarning } from './logging';
import { researchPlan } from './tools/research-planner';
async function wait(seconds: number) {
logDebug(`Waiting ${seconds}s...`);
@ -397,7 +398,8 @@ export async function getResponse(question?: string,
languageCode: string | undefined = undefined,
searchLanguageCode?: string,
searchProvider?: string,
with_images: boolean = false
withImages: boolean = false,
teamSize: number = 2
): Promise<{ result: StepAction; context: TrackerContext; visitedURLs: string[], readURLs: string[], allURLs: string[], allImages?: string[], relatedImages?: string[] }> {
let step = 0;
@ -458,6 +460,7 @@ export async function getResponse(question?: string,
const visitedURLs: string[] = [];
const badURLs: string[] = [];
const imageObjects: ImageObject[] = [];
let imageReferences: ImageReference[] = [];
const evaluationMetrics: Record<string, RepeatEvaluationType[]> = {};
// reserve the 10% final budget for the beast mode
const regularBudget = tokenBudget * 0.85;
@ -479,14 +482,12 @@ export async function getResponse(question?: string,
});
})
while (context.tokenTracker.getTotalUsage().totalTokens < regularBudget) {
// add 1s delay to avoid rate limiting
step++;
totalStep++;
const budgetPercentage = (context.tokenTracker.getTotalUsage().totalTokens / tokenBudget * 100).toFixed(2);
logDebug(`Step ${totalStep} / Budget used ${budgetPercentage}%`);
logDebug('Gaps:', { gaps });
logDebug(`Step ${totalStep} / Budget used ${budgetPercentage}%`, { gaps });
allowReflect = allowReflect && (gaps.length <= MAX_REFLECT_PER_STEP);
// rotating question from gaps
const currentQuestion: string = gaps[totalStep % gaps.length];
@ -787,6 +788,37 @@ But then you realized you have asked them before. You decided to to think out of
const soundBites = newKnowledge.map(k => k.answer).join(' ');
if (teamSize > 1) {
const subproblems = await researchPlan(question, teamSize, soundBites, context, SchemaGen);
// parallel call getResponse for each subproblem with exact same parameters from the current step, but their teamSize is 1
const subproblemResponses = await Promise.all(subproblems.map(subproblem => getResponse(subproblem,
tokenBudget,
maxBadAttempts,
context,
messages,
numReturnedURLs,
noDirectAnswer,
boostHostnames,
badHostnames,
onlyHostnames,
maxRef,
minRelScore, languageCode, searchLanguageCode, searchProvider, withImages, 1)));
// convert current step to AnswerAction
thisStep = {
action: 'answer',
think: thisStep.think,
answer: subproblemResponses.map(r => (r.result as AnswerAction).answer).join('\n\n'),
mdAnswer: subproblemResponses.map(r => (r.result as AnswerAction).mdAnswer).join('\n\n'),
references: subproblemResponses.map(r => (r.result as AnswerAction).references).flat(),
isFinal: true,
isAggregated: true
} as AnswerAction;
// break the loop, move to final boxing
break;
}
// rewrite queries with initial soundbites
let keywordsQueries = await rewriteQuery(thisStep, soundBites, context, SchemaGen);
const qOnly = keywordsQueries.filter(q => q.q).map(q => q.q)
@ -872,7 +904,7 @@ You decided to think out of the box or cut from a completely different angle.
SchemaGen,
currentQuestion,
allWebContents,
with_images
withImages
);
diaryContext.push(success
@ -998,8 +1030,9 @@ But unfortunately, you failed to solve the issue. You need to think out of the b
const answerStep = thisStep as AnswerAction;
if (!trivialQuestion) {
if (trivialQuestion) {
answerStep.mdAnswer = buildMdFromAnswer(answerStep);
} else if (!answerStep.isAggregated) {
answerStep.answer = repairMarkdownFinal(
convertHtmlTablesToMd(
fixBadURLMdLinks(
@ -1030,20 +1063,15 @@ But unfortunately, you failed to solve the issue. You need to think out of the b
answerStep.references = references;
await updateReferences(answerStep, allURLs)
answerStep.mdAnswer = repairMarkdownFootnotesOuter(buildMdFromAnswer(answerStep));
} else {
answerStep.mdAnswer = buildMdFromAnswer(answerStep);
}
let imageReferences: ImageReference[] = [];
if (imageObjects.length && with_images) {
try {
imageReferences = await buildImageReferences(answerStep.answer, imageObjects, context, SchemaGen);
logDebug('Image references built:', { count: imageReferences.length });
} catch (error) {
logError('Error building image references:', {
error: error instanceof Error ? error.message : String(error)
});
imageReferences = [];
if (imageObjects.length && withImages) {
try {
imageReferences = await buildImageReferences(answerStep.answer, imageObjects, context, SchemaGen);
logDebug('Image references built:', { imageReferences });
} catch (error) {
logError('Error building image references:', { error });
imageReferences = [];
}
}
}
@ -1055,8 +1083,8 @@ But unfortunately, you failed to solve the issue. You need to think out of the b
visitedURLs: returnedURLs,
readURLs: visitedURLs.filter(url => !badURLs.includes(url)),
allURLs: weightedURLs.map(r => r.url),
allImages: with_images ? imageObjects.map(i => i.url) : undefined,
relatedImages: with_images ? imageReferences.map(i => i.url) : undefined,
allImages: withImages ? imageObjects.map(i => i.url) : undefined,
relatedImages: withImages ? imageReferences.map(i => i.url) : undefined,
};
}

View File

@ -587,7 +587,8 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
body.language_code,
body.search_language_code,
body.search_provider,
body.with_images
body.with_images,
body.team_size
)
let finalAnswer = (finalStep as AnswerAction).mdAnswer;

View File

@ -1,90 +0,0 @@
import { z } from 'zod';
import { TokenTracker } from "../utils/token-tracker";
import { ObjectGeneratorSafe } from "../utils/safe-generator";
import { logInfo, logError, logDebug, logWarning } from '../logging';
const responseSchema = z.object({
think: z.string().describe('Strategic reasoning about the overall deduplication approach').max(500),
unique_queries: z.array(z.string().describe('Unique query that passed the deduplication process, must be less than 30 characters'))
.describe('Array of semantically unique queries').max(3)
});
function getPrompt(newQueries: string[], existingQueries: string[]): string {
return `You are an expert in semantic similarity analysis. Given a set of queries (setA) and a set of queries (setB)
<rules>
Function FilterSetA(setA, setB, threshold):
filteredA = empty set
for each candidateQuery in setA:
isValid = true
// Check similarity with already accepted queries in filteredA
for each acceptedQuery in filteredA:
similarity = calculateSimilarity(candidateQuery, acceptedQuery)
if similarity >= threshold:
isValid = false
break
// If passed first check, compare with set B
if isValid:
for each queryB in setB:
similarity = calculateSimilarity(candidateQuery, queryB)
if similarity >= threshold:
isValid = false
break
// If passed all checks, add to filtered set
if isValid:
add candidateQuery to filteredA
return filteredA
</rules>
<similarity-definition>
1. Consider semantic meaning and query intent, not just lexical similarity
2. Account for different phrasings of the same information need
3. Queries with same base keywords but different operators are NOT duplicates
4. Different aspects or perspectives of the same topic are not duplicates
5. Consider query specificity - a more specific query is not a duplicate of a general one
6. Search operators that make queries behave differently:
- Different site: filters (e.g., site:youtube.com vs site:github.com)
- Different file types (e.g., filetype:pdf vs filetype:doc)
- Different language/location filters (e.g., lang:en vs lang:es)
- Different exact match phrases (e.g., "exact phrase" vs no quotes)
- Different inclusion/exclusion (+/- operators)
- Different title/body filters (intitle: vs inbody:)
</similarity-definition>
Now with threshold set to 0.2; run FilterSetA on the following:
SetA: ${JSON.stringify(newQueries)}
SetB: ${JSON.stringify(existingQueries)}`;
}
const TOOL_NAME = 'dedup';
export async function dedupQueries(
newQueries: string[],
existingQueries: string[],
tracker?: TokenTracker
): Promise<{ unique_queries: string[] }> {
try {
const generator = new ObjectGeneratorSafe(tracker);
const prompt = getPrompt(newQueries, existingQueries);
const result = await generator.generateObject({
model: TOOL_NAME,
schema: responseSchema,
prompt,
});
logInfo(TOOL_NAME, { unique_queries: result.object.unique_queries });
return { unique_queries: result.object.unique_queries };
} catch (error) {
logError(`Error in ${TOOL_NAME}`, { error });
throw error;
}
}

View File

@ -0,0 +1,83 @@
import { PromptPair, TrackerContext } from '../types';
import { ObjectGeneratorSafe } from "../utils/safe-generator";
import { Schemas } from "../utils/schemas";
import { logInfo, logError } from '../logging';
function getPrompt(question: string, teamSize: number = 3, soundBites: string): PromptPair {
const currentTime = new Date();
const currentYear = currentTime.getFullYear();
const currentMonth = currentTime.getMonth() + 1;
return {
system: `
You are a Principal Research Lead managing a team of ${teamSize} junior researchers. Your role is to break down a complex research topic into focused, manageable subproblems and strategically assign them to your team members.
User give you a research topic and some soundbites about the topic, and you follow this systematic approach:
<approach>
First, analyze the main research topic and identify:
- Core research questions that need to be answered
- Key domains/disciplines involved
- Critical dependencies between different aspects
- Potential knowledge gaps or challenges
Then decompose the topic into ${teamSize} distinct, focused subproblems using these ORTHOGONALITY & DEPTH PRINCIPLES:
</approach>
<requirements>
Orthogonality Requirements:
Each subproblem must address a fundamentally different aspect/dimension of the main topic
Use different decomposition axes (e.g., temporal, methodological, stakeholder-based, technical layers)
Minimize content overlap - if two subproblems share >20% of their scope, redesign them
Apply the "substitution test": removing any single subproblem should create a significant gap in understanding
Depth Requirements:
Each subproblem should require 15-25 hours of focused research to properly address
Must go beyond surface-level information to explore underlying mechanisms, theories, or implications
Should generate insights that require synthesis of multiple sources and original analysis
Include both "what" and "why/how" questions to ensure analytical depth
Validation Checks: Before finalizing assignments, verify:
Orthogonality Matrix: Create a 2D matrix showing overlap between each pair of subproblems - aim for <20% overlap
Depth Assessment: Each subproblem should have 4-6 layers of inquiry (surface mechanisms implications future directions)
Coverage Completeness: The union of all subproblems should address 90%+ of the main topic's scope
</requirements>
The current time is ${currentTime.toISOString()}. Current year: ${currentYear}, current month: ${currentMonth}.
Structure your response as valid JSON matching this exact schema. Do not include any text like (this subproblem is about ...) in the subproblems, use second person to describe the subproblems.
Now proceed with decomposing and assigning the research topic.
`,
user:
`
${question}
<soundbite
${soundBites}
</soundbites>
<think>`
};
}
const TOOL_NAME = 'researchPlanner';
export async function researchPlan(question: string, teamSize: number, soundBites: string, trackers: TrackerContext, schemaGen: Schemas): Promise<string[]> {
try {
const generator = new ObjectGeneratorSafe(trackers.tokenTracker);
const prompt = getPrompt(question, teamSize, soundBites);
const result = await generator.generateObject({
model: TOOL_NAME,
schema: schemaGen.getResearchPlanSchema(),
system: prompt.system,
prompt: prompt.user,
});
trackers?.actionTracker.trackThink(result.object.think);
const subproblems = result.object.subproblems;
logInfo(TOOL_NAME, { subproblems });
return subproblems;
} catch (error) {
logError(TOOL_NAME, { error });
throw error;
}
}

View File

@ -41,6 +41,7 @@ export type AnswerAction = BaseAction & {
references: Array<Reference>;
isFinal?: boolean;
mdAnswer?: string;
isAggregated?: boolean;
};
@ -274,6 +275,7 @@ export interface ChatCompletionRequest {
language_code?: string;
search_language_code?: string;
search_provider?: string;
team_size?: number;
}
export interface URLAnnotation {

View File

@ -1,7 +1,7 @@
import { z } from "zod";
import { ObjectGeneratorSafe } from "./safe-generator";
import { EvaluationType, PromptPair } from "../types";
import { logInfo, logError, logDebug, logWarning } from '../logging';
import { logDebug } from '../logging';
export const MAX_URLS_PER_STEP = 5
export const MAX_QUERIES_PER_STEP = 5
@ -157,6 +157,22 @@ export class Schemas {
});
}
getResearchPlanSchema(teamSize: number = 3): z.ZodObject<any> {
return z.object({
think: z.string()
.describe('Explain your decomposition strategy and how you ensured orthogonality between subproblems')
.max(300),
subproblems: z.array(
z.string()
.describe('Complete research plan containing: title, scope, key questions, methodology')
.max(500)
)
.length(teamSize)
.describe(`Array of exactly ${teamSize} orthogonal research plans, each focusing on a different fundamental dimension of the main topic`)
});
}
getQueryRewriterSchema(): z.ZodObject<any> {
return z.object({
think: z.string().describe(`Explain why you choose those search queries. ${this.getLanguagePrompt()}`).max(500),