使用 NV?Ingest、Unstructured 和 Elasticsearch 處理非結構化數據

作者:來自 Elastic?Ajay Krishnan Gopalan

了解如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 為 RAG 應用構建可擴展的非結構化文檔數據管道。

Elasticsearch 原生集成了行業領先的生成式 AI 工具和提供商。查看我們的網絡研討會,了解如何超越 RAG 基礎,或使用 Elastic 向量數據庫構建可投入生產的應用。

為了為你的用例構建最佳搜索解決方案,現在就開始免費云試用,或在本地機器上試用 Elastic。


在這篇博客中,我們將討論如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 實現一個可擴展的數據處理流水線。該流水線將來自數據源的非結構化數據轉換為結構化、可搜索的內容,為下游的 AI 應用(如 RAG)做好準備。檢索增強生成(RAG)是一種 AI 技術,它為大語言模型(LLMs)提供外部知識,以生成對用戶查詢的響應。這使得 LLM 的回答能夠根據特定上下文進行定制,從而使答案更準確、更相關。

在開始之前,讓我們先了解一下實現該流水線的關鍵組件,以及它們各自的作用。

流水線組件

NV-Ingest 是一組微服務,用于將非結構化文檔轉換為結構化內容和元數據。它可以大規模處理文檔解析、視覺結構識別和 OCR 處理。

Unstructured 是一個 ETL+ 平臺,用于協調整個非結構化數據處理流程:從從多個數據源中攝取非結構化數據、通過可配置的工作流引擎將原始非結構化文件轉換為結構化數據、使用附加轉換豐富數據,一直到將結果上傳到向量存儲、數據庫和搜索引擎。它提供了可視化 UI、API 和可擴展的后端基礎設施,在一個工作流中協調文檔解析、數據豐富和嵌入處理。

Elasticsearch 是業界領先的搜索和分析引擎,現在具備原生的向量搜索能力。它既可以作為傳統的文本數據庫,也可以作為向量數據庫,支持像 k-NN 相似度搜索這樣的功能,實現大規模語義搜索。

現在我們已經介紹了核心組件,接下來讓我們看看它們在典型工作流程中是如何協同工作的,然后再深入了解具體實現。

使用 NV-Ingest - Unstructured - Elasticsearch 實現 RAG

雖然這里我們只提供關鍵要點,你可以在此處查看完整的 notebook。

本博客分為三個部分:

  1. 設置源和目標連接器

  2. 使用 Unstructured API 設置工作流

  3. 基于處理后的數據進行 RAG

Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向無環圖)的形式表示,節點稱為連接器,用于控制數據的攝取來源以及處理結果的上傳目標。這些節點在任何工作流中都是必需的。源連接器配置原始數據從數據源的攝取,目標連接器配置處理后數據上傳到向量存儲、搜索引擎或數據庫。

在本博客中,我們將研究論文存儲在 Amazon S3 中,并希望將處理后的數據傳送到 Elasticsearch 用于下游用途。這意味著,在構建數據處理工作流之前,我們需要通過 Unstructured API 創建一個 Amazon S3 的源連接器和一個 Elasticsearch 的目標連接器。

步驟 1:設置 S3 源連接器

在創建源連接器時,你需要為其指定一個唯一名稱,明確其類型(例如 S3 或 Google Drive),并提供配置,通常包括你要連接的數據源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夾)以及身份驗證信息。

