目錄
一、docker安裝和配置Kafka
1.拉取 Zookeeper 的 Docker 鏡像
2.運行 Zookeeper 容器
3.拉取 Kafka 的 Docker 鏡像
4.運行 Kafka 容器
5.下載 Kafdrop
6.運行 Kafdrop
7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下載很慢,可以找一臺網絡比較好的機器,輸入這兩個命令進行下載,下載后使用docker save -o保存為tar文件,然后將tar文件傳輸到目標機器后,使用docker load -i加載tar文件為docker鏡像文件
8.使用 Kafka 自帶的工具來創建一個名為 users 的主題
9.驗證 Kafka,可以使用 Kafka 自帶的工具來驗證 Kafka 是否正常工作。例如,啟動一個 Kafka 消費者來監聽 users 主題:
二、在Spring Boot項目中集成和使用Kafka
1. 添加依賴
2. 配置Kafka
3. 創建消息對象
4. 創建生產者
5. 創建消費者
6. 測試
三、web訪問Kafdrop
一、docker安裝和配置Kafka
1.拉取 Zookeeper 的 Docker 鏡像
docker pull wurstmeister/zookeeper
2.運行 Zookeeper 容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
3.拉取 Kafka 的 Docker 鏡像
docker pull wurstmeister/kafka
4.運行 Kafka 容器
docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka
5.下載 Kafdrop
docker pull obsidiandynamics/kafdrop
6.運行 Kafdrop
docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop
7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下載很慢,可以找一臺網絡比較好的機器,輸入這兩個命令進行下載,下載后使用docker save -o保存為tar文件,然后將tar文件傳輸到目標機器后,使用docker load -i加載tar文件為docker鏡像文件
下載:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
(kafdrop是一個kafka的web圖形管理界面)
docker pull obsidiandynamics/kafdrop
打包:
docker save -o ./zookeeper.tar wurstmeister/zookeeper
docker save -o ./kafka.tar wurstmeister/kafka
docker save -o ./kafdrop.tar obsidiandynamics/kafdrop
傳輸:
scp kafka.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可
scp zookeeper.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可
scp kafdrop.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可
目標機加載成docker鏡像
docker load -i /usr/root/kafka/kafka.tar
docker load -i /usr/root/kafka/zookeeper.tar
docker load -i /usr/root/kafka/kafdrop.tar
查看鏡像列表
docker images
8.使用 Kafka 自帶的工具來創建一個名為 users 的主題
docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092
9.驗證 Kafka,可以使用 Kafka 自帶的工具來驗證 Kafka 是否正常工作。例如,啟動一個 Kafka 消費者來監聽 users 主題:
docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092
這個命令,會啟動一個額外的 Kafka 消費者來監聽 users 主題。這個消費者是通過 Kafka 自帶的 kafka-console-consumer.sh 工具啟動的,主要用于測試和驗證目的。它會持續監聽并打印出發送到 users 主題的所有消息。
二、在Spring Boot項目中集成和使用Kafka
1. 添加依賴
首先,在你的pom.xml文件中添加Kafka的依賴:
<dependency>
????<groupId>org.springframework.kafka</groupId>
????<artifactId>spring-kafka</artifactId>
</dependency>
2. 配置Kafka
在application.properties或application.yml文件中配置Kafka的相關屬性。這里以application.properties為例:
# Kafka broker地址
spring.kafka.bootstrap-servers=localhost:9092
# 生產者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# 消費者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
3. 創建消息對象
假設我們要發送和接收一個簡單的KafkaMsgs 對象:
public class KafkaMsgs {
????private String id;
????private String msg;
? ? private Long?date;
? ? // 構造函數、getter和setter省略
}
4. 創建生產者
創建一個生產者類來發送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
????@Autowired
????private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;
????public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {
????????kafkaTemplate.send(topic, kafkaMsgs);
????}
}
5. 創建消費者
創建一個消費者類來接收消息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
????@KafkaListener(topics = "users", groupId = "my-group")
????public void listen(KafkaMsgs?kafkaMsgs) {
????????System.out.println("Received message: " + kafkaMsgs);
????}
}
6. 測試
你可以創建一個簡單的測試類來驗證生產和消費是否正常工作:
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.esop.resurge.core.config.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.airbubble.kingdom.army.reponse.FeedBack;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
@Api(tags="kafka數據控制器")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
? ? @Autowired
? ? KafkaProducer kafkaProducer;? ? @ApiOperation(value = "測試發送數據到kafka", httpMethod = "GET")
? ? @GetMapping(value = "/sendKafkaData")
? ? public FeedBack<String> sendKafkaData(
? ? ? ? ? ? @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,
? ? ? ? ? ? @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg
? ? ) throws Exception {
? ? ? ? kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(
? ? ? ? ? ? ? ? IdUtil.fastUUID(),
? ? ? ? ? ? ? ? msg,
? ? ? ? ? ? ? ? Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))
? ? ? ? ));
? ? ? ? return FeedBack.getInstance("發送成功");
? ? }}
三、web訪問Kafdrop
?打開瀏覽器,訪問 http://192.168.7.46:9000,你應該能夠看到 Kafdrop 的 Web 界面