Implement retry mechanism for missing embeddings in Jina API responses

This commit is contained in:
Han Xiao 2025-04-17 22:06:15 +08:00
parent 94cb43a26a
commit a8f7124e6d

View File

@ -4,6 +4,7 @@ import axios, {AxiosError} from "axios";
const BATCH_SIZE = 128;
const API_URL = "https://api.jina.ai/v1/embeddings";
const MAX_RETRIES = 3; // Maximum number of retries for missing embeddings
// Modified to support different embedding tasks and dimensions
export async function getEmbeddings(
@ -37,10 +38,60 @@ export async function getEmbeddings(
const currentBatch = Math.floor(i / BATCH_SIZE) + 1;
console.log(`[embeddings] Processing batch ${currentBatch}/${batchCount} (${batchTexts.length} texts)`);
// Get embeddings for the batch with retry logic for missing indices
const { batchEmbeddings, batchTokens } = await getBatchEmbeddingsWithRetry(
batchTexts,
options,
currentBatch,
batchCount
);
allEmbeddings.push(...batchEmbeddings);
totalTokens += batchTokens;
console.log(`[embeddings] Batch ${currentBatch} complete. Tokens used: ${batchTokens}, total so far: ${totalTokens}`);
}
// Track token usage if tracker is provided
if (tokenTracker) {
tokenTracker.trackUsage('embeddings', {
promptTokens: totalTokens,
completionTokens: 0,
totalTokens: totalTokens
});
}
console.log(`[embeddings] Complete. Generated ${allEmbeddings.length} embeddings using ${totalTokens} tokens`);
return {embeddings: allEmbeddings, tokens: totalTokens};
}
// Helper function to get embeddings for a batch with retry logic for missing indices
async function getBatchEmbeddingsWithRetry(
batchTexts: string[],
options: {
task?: "text-matching" | "retrieval.passage" | "retrieval.query",
dimensions?: number,
late_chunking?: boolean,
embedding_type?: string
},
currentBatch: number,
batchCount: number
): Promise<{ batchEmbeddings: number[][], batchTokens: number }> {
let batchEmbeddings: number[][] = [];
let batchTokens = 0;
let retryCount = 0;
let textsToProcess = [...batchTexts]; // Copy the original texts
let indexMap = new Map<number, number>(); // Map to keep track of original indices
// Initialize indexMap with original indices
textsToProcess.forEach((_, idx) => {
indexMap.set(idx, idx);
});
while (textsToProcess.length > 0 && retryCount < MAX_RETRIES) {
const request: JinaEmbeddingRequest = {
model: "jina-embeddings-v3",
task: options.task || "text-matching",
input: batchTexts,
input: textsToProcess,
truncate: true
};
@ -61,58 +112,119 @@ export async function getEmbeddings(
}
);
// Prepare embeddings, handling any missing indices
let batchEmbeddings: number[][];
if (!response.data.data || response.data.data.length !== batchTexts.length) {
console.error('Invalid response from Jina API:', response.data.data?.length, batchTexts.length);
// Find missing indices and complete with zero vectors
const receivedIndices = new Set(response.data.data?.map(item => item.index) || []);
const dimensionSize = response.data.data?.[0]?.embedding?.length || options.dimensions || 1024;
batchEmbeddings = [];
for (let idx = 0; idx < batchTexts.length; idx++) {
if (receivedIndices.has(idx)) {
// Find the item with this index
const item = response.data.data.find(d => d.index === idx);
batchEmbeddings.push(item!.embedding);
} else {
// Create a zero vector for missing index
console.error(`Missing embedding for index ${idx}: [${batchTexts[idx]}]`);
batchEmbeddings.push(new Array(dimensionSize).fill(0));
if (!response.data.data) {
console.error('No data returned from Jina API');
if (retryCount === MAX_RETRIES - 1) {
// On last retry, create placeholder embeddings
const dimensionSize = options.dimensions || 1024;
const placeholderEmbeddings = textsToProcess.map(text => {
console.error(`Failed to get embedding after all retries: [${text.substring(0, 50)}...]`);
return new Array(dimensionSize).fill(0);
});
// Add embeddings in correct order
for (let i = 0; i < textsToProcess.length; i++) {
const originalIndex = indexMap.get(i)!;
while (batchEmbeddings.length <= originalIndex) {
batchEmbeddings.push([]);
}
batchEmbeddings[originalIndex] = placeholderEmbeddings[i];
}
}
} else {
// All indices present, just sort by index
batchEmbeddings = response.data.data
.sort((a, b) => a.index - b.index)
.map(item => item.embedding);
retryCount++;
continue;
}
allEmbeddings.push(...batchEmbeddings);
totalTokens += response.data.usage?.total_tokens || 0;
console.log(`[embeddings] Batch ${currentBatch} complete. Tokens used: ${response.data.usage?.total_tokens || 0}, total so far: ${totalTokens}`);
const receivedIndices = new Set(response.data.data.map(item => item.index));
const dimensionSize = response.data.data[0]?.embedding?.length || options.dimensions || 1024;
// Process successful embeddings
const successfulEmbeddings: number[][] = [];
const remainingTexts: string[] = [];
const newIndexMap = new Map<number, number>();
for (let idx = 0; idx < textsToProcess.length; idx++) {
if (receivedIndices.has(idx)) {
// Find the item with this index
const item = response.data.data.find(d => d.index === idx)!;
// Get the original index and store in the result array
const originalIndex = indexMap.get(idx)!;
while (batchEmbeddings.length <= originalIndex) {
batchEmbeddings.push([]);
}
batchEmbeddings[originalIndex] = item.embedding;
successfulEmbeddings.push(item.embedding);
} else {
// Add to retry list
const newIndex = remainingTexts.length;
newIndexMap.set(newIndex, indexMap.get(idx)!);
remainingTexts.push(textsToProcess[idx]);
console.log(`Missing embedding for index ${idx}, will retry: [${textsToProcess[idx].substring(0, 50)}...]`);
}
}
// Add tokens
batchTokens += response.data.usage?.total_tokens || 0;
// Update for next iteration
textsToProcess = remainingTexts;
indexMap = newIndexMap;
// If all embeddings were successfully processed, break out of the loop
if (textsToProcess.length === 0) {
break;
}
// Increment retry count and log
retryCount++;
console.log(`[embeddings] Batch ${currentBatch}/${batchCount} - Retrying ${textsToProcess.length} texts (attempt ${retryCount}/${MAX_RETRIES})`);
} catch (error) {
console.error('Error calling Jina Embeddings API:', error);
if (error instanceof AxiosError && error.response?.status === 402) {
return {embeddings: [], tokens: 0};
return { batchEmbeddings: [], batchTokens: 0 };
}
// On last retry, create placeholder embeddings
if (retryCount === MAX_RETRIES - 1) {
const dimensionSize = options.dimensions || 1024;
for (let idx = 0; idx < textsToProcess.length; idx++) {
const originalIndex = indexMap.get(idx)!;
console.error(`Failed to get embedding after all retries for index ${originalIndex}: [${textsToProcess[idx].substring(0, 50)}...]`);
while (batchEmbeddings.length <= originalIndex) {
batchEmbeddings.push([]);
}
batchEmbeddings[originalIndex] = new Array(dimensionSize).fill(0);
}
}
retryCount++;
if (retryCount < MAX_RETRIES) {
console.log(`[embeddings] Batch ${currentBatch}/${batchCount} - Retry attempt ${retryCount}/${MAX_RETRIES} after error`);
// Wait before retrying to avoid overwhelming the API
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw error; // If we've exhausted retries, re-throw the error
}
throw error;
}
}
// Track token usage if tracker is provided
if (tokenTracker) {
tokenTracker.trackUsage('embeddings', {
promptTokens: totalTokens,
completionTokens: 0,
totalTokens: totalTokens
});
// Handle any remaining missing embeddings after max retries
if (textsToProcess.length > 0) {
console.error(`[embeddings] Failed to get embeddings for ${textsToProcess.length} texts after ${MAX_RETRIES} retries`);
const dimensionSize = options.dimensions || 1024;
for (let idx = 0; idx < textsToProcess.length; idx++) {
const originalIndex = indexMap.get(idx)!;
console.error(`Creating zero embedding for index ${originalIndex} after all retries failed`);
while (batchEmbeddings.length <= originalIndex) {
batchEmbeddings.push([]);
}
batchEmbeddings[originalIndex] = new Array(dimensionSize).fill(0);
}
}
console.log(`[embeddings] Complete. Generated ${allEmbeddings.length} embeddings using ${totalTokens} tokens`);
return {embeddings: allEmbeddings, tokens: totalTokens};
return { batchEmbeddings, batchTokens };
}