source_connector_response = unstructured_client.sources.create_source(request=CreateSourceRequest(create_source_connector=CreateSourceConnector(name="demo_source1",type=SourceConnectorType.S3,config=S3SourceConnectorConfigInput(key=os.environ['S3_AWS_KEY'],secret=os.environ['S3_AWS_SECRET'],remote_url=os.environ["S3_REMOTE_URL"],recursive=False #True/False)))
)pretty_print_model(source_connector_response.source_connector_information)

步驟 2:設置 Elasticsearch 目標連接器

接下來,我們來設置 Elasticsearch 目標連接器。你使用的 Elasticsearch 索引必須具有與 Unstructured 為你生成的文檔架構兼容的架構 —— 你可以在文檔中找到所有詳細信息。

destination_connector_response = unstructured_client.destinations.create_destination(request=CreateDestinationRequest(create_destination_connector=CreateDestinationConnector(name="demo_dest-3",type=DestinationConnectorType.ELASTICSEARCH,config=ElasticsearchConnectorConfigInput(hosts=[os.environ['es_host']],es_api_key=os.environ['es_api_key'],index_name="demo-index")))
)

步驟 3:使用 Unstructured 創建工作流

一旦你擁有了源連接器和目標連接器,就可以創建一個新的數據處理工作流。我們將通過以下節點構建工作流 DAG:

  • NV-Ingest 用于文檔分區

  • Unstructured 的 Image Summarizer、Table Summarizer 和 Named Entity Recognition 節點用于內容豐富

  • Chunker 和 Embedder 節點用于使內容準備好進行相似性搜索

from unstructured_client.models.shared import (WorkflowNode,WorkflowNodeType,WorkflowType,Schedule
)# Partition the content by using NV-Ingest
parition_node = WorkflowNode(name="Ingest",subtype="nvingest",type="partition",settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},)# Summarize each detected image.
image_summarizer_node = WorkflowNode(name="Image summarizer",subtype="openai_image_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Summarize each detected table.
table_summarizer_node = WorkflowNode(name="Table summarizer",subtype="anthropic_table_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(name="Named entity recognizer",subtype="openai_ner",type=WorkflowNodeType.PROMPTER,settings={"prompt_interface_overrides": None}
)# Chunk the partitioned content.
chunk_node = WorkflowNode(name="Chunker",subtype="chunk_by_title",type=WorkflowNodeType.CHUNK,settings={"unstructured_api_url": None,"unstructured_api_key": None,"multipage_sections": False,"combine_text_under_n_chars": 0,"include_orig_elements": True,"max_characters": 1537,"overlap": 160,"overlap_all": False,"contextual_chunking_strategy": None}
)# Generate vector embeddings.
embed_node = WorkflowNode(name="Embedder",subtype="azure_openai",type=WorkflowNodeType.EMBED,settings={"model_name": "text-embedding-3-large"}
)response = unstructured_client.workflows.create_workflow(request={"create_workflow": {"name": f"s3-to-es-NV-Ingest-custom-workflow","source_id": source_connector_response.source_connector_information.id,"destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e","workflow_type": WorkflowType.CUSTOM,"workflow_nodes": [parition_node,image_summarizer_node,table_summarizer_node,named_entity_recognizer_node,chunk_node,embed_node],}}
)workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)job = unstructured_client.workflows.run_workflow(request={"workflow_id": workflow_id,}
)pretty_print_model(job.job_information)

一旦這個工作流的任務完成,數據將被上傳到 Elasticsearch,我們就可以繼續構建一個基礎的 RAG 應用程序。

步驟 4:RAG 設置

讓我們繼續設置一個簡單的檢索器,它將連接到數據,接收用戶查詢,使用與原始數據嵌入相同的模型對其進行嵌入,并計算余弦相似度以檢索前 3 個文檔。

from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import osembeddings = OpenAIEmbeddings(model="text-embedding-3-large",openai_api_key=os.environ['OPENAI_API_KEY'])vector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",distance_strategy="COSINE"
)retriever = vector_store.as_retriever(search_type="similarity",search_kwargs={"k": 3}  # Number of results to return
)

然后,讓我們設置一個工作流來接收用戶查詢,從 Elasticsearch 中獲取相似文檔,并使用這些文檔作為上下文來回答用戶的問題。

from openai import OpenAIclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))def generate_answer(question: str, documents: str):prompt = """You are an assistant that can answer user questions given provided context.Your answer should be thorough and technical.If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'"""augmented_prompt = (f"{prompt}"f"User question: {question}\n\n"f"{documents}")response = client.chat.completions.create(messages=[{'role': 'system', 'content': 'You answer users questions.'},{'role': 'user', 'content': augmented_prompt},],model="gpt-4o-2024-11-20",temperature=0,)return response.choices[0].message.contentdef format_docs(docs):seen_texts = set()useful_content = [doc.page_content for doc in docs]return  "\nRetrieved documents:\n" + "".join([f"\n\n===== Document {str(i)} =====\n" + docfor i, doc in enumerate(useful_content)])
def rag(query):docs = retriever.invoke(query)documents = format_docs(docs)answer = generate_answer(query, documents)return documents, answer

將所有內容組合在一起,我們得到:

query = "How did the response lengths change with training?"docs, answer = rag(query)print(answer)

和一個響應:

Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.

Elasticsearch 提供了多種增強搜索的策略,包括混合搜索,這是近似語義搜索和基于關鍵字的搜索的結合。

這種方法可以提高作為上下文使用的 RAG 架構中的 top 文檔的相關性。要啟用此功能,您需要按照以下方式修改 vector_store 初始化:

from langchain_elasticsearch import DenseVectorStrategyvector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)

結論

良好的 RAG 從準備充分的數據開始,而 Unstructured 簡化了這一關鍵的第一步。通過 NV-Ingest 啟用文檔分區、對非結構化數據進行元數據豐富并高效地將其攝取到 Elasticsearch,它確保了您的 RAG 管道建立在堅實的基礎上,為所有下游任務釋放其全部潛力。

原文:Unstructured data processing with NV?Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/82549.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/82549.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/82549.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Android 13 使能user版本進recovery

在 debug 版本上&#xff0c;可以在關機狀態下&#xff0c;同時按 電源鍵 和 音量加鍵 進 recovery 。 user 版本上不行。 參考 使用 build 變體 debug 版本和 user 版本的差別之一就是 ro.debuggable 屬性不同。 順著這個思路追蹤&#xff0c;找到 bootable/recovery/reco…

每日算法刷題計劃

這是我每天堅持刷算法題的倉庫&#xff0c;每天刷1-3道&#xff0c;時間30-40min&#xff0c;加油! 目前考慮leetcode洛谷形式&#xff0c;c和python3語言&#xff0c;leetcode主要學核心思想&#xff0c;洛谷學會輸入輸出格式 每日打卡:markdowncsdn打卡 刷題策略: 按分類刷…

紅黑樹():

1. 紅黑樹&#xff1a; 紅黑樹從根節點開始的最長的路徑不會超過最短路徑的2倍。 紅黑樹的話&#xff0c;他的結點的分布沒有我們的AVL樹的結點的分布均衡&#xff0c;但是效率也不錯&#xff0c;AVL樹的結點分布的那么均勻&#xff0c;其實也是在進行了旋轉&#xff0c;付出了…

【AI智能推薦系統】第六篇:隱私保護與聯邦學習在推薦系統中的平衡之道

第六篇:隱私保護與聯邦學習在推薦系統中的平衡之道 提示語:?? “數據不出域,推薦更精準!深度揭秘騰訊、螞蟻集團如何用聯邦學習打造合規推薦系統,隱私計算技術全景解析與工業級實現方案!” 目錄 隱私保護的行業挑戰隱私計算技術體系 2.1 聯邦學習基礎架構2.2 差分隱私…

【Qt/C++】深入理解 Lambda 表達式與 `mutable` 關鍵字的使用

【Qt/C】深入理解 Lambda 表達式與 mutable 關鍵字的使用 在 Qt 開發中&#xff0c;我們常常會用到 lambda 表達式來編寫簡潔的槽函數。今天通過一個實際代碼示例&#xff0c;詳細講解 lambda 的語法、變量捕獲方式&#xff0c;特別是 mutable 的作用。 示例代碼 QPushButto…

記錄 ubuntu 安裝中文語言出現 software database is broken

搜索出來的結果是 sudo apt-get install language-pack-zh-han* 然而,無效,最后手動安裝如下 apt install language-pack-zh-hans apt install language-pack-zh-hans-base apt install language-pack-gnome-zh-hans apt install fonts-arphic-uming apt install libreoffic…

[虛幻官方教程學習筆記]深入理解實時渲染(An In-Depth Look at Real-Time Rendering)

原英文教程地址深入理解實時渲染&#xff08;An In-Depth Look at Real-Time Rendering&#xff09; 文章目錄 1.Intro to An In-Depth Look at Real-Time RenderingCPU VS GPUDeferred VS Forward 2. Before Rendering and OcclusionCulling計算的步驟使用console command:fre…

Linux進程間信號

目錄 信號入門 生活角度中的信號 技術應用角度的信號 信號的發送與記錄 信號處理常見方式概述 產生信號 通過終端按鍵產生 通過系統函數向進程發信號 由軟件條件產生信號 由硬件異常產生信號 阻塞信號 信號其他相關常見概念 在內核中的表示 sigset_t 信號集操作…

Git簡介和發展

Git 簡介 Git是一個開源的分布式版本控制系統&#xff0c;跨平臺&#xff0c;支持Windows、Linux、MacOS。主要是用于項目的版本管理&#xff0c;是由林納斯托瓦茲(Linux Torvalds)在2005年為Linux內核開發而創建。 起因 在2002年至2005年間&#xff0c;Linux內核開發團隊使…

Perspective,數據可視化的超級引擎!

