SpringBoot3集成Kafka

標簽:Kafka3.Kafka-eagle3;

一、簡介

Kafka是一個開源的分布式事件流平臺,常被用于高性能數據管道、流分析、數據集成和關鍵任務應用,基于Zookeeper協調的處理平臺,也是一種消息系統,具有更好的吞吐量、內置分區、復制和容錯,這使得它成為大規模消息處理應用程序的一個很好的解決方案;

二、環境搭建

1、Kafka部署

1、下載安裝包:kafka_2.13-3.5.0.tgz2、配置環境變量open -e ~/.bash_profileexport KAFKA_HOME=/本地路徑/kafka3.5
export PATH=$PATH:$KAFKA_HOME/binsource ~/.bash_profile3、該目錄【kafka3.5/bin】啟動zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties4、該目錄【kafka3.5/bin】啟動kafka
kafka-server-start.sh ../config/server.properties

2、Kafka測試

1、生產者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message2、消費者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message

3、可視化工具

配置和部署

1、下載安裝包:kafka-eagle-bin-3.0.2.tar.gz2、配置環境變量open -e ~/.bash_profileexport KE_HOME=/本地路徑/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/binsource ~/.bash_profile3、修改配置文件:system-config.propertiesefak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle4、本地新建數據庫:kafka-eagle,注意用戶名和密碼是否一致5、啟動命令
efak-web-3.0.2/bin/ke.sh start
命令語法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}6、本地訪問【localhost:8048】 username:admin password:123456

KSQL語句測試

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程結構

2、依賴管理

這里關于依賴的管理就比較復雜了,首先spring-kafka組件選擇與boot框架中spring相同的依賴,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;

但是該版本使用的是kafka-clients組件的3.3.2版本,在Spring文檔的kafka模塊中,明確說明spring-boot:3.1要使用kafka-clients:3.4,所以從spring-kafka組件中排除掉,重新依賴kafka-clients組件;

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version>
</dependency>

3、配置文件

配置kafka連接地址,監聽器的消息應答機制,消費者的基礎模式;

spring:# kafka配置kafka:bootstrap-servers: localhost:9092listener:missing-topics-fatal: falseack-mode: manual_immediateconsumer:group-id: boot-kafka-groupenable-auto-commit: falsemax-poll-records: 10properties:max.poll.interval.ms: 3600000

四、基礎用法

1、消息生產

模板類KafkaTemplate用于執行高級的操作,封裝各種消息發送的方法,在該方法中,通過topickey以及消息主體,實現消息的生產;

@RestController
public class ProducerWeb {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/msg")public String sendMsg (){try {// 構建消息主體JsonMapper jsonMapper = new JsonMapper();String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));// 發送消息kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);} catch (JsonProcessingException e) {e.printStackTrace();}return "OK" ;}
}

2、消息消費

編寫消息監聽類,通過KafkaListener注解控制監聽的具體信息,在實現消息生產和消費的方法測試后,使用可視化工具kafka-eagle查看topic和消息列表;

@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@KafkaListener(topics = "boot-kafka-topic")public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {try {String key =  String.valueOf(record.key());String body = record.value();log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);} catch (Exception e){e.printStackTrace();} finally {acknowledgment.acknowledge();}}
}

五、參考源碼

文檔倉庫:
https://gitee.com/cicadasmile/butte-java-note源碼倉庫:
https://gitee.com/cicadasmile/butte-spring-parent

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/43169.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/43169.shtml
英文地址,請注明出處:http://en.pswp.cn/news/43169.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

跟著美團學設計模式(感處)

讀了著篇文章之后發現真的是&#xff0c;你的思想&#xff0c;你的思維是真的比比你擁有什么技術要強的。 注 開閉原則 開閉原則&#xff08;Open-Closed Principle&#xff09;是面向對象設計中的基本原則之一&#xff0c;它的定義是&#xff1a;一個軟件實體應該對擴展開放…

python生成旗幟--比如美國國旗生成

目錄 1、解釋說明&#xff1a; 2、使用示例&#xff1a; 3、注意事項&#xff1a; 1、解釋說明&#xff1a; 在Python中&#xff0c;生成國旗可以通過使用第三方庫或者自定義函數來實現。通常&#xff0c;我們可以使用Pillow庫來處理圖像&#xff0c;以及使用matplotlib庫來…

python爬蟲7:實戰1

python爬蟲7&#xff1a;實戰1 前言 ? python實現網絡爬蟲非常簡單&#xff0c;只需要掌握一定的基礎知識和一定的庫使用技巧即可。本系列目標旨在梳理相關知識點&#xff0c;方便以后復習。 申明 ? 本系列所涉及的代碼僅用于個人研究與討論&#xff0c;并不會對網站產生不好…

carla中lka實現(二)

前言&#xff1a; 首先計算之前檢測出來的車道線的中線與輸入圖像的中線進行計算距離&#xff0c;&#xff0c;并設置不同的閾值對于不同的方向進行相關的調整。 一、車輛中心線 一般而言將攝像頭架設在車輛的正中心軸上&#xff0c;所獲得的圖像的中間線極為車輛的中心。 …

QGraphicsView 實例3地圖瀏覽器

主要介紹Graphics View框架&#xff0c;實現地圖的瀏覽、放大、縮小&#xff0c;以及顯示各個位置的視圖、場景和地圖坐標 效果圖: mapwidget.h #ifndef MAPWIDGET_H #define MAPWIDGET_H #include <QLabel> #include <QMouseEvent> #include <QGraphicsView&…

WSL2 ubuntu子系統OpenCV調用本機攝像頭的RTSP視頻流做開發測試

