在 RAGFlow 這樣的檢索增強生成(RAG)系統中,知識庫是其核心。用戶上傳的文檔如何高效、可靠地轉化為可檢索的知識,是系統穩定運行的關鍵。今天,我們就來深入探討 RAGFlow 中文件上傳到知識庫的完整流程,揭秘其背后的任務調度機制。
1. 概覽:文件上傳到知識庫的生命周期
RAGFlow 的文件上傳并非簡單的文件存儲,而是一個涉及前端交互、后端 API、消息隊列、后臺服務和多個數據存儲組件的復雜協作過程。它確保了文檔從原始文件到結構化知識的平穩轉換。
2. 核心參與者 (Actors & Components)
在深入流程之前,我們先認識一下這場“幕后之旅”中的主要參與者:
- 用戶 (User): 觸發文件上傳操作。
- 前端 (Frontend): RAGFlow 的 Web 界面,負責用戶交互和文件傳輸。
- 后端 API (Backend API): 基于 Flask 構建,提供各種 RESTful 接口,包括文件上傳。
document_app.py
: 處理文檔相關的 API 請求。file_service.py
: 負責文件的物理存儲和數據庫記錄。document_service.py
: 管理文檔的數據庫記錄和處理狀態。task_service.py
: 負責任務的創建、管理和排隊。
- Redis (Message Queue): 作為任務隊列,實現任務的異步處理和解耦。
- 任務執行器 (Task Executor -
task_executor.py
): 獨立的后臺服務,從 Redis 隊列中消費任務并執行實際的文檔處理工作。 - 對象存儲 (Object Storage - e.g., MinIO): 存儲原始上傳文件和處理過程中生成的圖片等二進制數據。
- 文檔存儲 (Document Store - e.g., Elasticsearch/Infinity): 存儲經過處理的文檔塊(chunks)及其向量,供檢索使用。
3. 詳細流程:一步步揭秘
步驟 1: 用戶發起文件上傳 (Frontend)
用戶在 RAGFlow 的 Web 界面上選擇文件并點擊上傳按鈕,同時指定文件要上傳到哪個知識庫(kb_id
)。
步驟 2: 文件上傳到后端 API (/v1/document/upload
)
前端將文件數據和知識庫 ID (kb_id
) 打包成 multipart/form-data
格式,通過 HTTP POST
請求發送到后端 API 的 /v1/document/upload
路由。
步驟 3: 文件存儲與文檔記錄創建 (FileService
)
后端 document_app.py
中的 /upload
路由接收到請求后,會調用 FileService.upload_document
。
FileService
會執行以下操作:
- 將上傳的原始文件存儲到對象存儲中(例如 MinIO)。
- 在數據庫中為該文件創建一條新的
Document
記錄,包含文件的元數據(如名稱、大小、類型、存儲位置等),并將其初始處理狀態設置為待處理(例如progress=0
)。
此時,后端 API 會立即響應前端上傳成功,用戶界面會顯示文件已上傳。但請注意,文件此時尚未被解析和索引。
步驟 4: 后臺任務調度 (ragflow_server.py
-> DocumentService.update_progress
)
這是整個流程中任務調度的核心。RAGFlow 的主服務器 (ragflow_server.py
) 啟動時,會啟動一個獨立的后臺線程,該線程會每隔 6 秒調用一次 DocumentService.update_progress()
方法。
DocumentService.update_progress()
的職責是:
- 掃描數據庫中所有狀態為“未完成” (
progress < 1
) 的文檔。 - 對于新上傳的文檔(其
Document
記錄已存在但尚未有對應的Task
記錄),它會識別出這些文檔需要進行初始處理。
步驟 5: 任務創建與排隊 (TaskService.queue_tasks
)
當 DocumentService.update_progress()
識別出需要處理的文檔時,它會(間接地)觸發 TaskService.queue_tasks
函數。
TaskService.queue_tasks
會根據文檔類型和配置,生成一個或多個具體的處理任務(Task
記錄),例如:
- 對于 PDF 文件,可能會根據頁碼范圍生成多個任務。
- 對于 Excel 文件,可能會根據行范圍生成多個任務。
- 這些
Task
記錄會被插入到數據庫中。 - 最關鍵的是,這些
Task
消息會被推送到 Redis 消息隊列中。
步驟 6: 任務消費與處理 (task_executor.py
)
獨立的 task_executor.py
服務會持續監聽 Redis 消息隊列。一旦有新的任務消息到達,它就會立即消費該任務。
步驟 7: 分塊、嵌入與索引
task_executor
消費任務后,會執行實際的文檔處理:
- 從對象存儲中獲取原始文件二進制數據。
- 根據文檔類型和解析器配置,將文件內容進行分塊 (Chunking)。
- 對每個文本塊進行嵌入 (Embedding),生成向量表示。
- 將處理后的文本塊、向量以及其他元數據(如關鍵詞、問題、標簽等)插入到文檔存儲中(例如 Elasticsearch 或 Infinity),使其可被檢索。
- 在處理過程中,
task_executor
會不斷更新Task
記錄的progress
和progress_msg
,將處理進度反饋回數據庫。
步驟 8: 可選:RAPTOR/GraphRAG 任務排隊
如果知識庫配置了更高級的解析方法(如 RAPTOR 或 GraphRAG),并且初始分塊任務已完成,DocumentService.update_progress()
在下一次掃描時會檢測到這一點,并觸發 TaskService.queue_raptor_o_graphrag_tasks
,將新的 RAPTOR 或 GraphRAG 任務推送到 Redis 隊列。task_executor
會再次消費這些任務并執行相應的復雜處理。
步驟 9: 狀態更新
task_executor
在完成每個任務后,會更新 Task
記錄的狀態。DocumentService.update_progress()
也會匯總所有相關任務的進度,最終標記文檔為“已完成”或“失敗”。
時序圖
sequenceDiagramactor Userparticipant Frontendparticipant BackendAPI as Backend API (Flask)participant FileServiceparticipant DocumentServiceparticipant Redisparticipant TaskExecutor as Task Executorparticipant ObjectStorage as Object Storage (MinIO)participant DocumentStore as Document Store (ES/Infinity)participant RagflowServer as Ragflow Server (Background Thread)User->>Frontend: 1. Upload File (file, kb_id)Frontend->>BackendAPI: 2. POST /v1/document/upload (file, kb_id)BackendAPI->>FileService: 3. upload_document(file, kb_id)FileService->>ObjectStorage: 3.1. Store FileFileService->>DocumentService: 3.2. insert(document_metadata)DocumentService-->>FileService: Document Record CreatedFileService-->>BackendAPI: Upload SuccessBackendAPI-->>Frontend: 4. Upload Success ResponseFrontend->>User: File Uploaded (UI Update)loop Every 6 secondsRagflowServer->>DocumentService: 5. update_progress()DocumentService->>DocumentService: 5.1. get_unfinished_docs()alt Document has no tasks (Newly Uploaded)DocumentService->>TaskService: 5.2. queue_tasks(doc, bucket, name, priority)TaskService->>Redis: 5.3. queue_product(initial_task_message)endendTaskExecutor->>Redis: 6. Consume Task MessageTaskExecutor->>ObjectStorage: 7.1. Get File BinaryTaskExecutor->>TaskExecutor: 7.2. Perform Chunking & EmbeddingTaskExecutor->>DocumentStore: 7.3. Insert Chunks & VectorsTaskExecutor->>DocumentService: 7.4. set_progress(task_id, progress, msg)DocumentService-->>TaskExecutor: Task Progress Updatedalt All initial tasks for document completedRagflowServer->>DocumentService: 8. update_progress() (next iteration)DocumentService->>DocumentService: 8.1. get_unfinished_docs()DocumentService->>TaskService: 8.2. queue_raptor_o_graphrag_tasks(doc, type, priority) (if configured)TaskService->>Redis: 8.3. queue_product(advanced_task_message)TaskExecutor->>Redis: 9. Consume Advanced Task MessageTaskExecutor->>TaskExecutor: 10. Perform RAPTOR/GraphRAG ProcessingTaskExecutor->>DocumentStore: 10.1. Insert Advanced Chunks/GraphTaskExecutor->>DocumentService: 10.2. set_progress(task_id, progress, msg)DocumentService-->>TaskExecutor: Advanced Task Progress UpdatedendDocumentService->>DocumentService: 11. Update Document Overall Status (e.g., DONE)
將代碼放入https://www.processon.com/mermaid
中,查看流程
總結
RAGFlow 的知識庫文件上傳流程是一個精心設計的異步系統。它將文件接收、存儲、任務調度和實際處理解耦,通過 Redis 消息隊列和后臺定時任務實現了高效、可擴展的文檔處理能力。這種架構不僅保證了用戶體驗的流暢性(上傳后立即響應),也確保了后臺處理的健壯性和可恢復性。
希望這篇博客能幫助您更清晰地理解 RAGFlow 的內部工作原理!