Perspective 是一個強大的交互式數據分析和可視化庫&#xff0c;它允許你創建高度可配置的報告、儀表板、筆記本和應用程序。給用戶提供了一個新的視角來看待數據。 Stars 數9125Forks 數1217 主要特點 高效流式查詢引擎&#xff1a;Perspective使用C編寫&#xff0c;并編譯為…

MySQL COUNT(*) 查詢優化詳解!

目錄 前言1. COUNT(*) 為什么慢&#xff1f;—— InnoDB 的“計數煩惱” &#x1f914;2. MySQL 執行 COUNT(*) 的方式 (InnoDB)3. COUNT(*) 優化策略&#xff1a;快&#xff01;準&#xff01;狠&#xff01;策略一&#xff1a;利用索引優化帶 WHERE 子句的 COUNT(*) (最常見且…

如何在postman使用時間戳

1. 使用 Pre-request Script 動態轉換? 在發送請求前&#xff0c;將日期字符串轉為時間戳并存儲為環境變量/全局變量。 ?示例代碼? // 將日期字符串&#xff08;如 "2023-10-01"&#xff09;轉為時間戳&#xff08;毫秒&#xff09; const dateString "2…

嵌入式學習筆記 - 運算放大器的共模抑制比

一 定義 共模抑制比&#xff08;Common Mode Rejection Ratio, ?CMRR?&#xff09;是衡量差分放大器&#xff08;或差分電路&#xff09;抑制共模信號能力的關鍵指標。它在電子工程中尤為重要&#xff0c;特別是在需要處理微弱信號或對抗環境噪聲的場景中。 核心概念 ?共…

成龍電影中的三菱汽車

帕杰羅、 Lancer Evolution、 3000GT Mitsubishi Lancer Evo ll 1995 附錄 Mercedes-Benz 280SL&#xff08;W113&#xff09;&#xff0c;俗稱“Pagoda”&#xff08;帕格達&#xff09;

Spring 項目無法連接 MySQL:Nacos 配置誤區排查與解決

在開發過程中&#xff0c;我們使用 Nacos 來管理 Spring Boot 項目的配置&#xff0c;其中包括數據庫連接配置。然而&#xff0c;在實際操作中&#xff0c;由于一些概念的混淆&#xff0c;我們遇到了一些連接問題。本文將分享我的故障排查過程&#xff0c;幫助大家避免類似的錯…

LabVIEW與 IMAQ Vision 機器視覺應用

在工業生產及諸多領域&#xff0c;精確高效的檢測至關重要。基于 LabVIEW 與 IMAQ Vision 的機器視覺應用&#xff0c;深入剖析其原理、系統構成、軟件設計及優勢&#xff0c;為相關領域工程師提供全面技術參考。 ? 一、技術原理 &#xff08;一&#xff09;機器視覺技術基礎…

【STM32 學習筆記】USART串口

注意&#xff1a;在串口助手的接收模式中有文本模式和HEX模式兩種模式&#xff0c;那么它們有什么區別&#xff1f; ??文本模式和Hex模式是兩種不同的文件編輯或瀏覽模式&#xff0c;不是完全相同的概念。文本模式通常是指以ASCII編碼格式表示文本文件的編輯或瀏覽模式。在文…

【WPS】怎么解決“word的復制表格”粘貼到“excel的單元格”變多行單元格的問題

把 word文檔復制表格到這個excel表格上面的話&#xff0c;會出現由單個單元格變成多行單元格的情況。 現在&#xff0c;就這個問題怎么解決&#xff0c;提出了一個方案&#xff0c;就是先查找是什么導致了這個換行&#xff0c;然后再將換行的這個字符進行一個整體的替換&#x…

嵌入式開發面試題詳解:STM32 與嵌入式開發核心知識全面解析

一、STM32 共有幾種基本時鐘信號&#xff1f; 題目 STM32 共有幾種基本時鐘信號&#xff1f; 解答 STM32 包含 4 種基本時鐘信號&#xff0c;分別為 HSI&#xff08;內部高速時鐘&#xff09;、HSE&#xff08;外部高速時鐘&#xff09;、LSI&#xff08;內部低速時鐘&…

華為策略路由

路由策略&#xff1a;是對路由條目進行控制&#xff0c;通告控制路由條目影響報文的轉發路徑。路由策略為控制平面。 策略路由&#xff1a;是根據報文特征&#xff0c;認為的控制報文從某個即可轉發出去&#xff0c;不修改路由表。即策略路由為在轉發平面。 路由策略 策略路由…