SpringBoot 實現整合kafka的簡單使用

1、引入kafka的依賴

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>

2、配置kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,這里有三個地址,用逗號分隔。listener:ack-mode: manual_immediate #設置消費者的確認模式為manual_immediate,表示消費者在接收到消息后立即手動確認。concurrency: 3  #設置消費者的并發數為5missing-topics-fatal: false  #設置為false,表示如果消費者訂閱的主題不存在,不會拋出異常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 設置消息鍵的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #設置消息值的序列化器acks: 1  #一般就是選擇1,兼顧可靠性和吞吐量 ,如果想要更高的吞吐量設置為0,如果要求更高的可靠性就設置為-1consumer:auto-offset-reset: earliest #設置為"earliest"表示將從最早的可用消息開始消費,即從分區的起始位置開始讀取消息。enable-auto-commit: false #禁用了自動提交偏移量的功能,為了避免出現重復數據和數據丟失,一般都是手動提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設置消息鍵的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #設置消息值的反序列化器

3、創建主題

  • 自動創建(不推薦)

    在kafka的安裝目錄conf目錄下找到該配置文件server.properties,添加如下配置:
    num.partitions=3 #默認3個分區
    auto.create.topics.enable=true #開啟自動創建主題
    default.replication.factor=3 #默認3個副本
    
  • 手動創建

在kafka的安裝目錄bin目錄下,執行如下命令:
//創建一個有三個分區和三個副本,名為zhuoye的主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic xxxx

