Files
CloudFlare-ImgBed/functions/utils/indexManager.js

1455 lines
45 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* 索引管理器 - D1数据库版本 */
import { getDatabase } from './databaseAdapter.js';
/**
* 文件索引结构D1数据库存储
*
* 文件表:
* - 直接存储在 files 表中,包含所有文件信息和元数据
*
* 索引元数据表:
* - 存储在 index_metadata 表中
* - 包含 lastUpdated, totalCount, lastOperationId 等信息
*
* 原子操作表:
* - 存储在 index_operations 表中
* - 包含 id, type, timestamp, data, processed 等字段
* - operation: {
* type: "add" | "remove" | "move" | "batch_add" | "batch_remove" | "batch_move",
* timestamp: 1640995200000,
* data: {
* fileId: "file_unique_id",
* metadata: {}
* }
* }
*/
const INDEX_KEY = 'manage@index';
const INDEX_META_KEY = 'manage@index@meta'; // 索引元数据键
const OPERATION_KEY_PREFIX = 'manage@index@operation_';
const INDEX_CHUNK_SIZE = 10000; // 索引分块大小
const KV_LIST_LIMIT = 1000; // KV 列出批量大小
const BATCH_SIZE = 10; // 批量处理大小
/**
* 添加文件到索引
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @param {string} fileId - 文件 ID
* @param {Object} metadata - 文件元数据
*/
export async function addFileToIndex(context, fileId, metadata = null) {
const { env } = context;
try {
if (metadata === null) {
// 如果未传入metadata尝试从数据库中获取
const db = getDatabase(env);
const fileData = await db.getWithMetadata(fileId);
metadata = fileData.metadata || {};
}
// 记录原子操作
const operationId = await recordOperation(context, 'add', {
fileId,
metadata
});
console.log(`File ${fileId} add operation recorded with ID: ${operationId}`);
return { success: true, operationId };
} catch (error) {
console.error('Error recording add file operation:', error);
return { success: false, error: error.message };
}
}
/**
* 批量添加文件到索引
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @param {Array} files - 文件数组,每个元素包含 { fileId, metadata }
* @param {Object} options - 选项
* @param {boolean} options.skipExisting - 是否跳过已存在的文件,默认为 false更新已存在的文件
* @returns {Object} 返回操作结果 { operationId, totalProcessed }
*/
export async function batchAddFilesToIndex(context, files, options = {}) {
try {
const { env } = context;
const { skipExisting = false } = options;
// 处理每个文件的metadata
const processedFiles = [];
for (const fileItem of files) {
const { fileId, metadata } = fileItem;
let finalMetadata = metadata;
// 如果没有提供metadata尝试从KV中获取
if (!finalMetadata) {
try {
const fileData = await getDatabase(env).getWithMetadata(fileId);
finalMetadata = fileData.metadata || {};
} catch (error) {
console.warn(`Failed to get metadata for file ${fileId}:`, error);
finalMetadata = {};
}
}
processedFiles.push({
fileId,
metadata: finalMetadata
});
}
// 记录批量添加操作
const operationId = await recordOperation(context, 'batch_add', {
files: processedFiles,
options: { skipExisting }
});
console.log(`Batch add operation recorded with ID: ${operationId}, ${files.length} files`);
return {
success: true,
operationId,
totalProcessed: files.length
};
} catch (error) {
console.error('Error recording batch add files operation:', error);
return {
success: false,
error: error.message,
totalProcessed: 0
};
}
}
/**
* 从索引中删除文件
* @param {Object} context - 上下文对象
* @param {string} fileId - 文件 ID
*/
export async function removeFileFromIndex(context, fileId) {
try {
// 记录删除操作
const operationId = await recordOperation(context, 'remove', {
fileId
});
console.log(`File ${fileId} remove operation recorded with ID: ${operationId}`);
return { success: true, operationId };
} catch (error) {
console.error('Error recording remove file operation:', error);
return { success: false, error: error.message };
}
}
/**
* 批量删除文件
* @param {Object} context - 上下文对象
* @param {Array} fileIds - 文件 ID 数组
*/
export async function batchRemoveFilesFromIndex(context, fileIds) {
try {
// 记录批量删除操作
const operationId = await recordOperation(context, 'batch_remove', {
fileIds
});
console.log(`Batch remove operation recorded with ID: ${operationId}, ${fileIds.length} files`);
return {
success: true,
operationId,
totalProcessed: fileIds.length
};
} catch (error) {
console.error('Error recording batch remove files operation:', error);
return {
success: false,
error: error.message,
totalProcessed: 0
};
}
}
/**
* 移动文件修改文件ID
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @param {string} originalFileId - 原文件 ID
* @param {string} newFileId - 新文件 ID
* @param {Object} newMetadata - 新的元数据如果为null则获取原文件的metadata
* @returns {Object} 返回操作结果 { success, operationId?, error? }
*/
export async function moveFileInIndex(context, originalFileId, newFileId, newMetadata = null) {
try {
const { env } = context;
// 确定最终的metadata
let finalMetadata = newMetadata;
if (finalMetadata === null) {
// 如果没有提供新metadata尝试从KV中获取
try {
const fileData = await getDatabase(env).getWithMetadata(newFileId);
finalMetadata = fileData.metadata || {};
} catch (error) {
console.warn(`Failed to get metadata for new file ${newFileId}:`, error);
finalMetadata = {};
}
}
// 记录移动操作
const operationId = await recordOperation(context, 'move', {
originalFileId,
newFileId,
metadata: finalMetadata
});
console.log(`File move operation from ${originalFileId} to ${newFileId} recorded with ID: ${operationId}`);
return { success: true, operationId };
} catch (error) {
console.error('Error recording move file operation:', error);
return { success: false, error: error.message };
}
}
/**
* 批量移动文件
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @param {Array} moveOperations - 移动操作数组,每个元素包含 { originalFileId, newFileId, metadata? }
* @returns {Object} 返回操作结果 { operationId, totalProcessed }
*/
export async function batchMoveFilesInIndex(context, moveOperations) {
try {
const { env } = context;
// 处理每个移动操作的metadata
const processedOperations = [];
for (const operation of moveOperations) {
const { originalFileId, newFileId, metadata } = operation;
// 确定最终的metadata
let finalMetadata = metadata;
if (finalMetadata === null || finalMetadata === undefined) {
// 如果没有提供新metadata尝试从KV中获取
try {
const fileData = await getDatabase(env).getWithMetadata(newFileId);
finalMetadata = fileData.metadata || {};
} catch (error) {
console.warn(`Failed to get metadata for new file ${newFileId}:`, error);
finalMetadata = {};
}
}
processedOperations.push({
originalFileId,
newFileId,
metadata: finalMetadata
});
}
// 记录批量移动操作
const operationId = await recordOperation(context, 'batch_move', {
operations: processedOperations
});
console.log(`Batch move operation recorded with ID: ${operationId}, ${moveOperations.length} operations`);
return {
success: true,
operationId,
totalProcessed: moveOperations.length
};
} catch (error) {
console.error('Error recording batch move files operation:', error);
return {
success: false,
error: error.message,
totalProcessed: 0
};
}
}
/**
* 合并所有挂起的操作到索引中
* @param {Object} context - 上下文对象
* @param {Object} options - 选项
* @param {boolean} options.cleanupAfterMerge - 合并后是否清理操作记录,默认为 true
* @returns {Object} 合并结果
*/
export async function mergeOperationsToIndex(context, options = {}) {
const { waitUntil } = context;
const { cleanupAfterMerge = true } = options;
try {
console.log('Starting operations merge...');
// 获取当前索引
const currentIndex = await getIndex(context);
if (currentIndex.success === false) {
console.error('Failed to get current index for merge');
return {
success: false,
error: 'Failed to get current index'
};
}
// 获取所有待处理的操作
const operations = await getAllPendingOperations(context, currentIndex.lastOperationId);
if (operations.length === 0) {
console.log('No pending operations to merge');
return {
success: true,
processedOperations: 0,
message: 'No pending operations'
};
}
console.log(`Found ${operations.length} pending operations to merge`);
// 按时间戳排序操作,确保按正确顺序应用
operations.sort((a, b) => a.timestamp - b.timestamp);
// 创建索引的副本进行操作
const workingIndex = currentIndex;
let operationsProcessed = 0;
let addedCount = 0;
let removedCount = 0;
let movedCount = 0;
let updatedCount = 0;
const processedOperationIds = [];
// 应用每个操作
for (const operation of operations) {
try {
switch (operation.type) {
case 'add':
const addResult = applyAddOperation(workingIndex, operation.data);
if (addResult.added) addedCount++;
if (addResult.updated) updatedCount++;
break;
case 'remove':
if (applyRemoveOperation(workingIndex, operation.data)) {
removedCount++;
}
break;
case 'move':
if (applyMoveOperation(workingIndex, operation.data)) {
movedCount++;
}
break;
case 'batch_add':
const batchAddResult = applyBatchAddOperation(workingIndex, operation.data);
addedCount += batchAddResult.addedCount;
updatedCount += batchAddResult.updatedCount;
break;
case 'batch_remove':
removedCount += applyBatchRemoveOperation(workingIndex, operation.data);
break;
case 'batch_move':
movedCount += applyBatchMoveOperation(workingIndex, operation.data);
break;
default:
console.warn(`Unknown operation type: ${operation.type}`);
continue;
}
operationsProcessed++;
processedOperationIds.push(operation.id);
// 增加协作点
if (operationsProcessed % 3 === 0) {
await new Promise(resolve => setTimeout(resolve, 0));
}
} catch (error) {
console.error(`Error applying operation ${operation.id}:`, error);
}
}
// 如果有任何修改,保存索引
if (operationsProcessed > 0) {
workingIndex.lastUpdated = Date.now();
workingIndex.totalCount = workingIndex.files.length;
// 记录最后处理的操作ID
if (processedOperationIds.length > 0) {
workingIndex.lastOperationId = processedOperationIds[processedOperationIds.length - 1];
}
// 保存更新后的索引元数据
const saveSuccess = await saveIndexMetadata(context, workingIndex);
if (!saveSuccess) {
console.error('Failed to save chunked index');
return {
success: false,
error: 'Failed to save index'
};
}
console.log(`Index updated: ${addedCount} added, ${updatedCount} updated, ${removedCount} removed, ${movedCount} moved`);
}
// 清理已处理的操作记录
if (cleanupAfterMerge && processedOperationIds.length > 0) {
waitUntil(cleanupOperations(context, processedOperationIds));
}
const result = {
success: true,
processedOperations: operationsProcessed,
addedCount,
updatedCount,
removedCount,
movedCount,
totalFiles: workingIndex.totalCount
};
console.log('Operations merge completed:', result);
return result;
} catch (error) {
console.error('Error merging operations:', error);
return {
success: false,
error: error.message
};
}
}
/**
* 读取文件索引,支持搜索和分页
* @param {Object} context - 上下文对象
* @param {Object} options - 查询选项
* @param {string} options.search - 搜索关键字
* @param {string} options.directory - 目录过滤
* @param {number} options.start - 起始位置
* @param {number} options.count - 返回数量,-1 表示返回所有
* @param {string} options.channel - 渠道过滤
* @param {string} options.listType - 列表类型过滤
* @param {boolean} options.countOnly - 仅返回总数
* @param {boolean} options.includeSubdirFiles - 是否包含子目录下的文件
*/
export async function readIndex(context, options = {}) {
try {
const {
search = '',
directory = '',
start = 0,
count = 50,
channel = '',
listType = '',
countOnly = false,
includeSubdirFiles = false
} = options;
// 处理目录满足无头有尾的格式,根目录为空
const dirPrefix = directory === '' || directory.endsWith('/') ? directory : directory + '/';
// 直接从数据库读取文件,不依赖索引
const { env } = context;
const db = getDatabase(env);
let allFiles = [];
let cursor = null;
// 分批获取所有文件
while (true) {
const response = await db.listFiles({
limit: 1000,
cursor: cursor
});
if (!response || !response.keys || !Array.isArray(response.keys)) {
break;
}
for (const item of response.keys) {
// 跳过管理相关的键和分块数据
if (item.name.startsWith('manage@') || item.name.startsWith('chunk_')) {
continue;
}
// 跳过没有元数据的文件
if (!item.metadata || !item.metadata.TimeStamp) {
continue;
}
allFiles.push({
id: item.name,
metadata: item.metadata
});
}
cursor = response.cursor;
if (!cursor) break;
}
let filteredFiles = allFiles;
// 目录过滤
if (directory) {
const normalizedDir = directory.endsWith('/') ? directory : directory + '/';
filteredFiles = filteredFiles.filter(file => {
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
return fileDir.startsWith(normalizedDir) || file.metadata.Directory === directory;
});
}
// 渠道过滤
if (channel) {
filteredFiles = filteredFiles.filter(file =>
file.metadata.Channel.toLowerCase() === channel.toLowerCase()
);
}
// 列表类型过滤
if (listType) {
filteredFiles = filteredFiles.filter(file =>
file.metadata.ListType === listType
);
}
// 搜索过滤
if (search) {
const searchLower = search.toLowerCase();
filteredFiles = filteredFiles.filter(file => {
return file.metadata.FileName?.toLowerCase().includes(searchLower) ||
file.id.toLowerCase().includes(searchLower);
});
}
// 如果只需要总数
if (countOnly) {
return {
totalCount: filteredFiles.length,
indexLastUpdated: index.lastUpdated
};
}
// 分页处理
const totalCount = filteredFiles.length;
let resultFiles = filteredFiles;
// 如果不包含子目录文件,获取当前目录下的直接文件
if (!includeSubdirFiles) {
resultFiles = filteredFiles.filter(file => {
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
return fileDir === dirPrefix;
});
}
if (count !== -1) {
const startIndex = Math.max(0, start);
const endIndex = startIndex + Math.max(1, count);
resultFiles = resultFiles.slice(startIndex, endIndex);
}
// 提取目录信息
const directories = new Set();
filteredFiles.forEach(file => {
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
if (fileDir && fileDir.startsWith(dirPrefix)) {
const relativePath = fileDir.substring(dirPrefix.length);
const firstSlashIndex = relativePath.indexOf('/');
if (firstSlashIndex !== -1) {
const subDir = dirPrefix + relativePath.substring(0, firstSlashIndex);
directories.add(subDir);
}
}
});
return {
files: resultFiles,
directories: Array.from(directories),
totalCount: totalCount,
indexLastUpdated: Date.now(),
returnedCount: resultFiles.length,
success: true
};
} catch (error) {
console.error('Error reading index:', error);
return {
files: [],
directories: [],
totalCount: 0,
indexLastUpdated: Date.now(),
returnedCount: 0,
success: false,
};
}
}
/**
* 重建索引(从 KV 中的所有文件重新构建索引)
* @param {Object} context - 上下文对象
* @param {Function} progressCallback - 进度回调函数
*/
export async function rebuildIndex(context, progressCallback = null) {
const { env, waitUntil } = context;
try {
console.log('Starting index rebuild...');
let cursor = null;
let processedCount = 0;
const newIndex = {
files: [],
lastUpdated: Date.now(),
totalCount: 0,
lastOperationId: null
};
// 从D1数据库读取所有文件
const db = getDatabase(env);
const filesStmt = db.db.prepare('SELECT id, metadata FROM files WHERE timestamp IS NOT NULL ORDER BY timestamp DESC');
const fileResults = await filesStmt.all();
for (const row of fileResults) {
try {
const metadata = JSON.parse(row.metadata || '{}');
// 跳过没有时间戳的文件
if (!metadata.TimeStamp) {
continue;
}
// 构建文件索引项
const fileItem = {
id: row.id,
metadata: metadata
};
newIndex.files.push(fileItem);
processedCount++;
// 报告进度
if (progressCallback && processedCount % 100 === 0) {
progressCallback(processedCount);
}
} catch (error) {
console.warn(`Failed to parse metadata for file ${row.id}:`, error);
}
}
// 按时间戳倒序排序
newIndex.files.sort((a, b) => b.metadata.TimeStamp - a.metadata.TimeStamp);
newIndex.totalCount = newIndex.files.length;
// 保存新索引元数据
const saveSuccess = await saveIndexMetadata(context, newIndex);
if (!saveSuccess) {
console.error('Failed to save chunked index during rebuild');
return {
success: false,
error: 'Failed to save rebuilt index'
};
}
// 清除旧的操作记录和多余索引
waitUntil(deleteAllOperations(context));
waitUntil(clearChunkedIndex(context, true));
console.log(`Index rebuild completed. Processed ${processedCount} files, indexed ${newIndex.totalCount} files.`);
return {
success: true,
processedCount,
indexedCount: newIndex.totalCount
};
} catch (error) {
console.error('Error rebuilding index:', error);
return {
success: false,
error: error.message
};
}
}
/**
* 获取索引信息
* @param {Object} context - 上下文对象
*/
export async function getIndexInfo(context) {
try {
// 直接从数据库读取文件信息,不依赖索引
const { env } = context;
const db = getDatabase(env);
let allFiles = [];
let cursor = null;
// 分批获取所有文件
while (true) {
const response = await db.listFiles({
limit: 1000,
cursor: cursor
});
if (!response || !response.keys || !Array.isArray(response.keys)) {
break;
}
for (const item of response.keys) {
// 跳过管理相关的键和分块数据
if (item.name.startsWith('manage@') || item.name.startsWith('chunk_')) {
continue;
}
// 跳过没有元数据的文件
if (!item.metadata || !item.metadata.TimeStamp) {
continue;
}
allFiles.push({
id: item.name,
metadata: item.metadata
});
}
cursor = response.cursor;
if (!cursor) break;
}
// 如果没有文件,返回空结果
if (allFiles.length === 0) {
return {
success: true,
totalFiles: 0,
lastUpdated: Date.now(),
channelStats: {},
directoryStats: {},
typeStats: {},
oldestFile: null,
newestFile: null
};
}
// 统计各渠道文件数量
const channelStats = {};
const directoryStats = {};
const typeStats = {};
allFiles.forEach(file => {
// 渠道统计
let channel = file.metadata.Channel || 'Telegraph';
if (channel === 'TelegramNew') {
channel = 'Telegram';
}
channelStats[channel] = (channelStats[channel] || 0) + 1;
// 目录统计
const dir = file.metadata.Directory || extractDirectory(file.id) || '/';
directoryStats[dir] = (directoryStats[dir] || 0) + 1;
// 类型统计
let listType = file.metadata.ListType || 'None';
const label = file.metadata.Label || 'None';
if (listType !== 'White' && label === 'adult') {
listType = 'Block';
}
typeStats[listType] = (typeStats[listType] || 0) + 1;
});
// 找到最新和最旧的文件
let oldestFile = null;
let newestFile = null;
if (allFiles.length > 0) {
// 按时间戳排序
const sortedFiles = [...allFiles].sort((a, b) => {
const timeA = a.metadata.TimeStamp || 0;
const timeB = b.metadata.TimeStamp || 0;
return timeA - timeB;
});
oldestFile = sortedFiles[0];
newestFile = sortedFiles[sortedFiles.length - 1];
}
return {
success: true,
totalFiles: allFiles.length,
lastUpdated: Date.now(),
channelStats,
directoryStats,
typeStats,
oldestFile,
newestFile
};
} catch (error) {
console.error('Error getting index info:', error);
return null;
}
}
/* ============= 原子操作相关函数 ============= */
/**
* 生成唯一的操作ID
*/
function generateOperationId() {
const timestamp = Date.now();
const random = Math.random().toString(36).substring(2, 9);
return `${timestamp}_${random}`;
}
/**
* 记录原子操作
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @param {string} type - 操作类型
* @param {Object} data - 操作数据
*/
async function recordOperation(context, type, data) {
const { env } = context;
const operationId = generateOperationId();
const operation = {
type,
timestamp: Date.now(),
data
};
const db = getDatabase(env);
await db.putIndexOperation(operationId, operation);
return operationId;
}
/**
* 获取所有待处理的操作
* @param {Object} context - 上下文对象
* @param {string} lastOperationId - 最后处理的操作ID
*/
async function getAllPendingOperations(context, lastOperationId = null) {
const { env } = context;
const operations = [];
let cursor = null;
try {
const db = getDatabase(env);
const allOperations = await db.listIndexOperations({
processed: false,
limit: 10000 // 获取所有未处理的操作
});
// 如果指定了lastOperationId过滤已处理的操作
for (const operation of allOperations) {
if (lastOperationId && operation.id <= lastOperationId) {
continue;
}
operations.push(operation);
}
} catch (error) {
console.error('Error getting pending operations:', error);
}
return operations;
}
/**
* 应用添加操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyAddOperation(index, data) {
const { fileId, metadata } = data;
// 检查文件是否已存在
const existingIndex = index.files.findIndex(file => file.id === fileId);
const fileItem = {
id: fileId,
metadata: metadata || {}
};
if (existingIndex !== -1) {
// 更新现有文件
index.files[existingIndex] = fileItem;
return { added: false, updated: true };
} else {
// 添加新文件
insertFileInOrder(index.files, fileItem);
return { added: true, updated: false };
}
}
/**
* 应用删除操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyRemoveOperation(index, data) {
const { fileId } = data;
const initialLength = index.files.length;
index.files = index.files.filter(file => file.id !== fileId);
return index.files.length < initialLength;
}
/**
* 应用移动操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyMoveOperation(index, data) {
const { originalFileId, newFileId, metadata } = data;
const originalIndex = index.files.findIndex(file => file.id === originalFileId);
if (originalIndex === -1) {
return false; // 原文件不存在
}
// 更新文件ID和元数据
index.files[originalIndex] = {
id: newFileId,
metadata: metadata || index.files[originalIndex].metadata
};
return true;
}
/**
* 应用批量添加操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyBatchAddOperation(index, data) {
const { files, options } = data;
const { skipExisting = false } = options || {};
let addedCount = 0;
let updatedCount = 0;
// 创建现有文件ID的映射以提高查找效率
const existingFilesMap = new Map();
index.files.forEach((file, idx) => {
existingFilesMap.set(file.id, idx);
});
for (const fileData of files) {
const { fileId, metadata } = fileData;
const fileItem = {
id: fileId,
metadata: metadata || {}
};
const existingIndex = existingFilesMap.get(fileId);
if (existingIndex !== undefined) {
if (!skipExisting) {
// 更新现有文件
index.files[existingIndex] = fileItem;
updatedCount++;
}
} else {
// 添加新文件
insertFileInOrder(index.files, fileItem);
// 更新映射
index.files.forEach((file, idx) => {
existingFilesMap.set(file.id, idx);
});
addedCount++;
}
}
return { addedCount, updatedCount };
}
/**
* 应用批量删除操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyBatchRemoveOperation(index, data) {
const { fileIds } = data;
const fileIdSet = new Set(fileIds);
const initialLength = index.files.length;
index.files = index.files.filter(file => !fileIdSet.has(file.id));
return initialLength - index.files.length;
}
/**
* 应用批量移动操作
* @param {Object} index - 索引对象
* @param {Object} data - 操作数据
*/
function applyBatchMoveOperation(index, data) {
const { operations } = data;
let movedCount = 0;
// 创建现有文件ID的映射以提高查找效率
const existingFilesMap = new Map();
index.files.forEach((file, idx) => {
existingFilesMap.set(file.id, idx);
});
for (const operation of operations) {
const { originalFileId, newFileId, metadata } = operation;
const originalIndex = existingFilesMap.get(originalFileId);
if (originalIndex !== undefined) {
// 更新映射
existingFilesMap.delete(originalFileId);
existingFilesMap.set(newFileId, originalIndex);
// 更新文件信息
index.files[originalIndex] = {
id: newFileId,
metadata: metadata || index.files[originalIndex].metadata
};
movedCount++;
}
}
return movedCount;
}
/**
* 清理已处理的操作记录
* @param {Object} context - 上下文对象
* @param {Array} operationIds - 要清理的操作ID数组
* @param {number} concurrency - 并发数量默认为10
*/
async function cleanupOperations(context, operationIds, concurrency = 10) {
const { env } = context;
try {
console.log(`Cleaning up ${operationIds.length} processed operations with concurrency ${concurrency}...`);
// 创建删除任务数组
const deleteTasks = operationIds.map(operationId => {
const operationKey = OPERATION_KEY_PREFIX + operationId;
return async () => {
try {
await getDatabase(env).delete(operationKey);
} catch (error) {
console.error(`Error deleting operation ${operationId}:`, error);
}
};
});
// 使用并发控制执行删除操作
await promiseLimit(deleteTasks, concurrency);
console.log(`Successfully cleaned up ${operationIds.length} operations`);
} catch (error) {
console.error('Error cleaning up operations:', error);
}
}
/**
* 删除所有原子操作记录
* @param {Object} context - 上下文对象,包含 env 和其他信息
* @returns {Object} 删除结果 { success, deletedCount, errors?, totalFound? }
*/
export async function deleteAllOperations(context) {
const { env } = context;
try {
console.log('Starting to delete all atomic operations...');
// 获取所有原子操作
const allOperationIds = [];
let cursor = null;
let totalFound = 0;
// 首先收集所有操作键
while (true) {
const response = await getDatabase(env).list({
prefix: OPERATION_KEY_PREFIX,
limit: KV_LIST_LIMIT,
cursor: cursor
});
// 检查响应格式
if (!response || !response.keys || !Array.isArray(response.keys)) {
console.error('Invalid response from database list in cleanupProcessedOperations:', response);
break;
}
for (const item of response.keys) {
allOperationIds.push(item.name.substring(OPERATION_KEY_PREFIX.length));
totalFound++;
}
cursor = response.cursor;
if (!cursor) break;
}
if (totalFound === 0) {
console.log('No atomic operations found to delete');
return {
success: true,
deletedCount: 0,
totalFound: 0,
message: 'No operations to delete'
};
}
console.log(`Found ${totalFound} atomic operations to delete`);
// 批量删除原子操作
await cleanupOperations(context, allOperationIds);
console.log(`Delete all operations completed`);
} catch (error) {
console.error('Error deleting all operations:', error);
}
}
/* ============= 工具函数 ============= */
/**
* 获取索引(内部函数)
* @param {Object} context - 上下文对象
*/
async function getIndex(context) {
const { waitUntil } = context;
try {
// 首先尝试加载索引
const index = await loadIndexFromDatabase(context);
if (index.success) {
return index;
} else {
// 如果加载失败,触发重建索引
waitUntil(rebuildIndex(context));
}
} catch (error) {
console.warn('Error reading index, creating new one:', error);
waitUntil(rebuildIndex(context));
}
// 返回空的索引结构
return {
files: [],
lastUpdated: Date.now(),
totalCount: 0,
lastOperationId: null,
success: false,
};
}
/**
* 从文件路径提取目录(内部函数)
* @param {string} filePath - 文件路径
*/
function extractDirectory(filePath) {
const lastSlashIndex = filePath.lastIndexOf('/');
if (lastSlashIndex === -1) {
return ''; // 根目录
}
return filePath.substring(0, lastSlashIndex + 1); // 包含最后的斜杠
}
/**
* 将文件按时间戳倒序插入到已排序的数组中
* @param {Array} sortedFiles - 已按时间戳倒序排序的文件数组
* @param {Object} fileItem - 要插入的文件项
*/
function insertFileInOrder(sortedFiles, fileItem) {
const fileTimestamp = fileItem.metadata.TimeStamp || 0;
// 如果数组为空或新文件时间戳比第一个文件更新,直接插入到开头
if (sortedFiles.length === 0 || fileTimestamp >= (sortedFiles[0].metadata.TimeStamp || 0)) {
sortedFiles.unshift(fileItem);
return;
}
// 如果新文件时间戳比最后一个文件更旧,直接添加到末尾
if (fileTimestamp <= (sortedFiles[sortedFiles.length - 1].metadata.TimeStamp || 0)) {
sortedFiles.push(fileItem);
return;
}
// 使用二分查找找到正确的插入位置
let left = 0;
let right = sortedFiles.length;
while (left < right) {
const mid = Math.floor((left + right) / 2);
const midTimestamp = sortedFiles[mid].metadata.TimeStamp || 0;
if (fileTimestamp >= midTimestamp) {
right = mid;
} else {
left = mid + 1;
}
}
// 在找到的位置插入文件
sortedFiles.splice(left, 0, fileItem);
}
/**
* 并发控制工具函数 - 限制同时执行的Promise数量
* @param {Array} tasks - 任务数组每个任务是一个返回Promise的函数
* @param {number} concurrency - 并发数量
* @returns {Promise<Array>} 所有任务的结果数组
*/
async function promiseLimit(tasks, concurrency = BATCH_SIZE) {
const results = [];
const executing = [];
for (let i = 0; i < tasks.length; i++) {
const task = tasks[i];
const promise = Promise.resolve().then(() => task()).then(result => {
results[i] = result;
return result;
}).finally(() => {
const index = executing.indexOf(promise);
if (index >= 0) {
executing.splice(index, 1);
}
});
executing.push(promise);
if (executing.length >= concurrency) {
await Promise.race(executing);
}
}
// 等待所有剩余的Promise完成
await Promise.all(executing);
return results;
}
/**
* 保存分块索引到KV存储
* @param {Object} context - 上下文对象,包含 env
* @param {Object} index - 完整的索引对象
* @returns {Promise<boolean>} 是否保存成功
*/
async function saveIndexMetadata(context, index) {
const { env } = context;
try {
const db = getDatabase(env);
// 保存索引元数据到index_metadata表
const stmt = db.db.prepare(`
INSERT OR REPLACE INTO index_metadata (key, last_updated, total_count, last_operation_id)
VALUES (?, ?, ?, ?)
`);
await stmt.bind(
'main_index',
index.lastUpdated,
index.totalCount,
index.lastOperationId
).run();
console.log(`Saved index metadata: ${index.totalCount} total files, last updated: ${index.lastUpdated}`);
return true;
} catch (error) {
console.error('Error saving index metadata:', error);
return false;
}
}
/**
* 从D1数据库加载索引
* @param {Object} context - 上下文对象,包含 env
* @returns {Promise<Object>} 完整的索引对象
*/
async function loadIndexFromDatabase(context) {
const { env } = context;
try {
const db = getDatabase(env);
// 首先获取元数据
const metadataStmt = db.db.prepare('SELECT * FROM index_metadata WHERE key = ?');
const metadata = await metadataStmt.bind('main_index').first();
if (!metadata) {
throw new Error('Index metadata not found');
}
// 从files表直接查询所有文件
const filesStmt = db.db.prepare('SELECT id, metadata FROM files ORDER BY timestamp DESC');
const fileResults = await filesStmt.all();
const files = fileResults.map(row => ({
id: row.id,
metadata: JSON.parse(row.metadata || '{}')
}));
const index = {
files,
lastUpdated: metadata.last_updated,
totalCount: metadata.total_count,
lastOperationId: metadata.last_operation_id,
success: true
};
console.log(`Loaded chunked index: ${metadata.chunkCount} chunks, ${files.length} total files`);
return index;
} catch (error) {
console.error('Error loading chunked index:', error);
// 返回空的索引结构
return {
files: [],
lastUpdated: Date.now(),
totalCount: 0,
lastOperationId: null,
success: false,
};
}
}
/**
* 清理分块索引
* @param {Object} context - 上下文对象,包含 env
* @param {boolean} onlyNonUsed - 是否仅清理未使用的分块索引,默认为 false
* @returns {Promise<boolean>} 是否清理成功
*/
export async function clearChunkedIndex(context, onlyNonUsed = false) {
const { env } = context;
try {
console.log('Starting chunked index cleanup...');
// 获取元数据
const metadataStr = await getDatabase(env).get(INDEX_META_KEY);
let chunkCount = 0;
if (metadataStr) {
const metadata = JSON.parse(metadataStr);
chunkCount = metadata.chunkCount || 0;
if (!onlyNonUsed) {
// 删除元数据
await getDatabase(env).delete(INDEX_META_KEY).catch(() => {});
}
}
// 删除分块
const recordedChunks = []; // 现有的索引分块键
let cursor = null;
while (true) {
const response = await getDatabase(env).list({
prefix: INDEX_KEY,
limit: KV_LIST_LIMIT,
cursor: cursor
});
// 检查响应格式
if (!response || !response.keys || !Array.isArray(response.keys)) {
console.error('Invalid response from database list in getIndexStorageStats:', response);
break;
}
for (const item of response.keys) {
recordedChunks.push(item.name);
}
cursor = response.cursor;
if (!cursor) break;
}
const reservedChunks = [];
if (onlyNonUsed) {
// 如果仅清理未使用的分块索引,保留当前在使用的分块
for (let chunkId = 0; chunkId < chunkCount; chunkId++) {
reservedChunks.push(`${INDEX_KEY}_${chunkId}`);
}
}
const deletePromises = [];
for (let chunkKey of recordedChunks) {
if (reservedChunks.includes(chunkKey) || !chunkKey.startsWith(INDEX_KEY + '_')) {
// 保留的分块和非分块键不删除
continue;
}
deletePromises.push(
getDatabase(env).delete(chunkKey).catch(() => {})
);
}
if (recordedChunks.includes(INDEX_KEY)) {
deletePromises.push(
getDatabase(env).delete(INDEX_KEY).catch(() => {})
);
}
await Promise.all(deletePromises);
console.log(`Chunked index cleanup completed. Attempted to delete ${chunkCount} chunks.`);
return true;
} catch (error) {
console.error('Error during chunked index cleanup:', error);
return false;
}
}
/**
* 获取索引的存储统计信息
* @param {Object} context - 上下文对象,包含 env
* @returns {Object} 存储统计信息
*/
export async function getIndexStorageStats(context) {
const { env } = context;
try {
// 获取元数据
const metadataStr = await getDatabase(env).get(INDEX_META_KEY);
if (!metadataStr) {
return {
success: false,
error: 'No chunked index metadata found',
isChunked: false
};
}
const metadata = JSON.parse(metadataStr);
// 检查各个分块的存在情况
const chunkChecks = [];
for (let chunkId = 0; chunkId < metadata.chunkCount; chunkId++) {
const chunkKey = `${INDEX_KEY}_${chunkId}`;
chunkChecks.push(
getDatabase(env).get(chunkKey).then(data => ({
chunkId,
exists: !!data,
size: data ? data.length : 0
}))
);
}
const chunkResults = await Promise.all(chunkChecks);
const stats = {
success: true,
isChunked: true,
metadata,
chunks: chunkResults,
totalChunks: metadata.chunkCount,
existingChunks: chunkResults.filter(c => c.exists).length,
totalSize: chunkResults.reduce((sum, c) => sum + c.size, 0)
};
return stats;
} catch (error) {
console.error('Error getting index storage stats:', error);
return {
success: false,
error: error.message,
isChunked: false
};
}
}