Kafka使用Elasticsearch Service Sink Connector直接傳輸topic數據到Elasticsearch

鏈接:Elasticsearch Service Sink Connector for Confluent Platform | Confluent Documentation

鏈接:Apache Kafka

一、搭建測試環境

下載Elasticsearch Service Sink Connector

https://file.zjwlyy.cn/confluentinc-kafka-connect-elasticsearch-15.0.0.zip

為了方便,使用docker搭建kafka和elasticsearch。

docker run -d --name elasticsearch ? -e "discovery.type=single-node" ? -e ES_JAVA_OPTS="-Xms512m -Xmx512m" ? -p 9200:9200 -p 9300:9300 ? docker.elastic.co/elasticsearch/elasticsearch:7.17.1

docker run --user root -d --name kafka -p 9092:9092 -p 8083:8083 apache/kafka:3.9.1

confluentinc-kafka-connect-elasticsearch-15.0.0.zip文件復制到kafka容器里

docker cp?confluentinc-kafka-connect-elasticsearch-15.0.0.zip kafka:/opt/connectors ??

進入kafka的容器

docker exec -it?kafka /bin/bash

修改配置文件

vi /opt/kafka/config/connect-standalone.propertiesplugin.path=/opt/connectors   #修改為zip解壓路徑

解壓zip

unzip?confluentinc-kafka-connect-elasticsearch-15.0.0.zip

修改配置文件

vi /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties
# 基礎配置
name=t-elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3  # 根據分區數調整
topics=t-elasticsearch-sink
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false# ES連接配置
connection.url=http://192.168.1.1:9200  # 多節點負載均衡
type.name=_doc
index.name=t-elasticsearch-sink
#index.auto.create=true  # 自動創建索引(或手動預創建)
schema.ignore=true# 容錯與錯誤處理
errors.tolerance=all
#errors.deadletterqueue.topic.name=dlq_t4_elasticsearch  # 必須配置DLQ
#errors.deadletterqueue.context.headers.enable=true  # 保留錯誤上下文
behavior.on.null.values=IGNORE  # 跳過空值消息# 性能優化
batch.size=2000  # 批量寫入提升吞吐
max.in.flight.requests=5  # 并發請求數
max.retries=10  # 失敗重試次數
retry.backoff.ms=5000  # 重試間隔
read.timeout.ms=10000  # 讀超時
connection.timeout.ms=10000  # 連接超時
flush.timeout.ms=30000  # 刷新超時[2](@ref)

?啟動Connector

#cd /opt/kafka/bin

#./connect-standalone.sh -daemon ../config/connect-standalone.properties /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties

二、查看Connector狀態

curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/status? #查看狀態

curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/config? ?#查看配置

curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink/offsets? #清理偏移量

curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink? ?#刪除此connectors

三、測試寫入

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list? ?#查看topics

./kafka-topics.sh --delete --topic t-elasticsearch-sink? ? #刪除topic

./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic t-elasticsearch-sink? #逐行寫入消息?

四、查看ES索引

curl http://127.0.0.1:9200/_cat/indices?v

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/85637.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/85637.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/85637.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

訊方“教學有方”平臺獲華為昇騰應用開發技術認證!

教學有方 華為昇騰應用開發技術認證 權威認證 彰顯實力 近日,訊方技術自研的教育行業大模型平臺——“教學有方”,成功獲得華為昇騰應用開發技術認證。這一認證不僅是對 “教學有方” 平臺技術實力的高度認可,更標志著訊方在智慧教育領域的…

保護你的Electron應用:深度解析asar文件與Virbox Protector的安全策略

在現代軟件開發中,Electron框架因其跨平臺特性而備受開發者青睞。然而,隨著Electron應用的普及,如何保護應用中的核心資源文件——asar文件,成為了開發者必須面對的問題。今天,我們將深入探討asar文件的特性&#xff0…

端口安全配置示例

組網需求 如圖所示,用戶PC1、PC2、PC3通過接入設備連接公司網絡。為了提高用戶接入的安全性,將接入設備Router的接口使能端口安全功能,并且設置接口學習MAC地址數的上限為接入用戶數,這樣其他外來人員使用自己帶來的PC無法訪問公…

零基礎RT-thread第四節:電容按鍵

電容按鍵 其實只需要理解,手指按上去后充電時間變長,我們可以利用定時器輸入捕獲功能計算充電時間,超過無觸摸時的充電時間一定的閾值就認為是有手指觸摸。 基本原理就是這樣,我們開始寫代碼: 其實,看過了…

SQL基礎操作:從增刪改查開始