4、生產者代碼

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "xxxx";@Overridepublic void queryInfo() {List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//獲取上次查詢時間Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定義計數器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍歷查詢for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {} finally {latch.countDown();}});}//等待任務執行完畢latch.await();//將最終的消息集合發送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;}}

這個時候,如果你想看有沒有把消息發送到kafka的指定主題可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic xxxx

5、消費者代碼

@Slf4j
@Component
public class KafkaConsumer {// 消費監聽@KafkaListener(topics = "xxxx",groupId ="aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//處理數據,存入openTsDb.................................ack.acknowledge();//手動提交}catch (Exception e){log.error("kafa-topic【zhuoye】消費阿里云指標源消息【失敗】");log.error(e.getMessage());}}
}

6、常用Kafka的命令

//創建主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收對應的消息kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic xxxx
// 修改kafka-topic分區數
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic xxxx
// 查看topic分區數
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxxx
// 查看用戶組消費情況
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/45945.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/45945.shtml
英文地址,請注明出處:http://en.pswp.cn/web/45945.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux系統下weblogic10.3.6版本打補丁步驟

linux系統 weblogic補丁壓縮包&#xff1a;p35586779_1036_Generic.zip 鏈接&#xff1a;https://pan.baidu.com/s/1EEz_zPX-VHp5EU5LLxfxjQ 提取碼&#xff1a;XXXX &#xff08;補丁壓縮包中包含以下東西&#xff09; 打補丁步驟&#xff1a; 1.備份原weblogic(需要先確保服…

Langchain[3]:Langchain架構演進與功能擴展:流式事件處理、事件過濾機制、回調傳播策略及裝飾器應用

Langchain[3]:Langchain架構演進與功能擴展&#xff1a;流式事件處理、事件過濾機制、回調傳播策略及裝飾器應用 1. Langchain的演變 v0.1: 初始版本&#xff0c;包含基本功能。 從0.1~0.2完成的特性&#xff1a; 通過事件流 API 提供更好的流式支持。標準化工具調用支持Tool…

【linux 100條命令】

以下介紹一些常見的 Linux 命令&#xff1a; 1. ls &#xff1a;用于列出目錄中的內容。 - 常用選項&#xff1a; - -l &#xff1a;以長格式顯示詳細信息&#xff0c;包括文件權限、所有者、所屬組、文件大小、修改時間等。 - -a &#xff1a;顯示所有文件&#xff0c;包…

哪些基于 LLMs 的產品值得開發?從用戶體驗和市場接受度的角度探討

編者按&#xff1a;在大語言模型&#xff08;LLMs&#xff09;相關技術高速發展的今天&#xff0c;哪些基于 LLMs 的產品真正值得我們投入精力開發&#xff1f;如何從用戶體驗和市場接受度的角度評估這些產品的潛力&#xff1f; 今天為大家分享的這篇文章&#xff0c;作者的核心…

從代理模式到注解開發

代理模式 package org.example.proxy;public class ProxyClient {public static void main(String[] args) {ProxyBuilder proxyBuilder new ProxyBuilder();proxyBuilder.build();} }interface BuildDream {void build(); }class CustomBuilder implements BuildDream {Over…

visual studio開發C++項目遇到的坑

文章目錄 1.安裝的時候&#xff0c;順手安裝了C模板&#xff0c;導致新建項目執行出問題2.生成的exe&#xff0c;打開閃退問題3.項目里宏的路徑不對&#xff0c;導致后面編譯沒有輸出4. vs編譯ui&#xff0c;warning跳過&#xff0c;未成功5.vs編譯.h&#xff0c;warning跳過&a…

K8S 中的 CRI、OCI、CRI shim、containerd

K8S 如何創建容器&#xff1f; 下面這張圖&#xff0c;就是經典的 K8S 創建容器的步驟&#xff0c;可以說是冗長復雜&#xff0c;至于為什么設計成這樣的架構&#xff0c;繼續往下讀。 前半部分 CRI&#xff08;Container Runtime Interface&#xff0c;容器運行時接口&#xf…

避免海外業務中斷,TikTok養號注意事項

TikTok已成為企業和個人拓展海外業務的重要平臺。然而&#xff0c;由于平臺規則嚴格&#xff0c;賬號被封禁或限制訪問的風險始終存在。為了確保用戶在TikTok上的業務順利進行&#xff0c;著重說一些養號的注意事項。 文章分為三個部分&#xff0c;分別是遵守平臺規則、養號策略…

Qt判定鼠標是否在該多邊形的線條上

要判斷鼠標是否在由QPainterPath或一系列QPointF點定義的多邊形的線條上&#xff0c;你可以使用以下步驟&#xff1a; 獲取鼠標當前位置&#xff1a;在鼠標事件中&#xff0c;使用QMouseEvent的pos()方法獲取鼠標的當前位置。 檢查點與線段的距離&#xff1a;遍歷多邊形的每條…

面試高級 Java 工程師:2024 年的見聞與思考

面試高級 Java 工程師&#xff1a;2024 年的見聞與思考 由于公司業務拓展需要&#xff0c;公司招聘一名高級java工程研發工程師&#xff0c;主要負責新項目的研發及老項目的維護升級。我作為一名技術面試官&#xff0c;參與招聘高級 Java 工程師&#xff0c;我見證了技術領域的…

LATEX格式的高等數學題庫(導數和概率論與數理統計)

\documentclass{ctexart} \usepackage{amsmath,amssymb,amsfonts,hyperref} \usepackage{CJKutf8} \usepackage{enumitem} % 引入宏包 \usepackage [colorlinkstrue] {} \begin{document}\begin{CJK}{UTF8}{gkai}%正文放在此行下與\end{CJK}之間就行\tableofcontents\newpage\s…

F1-score(標準度量)

什么是F1-score&#xff1f; F1分數&#xff08;F1-score&#xff09;是分類問題的一個衡量指標。一些多分類問題的機器學習競賽&#xff0c;常常將F1-score作為最終測評的方法。它是精確率和召回率的調和平均數&#xff0c;最大為1&#xff0c;最小為0&#xff0c;如公式1所示…

高效轉換:CSV 轉 JSON 數組 API

在日常數據處理和分析中&#xff0c;CSV 和 JSON 是兩種常見的數據格式。無論是開發者還是數據科學家&#xff0c;經常需要在這兩種格式之間轉換。我們提供的 CSV 轉 JSON 數組 API 可以幫助您輕松完成這一任務。 功能特點&#xff1a; 多種輸入方式&#xff1a;支持直接粘貼…

使用GPT3.5,LangChain,FAISS和python構建一個本地知識庫

引言 介紹本地知識庫的概念和用途 在現代信息時代&#xff0c;我們面臨著海量的數據和信息&#xff0c;如何有效地管理和利用這些信息成為一項重要的任務。本地知識庫是一種基于本地存儲的知識管理系統&#xff0c;旨在幫助用戶收集、組織和檢索大量的知識和信息。它允許用戶…

C語言-->指針詳解

提示&#xff1a;本系列文章是C語言的重難點–>指針 C語言-->指針詳解 前言一、什么是指針&#xff1f;二、指針的聲明與初始化三、指針的解引用四、指針與數組五、指針與函數六、動態內存分配七、常見錯誤與注意事項總結我是將軍我一直都在&#xff0c;。&#xff01; 前…

Oracle或MySQL數據遷移到國產數據庫后的注意事項

一、人大金倉Kingbase 1、初始化后兼容 創建sysdate()方法兼容原生MySQL模式下不具備sysdate()的問題&#xff1a; create or replace function sysdate() returns timestamp with time zone as select current_timestamp; language sql; 2. 執行語句收集統計信息&#xff…

1.5-協程基礎與關鍵知識:連接線程的世界-回調型 API 協作

文章目錄 線程 API 轉換成掛起函數&#xff1a;suspendCoroutine支持取消的 suspendCoroutine&#xff1a;suspendCancellableCoroutine總結 線程 API 轉換成掛起函數&#xff1a;suspendCoroutine 在實際項目中即使已經使用協程了&#xff0c;可是要完全避免跟傳統的線程 API…

Excel 學習手冊 - 精進版(包括各類復雜函數及其嵌套使用)

作為程序員從未想過要去精進一下 Excel 辦公軟件的使用方法&#xff0c;以前用到某功能都是直接百度&#xff0c;最近這兩天跟著嗶哩嗶哩上的戴戴戴師兄把 Excel 由里到外學了一遍&#xff0c;收獲良多。程序員要想掌握這些內容可以說是手拿把掐&#xff0c;對后續 Excel 的運用…

linux的學習(七):讀取,函數,正則表達式,文本處理工具cut和awk

##簡介 shell編程中的讀取&#xff0c;函數&#xff0c;正則表達式&#xff0c;文本處理工具的簡單使用 read read&#xff1a;讀取控制臺的輸入 參數&#xff1a; -p&#xff1a;指定讀取時的提示符-t&#xff1a;等待讀取的時間 腳本例子 編寫i.sh腳本&#xff0c;enter…

算法實驗3:貪心算法的應用

實驗內容 &#xff08;1&#xff09;活動安排問題 設有n個活動的集合E{1, 2, …, n}&#xff0c;其中每個活動都要求使用同一資源&#xff0c;而在同一時間內只有一個活動能使用這一資源。每個活動i都有一個要求使用該資源的起始時間si和一個結束時間fi&#xff0c;且si <f…