docker安裝kafka,并通過springboot快速集成kafka

目錄

一、docker安裝和配置Kafka

1.拉取 Zookeeper 的 Docker 鏡像

2.運行 Zookeeper 容器

3.拉取 Kafka 的 Docker 鏡像

4.運行 Kafka 容器

5.下載 Kafdrop

6.運行 Kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下載很慢,可以找一臺網絡比較好的機器,輸入這兩個命令進行下載,下載后使用docker save -o保存為tar文件,然后將tar文件傳輸到目標機器后,使用docker load -i加載tar文件為docker鏡像文件

8.使用 Kafka 自帶的工具來創建一個名為 users 的主題

9.驗證 Kafka,可以使用 Kafka 自帶的工具來驗證 Kafka 是否正常工作。例如,啟動一個 Kafka 消費者來監聽 users 主題:

二、在Spring Boot項目中集成和使用Kafka

1. 添加依賴

2. 配置Kafka

3. 創建消息對象

4. 創建生產者

5. 創建消費者

6. 測試

三、web訪問Kafdrop


一、docker安裝和配置Kafka

1.拉取 Zookeeper 的 Docker 鏡像

docker pull wurstmeister/zookeeper

2.運行 Zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

3.拉取 Kafka 的 Docker 鏡像

docker pull wurstmeister/kafka

4.運行 Kafka 容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka

5.下載 Kafdrop

docker pull obsidiandynamics/kafdrop

6.運行 Kafdrop

docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下載很慢,可以找一臺網絡比較好的機器,輸入這兩個命令進行下載,下載后使用docker save -o保存為tar文件,然后將tar文件傳輸到目標機器后,使用docker load -i加載tar文件為docker鏡像文件

下載:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
(kafdrop是一個kafka的web圖形管理界面)
docker pull obsidiandynamics/kafdrop
打包:
docker save -o ./zookeeper.tar wurstmeister/zookeeper
docker save -o ./kafka.tar wurstmeister/kafka
docker save -o ./kafdrop.tar obsidiandynamics/kafdrop
傳輸:
scp kafka.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可
scp zookeeper.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可
scp kafdrop.tar root@192.168.7.46:/usr/root/kafka?回車后輸入密碼即可

目標機加載成docker鏡像
docker load -i /usr/root/kafka/kafka.tar
docker load -i /usr/root/kafka/zookeeper.tar
docker load -i /usr/root/kafka/kafdrop.tar
查看鏡像列表
docker images

8.使用 Kafka 自帶的工具來創建一個名為 users 的主題

docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092

9.驗證 Kafka,可以使用 Kafka 自帶的工具來驗證 Kafka 是否正常工作。例如,啟動一個 Kafka 消費者來監聽 users 主題:

docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092

這個命令,會啟動一個額外的 Kafka 消費者來監聽 users 主題。這個消費者是通過 Kafka 自帶的 kafka-console-consumer.sh 工具啟動的,主要用于測試和驗證目的。它會持續監聽并打印出發送到 users 主題的所有消息。

二、在Spring Boot項目中集成和使用Kafka

1. 添加依賴

首先,在你的pom.xml文件中添加Kafka的依賴:

<dependency>

????<groupId>org.springframework.kafka</groupId>

????<artifactId>spring-kafka</artifactId>

</dependency>

2. 配置Kafka

在application.properties或application.yml文件中配置Kafka的相關屬性。這里以application.properties為例:

# Kafka broker地址

spring.kafka.bootstrap-servers=localhost:9092

# 生產者配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# 消費者配置

spring.kafka.consumer.group-id=my-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*

3. 創建消息對象

假設我們要發送和接收一個簡單的KafkaMsgs 對象:

public class KafkaMsgs {

????private String id;

????private String msg;

? ? private Long?date;

? ? // 構造函數、getter和setter省略

}

4. 創建生產者

創建一個生產者類來發送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducer {

????@Autowired

????private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;

????public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {

????????kafkaTemplate.send(topic, kafkaMsgs);

????}

}

5. 創建消費者

創建一個消費者類來接收消息:

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

@Service

public class KafkaConsumer {

????@KafkaListener(topics = "users", groupId = "my-group")

????public void listen(KafkaMsgs?kafkaMsgs) {

????????System.out.println("Received message: " + kafkaMsgs);

????}

}

6. 測試

你可以創建一個簡單的測試類來驗證生產和消費是否正常工作:

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.esop.resurge.core.config.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.airbubble.kingdom.army.reponse.FeedBack;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags="kafka數據控制器")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
? ? @Autowired
? ? KafkaProducer kafkaProducer;

? ? @ApiOperation(value = "測試發送數據到kafka", httpMethod = "GET")
? ? @GetMapping(value = "/sendKafkaData")
? ? public FeedBack<String> sendKafkaData(
? ? ? ? ? ? @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,
? ? ? ? ? ? @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg
? ? ) throws Exception {
? ? ? ? kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(
? ? ? ? ? ? ? ? IdUtil.fastUUID(),
? ? ? ? ? ? ? ? msg,
? ? ? ? ? ? ? ? Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))
? ? ? ? ));
? ? ? ? return FeedBack.getInstance("發送成功");
? ? }

}

三、web訪問Kafdrop

?打開瀏覽器,訪問 http://192.168.7.46:9000,你應該能夠看到 Kafdrop 的 Web 界面

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/70155.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/70155.shtml
英文地址,請注明出處:http://en.pswp.cn/web/70155.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++ 與 Java 的對比分析:除法運算中的錯誤處理

博客主頁&#xff1a; [小????????] 本文專欄: Java 文章目錄 &#x1f4af;前言&#x1f4af;C中的除法錯誤處理&#x1f4af;Java中的除法錯誤處理&#x1f4af;C與Java錯誤處理的對比&#x1f4af;錯誤處理的優化和實踐&#x1f4af;小結 &#x1f4af;前言 在…

LLM之循環神經網絡(RNN)

在人工智能的領域中&#xff0c;神經網絡是推動技術發展的核心力量。今天&#xff0c;讓我們深入探討循環神經網絡&#xff08;RNN&#xff09; 一、神經網絡基礎 &#xff08;1&#xff09;什么是神經網絡 神經網絡&#xff0c;又稱人工神經網絡&#xff0c;其設計靈感源于人…

SQL sever數據導入導出實驗

1.創建數據庫TCP-H &#xff08;1&#xff09;右鍵“數據庫”&#xff0c;點擊“新建數據庫”即可 &#xff08;2&#xff09;用sql語言創建&#xff0c;此處以創建數據庫DB_test為例&#xff0c;代碼如下&#xff1a; use master;go--檢查在當前服務器系統中的所有數據里面…

讓編程變成一種享受-明基RD320U顯示器

引言 作為一名有著多年JAVA開發經驗的從業者&#xff0c;在工作過程中&#xff0c;顯示器的重要性不言而喻。它不僅是我們與代碼交互的窗口&#xff0c;更是影響工作效率和體驗的關鍵因素。在多年的編程生涯中&#xff0c;我遇到過各種各樣的問題。比如&#xff0c;在進行代碼…

計算機網絡(涵蓋OSI,TCP/IP,交換機,路由器,局域網)

一、網絡通信基礎 &#xff08;一&#xff09;網絡通信的概念 網絡通信是指終端設備之間通過計算機網絡進行的信息傳遞與交流。它類似于現實生活中的物品傳遞過程&#xff1a;數據&#xff08;物品&#xff09;被封裝成報文&#xff08;包裹&#xff09;&#xff0c;通過網絡…

圖像處理篇---基本OpenMV圖像處理

文章目錄 前言1. 灰度化&#xff08;Grayscale&#xff09;2. 二值化&#xff08;Thresholding&#xff09;3. 掩膜&#xff08;Mask&#xff09;4. 腐蝕&#xff08;Erosion&#xff09;5. 膨脹&#xff08;Dilation&#xff09;6. 縮放&#xff08;Scaling&#xff09;7. 旋轉…

SpringMVC重定向接口,參數暴露在url中解決方案!RedirectAttributes

OK&#xff0c;首先描述下業務場景&#xff0c;終端數量限制登錄 1.首先訪問項目login的get接口 2.輸入賬號密碼點擊登錄后&#xff0c;會請求login的POST接口 3.后臺對終端數量邏輯處理不允許登錄跳回到登錄頁面 4.因代碼原因需在后臺進行多次重定向接口&#xff0c;最后跳…

Spring Boot01(注解、)---java八股

Spring Boot中常用注解及其底層實現 1、SpringBootApplication注解&#xff1a; SpringBootApplication注解&#xff1a;這個注解標識了一個SpringBoot工程&#xff0c;它實際上是另外三個注解的組合&#xff0c;這三個注解是&#xff1a; aSpringBootConfiguration&#xff1a…

?2.快速了解HTML5的標簽類型

??HTML5 的標簽類型豐富多樣&#xff0c;每種類型都有其獨特的功能和用途&#xff0c;以下是一些常見的 HTML5 標簽類型介紹&#xff1a; &#x1f98b;結構標簽 &#x1faad;<html>&#xff1a;它是 HTML 文檔的根標簽&#xff0c;所有其他標簽都包含在這個標簽內&am…

