java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止輸出 debug 日志

pom 依賴:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>

?或者

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>

?ps:前面的?spring-kafka 依賴中已經包含了后面的 kafka-clients

KafkaConsumerDemo.java:


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 環境需要將下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:*      xxx.xxx.xxx.xxx1 xxx-data01*      xxx.xxx.xxx.xxx2 xxx-data02*      xxx.xxx.xxx.xxx3 xxx-data03*      xxx.xxx.xxx.xxx4 xxx-data04*      xxx.xxx.xxx.xxx5 xxx-data05*      xxx.xxx.xxx.xxx6 xxx-data06*      xxx.xxx.xxx.xxx7 xxx-data07*      xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制臺輸出一些 org.apache.kafka.xxx 相關的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192");  // 指定 Brokerproperties.put("group.id", "11111111111111111111111");              // 指定消費組群 ID,為防止自己啟動拉取消息導致其他生產環境的消費者無法消費該消息,請設置一個絕對不重復的值,以起到隔離的作用properties.put("max.poll.records", "1000");// todo 設置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 將 key 的字節數組轉成 Java 對象properties.put("value.deserializer", StringDeserializer.class);  // 將 value 的字節數組轉成 Java 對象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) );  // 訂閱主題 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查詢全部的主題(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超時時間為 30秒,有消息立即返回,沒消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 條消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "條消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 當你用 KafkaConsumer從Kafka里讀取消息并且處理完后,commitSync 方法會幫你把這些消息的處理進度(也就是偏移量 offset )同步地告訴 Kafka 服務器。* 這樣,Kafka 就知道你已經處理到哪兒了。如果消費者(也就是讀取消息的程序)突然崩潰或者重啟,Kafka 就能根據最后一次提交的偏移量,讓你從上一次處理* 完的地方繼續開始,而不會漏掉或者重復處理消息。* 簡單來說,commitSync 方 法就是用來“保存進度”的,確保消息處理的可靠性和順序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/65823.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/65823.shtml
英文地址,請注明出處:http://en.pswp.cn/web/65823.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

車聯網安全--TLS握手過程詳解

目錄 1. TLS協議概述 2. 為什么要握手 2.1 Hello 2.2 協商 2.3 同意 3.總共握了幾次手&#xff1f; 1. TLS協議概述 車內各ECU間基于CAN的安全通訊--SecOC&#xff0c;想必現目前多數通信工程師們都已經搞的差不多了&#xff08;不要再問FvM了&#xff09;&#xff1b;…

RuoYi Cloud項目解讀【四、項目配置與啟動】

四、項目配置與啟動 當上面環境全部準備好之后&#xff0c;接下來就是項目配置。需要將項目相關配置修改成當前相關環境。 1 后端配置 1.1 數據庫 創建數據庫ry-cloud并導入數據腳本ry_2024xxxx.sql&#xff08;必須&#xff09;&#xff0c;quartz.sql&#xff08;可選&…

C#對象池

一、資源管理的困境與破局 在軟件開發的征程中&#xff0c;我們時常陷入資源管理的泥沼。以一個繁忙餐廳為例&#xff0c;每個顧客都急需一個盤子盛美食&#xff0c;可盤子數量有限&#xff0c;如果每次顧客用完盤子后&#xff0c;都不假思索地去清洗一個全新的盤子來供下一位…

Vue.js組件開發-如何使用moment.js

在Vue.js組件開發中&#xff0c;需要處理日期和時間&#xff0c;moment.js 是一個非常有用的庫。moment.js 提供了豐富的API來解析、驗證、操作和顯示日期和時間。 步驟&#xff1a; 1. 安裝moment.js 首先&#xff0c;需要通過npm或yarn安裝moment.js。在項目根目錄下運行以…

微信小程序mp3音頻播放組件,僅需傳入url即可

// index.js // packageChat/components/audio-player/index.js Component({/*** 組件的屬性列表*/properties: {/*** MP3 文件的 URL*/src: {type: String,value: ,observer(newVal, oldVal) {if (newVal ! oldVal && newVal) {// 如果 InnerAudioContext 已存在&…

要避免除數絕對值遠遠小于被除數絕對值的除法

要避免除數絕對值遠遠小于被除數絕對值的除法 用絕對值小的數作除數&#xff0c;舍人誤差會增大&#xff0c;如計算 x y \frac xy yx?,若 0 < ∣ y ∣ < ∣ x ∣ 0<|y|<|x| 0<∣y∣<∣x∣&#xff0c;則可能對計算結果帶來嚴重影響&#xff0c;應盡量避免…

深入了解OpenStack中的隧道網絡

在OpenStack環境中&#xff0c;隧道網絡是一項關鍵技術&#xff0c;它確保了虛擬機之間以及虛擬機與外部網絡之間的安全通信。通過隧道機制&#xff0c;我們可以有效地隔離不同租戶的流量&#xff0c;并支持多租戶環境下的復雜網絡需求。之前我們介紹了隧道網絡&#xff0c;下面…

4. scala高階之隱式轉換與泛型

背景 上一節&#xff0c;我介紹了scala中的面向對象相關概念&#xff0c;還有一個特色功能&#xff1a;模式匹配。本文&#xff0c;我會介紹另外一個特別強大的功能隱式轉換&#xff0c;并在最后介紹scala中泛型的使用 1. 隱式轉換 Scala提供的隱式轉換和隱式參數功能&#…

