目錄
- 介紹
- 主要功能
- 核心組件
- 流程圖
- 核心代碼解釋
- 1. 系統架構與核心組件
- 2. 核心處理流程
- 3. 高級處理能力
- 4. 關鍵創新點
- 5. 容錯與監控機制
- 6. 性能優化技巧
介紹
task_executor.py
是RAGFlow系統中的任務執行器(Task Executor)核心部分,主要負責文檔的解析、分塊(chunking)、向量化(embedding)和索引(indexing)處理流程。
主要功能
-
文檔處理流水線:
- 從存儲系統(如MinIO)獲取文檔
- 根據文檔類型選擇相應的解析器(parser)
- 將文檔分塊處理
- 生成向量表示(embeddings)
- 存儲到向量數據庫中
-
高級處理能力:
- 支持RAPTOR(遞歸抽象處理)算法
- 支持知識圖譜(GraphRAG)構建
- 自動關鍵詞提取和問題生成
- 內容標簽自動標注
-
任務管理:
- 從Redis隊列獲取任務
- 任務狀態跟蹤和報告
- 任務取消處理
- 失敗任務恢復
核心組件
-
文檔解析器工廠(FACTORY):
- 針對不同類型的文檔(論文、書籍、演示文稿、法律文件等)有不同的解析器
- 使用注冊模式動態選擇解析器
-
并發控制:
- 使用Trio庫實現異步并發
- 通過CapacityLimiter控制并發任務數
- 分塊構建、MinIO操作等都有獨立的并發限制
-
錯誤處理和監控:
- 詳細的任務狀態跟蹤
- 心跳報告機制
- 內存監控和快照功能
- 任務取消和超時處理
流程圖
核心代碼解釋
1. 系統架構與核心組件
-
并發控制體系:
- 使用
trio
異步框架實現高效I/O操作 - 四級并發限制器:
task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) # 任務級并發(默認5) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) # 分塊處理并發(默認1) minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) # 存儲操作并發(默認10) kg_limiter = trio.CapacityLimiter(2) # 知識圖譜處理并發
- 使用
-
文檔處理工廠模式:
FACTORY = {"general": naive,ParserType.PAPER.value: paper, # 學術論文處理器ParserType.BOOK.value: book, # 書籍處理器ParserType.TABLE.value: table, # 表格處理器# ...其他15+種文檔類型處理器 }
2. 核心處理流程
-
任務處理主循環 (
handle_task
函數):async def handle_task():redis_msg, task = await collect() # 從Redis獲取任務CURRENT_TASKS[task["id"]] = task # 登記當前任務try:await do_handle_task(task) # 執行實際處理redis_msg.ack() # 確認任務完成except Exception as e:FAILED_TASKS += 1set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
-
文檔處理三階段 (
do_handle_task
函數):# 階段1:文檔解析與分塊 chunks = await build_chunks(task, progress_callback)# 階段2:向量化處理 token_count, vector_size = await embedding(chunks, embedding_model)# 階段3:存儲索引 await settings.docStoreConn.insert(chunks, index_name, kb_id)
3. 高級處理能力
-
RAPTOR算法實現:
raptor = Raptor(max_cluster=64, # 最大聚類數chat_model=chat_mdl, # LLM模型embd_model=embd_mdl, # 嵌入模型prompt=config["prompt"], # 聚類提示詞max_token=config["max_token"], # 最大token數threshold=config["threshold"] # 相似度閾值 ) chunks = await raptor.process(original_chunks)
-
知識圖譜構建:
await run_graphrag(task, language=task_language,with_resolution=True, # 是否解析關系with_community=True, # 是否構建社區chat_model=chat_mdl,embd_model=embd_mdl )
4. 關鍵創新點
-
智能分塊增強:
# 自動關鍵詞提取 d["important_kwd"] = keyword_extraction(chat_mdl, content)# 自動問題生成 d["question_kwd"] = question_proposal(chat_mdl, content)# 智能標簽系統 d[TAG_FLD] = content_tagging(chat_mdl, content, all_tags)
-
混合向量生成:
# 標題向量權重調整 title_w = parser_config.get("filename_embd_weight", 0.1) vects = (title_w * title_vectors + (1-title_w) * content_vectors)
5. 容錯與監控機制
-
分布式鎖管理:
with RedisDistributedLock("clean_task_executor"):# 清理超時workerREDIS_CONN.srem("TASKEXE", expired_workers)
-
內存監控系統:
def start_tracemalloc_and_snapshot():tracemalloc.start()snapshot = tracemalloc.take_snapshot()snapshot.dump(f"snapshot_{timestamp}.trace")logging.info(f"Peak memory: {peak / 10**6:.2f} MB")
-
心跳監測系統:
REDIS_CONN.zadd(CONSUMER_NAME, json.dumps({"pending": PENDING_TASKS,"current": CURRENT_TASKS,# ...其他狀態指標}), timestamp )
6. 性能優化技巧
-
批量處理策略:
# 向量化批量處理 for i in range(0, len(texts), batch_size=16):vectors = await mdl.encode(texts[i:i+16])
-
緩存機制:
# LLM結果緩存 cached = get_llm_cache(llm_name, text, "keywords") if not cached:cached = await keyword_extraction(llm, text)set_llm_cache(llm_name, text, cached)