Docker搭建kafka+zookeeper以及Springboot集成kafka快速入門

參考文章

【Docker安裝部署Kafka+Zookeeper詳細教程】_linux arm docker安裝kafka-CSDN博客

Docker搭建kafka+zookeeper

打開我們的docker的鏡像源配置

vim?/etc/docker/daemon.json

配置

?{
? "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

?下面的那個insecure是我自己虛擬機的,不用理會

拉取鏡像

然后開始拉取我們的zookeeper鏡像和我們的kafka鏡像

這個是我們的zookeeper鏡像,沒有指定版本默認就是拉取最新的版本

docker pull zookeeper

kafka鏡像?

docker pull wurstmeister/kafka

因為我們的docker不同容器之間的網絡是互相隔開的,所以我們要創建一個共同使用的網絡

讓不同容器都加入這個網絡

docker network create創建我們的網絡

然后那個zookeeper_network是我們自定義的網絡名稱

docker network create --driver bridge zookeeper_network

kafka是依賴于zookeeper的所以我們要先安裝zookeeper

我們先用run來創建一個zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后臺運行

--name 是我們自定義容器的名字? 我定義的名字是zookeeper1

--network?

是指定我們的網絡環境,我們剛剛創建的網絡環境名字叫zookeeper_network,所以我們要讓容器加入這個網絡

-p 是指定我們的容器暴露給外部的端口? 2181:2181是指虛擬機(或服務器)的2181端口與容器內部的2181端口做映射

最后面的那個zookeeper 是我們的使用的鏡像源的名稱

一般是zookeeper:xxx來執行使用鏡像源的版本,如果不指定版本默認用的就是最新版本

查看我們創建的網絡環境的地址

docker inspect zookeeper_network

那個IPv4就是我們的網絡環境的地址,這是我的網絡環境的地址

我的是12.21.0.2,這個ip地址是要記住方便后面使用的
?

創建一個kafka容器

這段代碼有點長,根子自己改吧

 # 啟動kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主機IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解釋?

KAFKA_ZOOKEEPER_CONNECT 后面寫的是我們的之前的網絡的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我們的虛擬機(服務器)的本機的地址

不知道本機地址可以輸入 ip addr來查看本機地址

這樣子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我們引入兩個kafka的依賴?

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version>
</dependency>

自己對著自己的版本來

自己看b站視頻,9分鐘就搞定了

63-kafka-集成-Java場景-SpringBoot_嗶哩嗶哩_bilibili

然后開始寫我們的application.yml配置文件

下面是配置文件的全部+解析

其實和普通mq差不多

也就是配置生產者和消費者和一些過期時間超時時間

重點在于那個missing-topics-fatal

主題不存在的話,我們是否還要成功啟動

我自己的寫的默認的主題是test,但是我還沒在kafka里面創建,kafka里面還沒有這個叫test的主題

所以我啟動的時候,報錯然后失敗了?

spring:kafka:bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口號producer:acks: all #生產者發送消息時, Kafka 集群需要確認的確認級別。all 表示需要所有 broker 確認消息已經寫入batch-size: 16384  #生產者在發送消息時, 會先緩存一些消息, 達到 batch-size 后再批量發送。這個參數設置了批量發送的大小。buffer-memory: 33554432  #生產者用于緩存消息的內存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定義了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定義了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消費者組ID#消費方式: 在有提交記錄的時候,earliest與latest是一樣的,從提交記錄的下一條開始消費# earliest:無提交記錄,表示從最早的消息開始消費#latest:無提交記錄,從最新的消息的下一條開始消費auto-offset-reset: earliest  #當消費者沒有提交過 offset 時, 從何處開始消費消息enable-auto-commit: true #是否自動提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自動提交 offset 的間隔時間key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定義了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定義了消息 key 和 value 的反序列化方式max-poll-records: 2  #一次 poll 操作最多返回的消息數量properties:#如果在這個時間內沒有收到心跳,該消費者會被踢出組并觸發{組再平衡 rebalance}#消費者與 Kafka 服務端的會話超時時間session.timeout.ms: 120000#最大消費時間。此決定了獲取消息后提交偏移量的最大時間,超過設定的時間(默認5分鐘),服務端也會認為該消費者失效。踢出并再平衡#消費者調用 poll 方法的最大間隔時間max.poll.interval.ms: 300000#消費者發送請求到 Kafka 服務端的超時時間#配置控制客戶端等待請求響應的最長時間。#如果在超時之前沒有收到響應,客戶端將在必要時重新發送請求,#或者如果重試次數用盡,則請求失敗。request.timeout.ms: 60000#訂閱或分配主題時,允許自動創建主題。0.11之前,必須設置falseallow.auto.create.topics: true#消費者向協調器發送心跳的間隔時間。#poll方法向協調器發送心跳的頻率,為session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每個分區里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節#0.10.1版本后 如果 fetch 的第一個非空分區中的第一條消息大于這個限制#仍然會返回該消息,以確保消費者可以進行#每個分區最多拉取的消息字節數。#max.partition.fetch.bytes=1048576  #1Mlistener:#當enable.auto.commit的值設置為false時,該值會生效;為true時不會生效#manual_immediate:需要手動調用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate   手動 ACK 的方式。#如果監聽的主題不存在, 是否啟動失敗。missing-topics-fatal: false #如果至少有一個topic不存在,true啟動失敗。false忽略#消費方式, single 表示單條消費, batch 表示批量消費#type: single #單條消費?批量消費? #批量消費需要配合 consumer.max-poll-recordstype: batch#并發消費的線程數concurrency: 2 #配置多少,就為為每個消費者實例創建多少個線程。多出分區的線程空閑#默認的主題名稱template:default-topic: "test"#springboot啟動的端口號
server:port: 9999 #這個是java項目啟動的端口

基本案例

這是常量類

指定了一個topic和group

主題和分組id

groupid是消費者組的唯一標識

這個視頻9分鐘看懂kafka

小朋友也可以懂的Kafka入門教程,還不快來學_嗶哩嗶哩_bilibili

生產者

我們這個Autowired自動注入,會根據我們的配置文件的配置來自動注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我們前端傳的是json格式

?我們往這個標題發送我們的消息,其實這個就是我們的常量類里面寫的"test"

消費者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因為這個String是Json,所以我們可以轉回Object對象,其實是轉成JsonObject對象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消費了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}

