一個簡單的spring+kafka生產者

1. pom

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. 生產者

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.xxx.npi.module.common.msg.dto.MsgBase;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class MyMessageProducerService {@Value("${npi.default-url}")private String domain;private final KafkaTemplate<String, String> kafkaTemplate;public MyMessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public <T extends MsgBase> void sendMessage(String topicName, T msgObj) {List<T> list = new ArrayList<>();list.add(msgObj);if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}public <T extends MsgBase> void sendMessage(String topicName, List<T> list) {if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}private String toJsonString(Object obj) {return JSON.toJSONString(obj,SerializerFeature.WriteDateUseDateFormat,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteNullListAsEmpty,SerializerFeature.WriteNullStringAsEmpty,SerializerFeature.DisableCircularReferenceDetect);}}

3. 配置

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.retries}")private int retries;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.batch-size}")private int batchSize;@Value("${spring.kafka.producer.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.producer.security.protocol}")private String securityProtocol;@Value("${spring.kafka.producer.ssl.truststore.location}")private Resource sslTruststoreLocationResource;@Value("${spring.kafka.producer.ssl.truststore.password}")private String sslTruststorePassword;@Value("${spring.kafka.producer.sasl.mechanism}")private String saslMechanism;@Value("${spring.kafka.producer.sasl.jaas.config}")private String saslJaasConfig;@SuppressWarnings({"unchecked", "rawtypes"})@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate(producerFactory());}@SuppressWarnings("unchecked")@Beanpublic ProducerFactory<String, String> producerFactory() {@SuppressWarnings("rawtypes")DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());// factory.transactionCapable();// factory.setTransactionIdPrefix("transaction-");return factory;}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put("bootstrap.servers", servers);props.put("acks", acks);props.put("retries", retries);props.put("batch.size", batchSize);props.put("linger.ms", lingerMs);props.put("buffer.memory", bufferMemory);props.put("key.serializer", keySerializer);props.put("value.serializer", valueSerializer);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);// 如果需要更低級別的消息丟失防護,可以啟用冪等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// SSL配置props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");try {// 將類路徑資源轉換為臨時文件路徑InputStream inputStream = sslTruststoreLocationResource.getInputStream();File tempFile = File.createTempFile("client_truststore", ".jks");Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tempFile.getAbsolutePath());props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);} catch (IOException e) {throw new RuntimeException("Failed to locate truststore file", e);}return props;}
}

4. application

spring:kafka:producer:bootstrap-servers: n2.ikt.xxx.com:9092, n3.ikt.xxx.com:9092, n4.ikt.xxx.com:9092, n5.ikt.xxx.com:9092, n6.ikt.xxx.com:9092acks: allretries: 3batch-size: 16384linger-ms: 1buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializersecurity.protocol: SASL_SSLssl.truststore.location: classpath:client_truststore.jksssl.truststore.password: pwdsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='pwd';topic:br: mdscinpi.mdscinpi-data.tstmem: mdscinpi.msdcinpi-data.tstfbr: mdscinpi.inpi-data.tstcr: mdscinpi.npi-data.tst

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/42097.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/42097.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/42097.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

https 自簽證書相關生成csr文件、p12文件、crt文件、jks文件、key文件、pem文件

文章目錄 前言https 自簽證書相關生成csr文件、p12文件、crt文件、jks文件、key文件、pem文件1, 檢查openssl的版本2. 生成私鑰和證書簽署請求 (CSR)3. 生成自簽名證書4. 將證書和私鑰轉換為 PKCS12 格式的密鑰庫5. 創建信任庫 (Truststore)6. 將 PKCS12 文件轉換為 JKS 文件7.…

IDEA安裝IDE Eval Reset插件,30天自動續期,無限激活

第一步&#xff1a; 下載idea 注意&#xff1a;版本要是2021.2.2以下 第二步&#xff1a;快捷鍵CtrlAlts打開設置 第三步&#xff1a;打開下圖中藍色按鈕 第四步&#xff1a;點擊彈窗的 “” &#xff0c;并輸入 plugins.zhile.io 點擊 “ok” 第五步&#xff1a;搜索IDE Ea…

