鏈接:Elasticsearch Service Sink Connector for Confluent Platform | Confluent Documentation
鏈接:Apache Kafka
一、搭建測試環境
下載Elasticsearch Service Sink Connector
https://file.zjwlyy.cn/confluentinc-kafka-connect-elasticsearch-15.0.0.zip
為了方便,使用docker搭建kafka和elasticsearch。
docker run -d --name elasticsearch ? -e "discovery.type=single-node" ? -e ES_JAVA_OPTS="-Xms512m -Xmx512m" ? -p 9200:9200 -p 9300:9300 ? docker.elastic.co/elasticsearch/elasticsearch:7.17.1
docker run --user root -d --name kafka -p 9092:9092 -p 8083:8083 apache/kafka:3.9.1
confluentinc-kafka-connect-elasticsearch-15.0.0.zip文件復制到kafka容器里
docker cp?confluentinc-kafka-connect-elasticsearch-15.0.0.zip kafka:/opt/connectors ??
進入kafka的容器
docker exec -it?kafka /bin/bash
修改配置文件
vi /opt/kafka/config/connect-standalone.propertiesplugin.path=/opt/connectors #修改為zip解壓路徑
解壓zip
unzip?confluentinc-kafka-connect-elasticsearch-15.0.0.zip
修改配置文件
vi /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties
# 基礎配置
name=t-elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3 # 根據分區數調整
topics=t-elasticsearch-sink
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false# ES連接配置
connection.url=http://192.168.1.1:9200 # 多節點負載均衡
type.name=_doc
index.name=t-elasticsearch-sink
#index.auto.create=true # 自動創建索引(或手動預創建)
schema.ignore=true# 容錯與錯誤處理
errors.tolerance=all
#errors.deadletterqueue.topic.name=dlq_t4_elasticsearch # 必須配置DLQ
#errors.deadletterqueue.context.headers.enable=true # 保留錯誤上下文
behavior.on.null.values=IGNORE # 跳過空值消息# 性能優化
batch.size=2000 # 批量寫入提升吞吐
max.in.flight.requests=5 # 并發請求數
max.retries=10 # 失敗重試次數
retry.backoff.ms=5000 # 重試間隔
read.timeout.ms=10000 # 讀超時
connection.timeout.ms=10000 # 連接超時
flush.timeout.ms=30000 # 刷新超時[2](@ref)
?啟動Connector
#cd /opt/kafka/bin
#./connect-standalone.sh -daemon ../config/connect-standalone.properties /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties
二、查看Connector狀態
curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/status? #查看狀態
curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/config? ?#查看配置
curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink/offsets? #清理偏移量
curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink? ?#刪除此connectors
三、測試寫入
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list? ?#查看topics
./kafka-topics.sh --delete --topic t-elasticsearch-sink? ? #刪除topic
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic t-elasticsearch-sink? #逐行寫入消息?
四、查看ES索引
curl http://127.0.0.1:9200/_cat/indices?v