我們用List<String>來接收,因為可能一個消費者接收多條消息

指定消費者監聽的主題topic

以及指定消費者的唯一標識GROUP_ID

這些其實都是自己在常量類里面自己寫好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

?這個是得到我們的主題topic的名字

我用apifox調試之后,成功執行了


kafka的圖形化工具

這里介紹一個免費的開源項目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面還能指定中文

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/42783.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/42783.shtml
英文地址,請注明出處:http://en.pswp.cn/web/42783.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vue父子組件通信實現模糊搜索功能

我遇到的問題&#xff1a; 我的搜索框在父頁面&#xff0c;靜態數據都在子頁面。怎么實現模糊查詢數據&#xff1f; 昨天的嘗試&#xff1a;先把搜索的內容數據存到session里&#xff0c;然后從session里拿&#xff0c; 結果&#xff1a;存是存進去了&#xff0c;卻拿不到。應…

Django學習收尾

啟動項目命令 python manage.py runserver 文件上傳功能實現 title "Form上傳"if request.method "GET":form UpForm()return render(request, upload_form.html, {"form": form, "title": title})form UpForm(datarequest.POS…

Java對象創建究竟是在棧上還是堆上??

在 Java 中&#xff0c;對象的創建通常情況下是在堆上。 基本數據類型&#xff08;如 byte、short、int、long、float、double、char&#xff09;在方法內聲明時&#xff0c;其值會存儲在棧上。除了基本數據類型之外的所有對象&#xff0c;都是由 Java 虛擬機&#xff08;JVM&…

python入門基礎知識·二

""" # Python介紹 # Python注釋 # 單行注釋&#xff1a; # # 多行注釋&#xff1a; r """""" # Python輸出和輸入 # print: 輸出 # input: 輸入 ①會讓程序暫停&#xff0c;②得到的是字符串內容 int(&…

Linux Mac 安裝Higress 平替 Spring Cloud Gateway

Linux Mac 安裝Higress 平替 Spring Cloud Gateway Higress是什么?傳統網關分類Higress定位下載安裝包執行安裝命令執行腳本 安裝成功打開管理界面使用方法configure.shreset.shstartup.shshutdown.shstatus.shlogs.sh Higress官網 Higress是什么? Higress是基于阿里內部的…

Vue指令詳解與實操運用 - 編程魔法

在Vue.js的世界里&#xff0c;指令就像是一位魔法師&#xff0c;它們能夠賦予HTML元素以生命&#xff0c;讓網頁與用戶互動起來。今天&#xff0c;我們就來揭開這些指令的神秘面紗&#xff0c;看看它們是如何在我們的日常開發中發揮作用的。 1. v-text 和 v-html - 文字與內容的…

思考:Java內存模型和硬件內存模型

前言 前一陣在看volatile的原理&#xff0c;看到內存屏障和緩存一致性&#xff0c;發現再往底層挖就挖到了硬件和Java內存模型。這一塊是自己似懂非懂的知識區&#xff0c;我一般稱之為知識混沌區。因此整理這一篇文章。 什么是內存模型&#xff08;Memory Model&#xff09;…

CentOS6用文件配置IP模板

CentOS6用文件配置IP模板 到 CentOS6.9 , 默認還不能用 systemctl , 能用 service chkconfig sshd on 對應 systemctl enable sshd 啟用,開機啟動該服務 ### chkconfig sshd on 對應 systemctl enable sshd 啟用,開機啟動該服務 sudo chkconfig sshd onservice sshd start …

未羽研發測試管理平臺

突然有一些覺悟&#xff0c;程序猿不能只會吭哧吭哧的低頭做事&#xff0c;應該學會怎么去展示自己&#xff0c;怎么去宣傳自己&#xff0c;怎么把自己想做的事表述清楚。 于是&#xff0c;這兩天一直在整理自己的作品&#xff0c;也為接下來的找工作多做點準備。接下來…

LT7911UX 國產原裝 一拖三 edp 轉LVDS 可旋轉 可縮放

2.一般說明 該LT7911UX是一種高性能Type-C/DP1.4a到MIPI或LVDS芯片的VR/顯示應用。HDCP RX作為HDCP轉發器的上游&#xff0c;可以與其他芯片的HDCP TX配合實現轉發器功能。 對于DP1.4a輸入&#xff0c;LT7911UX可配置為1/2/4通道。自適應均衡使其適用于長電纜應用&#xff0c;最…

Junior.Crypt.2024 CTF Web方向 題解WirteUp 全

Buy a cat 題目描述&#xff1a;Buy a cat 開題 第一思路是抓包改包 Very Secure App 題目描述&#xff1a;All secrets become clear 開題 亂輸一個密碼就登陸成功了&#xff08;不是弱口令&#xff09; 但是回顯Your role is: user 但是有jwt&#xff01;&#xff01;&a…

深入理解基本數據結構:鏈表詳解

引言 在計算機科學中&#xff0c;數據結構是存儲、組織和管理數據的方式。鏈表是一種重要的線性數據結構&#xff0c;廣泛應用于各種編程場景。在這篇博客中&#xff0c;我們將詳細探討鏈表的定義、特點、操作及其在不同編程語言中的實現。 什么是鏈表&#xff1f; 鏈表是一種…

Mobile ALOHA前傳之VINN, Diffusion Policy和ACT對比

VINNDiffusion PolicyACT核心思想1.從離線數據中自監督學習獲得一個視覺編碼器&#xff1b;2.基于視覺編碼器&#xff0c;從采集的示例操作數據中檢索與當前觀測圖像最相似的N張圖像以及對應的動作&#xff1b;3.基于圖像編碼器的距離對各個動作進行加權平均&#xff0c;獲得最…

Open3D loss函數優化的ICP配準算法(精配準)

目錄 一、概述 1.1ICP的基本步驟 1.2損失函數的設計 二、代碼實現 2.1關鍵函數 2.2完整代碼 三、實現效果 3.1原始點云 3.2配準后點云 3.3計算數據 一、概述 ICP(Iterative Closest Point)配準算法是一種用于對齊兩個點云的經典算法。其目標是通過迭代優化…

Istio實戰教程:Service Mesh部署與流量管理

引言 Istio是一個開源的服務網格&#xff0c;它提供了一種統一的方法來連接、保護、控制和觀察服務。本教程將指導你從零開始部署Istio&#xff0c;并展示如何使用Istio進行基本的流量管理。 環境準備 Kubernetes集群&#xff1a;Istio運行在Kubernetes之上&#xff0c;確保…

W25Q64 Flash存儲器與STM32:硬件與軟件的完美結合案例

摘要 在嵌入式系統中&#xff0c;數據存儲是關鍵組成部分之一。W25Q64 Flash存儲器因其高容量、低功耗和高可靠性&#xff0c;成為STM32微控制器項目中優選的存儲解決方案。本文將展示W25Q64與STM32微控制器集成的案例&#xff0c;包括硬件設計、SPI通信協議實現和軟件編程策略…

記錄在Windows上安裝Docker

在Windows上安裝Docker時&#xff0c;可以選擇使用不同的后端。 其中兩個常見的選擇是&#xff1a;WSL 2&#xff08;Windows Subsystem for Linux 2&#xff09;和 Hyper-V 后端。此外&#xff0c;還可以選擇使用Windows容器。 三者的區別了解即可&#xff0c;推薦用WSL 2&…

我們公司落地大模型的路徑、方法和坑

我們公司落地大模型的路徑、方法和坑 李木子 AI大模型實驗室 2024年07月02日 18:35 北京 最近一年&#xff0c;LLM&#xff08;大型語言模型&#xff09;已經成熟到可以投入實際應用中了。預計到 2025 年&#xff0c;AI 領域的投資會飆升到 2000 億美元。現在&#xff0c;不只…

Thinking--在應用中添加動態水印,且不可刪除

Thinking系列&#xff0c;旨在利用10分鐘的時間傳達一種可落地的編程思想。 水印是一種用于保護版權和識別內容的技術&#xff0c;通常用于圖像、視頻或文檔中。它可以是文本、圖像或兩者的組合&#xff0c;通常半透明或以某種方式嵌入到內容中&#xff0c;使其不易被移除或篡改…

【Linux】多線程_2

文章目錄 九、多線程2. 線程的控制 未完待續 九、多線程 2. 線程的控制 主線程退出 等同于 進程退出 等同于 所有線程都退出。為了避免主線程退出&#xff0c;但是新線程并沒有執行完自己的任務的問題&#xff0c;主線程同樣要跟進程一樣等待新線程返回。 pthread_join 函數…