作者:來自 Elastic?Andre Luiz
Airbyte 是一個數據集成工具,可自動化并可擴展地將信息從各種來源傳輸到不同的目的地。它使你能夠從 API、數據庫和其他系統提取數據,并將其加載到 Elasticsearch 等平臺,以實現高級搜索和高效分析。
本文將介紹如何配置 Airbyte 將數據攝取到 Elasticsearch,涵蓋關鍵概念、先決條件和分步集成過程。
Airbyte 基本概念
Airbyte 在使用過程中涉及多個核心概念,主要包括:
- 來源(Sources): 定義要提取數據的來源。
- 目的地(Destinations): 定義數據的發送和存儲位置。
- 連接(Connections): 配置數據來源與目的地之間的關系,包括同步頻率。
Airbyte 與 Elasticsearch 的集成
在本次演示中,我們將進行一個集成操作,把存儲在 S3 存儲桶中的數據遷移到 Elasticsearch 索引。我們將展示如何在 Airbyte 中配置數據來源(S3)和目的地(Elasticsearch)。
前提條件
要完成本次演示,需要滿足以下前提條件:
- 在 AWS 中創建一個存儲桶,用于存放包含數據的 JSON 文件。
- 使用 Docker 在本地安裝 Airbyte。
- 在 Elastic Cloud 中創建一個 Elasticsearch 集群,用于存儲攝取的數據。
接下來,我們將詳細介紹每個步驟。
安裝 Airbyte
Airbyte 可以在本地使用 Docker 運行,也可以在云端運行(云端使用會產生費用)。本次演示將使用 Docker 運行本地版本。
安裝過程可能需要幾分鐘。按照安裝說明完成后,Airbyte 將可通過以下地址訪問:http://localhost:8000
登錄后,我們即可開始配置集成。
創建存儲桶
在此步驟中,你需要一個 AWS 賬戶來創建 S3 存儲桶。此外,必須設置正確的權限,通過創建策略和 IAM 用戶來允許訪問該存儲桶。
在存儲桶中,我們將上傳包含不同日志記錄的 JSON 文件,這些文件稍后將被遷移到 Elasticsearch。日志文件的內容如下:
{"timestamp": "2025-02-15T14:00:12Z","level": "INFO","service": "data_pipeline","message": "Pipeline execution started","details": {"pipeline_id": "abc123","source": "MySQL","destination": "Elasticsearch"}
}
以下是加載到存儲桶中的文件:
Elastic Cloud 配置
為了簡化演示,我們將使用 Elastic Cloud。如果你還沒有賬戶,可以在此創建免費試用賬戶:Elastic Cloud 注冊。
在 Elastic Cloud 配置部署后,你需要獲取以下信息:
- Elasticsearch 服務器的 URL。
- 訪問 Elasticsearch 的用戶。
要獲取 URL,請前往 Deployments > My deployment,在應用程序中找到 Elasticsearch,然后點擊 "Copy endpoint" 復制端點。
要創建用戶,請按照以下步驟操作:
- 訪問 Kibana > Stack Management > Users。
- 創建一個具有 superuser 角色的新用戶。
- 填寫相關字段以完成用戶創建。
現在我們已經完成所有設置,可以開始在 Airbyte 中配置連接器。
配置源連接器
在此步驟中,我們將為 S3 創建源連接器。為此,我們需要訪問 Airbyte 界面并在菜單中選擇 “Source” 選項。然后,搜索 S3 連接器。以下是配置連接器的詳細步驟:
- 訪問 Airbyte 并進入 “Sources” 菜單。
- 搜索并選擇 S3 連接器。
- 配置以下參數:
- Source Name:定義數據源的名稱。
- Delivery Method:選擇 “Replicate Records”(推薦用于結構化數據)。
- Data Format:選擇“JSON Format”。
- Stream Name:定義在 Elasticsearch 中的索引名稱。
- Bucket Name:輸入 AWS 中的存儲桶名稱。
- AWS Access Key 和 AWS Secret Key:輸入訪問憑證。
點擊 Set up source 并等待驗證完成。
配置目標連接器
在此步驟中,我們將配置目標連接器,即 Elasticsearch。為此,我們需要訪問 Airbyte 菜單并選擇 “Destination” 選項。然后,搜索 Elasticsearch 并點擊搜索結果。接下來,按照以下步驟進行配置:
- 訪問 Airbyte 并進入 “Destinations” 菜單。
- 搜索并選擇 Elasticsearch 連接器。
- 配置以下參數:
- Authentication Method:選擇 “Username/Password”。
- Username and Password:使用在 Kibana 中創建的憑證。
- Server Endpoint:粘貼從 Elastic Cloud 復制的 URL。
- 點擊 Set up destination 并等待驗證完成。
創建 Source 和 Destination 連接
一旦 Source 和 Destination 都已創建,我們就可以建立它們之間的連接,從而完成集成的創建。
以下是創建連接的步驟:
1)在菜單中,進入 Connections 并點擊 Create First Connection。
2)在接下來的頁面中,你可以選擇一個已有的 Source 或創建一個新的 Source。由于我們已經創建了 Source,因此選擇 Source S3。
3)下一步是選擇 Destination。由于我們已經創建了 Elasticsearch 連接器,因此選擇它以完成配置。
在下一步,需要定義 Sync Mode 以及使用的 schema。由于只創建了日志 schema,因此它將是唯一可供選擇的選項。
4)接下來進入“配置連接”步驟。在這里,我們可以定義連接的名稱以及集成執行的頻率。頻率可以通過以下三種方式進行配置:
- Cron:根據用戶定義的 cron 表達式運行同步(例如
0 0 15 * * ?
,表示每天 15:00 運行); - Scheduled:按指定的時間間隔運行同步(例如每 24 小時、每 2 小時執行一次);
- Manual:手動運行同步。
在本次演示中,我們將選擇 Manual 選項。
最后,點擊 Set up Connection,即可建立數據源與目標之間的連接。
從 S3 同步數據到 Elasticsearch
返回到 Connections(連接)界面后,你可以看到已創建的連接。要執行同步過程,只需點擊 Sync(同步)。此時,數據從 S3 遷移到 Elasticsearch 的過程將開始。
如果一切順利,你會看到狀態顯示為 synced(已同步)。
在 Kibana 中可視化數據
現在,我們將進入 Kibana 以分析數據并檢查其是否正確索引。在 Kibana 的 Discovery 部分,我們將創建一個名為 logs 的 Data View。這樣,我們就可以僅探索同步后創建的 logs 索引中的數據。
現在,我們可以可視化索引后的數據并對其進行分析。通過這種方式,我們驗證了使用 Airbyte 進行的整個遷移流程,其中包含從存儲桶加載數據并將其索引到 Elasticsearch。
結論
Airbyte 被證明是一種高效的數據集成工具,使我們能夠以自動化方式連接多個數據源和目標。在本教程中,我們演示了如何將 S3 存儲桶中的數據攝取到 Elasticsearch 索引中,并重點介紹了該過程的主要步驟。
這種方法有助于攝取大規模數據,并允許在 Elasticsearch 中進行分析,例如復雜搜索、聚合和數據可視化。
參考資料
Airbyte 快速入門:
https://docs.airbyte.com/using-airbyte/getting-started/oss-quickstart#part-1-install-abctl
核心概念:
https://docs.airbyte.com/using-airbyte/core-concepts/
想獲得 Elastic 認證?了解下一期 Elasticsearch 工程師培訓的時間!
Elasticsearch 擁有大量新功能,幫助你為特定用例構建最佳搜索解決方案。探索我們的示例筆記本以了解更多信息,開始免費云試用,或立即在本地嘗試 Elastic!
原文:How to ingest data to Elasticsearch through Airbyte - Elasticsearch Labs