RagFlow優化代碼解析(一)

引子

前文寫到RagFlow的環境搭建&推理測試,感興趣的童鞋可以移步(RagFlow環境搭建&推理測試-CSDN博客)。前文也寫過RagFLow參數配置&測試的文檔,詳見(RagFlow環境搭建&推理測試-CSDN博客)。很少寫關于具體代碼的blog,這個不涉密,OK,那我們開始吧。

一、RagFlow檢索優化--ES替換為Infitity

在上一篇文章中,我嘗試新建了兩個知識庫,一個知識庫中有兩個文檔,其中一個比較大,另外一個知識庫上傳的時QA對的excel表格。我在聊天設置中選擇了兩個知識庫,我提出文檔,到得到答案差不多要3mins,這個。。。呃,需要排查下。那我們就采用控制變量法來找到問題原因吧,既然時兩個知識庫,那我們先刪除一個QA對知識庫。。。好家伙,提出一個問題,依然需要3mins才出答案。那我們繼續,剩下的一個知識庫中兩個文檔,那我們先禁用一個長的文檔,來看看效果。呃。。時間略快一點,還是要2mins。

根據上一篇博客里所講,我這里選擇了reranker模型,那么在混合查詢中的向量相似度部分將被rerank打分代替。那就去掉rerank,我們再測試下,emm 。。。時間略有縮短,可以看到顯示搜索中這個過程十分耗時。當然有可能是我的機器配置比較差的緣故,但進一步分析,目前設置是使用關鍵詞相似度與向量余弦相似度相結合的混合查詢方式。采用的是ES數據庫的查詢結果計算的。

剛好看到RagFlow中配置文檔中有替換Infinity的部分,那就先來了解下Infinity到底是什么。

開源 AI 原生數據庫 Infinity,23年12月 正式開源發布,提供了 2 種新數據類型:稀疏向量 Sparse Vector 和 張量 Tensor,在此前的全文搜索和向量搜索之外, Infinity 提供了更多的召回手段,如下圖所示,用戶可以采用任意 N 路召回(N ≥ 2)進行混合搜索,這是目前功能最強大的 RAG 專用數據庫。

我們知道,僅僅依靠向量搜索(默認情況下,它用來特指稠密向量)并不總能提供令人滿意的結果。當用戶問題中的特定關鍵詞與存儲的數據不準確匹配時,這種問題尤為明顯。這是因為向量本身不具備精確語義表征能力:一個詞,一句話,乃至一篇文章,都可以只用一個向量來表示,這時向量本質上表達的是這段文字的“語義”,也就是這段文字跟其他文字在一個上下文窗口內共同出現概率的壓縮表示 ,因此向量天然無法表示精確的查詢。例如如果用戶詢問“2024 年 3 月我們公司財務計劃包含哪些組合”,那么很可能得到的結果是其他時間段的數據,或者得到運營計劃,營銷管理等其他類型的數據。

因此,在一種好的解決方案是,利用基于關鍵詞的全文搜索提供精確查詢,它跟向量搜索共同工作,這就是全文搜索 + 向量搜索 的 2 路召回,又被稱為混合搜索(hybrid search)。

多了 那么多我們來看下ES和Infinity執行效率上的對比吧。如下圖:

我們可以看到infinity的執行效率是Especially的40倍左右,那我們就替換下試試。

關閉docker容器

docker compose -f docker-compose-base.yml -f docker-compose-gpu.yml down -v

修改參數

vi .env

重啟容器

docker compose -f docker-compose-base.yml -f docker-compose-gpu.yml up -d

會去拉取infinity的docker鏡像,更新之后,速度果然大幅度提升,幾秒內響應。

二、代碼解析

1、整體架構

我們從官方的架構圖入手,我們可以看到從左往右,是我們在實際在線應用的時候的流程架構,從右到左,是知識庫離線生成的流程架構。很明顯,這張圖中把知識庫部分畫的很大,彰顯它在整個RagFlow項目中的核心地位。于此同時,最右側詳細介紹了文件解析的各種手段,比如?OCR,?Document Layout Analyze?等,這些在常規的 RAG 中可能會作為一個不起眼的?Unstructured Loader?包含進去,可以猜到 RagFlow 的一個核心能力在于文件的解析環節。在官方文檔中也反復強調?Quality in, quality out, 反映出 RAGFlow 的獨到之處在于細粒度文檔解析。另外文檔中提到其沒有使用任何 RAG 中間件,而是完全重新研發了一套智能文檔理解系統,并以此為依托構建 RAG 任務編排體系,也可以理解文檔的解析是其 RagFlow 的核心亮點。

2、代碼結構

我們來看看代碼結構(版本:v0.18.0)

agent:RagFlow新增的一個模塊,即工作流(注:實際上工作流和agent不是一個概念,agent可以作為workflow的一部分),通過“Graph"是一個由節點和邊組成的數學概念。它被用來構建復雜的工作流或代理。

agentic_reasoning:代理推理

api:后端的 API

conf :配置信息

deepdoc: 文件解析模塊

docker:docker配置安裝啟動部署文件

docs:文檔

example:案例

graphrag:圖rag

helm:打包管理工具

intergreations:集成插件工具

mcp:模型上下文協議

web:對應的是前端頁面,TypeScript?開發

其他的一些技術中間件

Web 服務:Flask

業務數據庫:Mysql

向量數據庫:?ElasticSearch?(常規關鍵詞搜索用的也是它),前文已經替換?infinity?

文件存儲:?MinIO,支持分布式存儲

緩存中間件:valkey/valkey:8?是從 Redis 7.2.4 fork 而來,旨在作為 Redis 的開源替代品,特別是在 Redis Labs 更改了 Redis 的源碼使用協議之后。它保持了與 Redis 的兼容性,同時引入了許多性能和功能上的改進。在網絡應用中,Valkey 可以用作緩存、消息隊列、會話存儲等多種用途,適用于需要快速數據訪問和低延遲的場景。

3、源碼解析

(1)加載文件

常規的 RAG 服務都是在上傳時進行文件的加載和解析,但是 RAGFlow 的上傳僅僅包含上傳至 MinIO,需要手工點擊觸發文件的解析。根據實際體驗,RAGFlow 的解析相當慢,資源開銷也比較大,所以這就是采取二次手工確認的產品方案的原因吧。

實際的文件解析通過接口?/v1/document/run?進行觸發的,實際的處理是在?api/db/services/task_service.py?中的?queue_tasks()?中完成的,此方法會根據文件創建一個或多個異步任務,方便異步執行。實現如下所示:

def queue_tasks(doc: dict, bucket: str, name: str, priority: int):def new_task():return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}parse_task_array = []# pdf 文件的解析,根據不同的類型設置單個任務最多處理的頁數# 默認單個任務處理 12 頁 pdf,paper 類型的 pdf 一個任務處理 22 頁,其他 pdf 不分頁if doc["type"] == FileType.PDF.value:file_bin = STORAGE_IMPL.get(bucket, name)do_layout = doc["parser_config"].get("layout_recognize", "DeepDOC")pages = PdfParser.total_page_number(doc["name"], file_bin)page_size = doc["parser_config"].get("task_page_size", 12)if doc["parser_id"] == "paper":page_size = doc["parser_config"].get("task_page_size", 22)if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC":page_size = 10 ** 9page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)]for s, e in page_ranges:s -= 1s = max(0, s)e = min(e - 1, pages)for p in range(s, e, page_size):task = new_task()task["from_page"] = ptask["to_page"] = min(p + page_size, e)parse_task_array.append(task)# 表格數據單個任務處理 3000 行elif doc["parser_id"] == "table":file_bin = STORAGE_IMPL.get(bucket, name)rn = RAGFlowExcelParser.row_number(doc["name"], file_bin)for i in range(0, rn, 3000):task = new_task()task["from_page"] = itask["to_page"] = min(i + 3000, rn)parse_task_array.append(task)else:parse_task_array.append(new_task())chunking_config = DocumentService.get_chunking_config(doc["id"])# 任務插入 Redis 消息隊列,方便異步處理for task in parse_task_array:hasher = xxhash.xxh64()for field in sorted(chunking_config.keys()):if field == "parser_config":for k in ["raptor", "graphrag"]:if k in chunking_config[field]:del chunking_config[field][k]hasher.update(str(chunking_config[field]).encode("utf-8"))for field in ["doc_id", "from_page", "to_page"]:hasher.update(str(task.get(field, "")).encode("utf-8"))task_digest = hasher.hexdigest()task["digest"] = task_digesttask["progress"] = 0.0task["priority"] = priorityprev_tasks = TaskService.get_tasks(doc["id"])ck_num = 0if prev_tasks:for task in parse_task_array:ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)TaskService.filter_delete([Task.doc_id == doc["id"]])chunk_ids = []for task in prev_tasks:if task["chunk_ids"]:chunk_ids.extend(task["chunk_ids"].split())if chunk_ids:settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]),chunking_config["kb_id"])DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})bulk_insert_into_db(Task, parse_task_array, True)DocumentService.begin2parse(doc["id"])unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]for unfinished_task in unfinished_task_array:assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=unfinished_task), "Can't access Redis. Please check the Redis' status."

從上面的實現來看,文件的解析是根據內容拆分為多個任務,通過 Redis 消息隊列進行暫存(生產者),之后就可以離線異步處理。直接查看對應的消息隊列的消費模塊(消費者),對應在?rag/svr/task_executor.py?中的?main()?方法中。實現如下所示:

