|
|
|
|
@ -566,6 +566,213 @@ export async function deleteNoteEmbeddings(noteId: string, providerId?: string,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get notes that have failed embedding generation
|
|
|
|
|
*
|
|
|
|
|
* @param limit - Maximum number of failed notes to return
|
|
|
|
|
* @returns List of failed notes with their error information
|
|
|
|
|
*/
|
|
|
|
|
export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[]> {
|
|
|
|
|
// Get notes with failed embedding attempts
|
|
|
|
|
const failedQueueItems = await sql.getRows(`
|
|
|
|
|
SELECT noteId, operation, attempts, lastAttempt, error
|
|
|
|
|
FROM embedding_queue
|
|
|
|
|
WHERE attempts > 0
|
|
|
|
|
ORDER BY attempts DESC, lastAttempt DESC
|
|
|
|
|
LIMIT ?`,
|
|
|
|
|
[limit]
|
|
|
|
|
) as {noteId: string, operation: string, attempts: number, lastAttempt: string, error: string}[];
|
|
|
|
|
|
|
|
|
|
// Add titles to the failed notes
|
|
|
|
|
const failedNotesWithTitles = [];
|
|
|
|
|
for (const item of failedQueueItems) {
|
|
|
|
|
const note = becca.getNote(item.noteId);
|
|
|
|
|
if (note) {
|
|
|
|
|
failedNotesWithTitles.push({
|
|
|
|
|
...item,
|
|
|
|
|
title: note.title,
|
|
|
|
|
failureType: 'full' // This indicates a complete embedding failure
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
failedNotesWithTitles.push({
|
|
|
|
|
...item,
|
|
|
|
|
failureType: 'full'
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Now get notes with failed chunks
|
|
|
|
|
// We need to search for labels that contain failed chunks data
|
|
|
|
|
const notes = await sql.getRows(`
|
|
|
|
|
SELECT noteId, name, value
|
|
|
|
|
FROM attributes
|
|
|
|
|
WHERE type = 'label' AND name LIKE '%FailedChunks'
|
|
|
|
|
`) as {noteId: string, name: string, value: string}[];
|
|
|
|
|
|
|
|
|
|
// Process notes with failed chunks
|
|
|
|
|
for (const item of notes) {
|
|
|
|
|
try {
|
|
|
|
|
const noteId = item.noteId;
|
|
|
|
|
const note = becca.getNote(noteId);
|
|
|
|
|
if (!note) continue;
|
|
|
|
|
|
|
|
|
|
// Parse the failed chunks data
|
|
|
|
|
const failedChunks = JSON.parse(item.value) as Record<string, {attempts: number, lastAttempt: string, error: string}>;
|
|
|
|
|
const chunkCount = Object.keys(failedChunks).length;
|
|
|
|
|
if (chunkCount === 0) continue;
|
|
|
|
|
|
|
|
|
|
// Get the most recent failed chunk
|
|
|
|
|
let latestAttempt = '';
|
|
|
|
|
let totalAttempts = 0;
|
|
|
|
|
let errorExample = '';
|
|
|
|
|
|
|
|
|
|
for (const chunkId in failedChunks) {
|
|
|
|
|
const chunk = failedChunks[chunkId];
|
|
|
|
|
totalAttempts += chunk.attempts;
|
|
|
|
|
|
|
|
|
|
if (!latestAttempt || chunk.lastAttempt > latestAttempt) {
|
|
|
|
|
latestAttempt = chunk.lastAttempt;
|
|
|
|
|
errorExample = chunk.error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add this to our list of failed notes
|
|
|
|
|
failedNotesWithTitles.push({
|
|
|
|
|
noteId,
|
|
|
|
|
title: note.title,
|
|
|
|
|
failureType: 'chunks',
|
|
|
|
|
chunks: chunkCount,
|
|
|
|
|
attempts: totalAttempts,
|
|
|
|
|
lastAttempt: latestAttempt,
|
|
|
|
|
error: `${chunkCount} chunks failed: ${errorExample}`
|
|
|
|
|
});
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error("Error processing note with failed chunks:", error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sort by latest attempt
|
|
|
|
|
failedNotesWithTitles.sort((a, b) => {
|
|
|
|
|
if (a.lastAttempt && b.lastAttempt) {
|
|
|
|
|
return b.lastAttempt.localeCompare(a.lastAttempt);
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Limit to the specified number
|
|
|
|
|
return failedNotesWithTitles.slice(0, limit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retry embedding generation for a specific failed note
|
|
|
|
|
*
|
|
|
|
|
* @param noteId - ID of the note to retry
|
|
|
|
|
* @returns Success flag
|
|
|
|
|
*/
|
|
|
|
|
export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
|
|
|
|
|
let success = false;
|
|
|
|
|
|
|
|
|
|
// First, check if the note is in the embedding queue with failed attempts
|
|
|
|
|
const exists = await sql.getValue(
|
|
|
|
|
"SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0",
|
|
|
|
|
[noteId]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (exists) {
|
|
|
|
|
// Reset the note in the queue
|
|
|
|
|
const now = dateUtils.localNowDateTime();
|
|
|
|
|
const utcNow = dateUtils.utcNowDateTime();
|
|
|
|
|
|
|
|
|
|
await sql.execute(`
|
|
|
|
|
UPDATE embedding_queue
|
|
|
|
|
SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ?
|
|
|
|
|
WHERE noteId = ?`,
|
|
|
|
|
[now, utcNow, noteId]
|
|
|
|
|
);
|
|
|
|
|
success = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Next, check for failed chunks in labels
|
|
|
|
|
const note = becca.getNote(noteId);
|
|
|
|
|
if (note) {
|
|
|
|
|
// Look for any provider-specific failed chunks
|
|
|
|
|
const labels = note.getLabels();
|
|
|
|
|
const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks'));
|
|
|
|
|
|
|
|
|
|
for (const label of failedChunksLabels) {
|
|
|
|
|
// Remove the label - this will cause all chunks to be retried
|
|
|
|
|
await note.removeLabel(label.name);
|
|
|
|
|
success = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If we had chunk failures but no queue entry, we need to add one
|
|
|
|
|
if (failedChunksLabels.length > 0 && !exists) {
|
|
|
|
|
await queueNoteForEmbedding(noteId, 'UPDATE');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return success;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retry all failed embeddings
|
|
|
|
|
*
|
|
|
|
|
* @returns Number of notes queued for retry
|
|
|
|
|
*/
|
|
|
|
|
export async function retryAllFailedEmbeddings(): Promise<number> {
|
|
|
|
|
let totalRetried = 0;
|
|
|
|
|
|
|
|
|
|
// Get count of failed notes in queue
|
|
|
|
|
const failedCount = await sql.getValue(
|
|
|
|
|
"SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0"
|
|
|
|
|
) as number;
|
|
|
|
|
|
|
|
|
|
if (failedCount > 0) {
|
|
|
|
|
// Reset all failed notes in the queue
|
|
|
|
|
const now = dateUtils.localNowDateTime();
|
|
|
|
|
const utcNow = dateUtils.utcNowDateTime();
|
|
|
|
|
|
|
|
|
|
await sql.execute(`
|
|
|
|
|
UPDATE embedding_queue
|
|
|
|
|
SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ?
|
|
|
|
|
WHERE attempts > 0`,
|
|
|
|
|
[now, utcNow]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
totalRetried += failedCount;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Now find notes with failed chunks
|
|
|
|
|
const notesWithFailedChunks = await sql.getRows(`
|
|
|
|
|
SELECT DISTINCT noteId
|
|
|
|
|
FROM attributes
|
|
|
|
|
WHERE type = 'label' AND name LIKE '%FailedChunks'
|
|
|
|
|
`) as {noteId: string}[];
|
|
|
|
|
|
|
|
|
|
// Process each note with failed chunks
|
|
|
|
|
for (const item of notesWithFailedChunks) {
|
|
|
|
|
const noteId = item.noteId;
|
|
|
|
|
const note = becca.getNote(noteId);
|
|
|
|
|
|
|
|
|
|
if (note) {
|
|
|
|
|
// Get all failed chunks labels
|
|
|
|
|
const labels = note.getLabels();
|
|
|
|
|
const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks'));
|
|
|
|
|
|
|
|
|
|
for (const label of failedChunksLabels) {
|
|
|
|
|
// Remove the label - this will cause all chunks to be retried
|
|
|
|
|
await note.removeLabel(label.name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Make sure the note is in the queue
|
|
|
|
|
await queueNoteForEmbedding(noteId, 'UPDATE');
|
|
|
|
|
totalRetried++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return totalRetried;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Process the embedding queue
|
|
|
|
|
*/
|
|
|
|
|
@ -621,7 +828,10 @@ export async function processEmbeddingQueue() {
|
|
|
|
|
const context = await getNoteEmbeddingContext(noteData.noteId);
|
|
|
|
|
|
|
|
|
|
// Check if we should use chunking for large content
|
|
|
|
|
const useChunking = context.content.length > 5000; // Use chunking for large notes by default
|
|
|
|
|
const useChunking = context.content.length > 5000;
|
|
|
|
|
|
|
|
|
|
// Track if all providers failed
|
|
|
|
|
let allProvidersFailed = true;
|
|
|
|
|
|
|
|
|
|
// Process with each enabled provider
|
|
|
|
|
for (const provider of enabledProviders) {
|
|
|
|
|
@ -642,16 +852,35 @@ export async function processEmbeddingQueue() {
|
|
|
|
|
embedding
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
// At least one provider succeeded
|
|
|
|
|
allProvidersFailed = false;
|
|
|
|
|
} catch (providerError: any) {
|
|
|
|
|
log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove from queue on success
|
|
|
|
|
await sql.execute(
|
|
|
|
|
"DELETE FROM embedding_queue WHERE noteId = ?",
|
|
|
|
|
[noteData.noteId]
|
|
|
|
|
);
|
|
|
|
|
// Only remove from queue on success if at least one provider succeeded
|
|
|
|
|
if (!allProvidersFailed) {
|
|
|
|
|
await sql.execute(
|
|
|
|
|
"DELETE FROM embedding_queue WHERE noteId = ?",
|
|
|
|
|
[noteData.noteId]
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
// If all providers failed, mark as failed but keep in queue
|
|
|
|
|
await sql.execute(`
|
|
|
|
|
UPDATE embedding_queue
|
|
|
|
|
SET attempts = attempts + 1,
|
|
|
|
|
lastAttempt = ?,
|
|
|
|
|
error = ?
|
|
|
|
|
WHERE noteId = ?`,
|
|
|
|
|
[dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Remove from queue if too many attempts
|
|
|
|
|
if (noteData.attempts + 1 >= 3) {
|
|
|
|
|
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
const noteData = note as unknown as QueueItem;
|
|
|
|
|
|
|
|
|
|
@ -667,13 +896,10 @@ export async function processEmbeddingQueue() {
|
|
|
|
|
|
|
|
|
|
log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`);
|
|
|
|
|
|
|
|
|
|
// Remove from queue if too many attempts
|
|
|
|
|
// Don't remove from queue even after multiple failures, just mark as failed
|
|
|
|
|
// This allows manual retries later
|
|
|
|
|
if (noteData.attempts + 1 >= 3) {
|
|
|
|
|
await sql.execute(
|
|
|
|
|
"DELETE FROM embedding_queue WHERE noteId = ?",
|
|
|
|
|
[noteData.noteId]
|
|
|
|
|
);
|
|
|
|
|
log.error(`Removed note ${noteData.noteId} from embedding queue after multiple failures`);
|
|
|
|
|
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -857,40 +1083,151 @@ async function processNoteWithChunking(
|
|
|
|
|
// Delete existing embeddings first to avoid duplicates
|
|
|
|
|
await deleteNoteEmbeddings(noteId, provider.name, config.model);
|
|
|
|
|
|
|
|
|
|
// Track successful and failed chunks
|
|
|
|
|
let successfulChunks = 0;
|
|
|
|
|
let failedChunks = 0;
|
|
|
|
|
const totalChunks = chunks.length;
|
|
|
|
|
|
|
|
|
|
// Get existing chunk failure data from the database
|
|
|
|
|
// We'll store this in a special attribute on the note to track per-chunk failures
|
|
|
|
|
const failedChunksData = await getFailedChunksData(noteId, provider.name);
|
|
|
|
|
|
|
|
|
|
// Process each chunk with a slight delay to avoid rate limits
|
|
|
|
|
for (let i = 0; i < chunks.length; i++) {
|
|
|
|
|
const chunk = chunks[i];
|
|
|
|
|
const chunkId = `chunk_${i + 1}_of_${chunks.length}`;
|
|
|
|
|
|
|
|
|
|
// Create a modified context object with just this chunk's content
|
|
|
|
|
const chunkContext: NoteEmbeddingContext = {
|
|
|
|
|
...context,
|
|
|
|
|
content: chunk
|
|
|
|
|
};
|
|
|
|
|
// Skip chunks that have failed multiple times
|
|
|
|
|
if (failedChunksData[chunkId] && failedChunksData[chunkId].attempts >= 3) {
|
|
|
|
|
log.info(`Skipping chunk ${chunkId} for note ${noteId} after ${failedChunksData[chunkId].attempts} failed attempts`);
|
|
|
|
|
failedChunks++;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generate embedding for this chunk
|
|
|
|
|
const embedding = await provider.generateNoteEmbeddings(chunkContext);
|
|
|
|
|
try {
|
|
|
|
|
// Create a modified context object with just this chunk's content
|
|
|
|
|
const chunkContext: NoteEmbeddingContext = {
|
|
|
|
|
...context,
|
|
|
|
|
content: chunk
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Store with chunk information
|
|
|
|
|
await storeNoteEmbedding(
|
|
|
|
|
noteId,
|
|
|
|
|
provider.name,
|
|
|
|
|
config.model,
|
|
|
|
|
embedding
|
|
|
|
|
);
|
|
|
|
|
// Generate embedding for this chunk
|
|
|
|
|
const embedding = await provider.generateNoteEmbeddings(chunkContext);
|
|
|
|
|
|
|
|
|
|
// Store with chunk information
|
|
|
|
|
await storeNoteEmbedding(
|
|
|
|
|
noteId,
|
|
|
|
|
provider.name,
|
|
|
|
|
config.model,
|
|
|
|
|
embedding
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
successfulChunks++;
|
|
|
|
|
|
|
|
|
|
// Remove this chunk from failed chunks if it was previously failed
|
|
|
|
|
if (failedChunksData[chunkId]) {
|
|
|
|
|
delete failedChunksData[chunkId];
|
|
|
|
|
await updateFailedChunksData(noteId, provider.name, failedChunksData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Small delay between chunks to avoid rate limits
|
|
|
|
|
if (i < chunks.length - 1) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
|
|
|
}
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
// Track the failure for this specific chunk
|
|
|
|
|
failedChunks++;
|
|
|
|
|
|
|
|
|
|
if (!failedChunksData[chunkId]) {
|
|
|
|
|
failedChunksData[chunkId] = {
|
|
|
|
|
attempts: 1,
|
|
|
|
|
lastAttempt: dateUtils.utcNowDateTime(),
|
|
|
|
|
error: error.message || 'Unknown error'
|
|
|
|
|
};
|
|
|
|
|
} else {
|
|
|
|
|
failedChunksData[chunkId].attempts++;
|
|
|
|
|
failedChunksData[chunkId].lastAttempt = dateUtils.utcNowDateTime();
|
|
|
|
|
failedChunksData[chunkId].error = error.message || 'Unknown error';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update the failed chunks data in the database
|
|
|
|
|
await updateFailedChunksData(noteId, provider.name, failedChunksData);
|
|
|
|
|
|
|
|
|
|
// Small delay between chunks to avoid rate limits
|
|
|
|
|
if (i < chunks.length - 1) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
|
|
|
log.error(`Error processing chunk ${chunkId} for note ${noteId}: ${error.message || 'Unknown error'}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.info(`Generated ${chunks.length} chunk embeddings for note ${noteId}`);
|
|
|
|
|
// Log information about the processed chunks
|
|
|
|
|
if (successfulChunks > 0) {
|
|
|
|
|
log.info(`Generated ${successfulChunks} chunk embeddings for note ${noteId}`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (failedChunks > 0) {
|
|
|
|
|
log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId}`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If all chunks failed, throw an error so the note will be marked as failed
|
|
|
|
|
if (successfulChunks === 0 && failedChunks > 0) {
|
|
|
|
|
throw new Error(`All ${failedChunks} chunks failed for note ${noteId}`);
|
|
|
|
|
}
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Store failed chunk data for a note
|
|
|
|
|
* This is stored in a special attribute on the note so we can track per-chunk failures
|
|
|
|
|
*/
|
|
|
|
|
async function getFailedChunksData(noteId: string, providerId: string): Promise<Record<string, {attempts: number, lastAttempt: string, error: string}>> {
|
|
|
|
|
try {
|
|
|
|
|
const attributeName = `${providerId}FailedChunks`;
|
|
|
|
|
const note = becca.getNote(noteId);
|
|
|
|
|
|
|
|
|
|
if (!note) {
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const attr = note.getLabels().find(attr => attr.name === attributeName);
|
|
|
|
|
|
|
|
|
|
if (!attr || !attr.value) {
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return JSON.parse(attr.value);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Update failed chunk data for a note
|
|
|
|
|
*/
|
|
|
|
|
async function updateFailedChunksData(noteId: string, providerId: string, data: Record<string, {attempts: number, lastAttempt: string, error: string}>): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
const attributeName = `${providerId}FailedChunks`;
|
|
|
|
|
const note = becca.getNote(noteId);
|
|
|
|
|
|
|
|
|
|
if (!note) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Only store if there are failed chunks
|
|
|
|
|
if (Object.keys(data).length > 0) {
|
|
|
|
|
await note.setLabel(attributeName, JSON.stringify(data));
|
|
|
|
|
} else {
|
|
|
|
|
// If no failed chunks, remove the attribute if it exists
|
|
|
|
|
const attr = note.getLabels().find(attr => attr.name === attributeName);
|
|
|
|
|
if (attr) {
|
|
|
|
|
await note.removeLabel(attributeName);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (e) {
|
|
|
|
|
log.error(`Error updating failed chunks data for note ${noteId}: ${e}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export function cleanupEmbeddings() {
|
|
|
|
|
// Cleanup function implementation
|
|
|
|
|
}
|
|
|
|
|
@ -910,5 +1247,8 @@ export default {
|
|
|
|
|
setupEmbeddingBackgroundProcessing,
|
|
|
|
|
initEmbeddings,
|
|
|
|
|
reprocessAllNotes,
|
|
|
|
|
getEmbeddingStats
|
|
|
|
|
getEmbeddingStats,
|
|
|
|
|
getFailedEmbeddingNotes,
|
|
|
|
|
retryFailedEmbedding,
|
|
|
|
|
retryAllFailedEmbeddings
|
|
|
|
|
};
|
|
|
|
|
|