使用 FastAPI 構建 Elasticsearch API

作者:來自 Elastic?Jeffrey Rengifo

學習如何使用 Pydantic 模式和 FastAPI 后臺任務,通過實際示例構建一個 Elasticsearch API。

想獲得 Elastic 認證嗎?查看下一期 Elasticsearch Engineer 培訓的時間!

Elasticsearch 擁有豐富的新功能,能幫助你為你的使用場景構建最佳的搜索解決方案。深入了解我們的示例筆記本,開始免費云試用,或立即在本地機器上體驗 Elastic。


從《Elasticsearch in JavaScript》這篇文章中我們了解到,你不應該將 Elasticsearch 實例暴露在互聯網上,而是應該構建一個 API 層。暴露集群的 URL、索引名稱或 API 密鑰會讓攻擊者更容易瞄準你的數據,并不必要地擴大你的攻擊面。即使你對請求進行了清洗,攻擊者仍然可以發送復雜或高負載的查詢使集群超載。因此,最好不僅隱藏集群本身,還要隱藏查詢邏輯 —— 只讓用戶控制必要的部分,比如搜索關鍵字,而不是整個查詢。

對于 JavaScript 開發者來說,可以選擇使用 Node.js,而對于 Python 開發者來說,FastAPI 是一個不錯的替代方案。FastAPI 因其簡潔性和開箱即用的高性能而受到廣泛歡迎。

在本文中,我們將使用 FastAPI 構建 Elasticsearch 與客戶端應用程序(通常是網頁瀏覽器)之間的 API 層。然后,我們將探索一些可以通過 FastAPI 原生功能實現的常見用例。

你可以在這里找到該應用程序的筆記本。

準備數據

本文將使用一個包含獸醫就診記錄的數據集。下面是一個示例文檔:

{"owner_name": "Marco Rivera","pet_name": "Milo","species": "Cat","breed": "Siamese","vaccination_history": ["Rabies","Feline Leukemia"],"visit_details": "Slight eye irritation, prescribed eye drops."
}

首先,我們需要安裝 Elasticsearch 客戶端,以便能夠查詢我們的集群:

pip install elasticsearch

現在,我們導入 Elasticsearch 客戶端、helpers 和 getpass,用于從終端捕獲環境變量。

from elasticsearch import Elasticsearch, helpers
from getpass import getpassos.environ["ELASTICSEARCH_ENDPOINT"] = getpass("Elasticsearch endpoint: ")
os.environ["ELASTICSEARCH_API_KEY"] = getpass("Elasticsearch api-key: ")

我們定義索引名稱,并使用 Elasticsearch 端點和 API 密鑰初始化 Elasticsearch 客戶端。

ES_INDEX = "vet-visits"es_client = Elasticsearch(hosts=[os.environ["ELASTICSEARCH_ENDPOINT"]],api_key=os.environ["ELASTICSEARCH_API_KEY"],
)

然后我們來創建映射:

es_client.indices.create(index=ES_INDEX,body={"mappings": {"properties": {"breed": {"type": "keyword"},"owner_name": {"type": "text","fields": {"keyword": {"type": "keyword"}},},"pet_name": {"type": "text","fields": {"keyword": {"type": "keyword"}},},"species": {"type": "keyword"},"vaccination_history": {"type": "keyword"},"visit_details": {"type": "text"},}}},)

最后,下載數據集并將其放在腳本所在的同一文件夾。然后,我們可以使用 Bulk API 將數據導入 Elasticsearch:

def build_data(ndjson_file, index_name):with open(ndjson_file, "r") as f:for line in f:doc = json.loads(line)yield {"_index": index_name, "_source": doc}try:success, errors = helpers.bulk(es_client, build_data("vet-visits.ndjson", ES_INDEX))print(f"{success} documents indexed successfully")if errors:print("Errors during indexing:", errors)
except Exception as e:print(f"Error: {str(e)}")

如果一切順利,你應該會看到以下信息:

10 documents indexed successfully

現在,數據已導入 Elasticsearch 并準備好使用。接下來,我們將構建 API 來展示 FastAPI 的功能。

Hello, world!

開始之前,我們只需要安裝 FastAPI 和用于服務器創建的 Uvicorn,Pydantic 用于模式處理,以及 Elasticsearch 用于存儲和搜索數據。

pip install fastapi uvicorn elasticsearch pydantic -q

我們先導入庫并創建 FastAPI 服務器實例。

import asyncio
import json
import os
from typing import Listimport uvicorn
from fastapi import BackgroundTasks, Body, FastAPI, HTTPException, Response
from pydantic import BaseModelapp = FastAPI()

然后,我們可以創建一個 ping 端點來檢查服務器狀態。

@app.get("/ping")
async def ping():try:health = await es_client.cluster.health()return {"status": "success","message": "Connected to Elasticsearch","cluster_status": health["status"],"number_of_nodes": health["number_of_nodes"],"active_shards": health["active_shards"],}except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code,detail=f"Error connecting to Elasticsearch: {str(e)}",)

請求:

curl -XGET "http://localhost:8000/ping"

響應:

{"status":"success","message":"Connected to Elasticsearch","cluster_status":"green","number_of_nodes":4,"active_shards":172}
@app.post("/search")
async def search(query: dict = Body(...)):try:result = await es_client.search(index=ES_INDEX, body=query)return resultexcept Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

我們將嘗試使用 match_phrase 搜索牙齒清潔的就診記錄:

請求:

curl -X POST "http://localhost:8000/search" \-H "Content-Type: application/json" \-d '{"query": {"match_phrase": {"visit_details": "dental cleaning"}},"size": 10}'

響應:

{"took": 1,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": "eq"},"max_score": 3.5869093,"hits": [{"_index": "vet-visits","_id": "VUjqWZYB8Z9CzAyMLmyB","_score": 3.5869093,"_source": {"owner_name": "Leo Martínez","pet_name": "Simba","species": "Cat","breed": "Maine Coon","vaccination_history": ["Rabies","Feline Panleukopenia"],"visit_details": "Dental cleaning. Minor tartar buildup removed."}}]}
}

輸入和響應的類型定義

FastAPI 的一個關鍵特性是與 Pydantic 的集成,用于數據模式處理,允許你使用類型注解定義類,并自動進行數據驗證。在大多數情況下,給數據加類型對構建健壯穩定的應用至關重要。它還能實現模型復用,使代碼更易讀、維護和文檔化。

不過,嚴格的類型和模式限制在需要處理高度動態或不可預測的數據結構時可能不太適用。

接下來,我們把搜索端點的請求和響應改成類型定義。

我們可以創建類來驗證字段類型(field types)、設置默認值(default values)、定義枚舉(enums)和列表(lists)等。這里我們將為用戶請求和 Elasticsearch 響應創建類,并應用部分這些概念。

現在,用戶只需發送搜索詞,和可選的結果數量限制,就會返回包含 owner_name 和 visit_details 的命中列表。

# Pydantic classes# Pydantic class for the request
class SearchRequest(BaseModel):term: strsize: int = 10# Pydantic class for the response
class SearchResponse(BaseModel):hits: List[SearchHit]total: int# Class to format for hits
class SearchHit(BaseModel):owner_name: str = ""visit_details: str = ""

現在這個端點對用戶來說更簡單,對我們來說更安全,因為用戶只能控制查詢的一部分,而不是全部。

@app.post("/search", response_model=SearchResponse)
async def search_v3(request: SearchRequest):try:query = {"query": {"match_phrase": {"visit_details": request.term}},"size": request.size,}result = await es_client.search(index=ES_INDEX, body=query)hits = result["hits"]["hits"]results = []for hit in hits:source = hit.get("_source", {})results.append(SearchHit(owner_name=source["owner_name"],visit_details=source["visit_details"],))return SearchResponse(hits=results, total=len(results))except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

讓用戶完全訪問 _search 查詢體被認為是安全風險,因為惡意用戶可能發送超載集群的查詢。采用這種方式,用戶只能設置 match_phrase 子句內的內容,使端點更安全。

