作者:來自 Elastic?Ajay Krishnan Gopalan
了解如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 為 RAG 應用構建可擴展的非結構化文檔數據管道。
Elasticsearch 原生集成了行業領先的生成式 AI 工具和提供商。查看我們的網絡研討會,了解如何超越 RAG 基礎,或使用 Elastic 向量數據庫構建可投入生產的應用。
為了為你的用例構建最佳搜索解決方案,現在就開始免費云試用,或在本地機器上試用 Elastic。
在這篇博客中,我們將討論如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 實現一個可擴展的數據處理流水線。該流水線將來自數據源的非結構化數據轉換為結構化、可搜索的內容,為下游的 AI 應用(如 RAG)做好準備。檢索增強生成(RAG)是一種 AI 技術,它為大語言模型(LLMs)提供外部知識,以生成對用戶查詢的響應。這使得 LLM 的回答能夠根據特定上下文進行定制,從而使答案更準確、更相關。
在開始之前,讓我們先了解一下實現該流水線的關鍵組件,以及它們各自的作用。
流水線組件
NV-Ingest 是一組微服務,用于將非結構化文檔轉換為結構化內容和元數據。它可以大規模處理文檔解析、視覺結構識別和 OCR 處理。
Unstructured 是一個 ETL+ 平臺,用于協調整個非結構化數據處理流程:從從多個數據源中攝取非結構化數據、通過可配置的工作流引擎將原始非結構化文件轉換為結構化數據、使用附加轉換豐富數據,一直到將結果上傳到向量存儲、數據庫和搜索引擎。它提供了可視化 UI、API 和可擴展的后端基礎設施,在一個工作流中協調文檔解析、數據豐富和嵌入處理。
Elasticsearch 是業界領先的搜索和分析引擎,現在具備原生的向量搜索能力。它既可以作為傳統的文本數據庫,也可以作為向量數據庫,支持像 k-NN 相似度搜索這樣的功能,實現大規模語義搜索。
現在我們已經介紹了核心組件,接下來讓我們看看它們在典型工作流程中是如何協同工作的,然后再深入了解具體實現。
使用 NV-Ingest - Unstructured - Elasticsearch 實現 RAG
雖然這里我們只提供關鍵要點,你可以在此處查看完整的 notebook。
本博客分為三個部分:
-
設置源和目標連接器
-
使用 Unstructured API 設置工作流
-
基于處理后的數據進行 RAG
Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向無環圖)的形式表示,節點稱為連接器,用于控制數據的攝取來源以及處理結果的上傳目標。這些節點在任何工作流中都是必需的。源連接器配置原始數據從數據源的攝取,目標連接器配置處理后數據上傳到向量存儲、搜索引擎或數據庫。
在本博客中,我們將研究論文存儲在 Amazon S3 中,并希望將處理后的數據傳送到 Elasticsearch 用于下游用途。這意味著,在構建數據處理工作流之前,我們需要通過 Unstructured API 創建一個 Amazon S3 的源連接器和一個 Elasticsearch 的目標連接器。
步驟 1:設置 S3 源連接器
在創建源連接器時,你需要為其指定一個唯一名稱,明確其類型(例如 S3 或 Google Drive),并提供配置,通常包括你要連接的數據源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夾)以及身份驗證信息。
source_connector_response = unstructured_client.sources.create_source(request=CreateSourceRequest(create_source_connector=CreateSourceConnector(name="demo_source1",type=SourceConnectorType.S3,config=S3SourceConnectorConfigInput(key=os.environ['S3_AWS_KEY'],secret=os.environ['S3_AWS_SECRET'],remote_url=os.environ["S3_REMOTE_URL"],recursive=False #True/False)))
)pretty_print_model(source_connector_response.source_connector_information)
步驟 2:設置 Elasticsearch 目標連接器
接下來,我們來設置 Elasticsearch 目標連接器。你使用的 Elasticsearch 索引必須具有與 Unstructured 為你生成的文檔架構兼容的架構 —— 你可以在文檔中找到所有詳細信息。
destination_connector_response = unstructured_client.destinations.create_destination(request=CreateDestinationRequest(create_destination_connector=CreateDestinationConnector(name="demo_dest-3",type=DestinationConnectorType.ELASTICSEARCH,config=ElasticsearchConnectorConfigInput(hosts=[os.environ['es_host']],es_api_key=os.environ['es_api_key'],index_name="demo-index")))
)
步驟 3:使用 Unstructured 創建工作流
一旦你擁有了源連接器和目標連接器,就可以創建一個新的數據處理工作流。我們將通過以下節點構建工作流 DAG:
-
NV-Ingest 用于文檔分區
-
Unstructured 的 Image Summarizer、Table Summarizer 和 Named Entity Recognition 節點用于內容豐富
-
Chunker 和 Embedder 節點用于使內容準備好進行相似性搜索
from unstructured_client.models.shared import (WorkflowNode,WorkflowNodeType,WorkflowType,Schedule
)# Partition the content by using NV-Ingest
parition_node = WorkflowNode(name="Ingest",subtype="nvingest",type="partition",settings={"nvingest_host": userdata.get('NV-Ingest-host-address')},)# Summarize each detected image.
image_summarizer_node = WorkflowNode(name="Image summarizer",subtype="openai_image_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Summarize each detected table.
table_summarizer_node = WorkflowNode(name="Table summarizer",subtype="anthropic_table_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(name="Named entity recognizer",subtype="openai_ner",type=WorkflowNodeType.PROMPTER,settings={"prompt_interface_overrides": None}
)# Chunk the partitioned content.
chunk_node = WorkflowNode(name="Chunker",subtype="chunk_by_title",type=WorkflowNodeType.CHUNK,settings={"unstructured_api_url": None,"unstructured_api_key": None,"multipage_sections": False,"combine_text_under_n_chars": 0,"include_orig_elements": True,"max_characters": 1537,"overlap": 160,"overlap_all": False,"contextual_chunking_strategy": None}
)# Generate vector embeddings.
embed_node = WorkflowNode(name="Embedder",subtype="azure_openai",type=WorkflowNodeType.EMBED,settings={"model_name": "text-embedding-3-large"}
)response = unstructured_client.workflows.create_workflow(request={"create_workflow": {"name": f"s3-to-es-NV-Ingest-custom-workflow","source_id": source_connector_response.source_connector_information.id,"destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e","workflow_type": WorkflowType.CUSTOM,"workflow_nodes": [parition_node,image_summarizer_node,table_summarizer_node,named_entity_recognizer_node,chunk_node,embed_node],}}
)workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)job = unstructured_client.workflows.run_workflow(request={"workflow_id": workflow_id,}
)pretty_print_model(job.job_information)
一旦這個工作流的任務完成,數據將被上傳到 Elasticsearch,我們就可以繼續構建一個基礎的 RAG 應用程序。
步驟 4:RAG 設置
讓我們繼續設置一個簡單的檢索器,它將連接到數據,接收用戶查詢,使用與原始數據嵌入相同的模型對其進行嵌入,并計算余弦相似度以檢索前 3 個文檔。
from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import osembeddings = OpenAIEmbeddings(model="text-embedding-3-large",openai_api_key=os.environ['OPENAI_API_KEY'])vector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",distance_strategy="COSINE"
)retriever = vector_store.as_retriever(search_type="similarity",search_kwargs={"k": 3} # Number of results to return
)
然后,讓我們設置一個工作流來接收用戶查詢,從 Elasticsearch 中獲取相似文檔,并使用這些文檔作為上下文來回答用戶的問題。
from openai import OpenAIclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))def generate_answer(question: str, documents: str):prompt = """You are an assistant that can answer user questions given provided context.Your answer should be thorough and technical.If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'"""augmented_prompt = (f"{prompt}"f"User question: {question}\n\n"f"{documents}")response = client.chat.completions.create(messages=[{'role': 'system', 'content': 'You answer users questions.'},{'role': 'user', 'content': augmented_prompt},],model="gpt-4o-2024-11-20",temperature=0,)return response.choices[0].message.contentdef format_docs(docs):seen_texts = set()useful_content = [doc.page_content for doc in docs]return "\nRetrieved documents:\n" + "".join([f"\n\n===== Document {str(i)} =====\n" + docfor i, doc in enumerate(useful_content)])
def rag(query):docs = retriever.invoke(query)documents = format_docs(docs)answer = generate_answer(query, documents)return documents, answer
將所有內容組合在一起,我們得到:
query = "How did the response lengths change with training?"docs, answer = rag(query)print(answer)
和一個響應:
Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.
Elasticsearch 提供了多種增強搜索的策略,包括混合搜索,這是近似語義搜索和基于關鍵字的搜索的結合。
這種方法可以提高作為上下文使用的 RAG 架構中的 top 文檔的相關性。要啟用此功能,您需要按照以下方式修改 vector_store 初始化:
from langchain_elasticsearch import DenseVectorStrategyvector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)
結論
良好的 RAG 從準備充分的數據開始,而 Unstructured 簡化了這一關鍵的第一步。通過 NV-Ingest 啟用文檔分區、對非結構化數據進行元數據豐富并高效地將其攝取到 Elasticsearch,它確保了您的 RAG 管道建立在堅實的基礎上,為所有下游任務釋放其全部潛力。
原文:Unstructured data processing with NV?Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs