芝法醬躺平攻略(21)——kafka安裝和使用

本節內容比較初級,故接著躺平攻略寫

一、官網的下載

1.1 下載解壓

首先,去官網下載jar包,放進linux中,解壓到對應位置。
我的位置放在/WORK/MIDDLEWARE/kafka/4.0

1.2 常見配置

# 每個topic默認的分片數
num.properties=4
# 數據被刪除的時間
log.retention.hours=168
# 文件存儲路徑,注意,這不是日志,而是數據
log.dirs=/WORK/MIDDLEWARE/kafka/4.0/kraft-combined-logs
# 這個地方一定要修改,不然客戶端無法連通
# 這里要寫成ip
advertised.listeners=PLAINTEXT://192.168.0.64:9092,CONTROLLER://192.168.0.64:9093

1.3 自啟動

創建 /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-start.sh /WORK/MIDDLEWARE/kafka/4.0/config/server.properties'
ExecStop=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-stop.sh'
Restart=on-failure[Install]
WantedBy=multi-user.target                       

啟用

systemctl daemreload
systemctl enable kafka

1.4 創建topic

bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my-test-topic --bootstrap-server localhost:9092

描述信息展示如下:

        Topic: my-test-topic    Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 1    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 2    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 3    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:

畢竟我們是學習環境,搭的單機節點,對于每個分區沒有做副本。生產環境下,注意把副本分配到不同的節點上
使用參數如下:

--replica-assignment "<partition0>:<brokerA>,<brokerB>,…;<partition1>:<brokerC>,<brokerD>,…;…"
#如:
--replica-assignment "0:1,2;1:2,3;2:1,3

解釋一下,':‘前面的是分區的編號;’:'后面是這個分區的數據,分別放到哪個broker下

1.5 安裝kafka-ui

cd /WORK/MIDDLEWARE/kafka
mkdir kafka-ui
cd kafka-ui
vim docker-compose.yml

編輯docker-compose文件

services:kafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestports:- 9100:8080environment:DYNAMIC_CONFIG_ENABLED: 'true'

二、SpringBoot的生產者接入

2.1 pom引用

注意,我這里的indi.zhifa.engine-cloud:common-web-starter是自己寫的庫,便于快速創建web項目,大家可以去 我的碼云 下載

    <dependencies><dependency><groupId>indi.zhifa.engine-cloud</groupId><artifactId>common-web-starter</artifactId><version>${zhifa-engine.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.10</version></dependency></dependencies>

2.2 生產者java核心代碼:

service

@Slf4j
@Component
public class KafkaSendDataImpl implements IKafkaSendData {private final KafkaTemplate<String, String> mKafkaTemplate;private final FastJsonConfig mFastJsonConfig;public KafkaSendDataImpl(KafkaTemplate<String, String> pKafkaTemplate,@Qualifier("simple-fastJson-config") FastJsonConfig pFastJsonConfig) {mKafkaTemplate = pKafkaTemplate;mFastJsonConfig = pFastJsonConfig;}@Overridepublic void sendAsync(String topic,KafkaData pKafkaData) {String str = JSON.toJSONString(pKafkaData);try{mKafkaTemplate.send(topic,pKafkaData.getName(),str);}catch (Exception e){log.error("發送kafka時發成錯誤,錯誤信息是"+ e.getMessage());}}
}

controller

@Slf4j
@Validated
@RequiredArgsConstructor
@Tag(name = "生產者")
@ZhiFaRestController
@RequestMapping("/kafka/produce")
public class KafkaProduceController {final IKafkaSendData mKafkaSendData;@PostMapping("/{topic}")public void sendAsync(@PathVariable("topic") String pTopic, @RequestBody KafkaData pKafkaData){mKafkaSendData.sendAsync(pTopic,pKafkaData);}
}

配置:


server:# 服務端口port: 8083springdoc:swagger-ui:path: /swagger-ui.htmltags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: "管理接口"paths-to-match: '/**'packages-to-scan:- indi.zhifa.study2025.test.kafka.producer.controllerzhifa:enum-memo:enabled: trueenum-packages:- indi.zhifa.**.enumsuri: /api/enumweb:enabled: truespring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: zstd#0 的時候,吞吐量最高,不管是否成功#1 leader收到后才響應#-1 要求所有的follow都寫成功#通常iot項目,日志采集等,該值設為0.僅僅用來解耦時,比如訂單處理業務,一般設成all,避免丟失,并且在回調監控。并且會自動開啟冪等性。acks: all# 重試次數retries: 3

我們創建幾條消息,觀察現象:
在這里插入圖片描述
打開swagger-ui,看到確實有消息數量了
在這里插入圖片描述

2.3 key的作用

額外解釋一點,發送時,指定消息的key。kafka默認會把同一個key放在一個partition(分區)中。我這里用name做key,可以保證同一個name的消息被順序消費。

三、SpringBoot的消費者接入

消費者非常簡單,這里略寫

3.1 java核心代碼

@Component
public class KafkaConsumerListener {private Map<String,Long> mMsgIdx;public KafkaConsumerListener() {mMsgIdx = new ConcurrentHashMap<>();}@KafkaListener(topics = "my-test-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record) {String key = record.key();           // 獲取消息的 keyString value = record.value();       // 獲取消息的 valueString topic = record.topic();       // 獲取消息的 topicint partition = record.partition(); // 獲取消息的分區long offset = record.offset();      // 獲取消息的偏移量long timestamp = record.timestamp(); // 獲取消息的時間戳// 處理消息(這里我們只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);if(StringUtils.hasText(key)){Long idx = mMsgIdx.get(key);if(idx == null){idx = 0l;}idx = idx + 1;mMsgIdx.put(key, idx);System.out.println(key+"的第"+idx+"個消息");}}
}

3.2 配置

spring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092consumer:group-id: my-group   # 消費者組IDauto-offset-reset: earliest   # 消費者從頭開始讀取(如果沒有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/77965.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/77965.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/77965.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

AutoSAR從概念到實踐系列之MCAL篇(二)——Mcu模塊配置及代碼詳解(上)

歡迎大家學習我的《AutoSAR從概念到實踐系列之MCAL篇》系列課程,我是分享人M哥,目前從事車載控制器的軟件開發及測試工作。 學習過程中如有任何疑問,可底下評論! 如果覺得文章內容在工作學習中有幫助到你,麻煩點贊收藏評論+關注走一波!感謝各位的支持! 根據上一篇內容中…

easypoi 實現word模板導出

特此非常致謝&#xff1a;easypoi實現word模板 基礎的可以參考上文&#xff1b; 但是我的需求有一點點不一樣。 這是我的模板&#xff1a;就是我的t.imgs 是個list 但是很難過的是easy poi 我弄了一天&#xff0c;我都沒有弄出來嵌套list循環怎么輸出顯示&#xff0c;更難過…

Unity中數據存儲_LitJson

文章目錄 LitJson一&#xff1a;介紹二&#xff1a;特點三&#xff1a;使用四&#xff1a;注意事項 LitJson 一&#xff1a;介紹 LitJson 是一個專為 .NET 設計的輕量級 JSON 處理庫&#xff0c;支持序列化和反序列化 JSON 數據。 二&#xff1a;特點 快速且輕量 無外部依賴…

2025年首屆人形機器人半程馬拉松比賽(附機器人照片)

2025年4月19日&#xff0c;北京亦莊半程馬拉松暨人形機器人半場馬拉松正式開賽&#xff0c;作為全球首屆人形機器人戶外跑步成功舉辦&#xff0c;21.0975公里的戶外路程對人形機器人來講&#xff0c;注定將成為歷史性開篇&#xff0c;如果賽事能夠持續舉辦&#xff0c;那舉辦意…

網絡安全職業技能大賽Server2003

通過本地PC中滲透測試平臺Kali對服務器場景Windows進?系統服務及版本掃描滲透測 試&#xff0c;并將該操作顯示結果中Telnet服務對應的端?號作為FLAG提交 使用nmap掃描發現目標靶機開放端口232疑似telnet直接進行連接測試成功 Flag&#xff1a;232 通過本地PC中滲透測試平臺…

[java八股文][Java基礎面試篇]I/O

Java怎么實現網絡IO高并發編程&#xff1f; 可以用 Java NIO &#xff0c;是一種同步非阻塞的I/O模型&#xff0c;也是I/O多路復用的基礎。 傳統的BIO里面socket.read()&#xff0c;如果TCP RecvBuffer里沒有數據&#xff0c;函數會一直阻塞&#xff0c;直到收到數據&#xf…

Python常用的第三方模塊之【jieba庫】支持三種分詞模式:精確模式、全模式和搜索引擎模式(提高召回率)

Jieba 是一個流行的中文分詞Python庫&#xff0c;它提供了三種分詞模式&#xff1a;精確模式、全模式和搜索引擎模式。精確模式嘗試將句子最精確地切分&#xff0c;適合文本分析&#xff1b;全模式則掃描文本中所有可能的詞語&#xff0c;速度快但存在冗余&#xff1b;搜索引擎…

QT6 源(37):界面組件的總基類 QWidget 的源碼閱讀(下,c++ 代碼部分)

&#xff08;1&#xff09; QT 在 c 的基礎上增加了自己的編譯器&#xff0c;以支持元對象系統和 UI 界面設計&#xff0c;有 MOC 、 UIC 等 QT 自己的編譯器。本節的源代碼里&#xff0c;為了減少篇幅&#xff0c;易于閱讀&#xff0c;去除了上篇中的屬性部分&#xff0c; 上篇…

rabbitmq-spring-boot-start版本優化升級

文章目錄 1.前言2.優化升級內容3.依賴4.使用4.1發送消息代碼示例4.2消費監聽代碼示例4.3 brock中的消息 5.RabbmitMq的MessageConverter消息轉換器5.1默認行為5.2JDK 序列化的缺點5.3使用 JSON 進行序列化 6.總結 1.前言 由于之前手寫了一個好用的rabbitmq-spring-boot-start啟…

git lfs下載大文件限額

起因是用 model.load_state_dict(torch.load())加載pt權重文件時&#xff0c;出現錯誤&#xff1a;_pickle.UnpicklingError: invalid load key, ‘v’. GPT告訴我&#xff1a;你的 pt 文件不是權重文件&#xff0c;而是模型整體保存&#xff08;或根本不是 PyTorch 文件&#…

什么是RAG?RAG的主要流程是什么?

**RAG(Retrieval-Augmented Generation)**是一種結合檢索與生成技術的框架,旨在通過引入外部知識增強生成模型的性能。其核心思想是:在生成文本時,先從外部知識庫中檢索相關信息,再將檢索結果與原始輸入結合,作為生成模型的輸入,從而提升生成內容的準確性、相關性和信息…

【Rust 精進之路之第13篇-生命周期·進階】省略規則與靜態生命周期 (`‘static`)

系列: Rust 精進之路:構建可靠、高效軟件的底層邏輯 作者: 碼覺客 發布日期: 2025年4月20日 引言:讓編譯器“讀懂”你的意圖——省略的藝術 在上一篇【生命周期入門】中,我們理解了生命周期的必要性——它是 Rust 編譯器用來確保引用有效性、防止懸垂引用的關鍵機制。我…

Python爬蟲實戰:獲取xie程網敦煌酒店數據并分析,為51出行做參考

一、引言 伴隨互聯網的飛速發展,在線旅游平臺成為人們出行預訂酒店的重要途徑。xie程網作為國內頗具知名度的在線旅游平臺,存有豐富的酒店信息。借助爬取xie程網的酒店數據并加以深入分析,能夠為用戶提供更為精準的酒店推薦,特別是在旅游旺季,如 51 出行期間。本研究致力…

第二十一講 XGBoost 回歸建模 + SHAP 可解釋性分析(利用R語言內置數據集)

下面我將使用 R 語言內置的 mtcars 數據集&#xff0c;模擬一個完整的 XGBoost 回歸建模 SHAP 可解釋性分析 實戰流程。我們將以預測汽車的油耗&#xff08;mpg&#xff09;為目標變量&#xff0c;構建 XGBoost 模型&#xff0c;并用 SHAP 來解釋模型輸出。 &#x1f697; 示例…

PyMC+AI提示詞貝葉斯項目反應IRT理論Rasch分析籃球比賽官方數據:球員能力與位置層級結構研究

全文鏈接&#xff1a;tecdat.cn/?p41666 在體育數據分析領域不斷發展的當下&#xff0c;數據科學家們致力于挖掘數據背后的深層價值&#xff0c;為各行業提供更具洞察力的決策依據。近期&#xff0c;我們團隊完成了一項極具意義的咨詢項目&#xff0c;旨在通過先進的數據分析手…

【android bluetooth 框架分析 03】【Bta 層詳解 1】【Bluetooth Application Laye 介紹】

藍牙協議棧中 Bluetooth Application Layer&#xff08;藍牙應用層&#xff09;是協議棧核心組成部分&#xff0c;它位于協議棧中間偏上的位置&#xff0c;主要負責將底層 Bluetooth Stack&#xff08;如 L2CAP、AVDTP、RFCOMM、SDP 等&#xff09;與上層 Profile 和 Android F…

單片機獲取真實時間的實現方法

單片機獲取真實時間&#xff08;即當前的年月日、時分秒等&#xff09;通常需要依賴外部時間源或模塊&#xff0c;因為單片機本身沒有內置的實時時鐘&#xff08;RTC&#xff09;功能。 在 C 語言環境下&#xff0c;單片機獲取真實時間通常需要依賴 外部硬件模塊&#xff08;如…

Linux——進程優先級/切換/調度

1.進程優先級 1.進程優先級是什么&#xff1a;進程獲取CPU資源的先后順序 2.為什么要有進程優先級&#xff1a;因為一般CPU只有一塊&#xff0c;資源短缺&#xff0c;所以就需要優先級來確定誰先誰后的問題 3.值越低 進程的優先級越高 ps -l進行查看 UID&#xff1a;user id …

鑄鐵劃線平板:多行業的精密測量工具(北重十字滑臺加工廠家)

鑄鐵劃線平板是一種用于精密測量和校準的工具&#xff0c;廣泛應用于各個行業。它通常由鑄鐵制成&#xff0c;表面經過精密加工&#xff0c;能夠保證較高的平整度和準確度。鑄鐵劃線平板的主要作用是用來檢驗工件的平整度和垂直度&#xff0c;也常用于劃線、校準和測量工件的平…

Excel/WPS表格中圖片鏈接轉換成對應的實際圖片

Excel 超鏈圖變助手&#xff08;點擊下載可免費試用&#xff09; 是一款將鏈接轉換成實際圖片&#xff0c;批量下載表格中所有圖片的轉換工具&#xff0c;無需安裝&#xff0c;雙擊打開即可使用。 表格中鏈接如下圖所示&#xff1a; 操作方法&#xff1a; 1、雙擊以下圖標&a…