async def main():logging.info(r"""______           __      ______                     __/_  __/___ ______/ /__   / ____/  _____  _______  __/ /_____  _____/ / / __ `/ ___/ //_/  / __/ | |/_/ _ \/ ___/ / / / __/ __ \/ ___// / / /_/ (__  ) ,<    / /____>  </  __/ /__/ /_/ / /_/ /_/ / /
/_/  \__,_/____/_/|_|  /_____/_/|_|\___/\___/\__,_/\__/\____/_/""")logging.info(f'TaskExecutor: RAGFlow version: {get_ragflow_version()}')settings.init_settings()print_rag_settings()if sys.platform != "win32":signal.signal(signal.SIGUSR1, start_tracemalloc_and_snapshot)signal.signal(signal.SIGUSR2, stop_tracemalloc)TRACE_MALLOC_ENABLED = int(os.environ.get('TRACE_MALLOC_ENABLED', "0"))if TRACE_MALLOC_ENABLED:start_tracemalloc_and_snapshot(None, None)signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)threading.Thread(name="RecoverPendingTask", target=recover_pending_tasks).start()async with trio.open_nursery() as nursery:nursery.start_soon(report_status)while not stop_event.is_set():async with task_limiter:nursery.start_soon(handle_task)logging.error("BUG!!! You should not reach here!!!")

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/82756.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/82756.shtml
英文地址,請注明出處:http://en.pswp.cn/web/82756.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

永磁同步電機控制算法--模糊PI轉速控制器

一、原理介紹 在常規的PID控制系統的基礎上提出了一種模糊PID以及矢量變換方法相結合的控制系統&#xff0c;經過仿真分析對比證明&#xff1a; 模糊PID控制系統能夠有效的提高永磁同步電機的轉速響應速度&#xff0c;降低轉矩脈動&#xff0c;增強了整體控制系統的抗干擾能力…

MySQL基本操作(續)

第3章&#xff1a;MySQL基本操作&#xff08;續&#xff09; 3.3 表操作 表是關系型數據庫中存儲數據的基本結構&#xff0c;由行和列組成。在MySQL中&#xff0c;表操作包括創建表、查看表結構、修改表和刪除表等。本節將詳細介紹這些操作。 3.3.1 創建表 在MySQL中&#…

探索未知驚喜,盲盒抽卡機小程序系統開發新啟航

在消費市場不斷追求新鮮感與驚喜體驗的當下&#xff0c;盲盒抽卡機以其獨特的魅力&#xff0c;迅速成為眾多消費者熱衷的娛樂與消費方式。我們緊跟這一潮流趨勢&#xff0c;專注于盲盒抽卡機小程序系統的開發&#xff0c;致力于為商家和用戶打造一個充滿趣味與驚喜的數字化平臺…

89.實現添加收藏的功能的后端實現

實現完查看收藏列表之后&#xff0c;實現的是添加收藏的功能 我的設想是&#xff1a;在對話界面中&#xff0c;如果用戶認為AI的回答非常好&#xff0c;可以通過點擊該回答對應的氣泡中的圖標&#xff0c;對該內容進行添加 所以后端實現為&#xff1a; service類中添加&…

OD 算法題 B卷【猴子吃桃】

文章目錄 猴子吃桃 猴子吃桃 猴子喜歡吃桃&#xff0c;桃園有N棵桃樹&#xff0c;第i棵桃樹上有Ni個桃&#xff0c;看守將在H(>N)小時后回來&#xff1b;猴子可以決定吃桃的速度K(個/小時)&#xff0c;每個小時他會選擇一棵桃樹&#xff0c;從中吃掉K個桃&#xff0c;如果這…

ubuntu 端口復用

需求描述&#xff1a;復用服務器的 80端口&#xff0c;同時處理 ssh 和 http 請求&#xff0c;也就是 ssh 連接和 http 訪問服務器的時候都可以指定 80 端口&#xff0c;然后服務器可以正確分發請求給 ssh 或者 http。 此時&#xff0c;ssh 監聽的端口為 22&#xff0c;而 htt…

Hive中ORC存儲格式的優化方法

優化Hive中的ORC(Optimized Row Columnar)存儲格式可顯著提升查詢性能、降低存儲成本。以下是詳細的優化方法,涵蓋參數配置、數據組織、寫入優化及監控調優等維度: 一、ORC核心參數優化 1. 存儲與壓縮參數 SET orc.block.size=268435456; -- 塊大小(默認256MB)…

Vim 設置搜索高亮底色

在 Vim 中&#xff0c;默認搜索命中會高亮顯示&#xff0c;方便用戶快速定位關鍵字。但有些用戶希望自定義搜索匹配的底色或前景色&#xff0c;以適應不同的配色方案或提高可讀性。本文將詳細介紹如何修改 Vim 的搜索高亮顏色。 一、Vim 搜索高亮機制 Vim 用內置的高亮組&…