文章目錄 前言一、Ubuntu安裝opencv庫二、啟動 Windows 本機的 RTSP 視頻流下載解壓 EasyDarwin查看本機攝像頭設備開始推流 三、在ubuntu 終端編寫代碼創建目錄及文件創建CMakeLists.txt文件啟動 cmake 配置并構建 四、結果展示啟動圖形界面在圖形界面打開終端找到 rtsp_demo運…

linux系統服務學習(二)linux下yum源配置實戰

文章目錄 Linux下yum源配置實戰一、Linux下軟件包的管理1、軟件安裝方式2、源碼安裝的配置過程3、詳解源碼安裝的配置過程&#xff08;定制&#xff09;4、詳解編譯過程5、安裝過程6、axel多線程下載軟件源碼安裝7、使用軟鏈接解決command not found8、使用環境變量解決command…

軟考A計劃-系統集成項目管理工程師-收尾管理

點擊跳轉專欄>Unity3D特效百例點擊跳轉專欄>案例項目實戰源碼點擊跳轉專欄>游戲腳本-輔助自動化點擊跳轉專欄>Android控件全解手冊點擊跳轉專欄>Scratch編程案例點擊跳轉>軟考全系列點擊跳轉>藍橋系列 &#x1f449;關于作者 專注于Android/Unity和各種游…

字符串的無重復排列組合

題目描述&#xff1a; 無重復字符串的排列組合。編寫一種方法&#xff0c;計算某字符串的所有排列組合&#xff0c;字符串每個字符均不相同。 示例1: 輸入&#xff1a;S "qwe" 輸出&#xff1a;["qwe", "qew", "wqe", "weq&q…

中間件(二)dubbo負載均衡介紹

一、負載均衡概述 支持輪詢、隨機、一致性hash和最小活躍數等。 1、輪詢 ① sequences&#xff1a;內部的序列計數器 ② 服務器接口方法權重一樣&#xff1a;&#xff08;sequences1&#xff09;%服務器的數量&#xff08;決定調用&#xff09;哪個服務器的服務。 ③ 服務器…

opencv直方圖與模板匹配

import cv2 #opencv讀取的格式是BGR import numpy as np import matplotlib.pyplot as plt#Matplotlib是RGB %matplotlib inline def cv_show(img,name):cv2.imshow(name,img)cv2.waitKey()cv2.destroyAllWindows() 直方圖 cv2.calcHist(images,channels,mask,histSize,ran…

Spring中Bean的生命周期以及Bean的單例與多例模式

一. Bean的生命周期 bean的生命周期可以表達為&#xff1a;bean的定義?bean的初始化?bean的使用?bean的銷毀 Bean的初始化過程 1&#xff09;通過XML、Java annotation&#xff08;注解&#xff09;以及Java Configuration&#xff08;配置類&#xff09; 等方式加載Bea…

2023+HuggingGPT: Solving AI Tasks with ChatGPT and itsFriends in Hugging Face

摘要&#xff1a; 語言是llm(例如ChatGPT)連接眾多AI模型(例如hugs Face)的接口&#xff0c;用于解決復雜的AI任務。在這個概念中&#xff0c;llms作為一個控制器&#xff0c;管理和組織專家模型的合作。LLM首先根據用戶請求規劃任務列表&#xff0c;然后為每個任務分配專家模…

Unity 鼠標實現對物體的移動、縮放、旋轉

文章目錄 1. 代碼2. 測試場景 1. 代碼 using UnityEngine;public class ObjectManipulation : MonoBehaviour {// 縮放比例限制public float MinScale 0.2f;public float MaxScale 3.0f;// 縮放速率private float scaleRate 1f;// 新尺寸private float newScale;// 射線pri…

【Windows系統編程】03.遠線程注入ShellCode

shellcode&#xff1a;本質上也是一段普通的代碼&#xff0c;只不過特殊的編程手法&#xff0c;可以在任意環境下&#xff0c;不依賴于原有的依賴庫執行。 遠程線程 #include <iostream> #include <windows.h> #include <TlHelp32.h>int main(){HANDLE hPr…

Educational Codeforces Round 153 (Rated for Div. 2)ABC

Educational Codeforces Round 153 (Rated for Div. 2) 目錄 A. Not a Substring題目大意思路核心代碼 B. Fancy Coins題目大意思想核心代碼 C. Game on Permutation題目大意思想核心代碼 A. Not a Substring 題目大意 給定一個只包含“&#xff08;”和“&#xff09;”這兩…

react-native-webview RN和html雙向通信

rn登錄后得到的token需要傳遞給網頁&#xff0c;js獲取到的瀏覽器信息需要傳遞給rn RN Index.js: import React from react import { WebView } from react-native-webview import useList from ./useListexport default function Index(props) {const { uri, jsCode, webVie…

iPhone刪除的照片能恢復嗎?不小心誤刪了照片怎么找回?

iPhone最近刪除清空了照片還能恢復嗎&#xff1f;大家都知道&#xff0c;照片對于我們來說是承載著美好回憶的一種形式。它記錄著我們的平淡生活&#xff0c;也留住了我們的美好瞬間&#xff0c;具有極其重要的紀念價值。 照片不小心誤刪是一件非常難受的事&#xff0c;那么iP…

android TextView 超出長度使用省略號

在Android中最常見的需求&#xff0c;就是在在外部展示信息時&#xff0c;需要簡要展示內容。TextView僅需在靜態布局文件中設置以下幾個屬性&#xff1a; android:maxWidth“100dp” // 寬度是多少才算超出 android:maxLines"2" // 高度多少才算超出 android:elli…

React下載文件的兩種方式

React下載文件的兩種方式 - 代碼先鋒網 不知道有用沒用看著挺整齊 沒試過 1、GET類型下載 download url > {const eleLink document.createElement(a);eleLink.style.display none;// eleLink.target "_blank"eleLink.href url;// eleLink.href record;d…