響應也是如此。我們可以返回更簡潔的結果,隱藏 _id、_score、_index 等字段。

請求:

curl -X POST "http://localhost:8000/search" \-H "Content-Type: application/json" \-d '{"term": "dental cleaning"}'

響應:

{"hits":[{"owner_name":"Leo Martínez","visit_details":"Dental cleaning. Minor tartar buildup removed."}],"total":1}

后臺任務

FastAPI 的另一個功能是支持后臺任務。利用后臺任務,你可以立即返回結果給用戶,同時在后臺繼續執行任務。這個功能對長時間運行的任務特別有用。

在 Elasticsearch 中,我們使用 wait_for_completion=false 參數獲取任務 ID 并關閉連接,而不是等待任務完成。然后可以用 tasks API 查詢任務狀態。一些示例有 _reindex、_update_by_query 和 _delete_by_query。

假設你想讓用戶觸發基于字段值刪除幾百萬條文檔的操作,并在完成后通知他們。你可以結合 FastAPI 的后臺任務和 Elasticsearch 的 wait_for_completion 實現。

我們先創建一個函數,每隔 2 秒查詢 tasks API 檢查任務狀態。

async def check_task(es_client, task_id):try:while True:status = await es_client.tasks.get(task_id=task_id)if status.get("completed", False):print(f"Task {task_id} completed.")# Here you can add the logic to send the notificationbreakawait asyncio.sleep(2)except Exception as e:print(f"Error checking task {task_id}: {e}")

現在,我們可以創建一個端點,接收用作過濾條件的值來刪除文檔。調用 _delete_by_query API,設置 wait_for_completion=false,并使用返回的任務 ID 創建一個后臺任務,調用我們之前寫的 check_task 方法。

background_tasks.add_task 的第一個參數是要執行的函數,后面的參數是該函數所需的參數。

# Background task endpoint
@app.post("/delete-by-query")
async def delete_by_query(request: SearchRequest = Body(...), background_tasks: BackgroundTasks = None
):try:body = {"query": {"term": {"pet_name.keyword": request.term}}}response = await es_client.delete_by_query(index=ES_INDEX, body=body, wait_for_completion=False)task_id = response.get("task")if task_id:background_tasks.add_task(check_task, es_async_client, task_id)return Response(status_code=200,content=json.dumps({"message": "Delete by query. The response will be send by email when the task is completed.","task_id": task_id,}),media_type="application/json",)except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

下面是刪除所有 pet_name.keyword 字段中包含 “Buddy” 文檔的示例:

curl -X POST "http://localhost:8000/delete-by-query" \-H "Content-Type: application/json" \-d '{"term": "Buddy"}'

響應:

{"message": "Delete by query. The response will be send by email when the task is completed.", "task_id": "191ALShERbucSkcFTGpOCg:34822095"}

后臺日志:

INFO:     127.0.0.1:58804 - "POST /delete-by-query HTTP/1.1" 200 OK
Task 191ALShERbucSkcFTGpOCg:34822095 completed.

運行 API

添加以下代碼塊,將服務器暴露在 8000 端口:

if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

使用以下命令運行 FastAPI 應用:

uvicorn app:app --reload

總結

FastAPI 讓構建安全且簡潔的 Elasticsearch API 層變得簡單。它內置類型檢查、異步支持和后臺任務,能輕松處理常見用例,且開銷不大。

這些示例只是起點——你可以根據需要添加認證、分頁,甚至 websockets。關鍵是保持集群安全,API 清晰且易于維護。

原文:Building Elasticsearch APIs with FastAPI - Elasticsearch Labs

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92580.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92580.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92580.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[2025CVPR-目標檢測方向]FSHNet:一種用于3D物體檢測的全稀疏混合網絡。

1. ?簡介? 論文提出了FSHNet(Fully Sparse Hybrid Network),一種用于3D物體檢測的全稀疏混合網絡。FSHNet旨在解決現有稀疏3D檢測器的兩大核心問題:長距離交互能力弱和網絡優化困難。稀疏檢測器(如VoxelNeXt和SAFDN…

MySql 8.0.42 zip版安裝教程和使用

今天要裝個MySQL,就按照自己以前的教程來做,不知道是不是版本更新了的原因,又遇到了一點小阻礙,于是再記錄一下吧。 下載MySQL 下載鏈接:MySQL :: Download MySQL Community Serverhttps://dev.mysql.com/downloads/…

【lucene】實現knn

在 Lucene 中,可以通過 KnnFloatVectorQuery 和 KnnFloatVectorField 來實現 KNN(k-Nearest Neighbors)搜索。以下是具體介紹:1. 功能原理KnnFloatVectorQuery 是 Lucene 用于執行最近鄰搜索的查詢類,它可以在一個字段…

RabbitMQ實踐學習筆記

RabbitMQ實踐 以下是關于RabbitMQ實踐的整理,涵蓋常見場景和示例代碼(基于Markdown格式)。內容按模塊分類,避免步驟詞匯,直接提供可操作的方法: 基礎連接與隊列聲明 使用Python的pika庫建立連接并聲明隊列: import pikaconnection = pika.BlockingConnection(pika.C…

量子生成對抗網絡:量子計算與生成模型的融合革命

引言:當生成對抗網絡遇上量子計算在人工智能與量子計算雙重浪潮的交匯處,量子生成對抗網絡(Quantum Generative Adversarial Networks, QGAN)正成為突破經典算力瓶頸的關鍵技術。傳統生成對抗網絡(GAN)在圖…

VBA 多個選項,將選中的選項錄入當前選中的單元格

1、使用LISTBOX插件&#xff0c;選中后回車錄入 維護好數據&#xff0c;并新增一個activeX列表框插件 Private Sub Worksheet_SelectionChange(ByVal Target As Range)If Target.Count > 1 Then Exit SubIf Target.Row > 2 And Target.Row < 10 And Target.Column 2…

【NLP輿情分析】基于python微博輿情分析可視化系統(flask+pandas+echarts) 視頻教程 - 主頁-微博點贊量Top6實現

大家好&#xff0c;我是java1234_小鋒老師&#xff0c;最近寫了一套【NLP輿情分析】基于python微博輿情分析可視化系統(flaskpandasecharts)視頻教程&#xff0c;持續更新中&#xff0c;計劃月底更新完&#xff0c;感謝支持。今天講解主頁-微博點贊量Top6實現 視頻在線地址&…

SAP調用外部API

SAP需求將中文字符轉化為對應的拼音具體思路,由于sap中沒有將中文字符轉化為拼音的函數或方法類,則以http請求訪問外部服務器發布的API服務,然后獲取其返回值即可1.調用外部網站上提供的api缺點:免費次數有限,后需要充值這里是用www格式的json報文*&----------------------…

(12)機器學習小白入門YOLOv:YOLOv8-cls 模型微調實操

YOLOv8-cls 模型微調實操 (1)機器學習小白入門YOLOv &#xff1a;從概念到實踐 (2)機器學習小白入門 YOLOv&#xff1a;從模塊優化到工程部署 (3)機器學習小白入門 YOLOv&#xff1a; 解鎖圖片分類新技能 (4)機器學習小白入門YOLOv &#xff1a;圖片標注實操手冊 (5)機器學習小…

基于Matlab傳統圖像處理技術的車輛車型識別與分類方法研究

隨著計算機視覺和圖像處理技術的發展&#xff0c;車輛檢測與識別已經成為智能交通系統中的一個重要研究方向。傳統圖像處理方法通過對圖像進行預處理、特征提取、分類與識別&#xff0c;提供了一種無需復雜深度學習模型的解決方案。本研究基于MATLAB平臺&#xff0c;采用傳統圖…

未來趨勢:LeafletJS 與 Web3/AI 的融合

引言 LeafletJS 作為一個輕量、靈活的 JavaScript 地圖庫&#xff0c;以其模塊化設計和高效渲染能力在 Web 地圖開發中占據重要地位。隨著 Web3 和人工智能&#xff08;AI&#xff09;的興起&#xff0c;地圖應用的開發范式正在發生變革。Web3 技術&#xff08;如區塊鏈、去中…

Spring AI 系列之二十一 - EmbeddingModel

之前做個幾個大模型的應用&#xff0c;都是使用Python語言&#xff0c;后來有一個項目使用了Java&#xff0c;并使用了Spring AI框架。隨著Spring AI不斷地完善&#xff0c;最近它發布了1.0正式版&#xff0c;意味著它已經能很好的作為企業級生產環境的使用。對于Java開發者來說…

LFU算法及優化

繼上一篇的LRU算法的實現和講解&#xff0c;這一篇來講述LFU最近使用頻率高的數據很大概率將會再次被使用,而最近使用頻率低的數據,將來大概率不會再使用。做法&#xff1a;把使用頻率最小的數據置換出去。這種算法更多是從使用頻率的角度&#xff08;但是當緩存滿時&#xff0…

關于原車一鍵啟動升級手機控車的核心信息及注意事項

想知道如何給原車已經有一鍵啟動功能的車輛加裝手機遠程啟動。這是個很實用的汽車改裝需求&#xff0c;尤其適合想在冬天提前熱車、夏天提前開空調的車主。一、適配方案與核心功能 ?升級專車專用4G手機控車模塊?&#xff0c;推薦安裝「移動管家YD361-3」系統&#xff0c;該方…

數據結構與算法:類C語言有關操作補充

數據結構與算法:類C語言操作補充 作為老師,我將詳細講解類C語言(如C或C++)中的關鍵操作,包括動態內存分配和參數傳遞。這些內容在數據結構與算法中至關重要,例如在實現動態數組、鏈表或高效函數調用時。我會用通俗易懂的語言和代碼示例逐步解釋,確保你輕松掌握。內容基…

Go 并發(協程,通道,鎖,協程控制)

一.協程&#xff08;Goroutine&#xff09;并發&#xff1a;指程序能夠同時執行多個任務的能力&#xff0c;多線程程序在一個核的cpu上運行&#xff0c;就是并發。并行&#xff1a;多線程程序在多個核的cpu上運行&#xff0c;就是并行。并發主要由切換時間片來實現"同時&q…

圖機器學習(15)——鏈接預測在社交網絡分析中的應用

圖機器學習&#xff08;15&#xff09;——鏈接預測在社交網絡分析中的應用0. 鏈接預測1. 數據處理2. 基于 node2vec 的鏈路預測3. 基于 GraphSAGE 的鏈接預測3.1 無特征方法3.2 引入節點特征4. 用于鏈接預測的手工特征5. 結果對比0. 鏈接預測 如今&#xff0c;社交媒體已成為…

每日一算:華為-批薩分配問題

題目描述"吃貨"和"饞嘴"兩人到披薩店點了一份鐵盤&#xff08;圓形&#xff09;披薩&#xff0c;并囑咐店員將披薩按放射狀切成大小相同的偶數個小塊。但是粗心的服務員將披薩切成了每塊大小都完全不同的奇數塊&#xff0c;且肉眼能分辨出大小。由于兩人都…

Transfusion,Show-o and Show-o2論文解讀

目錄 一、Transfusion 1、概述 2、方法 二、Show-o 1、概述 2、方法 3、訓練 三、Show-o2 1、概述 2、模型架構 3、訓練方法 4、實驗 一、Transfusion 1、概述 Transfusion模型應該是Show系列&#xff0c;Emu系列的前傳&#xff0c;首次將文本和圖像生成統一到單…

聊聊 Flutter 在 iOS 真機 Debug 運行出現 Timed out *** to update 的問題

最近剛好有人在問&#xff0c;他的 Flutter 項目在升級之后出現 Error starting debug session in Xcode: Timed out waiting for CONFIGURATION_BUILD_DIR to update 問題&#xff0c;也就是真機 Debug 時始終運行不了的問題&#xff1a; 其實這已經是一個老問題了&#xff0c…