參考文章
【Docker安裝部署Kafka+Zookeeper詳細教程】_linux arm docker安裝kafka-CSDN博客
Docker搭建kafka+zookeeper
打開我們的docker的鏡像源配置
vim?/etc/docker/daemon.json
配置
?{
? "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}
?下面的那個insecure是我自己虛擬機的,不用理會
拉取鏡像
然后開始拉取我們的zookeeper鏡像和我們的kafka鏡像
這個是我們的zookeeper鏡像,沒有指定版本默認就是拉取最新的版本
docker pull zookeeper
kafka鏡像?
docker pull wurstmeister/kafka
因為我們的docker不同容器之間的網絡是互相隔開的,所以我們要創建一個共同使用的網絡
讓不同容器都加入這個網絡
docker network create創建我們的網絡
然后那個zookeeper_network是我們自定義的網絡名稱
docker network create --driver bridge zookeeper_network
kafka是依賴于zookeeper的所以我們要先安裝zookeeper
我們先用run來創建一個zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
-d 是后臺運行
--name 是我們自定義容器的名字? 我定義的名字是zookeeper1
--network?
是指定我們的網絡環境,我們剛剛創建的網絡環境名字叫zookeeper_network,所以我們要讓容器加入這個網絡
-p 是指定我們的容器暴露給外部的端口? 2181:2181是指虛擬機(或服務器)的2181端口與容器內部的2181端口做映射
最后面的那個zookeeper 是我們的使用的鏡像源的名稱
一般是zookeeper:xxx來執行使用鏡像源的版本,如果不指定版本默認用的就是最新版本
查看我們創建的網絡環境的地址
docker inspect zookeeper_network
那個IPv4就是我們的網絡環境的地址,這是我的網絡環境的地址
我的是12.21.0.2,這個ip地址是要記住方便后面使用的
?
創建一個kafka容器
這段代碼有點長,根子自己改吧
# 啟動kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主機IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
解釋?
KAFKA_ZOOKEEPER_CONNECT 后面寫的是我們的之前的網絡的地址
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我們的虛擬機(服務器)的本機的地址
不知道本機地址可以輸入 ip addr來查看本機地址
這樣子就搭建完成了
SpringBoot集成kafka
首先就是springboot和kafka的版本兼容了
Spring for Apache Kafka
然后我們引入兩個kafka的依賴?
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version> </dependency>
自己對著自己的版本來
自己看b站視頻,9分鐘就搞定了
63-kafka-集成-Java場景-SpringBoot_嗶哩嗶哩_bilibili
然后開始寫我們的application.yml配置文件
下面是配置文件的全部+解析
其實和普通mq差不多
也就是配置生產者和消費者和一些過期時間超時時間
重點在于那個missing-topics-fatal
主題不存在的話,我們是否還要成功啟動
我自己的寫的默認的主題是test,但是我還沒在kafka里面創建,kafka里面還沒有這個叫test的主題
所以我啟動的時候,報錯然后失敗了?
spring:kafka:bootstrap-servers: 192.168.88.130:9092 #Kafka 集群的地址和端口號producer:acks: all #生產者發送消息時, Kafka 集群需要確認的確認級別。all 表示需要所有 broker 確認消息已經寫入batch-size: 16384 #生產者在發送消息時, 會先緩存一些消息, 達到 batch-size 后再批量發送。這個參數設置了批量發送的大小。buffer-memory: 33554432 #生產者用于緩存消息的內存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer #定義了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定義了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消費者組ID#消費方式: 在有提交記錄的時候,earliest與latest是一樣的,從提交記錄的下一條開始消費# earliest:無提交記錄,表示從最早的消息開始消費#latest:無提交記錄,從最新的消息的下一條開始消費auto-offset-reset: earliest #當消費者沒有提交過 offset 時, 從何處開始消費消息enable-auto-commit: true #是否自動提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自動提交 offset 的間隔時間key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定義了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定義了消息 key 和 value 的反序列化方式max-poll-records: 2 #一次 poll 操作最多返回的消息數量properties:#如果在這個時間內沒有收到心跳,該消費者會被踢出組并觸發{組再平衡 rebalance}#消費者與 Kafka 服務端的會話超時時間session.timeout.ms: 120000#最大消費時間。此決定了獲取消息后提交偏移量的最大時間,超過設定的時間(默認5分鐘),服務端也會認為該消費者失效。踢出并再平衡#消費者調用 poll 方法的最大間隔時間max.poll.interval.ms: 300000#消費者發送請求到 Kafka 服務端的超時時間#配置控制客戶端等待請求響應的最長時間。#如果在超時之前沒有收到響應,客戶端將在必要時重新發送請求,#或者如果重試次數用盡,則請求失敗。request.timeout.ms: 60000#訂閱或分配主題時,允許自動創建主題。0.11之前,必須設置falseallow.auto.create.topics: true#消費者向協調器發送心跳的間隔時間。#poll方法向協調器發送心跳的頻率,為session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每個分區里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節#0.10.1版本后 如果 fetch 的第一個非空分區中的第一條消息大于這個限制#仍然會返回該消息,以確保消費者可以進行#每個分區最多拉取的消息字節數。#max.partition.fetch.bytes=1048576 #1Mlistener:#當enable.auto.commit的值設置為false時,該值會生效;為true時不會生效#manual_immediate:需要手動調用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate 手動 ACK 的方式。#如果監聽的主題不存在, 是否啟動失敗。missing-topics-fatal: false #如果至少有一個topic不存在,true啟動失敗。false忽略#消費方式, single 表示單條消費, batch 表示批量消費#type: single #單條消費?批量消費? #批量消費需要配合 consumer.max-poll-recordstype: batch#并發消費的線程數concurrency: 2 #配置多少,就為為每個消費者實例創建多少個線程。多出分區的線程空閑#默認的主題名稱template:default-topic: "test"#springboot啟動的端口號 server:port: 9999 #這個是java項目啟動的端口
基本案例
這是常量類
指定了一個topic和group
主題和分組id
groupid是消費者組的唯一標識
這個視頻9分鐘看懂kafka
小朋友也可以懂的Kafka入門教程,還不快來學_嗶哩嗶哩_bilibili
生產者
我們這個Autowired自動注入,會根據我們的配置文件的配置來自動注入
@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;
produces里面指定我們前端傳的是json格式
?我們往這個標題發送我們的消息,其實這個就是我們的常量類里面寫的"test"
消費者
@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因為這個String是Json,所以我們可以轉回Object對象,其實是轉成JsonObject對象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消費了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}
我們用List<String>來接收,因為可能一個消費者接收多條消息
指定消費者監聽的主題topic
以及指定消費者的唯一標識GROUP_ID
這些其實都是自己在常量類里面自己寫好的
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
?這個是得到我們的主題topic的名字
我用apifox調試之后,成功執行了
kafka的圖形化工具
這里介紹一個免費的開源項目KafkaKing
Releases · Bronya0/Kafka-King (github.com)
里面還能指定中文