eNSP防火墻綜合實驗

一、實驗拓撲 二、ip和安全區域配置 1、防火墻ip和安全區域配置 新建兩個安全區域 ip配置 Client1 Client2 電信DNS 百度web-1 聯通DNS 百度web-2 R2 R1 三、DNS透明代理相關配置 1、導入運營商地址庫 2、新建鏈路接口 3、配置真實DNS服務器 4、創建虛擬DNS服務器 5、配置D…

Linux 配置交換空間(Swap)解決內存不足

&#x1f680; 作者主頁&#xff1a; 有來技術 &#x1f525; 開源項目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 倉庫主頁&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 歡迎點贊 &#x1f44d; 收藏 ?評論 …

個人shell腳本分享

在周一到周五做增量備份&#xff0c;在周六周日做完全備份 #!/bin/bash定義變量 SRC“/path/to/source” # 源目錄 BKUP“/backup” # 備份主目錄 FUL“KaTeX parse error: Expected EOF, got # at position 22: …ull" #? 完全備份目錄 INC"BKUP/inc” # 增量備份…

Django 5 實用指南(一)安裝與配置

1.1 Django5的背景與發展 Django 自從2005年由Adrian Holovaty和Simon Willison在 Lawrence Journal-World 新聞網站上首次發布以來&#xff0c;Django 一直是 Web 開發領域最受歡迎的框架之一。Django 框架經歷了多個版本的演進&#xff0c;每次版本更新都引入了新功能、改進了…

百度搜索融合 DeepSeek 滿血版,開啟智能搜索新篇

百度搜索融合 DeepSeek 滿血版&#xff0c;開啟智能搜索新篇 &#x1f680; &#x1f539; 一、百度搜索全量接入 DeepSeek &#x1f539; 百度搜索迎來重要升級&#xff0c;DeepSeek 滿血版全面上線&#xff01;&#x1f389; 用戶在百度 APP 搜索后&#xff0c;點擊「AI」即…

RabbitMQ服務異步通信

消息隊列在使用過程中&#xff0c;面臨著很多實際問題需要思考&#xff1a; 1. 消息可靠性 消息從發送&#xff0c;到消費者接收&#xff0c;會經理多個過程&#xff1a; 其中的每一步都可能導致消息丟失&#xff0c;常見的丟失原因包括&#xff1a; 發送時丟失&#xff1a; 生…

【教程】MySQL數據庫學習筆記(七)——多表操作(持續更新)

寫在前面&#xff1a; 如果文章對你有幫助&#xff0c;記得點贊關注加收藏一波&#xff0c;利于以后需要的時候復習&#xff0c;多謝支持&#xff01; 【MySQL數據庫學習】系列文章 第一章 《認識與環境搭建》 第二章 《數據類型》 第三章 《數據定義語言DDL》 第四章 《數據操…

膠囊網絡動態路由算法:突破CNN空間局限性的數學原理與工程實踐

一、CNN的空間局限性痛點解析 傳統CNN的瓶頸&#xff1a; 池化操作導致空間信息丟失&#xff08;最大池化丟棄85%激活值&#xff09;無法建模層次空間關系&#xff08;旋轉/平移等變換不敏感&#xff09;局部感受野限制全局特征整合 示例對比&#xff1a; # CNN最大池化示例…

#滲透測試#批量漏洞挖掘#Apache Log4j反序列化命令執行漏洞

免責聲明 本教程僅為合法的教學目的而準備,嚴禁用于任何形式的違法犯罪活動及其他商業行為,在使用本教程前,您應確保該行為符合當地的法律法規,繼續閱讀即表示您需自行承擔所有操作的后果,如有異議,請立即停止本文章讀。 目錄 Apache Log4j反序列化命令執行漏洞 一、…

深入剖析Spring MVC

一、Spring MVC 概述 1. 什么是 Spring MVC&#xff1f; Spring MVC 是基于 Spring 框架的 Web 框架&#xff0c;它實現了 MVC 設計模式&#xff0c;將應用程序分為三個核心部分&#xff1a; Model&#xff1a;封裝應用程序的數據和業務邏輯。 View&#xff1a;負責渲染數據…

機器學習入門-讀書摘要

先看了《深度學習入門&#xff1a;基于python的理論和實踐》這本電子書&#xff0c;早上因為入迷還坐過站了。。 因為里面的反向傳播和鏈式法則特別難懂&#xff0c;又網上搜了相關內容進行進一步理解&#xff0c;參考的以下文章&#xff08;個人認為都講的都非常好&#xff0…