好的!SQL(Structured Query Language)是用于管理關系型數據庫的標準語言。讓我們從最基礎的增刪改查(CRUD)?? 操作開始學習,我會用簡單易懂的方式講解每個操作。 🛠 準備工作(建表…

vim 編輯模式/命令模式/視圖模式常用命令

以下是一份 Vim 命令大全,涵蓋 編輯模式(Insert Mode)、命令模式(Normal Mode) 和 視圖模式(Visual Mode) 的常用操作,適合初學者和進階用戶使用。 🧾 Vim 模式簡介 Vim…

每天看一個Fortran文件(10)

今天來看下MCV模式調用物理過程的相關代碼。我想改進有關于海氣邊界層方面的內容,因此我尋找相關的代碼,發現在physics目錄下有一個sfc_ocean.f的文件。 可以看見這個文件是在好多好多年前更新的了,里面內容不多,總共146行。是計算…

python打卡day37

疏錦行 知識點回顧: 1. 過擬合的判斷:測試集和訓練集同步打印指標 2. 模型的保存和加載 a. 僅保存權重 b. 保存權重和模型 c. 保存全部信息checkpoint,還包含訓練狀態 3. 早停策略 作業:對信貸數據集訓練后保存權重&#xf…

【Spark征服之路-2.9-Spark-Core編程(五)】

RDD行動算子: 行動算子就是會觸發action的算子,觸發action的含義就是真正的計算數據。 1. reduce ? 函數簽名 def reduce(f: (T, T) > T): T ? 函數說明 聚集 RDD 中的所有元素,先聚合分區內數據,再聚合分區間數據 val…

【入門】【練17.3 】比大小

| 時間限制:C/C 1000MS,其他語言 2000MS 內存限制:C/C 64MB,其他語言 128MB 難度:中等 分數:100 OI排行榜得分:12(0.1分數2難度) 出題人:root | 描述 試編一個程序,輸入…

CppCon 2017 學習:Free Your Functions!

“Free Your Functions!” 這句話在C設計中有很深的含義,意思是: “Free Your Functions!” 的理解 “解放你的函數”,鼓勵程序員: 不要把所有的函數都綁在類的成員函數里,優先考慮寫成自由函數(non-mem…

日常運維問題匯總-19

60. OVF3維護成本中心與訂貨原因之間的對應關系時,報錯提示,SYST: 不期望的日期 00/00/0000。消息號 FGV004,如下圖所示: OVF3往右邊拉動,有一個需要填入的字段“有效期自”,此字段值必須在成本中心定義的有…

2025SCA工具推薦︱基于多模態SCA的新一代開源供應鏈風險審查與治理平臺

近年來,隨著開源軟件在企業數字化轉型中的廣泛應用,開源供應鏈攻擊事件頻發,企業普遍面臨三大突出難題:一是不清楚自身引入了哪些開源組件,二是不掌握組件中潛在的安全漏洞和合規風險,三是缺乏自動化、全流…

CppCon 2017 學習:Migrating a C++03 library to C++11 case study

這段內容是在介紹 Wt(發音類似 “witty”) —— 一個用于 C 的 Web UI 框架。總結如下: 什么是 Wt? Wt 是一個 用 C 編寫的 widget(控件)驅動的 Web 框架。類似于桌面 GUI 框架(比如 Qt&#…

coding習慣 + Bug記錄整理

📖 清單 1、包裝類型導致的NPE2、xxApiWrapper命名3、see注釋4、MySQL模糊匹配特殊字符bug 整理些平時不好的coding習慣導致的bug📝 1、包裝類型導致的NPE 處理項目的一個bug,看日志是發生了空指針,相關代碼如下: D…

機器學習項目微服務離線移植

機器學習項目微服務離線移植 引言:為什么需要Docker化機器學習項目? 在當今的機器學習工程實踐中,項目部署與移植是一個常見但極具挑戰性的任務。傳統部署方式面臨著"在我機器上能運行"的困境——開發環境與生產環境的不一致導致…

JS紅寶書筆記 8.4 類

與函數類型相似,定義類也有兩種主要方式:類聲明和類表達式,這兩種方式都使用class關鍵字加大括號 與函數表達式類似,類表達式在它們被求值前也不能引用,不過與函數定義不同的是,雖然函數聲明可以提升&…

專題:2025游戲科技與市場趨勢報告|附130+份報告PDF匯總下載

原文鏈接:https://tecdat.cn/?p42733 2024年全球游戲市場規模突破1877億美元,中國以37.5%的全球占比成為核心增長引擎。生成式AI以52%的企業采用率重塑開發流程,混合休閑游戲實現37%的收入增長,跨端互通產品貢獻42%增量。玩家行為…

【沉浸式解決問題】Property ‘sqlSessionFactory‘ or ‘sqlSessionTemplate‘ are required

目錄 一、問題描述二、場景還原1. 測試mapper2. 測試service 三、原因分析四、解決方案1. DemoApplicationTests2. DemoApplication 后記 一、問題描述 在Application文件中加了ComponentScan注解,此后運行任何測試方法均報錯 java.lang.IllegalStateException: Fa…

Ubuntu 和 CentOS 中配置靜態 IP

在 Ubuntu 和 CentOS 中配置靜態 IP 的方法有所不同,主要因為兩者使用的網絡管理工具不同。以下是詳細步驟: Ubuntu(18.04 及更新版本,使用 netplan) 1. 查看網卡名稱 ip a記錄網卡名稱(如 ens33、eth0&a…