前端必修技能:高手進階核心知識分享 - CSS mix-blend-mode 圖片混合模式詳解

標簽定義及使用說明 mix-blend-mode 屬性描述了元素的內容應該與元素的直系父元素的內容和元素的背景如何混合。 語法 mix-blend-mod: 使用mix-blend-mode 各種混合模式實例 注意: Internet Explorer 或 Edge 瀏覽器不支持 mix-blend-mode 屬性。 &#xff08;還是那個熟…

AJAX-個人版-思路步驟整理版

前置知識&#xff1a;老式的web創建工程方法就是創建項目然后添加web工件&#xff0c;然后添加lib依賴如&#xff1a;tomcat,servlet&#xff0c;等。 傳統請求 對于傳統請求操作&#xff1a;整體流程也就是創建靜態頁面&#xff0c; <!DOCTYPE html> <html lang&q…

CSS技巧:用CSS繪制超寫實的酷炫徽章緞帶效果,超漂亮,超酷炫

為什么要用CSS來畫個徽章&#xff1f;這貨腦子進水了吧&#xff01; 今天在電腦前設計&#xff0c;要做徽章效果。突然覺得可以嘗試用css實現近似的效果。說干就干&#xff0c;打開編輯器&#xff0c;讓我的手指頭活躍起來&#xff01; 技術要點 通過多個圓形嵌套和漸變屬性…

【Rust練習】1.變量綁定與解構

地址&#xff1a;https://practice-zh.course.rs/variables.html &#x1f31f; 變量只有在初始化后才能被使用 // 修復下面代碼的錯誤并盡可能少的修改 fn main() {let x: i32; // 未初始化&#xff0c;但被使用let y: i32; // 未初始化&#xff0c;也未被使用println!(&quo…

WIN32核心編程 - 線程操作(一) 線程信息 - 線程控制

公開視頻 -> 鏈接點擊跳轉公開課程博客首頁 -> 鏈接點擊跳轉博客主頁 目錄 Thread Thread Control 創建 - Create 執行 - Execute 掛起 - Suspend 恢復 - Resume 終止 - Terminate 遠程 - Remote Thread Info GetCurrentThread/Id GetThreadContext CreateToo…

Vue iview-ui 被tooltip包裹的標題,點擊跳轉后,提示框不消失

tooltip包裹的標題&#xff0c;點擊跳轉后&#xff0c;提示框不消失 就會有這種顯示問題 下面這種錯誤方法不可行&#xff0c;解決辦法往下翻 css寫得沒錯&#xff0c;問題出在Javascript當中的 getElementsByClassName(“xxabc”)&#xff0c; 這個方法得到的是一個由class&q…

【Android】【WIFI】檢查 SDIO 設備的狀態

檢查 SDIO 設備的狀態 要檢查 Android 設備上 SDIO 設備的狀態&#xff0c;可以使用 ADB 命令來獲取系統信息。以下是一些示例命令&#xff1a; 列出 SDIO 設備 adb shell cat /proc/devices | grep sdio檢查 SDIO 模塊是否加載 adb shell lsmod | grep sdio獲取 SDIO 相關的…

IDEA中使用Maven打包及碰到的問題

1. 項目打包 IDEA中&#xff0c;maven打包的方式有兩種&#xff0c;分別是 install 和 package &#xff0c;他們的區別如下&#xff1a; install 方式 install 打包時做了兩件事&#xff0c;① 將項目打包成 jar 或者 war&#xff0c;打包結果存放在項目的 target 目錄下。…

自閉癥在生活中的典型表現

自閉癥&#xff0c;這個看似遙遠卻又悄然存在于我們周圍的疾病&#xff0c;其影響深遠且復雜。在日常生活中&#xff0c;自閉癥患者的典型表現往往讓人印象深刻&#xff0c;這些表現不僅揭示了他們內心的世界&#xff0c;也提醒我們要以更加包容和理解的心態去面對他們。 首先…