【計算機網絡】非阻塞IO——poll實現多路轉接

&#x1f525;個人主頁&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收錄專欄&#x1f308;&#xff1a;計算機網絡 &#x1f339;往期回顧&#x1f339;&#xff1a;【計算機網絡】非阻塞IO——select實現多路轉接 &#x1f516;流水不爭&#xff0c;爭的是滔滔不息 一、…

vscode使用系列之快速生成html模板

一.歡迎來到我的酒館 vscode&#xff0c;yyds! 目錄 一.歡迎來到我的酒館二.vscode下載安裝1.關于vscode你需要知道2.開始下載安裝 三.vscode快速創建html模板 二.vscode下載安裝 1.關于vscode你需要知道 Q&#xff1a;為什么使用vscode? A&#xff1a;使用vscode寫…

【C/C++】不同防止頭文件重復包含的措施

文章目錄 #pragma once vs #ifndef 文件宏1 原理層面區別&#xff08;core&#xff09;2 關鍵區別與優缺點分析3 總結與最佳實踐 #pragma once vs #ifndef 文件宏 在 C/C 中&#xff0c;#pragma once 和傳統的文件宏守衛 (#ifndef HEADER_H #define HEADER_H ... #endif) 都用…

java-springboot文件上傳校驗之只允許上傳excel文件,且檢查不能是腳本或者有害文件或可行性文件

四重驗證機制&#xff1a; 文件擴展名檢查&#xff08;.xlsx/.xls&#xff09;MIME類型檢查文件魔數驗證&#xff08;真實文件類型&#xff09;可執行文件特征檢測 防御措施&#xff1a; 使用try-with-resources確保流關閉限制文件大小防止DoS攻擊使用Apache POI的FileMagic進…

不確定性分析在LEAP能源-環境系統建模中的整合與應用

本內容突出與實例結合&#xff0c;緊密結合國家能源統計制度及《省級溫室氣體排放編制指南》&#xff0c;深入淺出地介紹針對不同級別研究對象時如何根據數據結構、可獲取性、研究目的&#xff0c;構建合適的能源生產、轉換、消費、溫室氣體排放&#xff08;以碳排放為主&#…

高性能分布式消息隊列系統(四)

八、客戶端模塊的實現 客戶端實現的總體框架 在 RabbitMQ 中&#xff0c;應用層提供消息服務的核心實體是 信道&#xff08;Channel&#xff09;。 用戶想要與消息隊列服務器交互時&#xff0c;通常不會直接操作底層的 TCP 連接&#xff0c;而是通過信道來進行各種消息的發布…

QPair 類說明

QPair 類說明 QPair 是一個模板類&#xff0c;用于存儲一對數據項。 頭文件&#xff1a; cpp #include <QPair> qmake 配置&#xff1a; QT core 所有成員列表&#xff08;包括繼承成員&#xff09; 公共類型 類型定義說明first_type第一個元素的類型&#xff…

4.大語言模型預備數學知識

大語言模型預備數學知識 復習一下在大語言模型中用到的矩陣和向量的運算&#xff0c;及概率統計和神經網絡中常用概念。 矩陣的運算 矩陣 矩陣加減法 條件&#xff1a;行數列數相同的矩陣才能做矩陣加減法 數值與矩陣的乘除法 矩陣乘法 條件&#xff1a;矩陣A的列數 矩陣…

uniapp 設置手機不息屏

在使用 UniApp 開發應用時&#xff0c;有時需要在設備長時間未操作時實現息屏保護功能&#xff0c;以節省電量和保護屏幕。以下是如何在 UniApp 中實現這一功能的步驟。 示例一 // 保持屏幕常亮 uni.setKeepScreenOn({keepScreenOn: true });// 監聽應用進入后臺事件 uni.onH…

智能推薦系統:協同過濾與深度學習結合

智能推薦系統&#xff1a;協同過濾與深度學習結合 系統化學習人工智能網站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目錄 智能推薦系統&#xff1a;協同過濾與深度學習結合摘要引言技術原理對比1. 協同過濾算法&#xff1a;基于相似性的推…

使用Python和OpenCV實現圖像識別與目標檢測

在計算機視覺領域&#xff0c;圖像識別和目標檢測是兩個非常重要的任務。圖像識別是指識別圖像中的內容&#xff0c;例如判斷一張圖片中是否包含某個特定物體&#xff1b;目標檢測則是在圖像中定位并識別多個物體的位置和類別。OpenCV是一個功能強大的開源計算機視覺庫&#xf…

《基于Apache Flink的流處理》筆記

思維導圖 1-3 章 4-7章 8-11 章 參考資料 源碼&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚會及會議 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…