pandas與sql對應關系【幫助sql使用者快速上手pandas】

本頁旨在提供一些如何使用pandas執行各種SQL操作的示例&#xff0c;來幫助SQL使用者快速上手使用pandas。 目錄 SQL語法一、選擇SELECT1、選擇2、添加計算列 二、連接JOIN ON1、內連接2、左外連接3、右外連接4、全外連接 三、過濾WHERE1、AND2、OR3、IS NULL4、IS NOT NULL5、B…

第432場周賽:跳過交替單元格的之字形遍歷、機器人可以獲得的最大金幣數、圖的最大邊權的最小值、統計 K 次操作以內得到非遞減子數組的數目

Q1、跳過交替單元格的之字形遍歷 1、題目描述 給你一個 m x n 的二維數組 grid&#xff0c;數組由 正整數 組成。 你的任務是以 之字形 遍歷 grid&#xff0c;同時跳過每個 交替 的單元格。 之字形遍歷的定義如下&#xff1a; 從左上角的單元格 (0, 0) 開始。在當前行中向…

《探索鴻蒙Next上開發人工智能游戲應用的技術難點》

在科技飛速發展的當下&#xff0c;鴻蒙Next系統為應用開發帶來了新的機遇與挑戰&#xff0c;開發一款運行在鴻蒙Next上的人工智能游戲應用更是備受關注。以下是在開發過程中可能會遇到的一些技術難點&#xff1a; 鴻蒙Next系統適配性 多設備協同&#xff1a;鴻蒙Next的一大特色…

Harry技術添加存儲(minio、aliyun oss)、短信sms(aliyun、模擬)、郵件發送等功能

Harry技術添加存儲&#xff08;minio、aliyun oss&#xff09;、短信sms&#xff08;aliyun、模擬&#xff09;、郵件發送等功能 基于SpringBoot3Vue3前后端分離的Java快速開發框架 項目簡介&#xff1a;基于 JDK 17、Spring Boot 3、Spring Security 6、JWT、Redis、Mybatis-P…

Vue2: el-table為每一行添加超鏈接,并實現光標移至文字上時改變形狀

為表格中的某一列添加超鏈接 一個表格通常有許多列,網上許多教程都可以實現為某一列添加超鏈接,如下,實現了當光標懸浮在“姓名”上時,改變為手形,點擊可實現跳轉。 <el-table :data="tableData"><el-table-column label="姓名" prop=&quo…

R數據分析:多分類問題預測模型的ROC做法及解釋

有同學做了個多分類的預測模型,結局有三個類別,做的模型包括多分類邏輯回歸、隨機森林和決策樹,多分類邏輯回歸是用ROC曲線并報告AUC作為模型評估的,后面兩種模型報告了混淆矩陣,審稿人就提出要統一模型評估指標。那么肯定是統一成ROC了,剛好借這個機會給大家講講ROC在多…

A3. Springboot3.x集成LLama3.2實戰

本文將介紹集成ollama官網提供的API在Springboot工程中進行整合。由于沒找到java-llama相關合適的sdk可以使用,因此只好對接官方給出的API開發一套RESTFull API服務。下面將從Ollama以下幾個API展開介紹,逐漸的了解其特性以及可以干些什么。具體llama API說明可參數我前面寫的…

面試:類模版中函數聲明在.h,定義在.cpp中,其他cpp引用引入這個頭文件,會有什么錯誤?

1、概述 類模版中函數聲明在.h&#xff0c;定義在.cpp中&#xff0c;其他cpp引用引入這個頭文件&#xff0c;會有什么錯誤?報編譯錯誤&#xff1a;error C2512: Demo<int>: no appropriate default constructor available 舉例如下代碼&#xff1a;demo.h 聲明模版類 …

記一次學習skynet中的C/Lua接口編程解析protobuf過程

1.引言 最近在學習skynet過程中發現在網絡收發數據的過程中數據都是裸奔&#xff0c;就想加入一種數據序列化方式&#xff0c;json、xml簡單好用&#xff0c;但我就是不想用&#xff0c;于是就想到了protobuf&#xff0c;對于protobuf C/C的使用個人感覺有點重&#xff0c;正好…

SQLAlchemy

https://docs.sqlalchemy.org.cn/en/20/orm/quickstart.htmlhttps://docs.sqlalchemy.org.cn/en/20/orm/quickstart.html 聲明模型 在這里&#xff0c;我們定義模塊級構造&#xff0c;這些構造將構成我們從數據庫中查詢的結構。這種結構被稱為 聲明式映射&#xff0c;它同時定…

Trimble自動化激光監測支持歷史遺產實現可持續發展【滬敖3D】

故事橋&#xff08;Story Bridge&#xff09;位于澳大利亞布里斯班&#xff0c;建造于1940年&#xff0c;全長777米&#xff0c;橫跨布里斯班河&#xff0c;可載汽車、自行車和行人往返于布里斯班的北部和南部郊區。故事橋是澳大利亞最長的懸臂橋&#xff0c;是全世界兩座手工建…

CentOS 和 Ubantu你該用哪個

文章目錄 **一、CentOS 和 Ubuntu 的詳細介紹****1. CentOS****1.1 基本信息****1.2 特點****1.3 缺點** **2. Ubuntu****2.1 基本信息****2.2 特點****2.3 缺點** **二、CentOS 和 Ubuntu 的異同****1. 相同點****2. 不同點****3. 使用體驗對比** **三、總結和選擇建議** Cent…