R語言4.3.0保姆級安裝教程,包含安裝包

[軟件名稱]&#xff1a;R語言4.3.0 R是用于統計分析、繪圖的語言和操作環境。R是屬于GNU系統的一個自由、免費、源代碼開放的軟件&#xff0c;它是一個用于統計計算和統計制圖的優秀工具。 獲取鏈接: https://pan.quark.cn/s/180306f47179 安裝步驟: 1.解壓壓縮包。 2.進入…

EtherCAT轉Profinet網關配置說明第二講:上位機軟件配置

EtherCAT協議轉Profinet協議網關模塊&#xff08;XD-ECPNS20&#xff09;&#xff0c;不僅可以實現數據之間的通信&#xff0c;還可以實現不同系統之間的數據共享。EtherCAT協議轉Profinet協議網關模塊&#xff08;XD-ECPNS20&#xff09;具有高速傳輸的特點&#xff0c;因此通…

iOS開發語言基礎與Xcode工具初探

在iOS開發的世界里&#xff0c;Swift語言和Xcode開發工具是每個開發者旅程的起點。Swift&#xff0c;一種由Apple設計的編程語言&#xff0c;以其簡潔的語法和強大的性能&#xff0c;成為了iOS開發的首選語言。而Xcode&#xff0c;則是Apple官方提供的集成開發環境&#xff08;…

Spring的核心概念理解案列

IDEA開發的簡單“登陸成功”小項目 IDEA項目結構&#xff1a; 每一部分代碼和相應的解讀&#xff1a; com.itTony文件下有dao&#xff08;實體&#xff09;層&#xff0c;service&#xff08;服務&#xff09;層&#xff0c;編寫的2個類&#xff08;HelloSpring和TestSpring&…

docker容器相關命令1(小記)

docker run 只在第一次運行時使用&#xff0c;將鏡像放到容器中&#xff0c;以后再次啟動這個容器時&#xff0c;只需要使用命令docker start即可。 docker run -it … /bin/bash &#xff1a;表示創建并啟動容器直接進入容器的命令行&#xff0c;命令行中exit就是退出容器&…

運維鍋總詳解CPU

本文從CPU簡介、衡量CPU性能指標、單核及多核CPU工作流程、如何平衡 CPU 性能和防止CPU過載、為什么計算密集型任務要選擇高頻率CPU、超線程技術、CPU歷史演進及摩爾定律等方面對CPU進行詳細分析。希望對您有所幫助&#xff01; 一、CPU簡介 CPU&#xff08;中央處理器&#…

要想貴人相助,首先自己得先成為貴人!

點擊上方△騰陽 關注 轉載請聯系授權 在金庸江湖里&#xff0c;有兩位大俠&#xff0c;一個是蕭峰&#xff0c;一個是郭靖。 郭靖在《射雕英雄傳》里是絕對的主角&#xff0c;在《神雕俠侶》當中也是重要的配角&#xff0c;甚至可以說是第二主角。 談起郭靖&#xff0c;很多…

昇思MindSpore學習入門-評價指標

當訓練任務結束&#xff0c;常常需要評價函數&#xff08;Metrics&#xff09;來評估模型的好壞。不同的訓練任務往往需要不同的Metrics函數。例如&#xff0c;對于二分類問題&#xff0c;常用的評價指標有precision&#xff08;準確率&#xff09;、recall&#xff08;召回率&…

20240706 每日AI必讀資訊

&#x1f680;Meta 發布 AI 重磅炸彈&#xff1a;多標記預測模型現已開放研究 - 新技術采用多標記預測方法&#xff0c;有望提高性能并縮短訓練時間。 - 模型同時預測多個未來單詞&#xff0c;可能改善語言結構和上下文理解。 - multi-token prediction模型是Facebook基于大…