Ragflow 源碼:task_executor.py

目錄

    • 介紹
      • 主要功能
      • 核心組件
    • 流程圖
    • 核心代碼解釋
      • 1. 系統架構與核心組件
      • 2. 核心處理流程
      • 3. 高級處理能力
      • 4. 關鍵創新點
      • 5. 容錯與監控機制
      • 6. 性能優化技巧

介紹

task_executor.py 是RAGFlow系統中的任務執行器(Task Executor)核心部分,主要負責文檔的解析、分塊(chunking)、向量化(embedding)和索引(indexing)處理流程。

主要功能

  1. 文檔處理流水線

    • 從存儲系統(如MinIO)獲取文檔
    • 根據文檔類型選擇相應的解析器(parser)
    • 將文檔分塊處理
    • 生成向量表示(embeddings)
    • 存儲到向量數據庫中
  2. 高級處理能力

    • 支持RAPTOR(遞歸抽象處理)算法
    • 支持知識圖譜(GraphRAG)構建
    • 自動關鍵詞提取和問題生成
    • 內容標簽自動標注
  3. 任務管理

    • 從Redis隊列獲取任務
    • 任務狀態跟蹤和報告
    • 任務取消處理
    • 失敗任務恢復

核心組件

  1. 文檔解析器工廠(FACTORY)

    • 針對不同類型的文檔(論文、書籍、演示文稿、法律文件等)有不同的解析器
    • 使用注冊模式動態選擇解析器
  2. 并發控制

    • 使用Trio庫實現異步并發
    • 通過CapacityLimiter控制并發任務數
    • 分塊構建、MinIO操作等都有獨立的并發限制
  3. 錯誤處理和監控

    • 詳細的任務狀態跟蹤
    • 心跳報告機制
    • 內存監控和快照功能
    • 任務取消和超時處理

流程圖

請添加圖片描述

核心代碼解釋

1. 系統架構與核心組件

  • 并發控制體系

    • 使用trio異步框架實現高效I/O操作
    • 四級并發限制器:
      task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS)  # 任務級并發(默認5)
      chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS)  # 分塊處理并發(默認1)
      minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO)  # 存儲操作并發(默認10)
      kg_limiter = trio.CapacityLimiter(2)  # 知識圖譜處理并發
      
  • 文檔處理工廠模式

    FACTORY = {"general": naive,ParserType.PAPER.value: paper,  # 學術論文處理器ParserType.BOOK.value: book,    # 書籍處理器ParserType.TABLE.value: table,  # 表格處理器# ...其他15+種文檔類型處理器
    }
    

2. 核心處理流程

  • 任務處理主循環 (handle_task函數):

    async def handle_task():redis_msg, task = await collect()  # 從Redis獲取任務CURRENT_TASKS[task["id"]] = task  # 登記當前任務try:await do_handle_task(task)    # 執行實際處理redis_msg.ack()              # 確認任務完成except Exception as e:FAILED_TASKS += 1set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
    
  • 文檔處理三階段 (do_handle_task函數):

    # 階段1:文檔解析與分塊
    chunks = await build_chunks(task, progress_callback)# 階段2:向量化處理
    token_count, vector_size = await embedding(chunks, embedding_model)# 階段3:存儲索引
    await settings.docStoreConn.insert(chunks, index_name, kb_id)
    

3. 高級處理能力

  • RAPTOR算法實現

    raptor = Raptor(max_cluster=64,                 # 最大聚類數chat_model=chat_mdl,            # LLM模型embd_model=embd_mdl,            # 嵌入模型prompt=config["prompt"],        # 聚類提示詞max_token=config["max_token"],  # 最大token數threshold=config["threshold"]   # 相似度閾值
    )
    chunks = await raptor.process(original_chunks)
    
  • 知識圖譜構建

    await run_graphrag(task, language=task_language,with_resolution=True,      # 是否解析關系with_community=True,       # 是否構建社區chat_model=chat_mdl,embd_model=embd_mdl
    )
    

4. 關鍵創新點

  • 智能分塊增強

    # 自動關鍵詞提取
    d["important_kwd"] = keyword_extraction(chat_mdl, content)# 自動問題生成
    d["question_kwd"] = question_proposal(chat_mdl, content)# 智能標簽系統
    d[TAG_FLD] = content_tagging(chat_mdl, content, all_tags)
    
  • 混合向量生成

    # 標題向量權重調整
    title_w = parser_config.get("filename_embd_weight", 0.1)
    vects = (title_w * title_vectors + (1-title_w) * content_vectors)
    

