perf: async parallel

This commit is contained in:
Han Xiao 2025-04-14 14:16:39 +08:00
parent 098564eb81
commit 30e5558bb8
5 changed files with 125 additions and 128 deletions

View File

@ -3,7 +3,7 @@ import {Reference, TrackerContext, WebContent} from "../types";
import {rerankDocuments} from "./jina-rerank";
import {Schemas} from "../utils/schemas";
// New function to calculate Jaccard similarity as fallback
// Jaccard similarity function for fallback
function calculateJaccardSimilarity(text1: string, text2: string): number {
// Convert texts to lowercase and tokenize by splitting on non-alphanumeric characters
const tokens1 = new Set(text1.toLowerCase().split(/\W+/).filter(t => t.length > 0));
@ -19,7 +19,7 @@ function calculateJaccardSimilarity(text1: string, text2: string): number {
return union.size === 0 ? 0 : intersection.size / union.size;
}
// New function to perform fallback similarity ranking
// Fallback similarity ranking
async function fallbackRerankWithJaccard(query: string, documents: string[]): Promise<{ results: { index: number, relevance_score: number }[] }> {
const results = documents.map((doc, index) => {
const score = calculateJaccardSimilarity(query, doc);
@ -89,7 +89,7 @@ export async function buildReferences(
validAnswerChunks.push(i);
// Create a reranking task (handling batch size constraint later)
// Create a reranking task
rerankTasks.push({
index: i,
chunk: answerChunk,
@ -97,42 +97,17 @@ export async function buildReferences(
});
}
// Fixed batch size of 512 as suggested
const BATCH_SIZE = 512;
// Process all reranking tasks in parallel with fixed batch size
const processTaskWithBatches = async (task: any) => {
// Process all reranking tasks in parallel using the updated rerankDocuments function
const processTask = async (task: any) => {
try {
// Create batches of web content chunks
const batches = [];
for (let i = 0; i < allWebContentChunks.length; i += BATCH_SIZE) {
batches.push(allWebContentChunks.slice(i, i + BATCH_SIZE));
}
// Process all batches in parallel
const batchPromises = batches.map(async (batch, batchIndex) => {
const batchOffset = batchIndex * BATCH_SIZE;
const result = await rerankDocuments(task.chunk, batch, context.tokenTracker);
// Adjust indices to account for batching
return result.results.map(item => ({
index: item.index + batchOffset,
relevance_score: item.relevance_score
}));
});
// Wait for all batch processing to complete
const batchResults = await Promise.all(batchPromises);
// Combine and sort all results
const combinedResults = batchResults.flat();
combinedResults.sort((a, b) => b.relevance_score - a.relevance_score);
// Use rerankDocuments directly - it now handles batching internally
const result = await rerankDocuments(task.chunk, allWebContentChunks, context.tokenTracker);
return {
answerChunkIndex: task.index,
answerChunk: task.chunk,
answerChunkPosition: task.position,
results: combinedResults
results: result.results
};
} catch (error) {
console.error('Reranking failed, falling back to Jaccard similarity', error);
@ -148,7 +123,7 @@ export async function buildReferences(
};
// Process all tasks in parallel
const taskResults = await Promise.all(rerankTasks.map(processTaskWithBatches));
const taskResults = await Promise.all(rerankTasks.map(processTask));
// Collect and flatten all matches
const allMatches = [];

View File

@ -51,12 +51,8 @@ export async function cherryPick(question: string, longContext: string, options:
console.log(`Total length ${longContext.length} split ${chunks.length} chunks into ${chunkBatches.length} batches of ~${chunksPerBatch} chunks each`);
// Process each batch and collect the embeddings
const allChunkEmbeddings: number[][] = [];
let totalTokensUsed = 0;
for (let batchIndex = 0; batchIndex < chunkBatches.length; batchIndex++) {
const batch = chunkBatches[batchIndex];
// Process all batches in parallel
const batchPromises = chunkBatches.map(async (batch, batchIndex) => {
console.log(`Processing batch ${batchIndex + 1}/${chunkBatches.length} with ${batch.length} chunks`);
// Get embeddings for the current batch
@ -90,12 +86,25 @@ export async function cherryPick(question: string, longContext: string, options:
// Extract embeddings from this batch
const batchEmbeddings = batchEmbeddingResponse.data.data.map((item: any) => item.embedding);
allChunkEmbeddings.push(...batchEmbeddings);
// Track token usage
const batchTokens = batchEmbeddingResponse.data.usage?.total_tokens || 0;
totalTokensUsed += batchTokens;
}
// Return both embeddings and token usage
return {
embeddings: batchEmbeddings,
tokens: batchEmbeddingResponse.data.usage?.total_tokens || 0
};
});
// Wait for all batch processing to complete
const batchResults = await Promise.all(batchPromises);
// Collect all embeddings and total token usage
const allChunkEmbeddings: number[][] = [];
let totalTokensUsed = 0;
batchResults.forEach(result => {
allChunkEmbeddings.push(...result.embeddings);
totalTokensUsed += result.tokens;
});
// Get embedding for the question
const questionEmbeddingResponse = await axios.post(

View File

@ -26,57 +26,74 @@ interface JinaRerankResponse {
};
}
/**
* Reranks a list of documents based on relevance to a query
* @param query The query to rank documents against
* @param documents Array of documents to be ranked
* @param topN Number of top results to return
* @param tracker Optional token tracker for usage monitoring
* @returns Array of reranked documents with their scores
*/
export async function rerankDocuments(
query: string,
documents: string[],
tracker?: TokenTracker
): Promise<{ results: Array<{index: number, relevance_score: number, document: {text: string}}> }> {
tracker?: TokenTracker,
batchSize = 2000
): Promise<{ results: Array<{ index: number, relevance_score: number, document: { text: string } }> }> {
try {
if (!JINA_API_KEY) {
throw new Error('JINA_API_KEY is not set');
}
if (documents.length > 2000) {
console.error(`Reranking ${documents.length} documents, which exceeds the recommended limit of 2000. This may lead to performance issues.`);
documents = documents.slice(0, 2000);
// No need to slice - we'll process all documents in batches
const batches: string[][] = [];
for (let i = 0; i < documents.length; i += batchSize) {
batches.push(documents.slice(i, i + batchSize));
}
const request: JinaRerankRequest = {
model: 'jina-reranker-v2-base-multilingual',
query,
top_n: documents.length,
documents
};
console.log(`Processing ${documents.length} documents in ${batches.length} batches of up to ${batchSize} each`);
const response = await axios.post<JinaRerankResponse>(
JINA_API_URL,
request,
{
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${JINA_API_KEY}`
}
}
// Process all batches in parallel
const batchResults = await Promise.all(
batches.map(async (batchDocuments, batchIndex) => {
const startIdx = batchIndex * batchSize;
const request: JinaRerankRequest = {
model: 'jina-reranker-v2-base-multilingual',
query,
top_n: batchDocuments.length,
documents: batchDocuments
};
const response = await axios.post<JinaRerankResponse>(
JINA_API_URL,
request,
{
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${JINA_API_KEY}`
}
}
);
// Track token usage from this batch
(tracker || new TokenTracker()).trackUsage('rerank', {
promptTokens: response.data.usage.total_tokens,
completionTokens: 0,
totalTokens: response.data.usage.total_tokens
});
// Add the original document index to each result
return response.data.results.map(result => ({
...result,
originalIndex: startIdx + result.index // Map back to the original index
}));
})
);
// Track token usage from the API
(tracker || new TokenTracker()).trackUsage('rerank', {
promptTokens: response.data.usage.total_tokens,
completionTokens: 0,
totalTokens: response.data.usage.total_tokens
});
// Flatten and sort all results by relevance score
const allResults = batchResults.flat().sort((a, b) => b.relevance_score - a.relevance_score);
return {
results: response.data.results
};
// Keep the original document indices in the results
const finalResults = allResults.map(result => ({
index: result.originalIndex, // Original document index
relevance_score: result.relevance_score,
document: result.document
}));
return {results: finalResults};
} catch (error) {
console.error('Error in reranking documents:', error);

View File

@ -3,22 +3,6 @@ import {TokenTracker} from "../utils/token-tracker";
import {JINA_API_KEY} from "../config";
import {TrackerContext} from "../types";
/**
* Segments text into chunks, handling text of arbitrary length by batching
* @param content Text to segment
* @param tracker Context for tracking token usage
* @param maxChunkLength Maximum length of each chunk (passed to Jina API)
* @param returnChunks Whether to return chunks in the API response
* @returns Object containing chunks and their positions
*/
/**
* Segments text into chunks, handling text of arbitrary length by batching
* @param content Text to segment
* @param tracker Context for tracking token usage
* @param maxChunkLength Maximum length of each chunk (passed to Jina API)
* @param returnChunks Whether to return chunks in the API response
* @returns Object containing chunks and chunk_positions matching Jina API format
*/
export async function segmentText(
content: string,
tracker: TrackerContext,
@ -38,19 +22,20 @@ export async function segmentText(
// Maximum size to send in a single API request (slightly under 64K to be safe)
const MAX_BATCH_SIZE = 60000;
// Final results
const allChunks = [];
const allChunkPositions = [];
let totalTokens = 0;
// Split content into batches
const batches = splitTextIntoBatches(content, MAX_BATCH_SIZE);
console.log(`Split content into ${batches.length} batches`);
// Process each batch sequentially
// Calculate offsets for each batch upfront
const batchOffsets: number[] = [];
let currentOffset = 0;
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
for (const batch of batches) {
batchOffsets.push(currentOffset);
currentOffset += batch.length;
}
// Process all batches in parallel
const batchPromises = batches.map(async (batch, i) => {
console.log(`Processing batch ${i + 1}/${batches.length} (size: ${batch.length})`);
try {
@ -81,33 +66,43 @@ export async function segmentText(
tokenizer: data.tokenizer
});
// Add chunks from this batch to the results
if (data.chunks && returnChunks) {
allChunks.push(...data.chunks);
}
// Get the batch offset
const offset = batchOffsets[i];
// Adjust chunk positions to account for the offset of this batch
if (data.chunk_positions) {
const adjustedPositions = data.chunk_positions.map((position: [number, number]) => {
// The API returns chunk_positions as arrays of [start, end]
return [
position[0] + currentOffset,
position[1] + currentOffset
] as [number, number];
});
allChunkPositions.push(...adjustedPositions);
}
// Track token usage
const batchTokens = data.usage?.tokens || 0;
totalTokens += batchTokens;
// Update the current offset for the next batch
currentOffset += batch.length;
const adjustedPositions = data.chunk_positions
? data.chunk_positions.map((position: [number, number]) => {
return [
position[0] + offset,
position[1] + offset
] as [number, number];
})
: [];
return {
chunks: data.chunks || [],
positions: adjustedPositions,
tokens: data.usage?.tokens || 0
};
} catch (error) {
handleSegmentationError(error);
}
});
// Wait for all batches to complete
const batchResults = await Promise.all(batchPromises);
// Aggregate results
const allChunks = [];
const allChunkPositions = [];
let totalTokens = 0;
for (const result of batchResults) {
if (returnChunks) {
allChunks.push(...result.chunks);
}
allChunkPositions.push(...result.positions);
totalTokens += result.tokens;
}
// Track total token usage for all batches

View File

@ -721,7 +721,8 @@ export function repairMarkdownFinal(markdown: string): string {
// remove any '<27>'
repairedMarkdown = repairedMarkdown.replace(/<2F>/g, '');
// remove any <center> tags
repairedMarkdown = repairedMarkdown.replace(/<\/?center>/g, '');
// Step 1: Handle <hr> and <br> tags outside tables