5. 容錯與監控機制

  • 分布式鎖管理

    with RedisDistributedLock("clean_task_executor"):# 清理超時workerREDIS_CONN.srem("TASKEXE", expired_workers)
    
  • 內存監控系統

    def start_tracemalloc_and_snapshot():tracemalloc.start()snapshot = tracemalloc.take_snapshot()snapshot.dump(f"snapshot_{timestamp}.trace")logging.info(f"Peak memory: {peak / 10**6:.2f} MB")
    
  • 心跳監測系統

    REDIS_CONN.zadd(CONSUMER_NAME, json.dumps({"pending": PENDING_TASKS,"current": CURRENT_TASKS,# ...其他狀態指標}), timestamp
    )
    

6. 性能優化技巧

  • 批量處理策略

    # 向量化批量處理
    for i in range(0, len(texts), batch_size=16):vectors = await mdl.encode(texts[i:i+16])
    
  • 緩存機制

    # LLM結果緩存
    cached = get_llm_cache(llm_name, text, "keywords")
    if not cached:cached = await keyword_extraction(llm, text)set_llm_cache(llm_name, text, cached)
    

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/86191.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/86191.shtml
英文地址,請注明出處:http://en.pswp.cn/web/86191.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

創客匠人聯盟生態:重構家庭教育知識變現的底層邏輯

在《家庭教育促進法》推動行業剛需化的背景下,單一個體 IP 的增長天花板日益明顯。創客匠人提出的 “聯盟生態思維”,正推動家庭教育行業從 “單打獨斗” 轉向 “矩陣作戰”,其核心在于通過工具整合資源,將 “同行競爭” 轉化為 “…

【Docker基礎】Docker容器管理:docker stop詳解

目錄 1 Docker容器生命周期概述 2 docker stop命令深度解析 2.1 命令基本語法 2.2 命令執行流程 2.3 stop與kill的區別 3 docker stop的工作原理 3.1 工作流程 3.2 詳細工作流程 3.3 信號處理機制 4 docker stop的使用場景與最佳實踐 4.1 典型使用場景 場景1&#…

rules寫成動態

拖拽排序和必填校驗聯動(rules寫到computed里) computed: {rules() {const rules {};this.form.feedList.forEach((item, idx) > {rules[feedList.${idx}] [{ required: true, message: 路線評價動態${idx 1}待填寫,請填寫完畢提交, trigger: change }];});re…

The Open Group開放流程自動化? 論壇(OPAF)發布組織最新進展報告

除埃克森美孚(ExxonMobil)的成就外,開放流程自動化? 論壇(OPAF)的最新論壇報告顯示,該組織其他成員也在多個領域取得進展。 “我們祝賀埃克森美孚,因為他們證明了在前線、創收的工藝操作中部署…

線程的基本控制

線程終止 exit是危險的 如果進程中的任意一個線程調用了exit,那么整個進程終止。 不終止進程的退出方式 普通單個線程的退出方法,以下方法退出不會導致進程終止: (1)從啟動例程中返回,返回值是線程的退出…

DeepSeek+WinForm串口通訊實戰

前言 在現代軟件開發中,串口通訊仍然是工業自動化、物聯網設備和嵌入式系統的重要通信方式。隨著.NET技術的發展,特別是.NET 5/.NET 6的跨平臺能力,傳統的WinForm應用現在可以通過現代UI框架實現真正的跨平臺串口通訊。本文將深入探討三種主…

針對數據倉庫方向的大數據算法工程師面試經驗總結

?? 一、技術核心考察點 數據建模能力 星型 vs 雪花模型:面試官常要求對比兩種模型。星型模型(事實表冗余維度表)查詢性能高但存儲冗余;雪花模型(規范化維度表)減少冗余但增加JOIN復雜度。需結合場景選擇&…

Nuxt3 Cannot read properties of undefined (reading ‘createElement‘)

你遇到的 TypeError: Cannot read properties of undefined (reading createElement) 這個報錯,通常是由于在 Nuxt3 或 Vue3 項目中,某些地方嘗試訪問 document.createElement 或類似 DOM API,但此時 document 還未定義(比如在服務…

正則表達式匹配實現

直接上代碼 using Microsoft.AspNetCore.Mvc; using System.Text.RegularExpressions;namespace SaaS.OfficialWebSite.Web.Controllers {public class RegController : Controller{public IActionResult Index(){return View();}[HttpPost]public IActionResult TestRegex([F…

API測試工具Parasoft SOAtest:應對API變化,優化測試執行

API頻繁變更給測試工作帶來諸多挑戰,如手動排查變更影響耗時費力、測試用例維護繁瑣易出錯等。Parasoft SOAtest作為一款企業級API測試工具,通過自動掃描API接口、智能分析變更影響、優化測試,執行以及支持測試用例共享與版本控制等功能&…

mysql 數據庫連接 -h localhost 和 -h 127.0.0.1 區別是什么

對于 mysql 數據庫, 在 my.conf 中指定的client 端口是 3358,實際的mysql server 的端口監聽在 3306, mysql -h localhost 可以居然可以連接成功; mysql -h 127.0.0.1 連接失敗提示Can’t connect to MySQL server on 127.0.0.1&a…

Educational Codeforces Round 180 (Rated for Div. 2) A-D

A.Race 題目大意 給你兩個x,y,終點會在二點之間隨機出現,alice在點a,假設alice和bob有相同的速度(距離更短者用時更少),問對于bob是否存在一點,無論終點是x還是y,他都能比alice更快到達 思路 如果alice在…

python requests post請求

在Python中,使用requests庫進行POST請求是一種常見的操作,用于向服務器發送數據。下面是如何使用requests庫進行POST請求的步驟: 安裝requests庫 如果你還沒有安裝requests庫,可以通過pip安裝: pip install requests…

Postman中設置定時自動運行接口測試

?創建測試集合? 將需每日運行的接口組織到Collection中,并配置好測試腳本和斷言。 ?配置定時運行? 打開目標Collection → 點擊 ?Run? 按鈕在Collection Runner頁面底部選擇 ?Schedule runs?關鍵配置: Frequency: Daily // 選擇每日執行 Time…

multiprocessing.pool和multiprocessing.Process

在CPU密集型任務中,Python的multiprocessing模塊是突破GIL限制的關鍵工具。multiprocessing.Pool(進程池)和multiprocessing.Process(獨立進程)是最常用的兩種并行化方案,但其設計思想和適用場景截然不同。…

容器技術技術入門與 Docker 環境部署

目錄 一:Docker概述 1、 Docker的優勢: (1)環境一致性 (2)隔離性 (3)資源高效 (4)便捷性和可擴展性 2、Docker容器與傳統虛擬機的區別 3、Docker的應用…

Oracle獲取執行計劃之DBMS_XPLAN 技術詳解1

在 Oracle 數據庫的管理與優化工作中,深入了解 SQL 語句的執行計劃是至關重要的一環。DBMS_XPLAN 包作為 Oracle 提供的強大工具,能夠幫助數據庫管理員(DBAs)和開發人員清晰地查看和分析 SQL 語句的執行計劃,從而實現對…

【Python】VScode配置Python教程

文章目錄 【Python】VScode配置Python教程下載Python安裝插件解決亂碼徹底運行vscode安裝python庫 【Python】VScode配置Python教程 前言: 當「Python 編程潛力」遇上「VSCode 開發神器」,會點燃怎樣的效率革命?試想這樣的場景:你…

PowerBI HtmlContent生成表格

假設有銷量表: 1.PowerBI 導入 Html Content對象&#xff0c;并拖入報表 2.新建度量值: 度量值 VAR colCount DISTINCTCOUNT(銷量[產品]) VAR ColumnHeaders "<tr><th styleborder:1px solid black; padding:5px; text-align:center; colspan"&col…

【人工智能與機器人研究】基于運動數據時空特征提取的人類運動片段分割方法

導讀 動作示教方法是非專家用戶對人形機器人進行控制的可靠形式&#xff0c;而對人類動作數據的運動分割與理解是其前提。利用現有方法對所捕獲人類運動原始數據進行關鍵幀提取與運動分割時&#xff0c;由于數據特征不明確&#xff0c;導致難以準確定位運動起始幀、結束幀及分…