springboot整合kafka多數據源

整合kafka多數據源

  • 項目背景
  • 依賴
  • 配置
  • 生產者
  • 消費者
  • 消息體

項目背景

在很多與第三方公司對接的時候,或者處在不同的網絡環境下,比如在互聯網和政務外網的分布部署服務的時候,我們需要對接多臺kafka來達到我們的業務需求,那么當kafka存在多數據源的情況,就與單機的情況有所不同。

依賴

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

單機的情況
如果是單機的kafka我們直接通過springboot自動配置的就可以使用,例如在yml里面直接引用

spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092

在使用的時候直接注入,然后就可以使用里面的方法了

    @Resourceprivate KafkaTemplate<String, String> kafkaTemplate;

在這里插入圖片描述

多數據源情況下

本篇文章主要講的是在多數據源下的使用,和單機的有所不同,我也看了網上的一些博客,但是當我去按照網上的配置的時候,總是會報錯 kafakTemplate這個bean找不到,所以沒辦法只有按照springboot自動配置里面的來改
在這里插入圖片描述

package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties = properties;this.kafkaSecondProperties = kafkaSecondProperties;}@Bean("kafkaTemplate")@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaSecondTemplate")public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaProducerListener")@Primarypublic ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaSecondProducerListener")public ProducerListener<Object, Object> kafkaSecondProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaConsumerFactory")@Primarypublic ConsumerFactory<Object, Object> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondConsumerFactory")public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("zwKafkaContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Bean("kafkaProducerFactory")@Primarypublic ProducerFactory<Object, Object> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondProducerFactory")public ProducerFactory<Object, Object> kafkaSecondProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean("kafkaAdmin")@Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}

生產者


package com.ddb.zggz.event;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;@Component
@Slf4j
public class KafkaPushEvent {@Resourceprivate KafkaTemplate<String, String> kafkaSecondTemplate;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;if ("zw".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if ("net".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture == null){throw new IllegalArgumentException("kakfa發送消息失敗");}sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka發送的message報錯,發送數據:{}", param);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka發送的message成功,發送數據:{}", param);}});}}

消費者

package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@Component
@Slf4j
public class SendMessageListener {@Autowiredprivate GzApprovalService gzApprovalService;@Autowiredprivate GzServiceService gzServiceService;@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")@RetryableTopic(include = {Exception.class},backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000))public void listen(ConsumerRecord<?, ?> consumerRecord) {String value = (String) consumerRecord.value();PushParam pushParam = JSONObject.parseObject(value, PushParam.class);//版本提審if ("version-approval".equals(pushParam.getEvent())) {ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服務下架if (pushParam.getEvent().equals("server-OffShelf-gzt")) {OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}@DltHandlerpublic void processMessage(String message) {}
}

消息體

package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;/*** @author bbd*/
@Data
public class PushParam implements Serializable {/*** 發送的消息數據*/private Object data;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)@JSONField(format = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime = LocalDateTime.now();/*** 事件名稱,用于消費者處理相關業務*/private String event;/*** 保存版本參數*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam = new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent("save-version");return pushParam;}/*** 保存服務參數*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam = new PushParam();pushParam.setData(gzService);pushParam.setEvent("save-server");return pushParam;}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/38162.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/38162.shtml
英文地址,請注明出處:http://en.pswp.cn/news/38162.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Vue-Router】路由過渡動效

在 Vue Router 中&#xff0c;你可以通過過渡動效&#xff08;Transition Effects&#xff09;為路由切換添加平滑的過渡效果&#xff0c;從而提升用戶體驗。過渡動效可以使用 Vue 的 <transition> 組件和 CSS 過渡來實現。 基本使用&#xff1a; 對導航使用動畫&#…

HTML-文本標簽

歷史上&#xff0c;網頁的主要功能是文本展示。所以&#xff0c;HTML 提供了大量的文本處理標簽。 <div> <div>是一個通用標簽&#xff0c;表示一個區塊&#xff08;division&#xff09;。它沒有語義&#xff0c;如果網頁需要一個塊級元素容器&#xff0c;又沒有…

leetcode 494. 目標和

2023.8.14 一杯茶&#xff0c;一包煙&#xff0c;一道dp做一天... ps&#xff1a;nums[i]均大于等于0。本題先轉化為0-1背包問題&#xff1a;將數組元素分成兩堆&#xff1a;一堆為正號&#xff0c;另一堆為負號。設正號堆的和為x&#xff0c;則負號堆的和為sum-x。&#xff08…

CentOS系統環境搭建(十)——CentOS7定時任務

centos系統環境搭建專欄&#x1f517;點擊跳轉 使用CentOS系統環境搭建&#xff08;九&#xff09;——centos系統下使用docker部署項目的項目做定時任務。 CentOS7定時任務 查看現有的定時任務 crontab -l編輯定時任務 crontab -e示例 每天凌晨兩點運行腳本 清理內存 0 2 *…

【Linux的開胃小菜】常用的RPM軟件包與YUM倉庫包管理器使用

一、系統初始化進程 systemd與System V init的區別以及作用&#xff1a; System V init運行級別systemd目標名稱systemd目標作用0poweroff.target關機1rescue.target單用戶模式2multi-user.target多用戶的文本界面3multi-user.target多用戶的文本界面4multi-user.target多用戶…

【SpringBoot】88、SpringBoot中使用Undertow替代Tomcat容器

SpringBoot 中我們既可以使用 Tomcat 作為 Http 服務,也可以用 Undertow 來代替。Undertow 在高并發業務場景中,性能優于 Tomcat。所以,如果我們的系統是高并發請求,不妨使用一下 Undertow,你會發現你的系統性能會得到很大的提升。 1、Tomcat 介紹 Tomcat是一個開源的Ja…

【數據結構】“單鏈表”的練習題(二)

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;個人主頁 &#xff1a;阿然成長日記 …

Django框架 靚號管理(增刪改查)

Django框架 靚號管理&#xff08;增刪改查&#xff09; 新建一個項目 backend 使用pycharm創建app startapp app項目目錄 C:\code\backend ├── app | ├── admin.py | ├── apps.py | ├── migrations | ├── models.py | ├── tests.py | ├── views.…

關于微信臨時文件wxfile://tmp文件如何處理,微信小程序最新獲取頭像和昵稱

分享-2023年資深前端進階&#xff1a;前端登頂之巔-最全面的前端知識點梳理總結&#xff0c;前端之巔 *分享一個使用比較久的&#x1fa9c; 技術棧&#xff1a;taro框架 vue3版本 解決在微信小程序獲取微信頭像時控制臺報錯&#xff1a;找不著wxfile://tmp 文件路徑,失敗&…

java spring cloud 企業電子招標采購系統源碼:營造全面規范安全的電子招投標環境,促進招投標市場健康可持續發展 tbms

? 項目說明 隨著公司的快速發展&#xff0c;企業人員和經營規模不斷壯大&#xff0c;公司對內部招采管理的提升提出了更高的要求。在企業里建立一個公平、公開、公正的采購環境&#xff0c;最大限度控制采購成本至關重要。符合國家電子招投標法律法規及相關規范&#xff0c;以…

支持M1 Syncovery for mac 文件備份同步工具

Syncovery for Mac 是一款功能強大、易于使用的文件備份和同步軟件&#xff0c;適用于需要備份和同步數據的個人用戶和企業用戶。Syncovery 提供了一個直觀的用戶界面&#xff0c;使用戶可以輕松設置備份和同步任務。用戶可以選擇備份的文件類型、備份目錄、備份頻率等&#xf…

解讀2023年上半年財報:營收凈利雙增長,珀萊雅離高端還有多遠?

夏季炎熱&#xff0c;防曬類產品的銷量暴漲。根據千牛數據&#xff0c;防曬衣今年5月全網搜索人數同比增長15%&#xff0c;加購人數同比增長29.8%&#xff0c;訪問人數同比增加42%。消費者狂熱的防曬需求&#xff0c;孕育著巨大的商機&#xff0c;許多企業開始瞄準這一機會。而…

在Windows和MacOS環境下實現批量doc轉docx,xls轉xlsx

一、引言 Python中批量進行辦公文檔轉化是常見的操作&#xff0c;在windows狀態下我們可以利用changeOffice這個模塊很快進行批量操作。 二、在Windows環境下的解決文案 Windows環境下&#xff0c;如何把doc轉化為docx&#xff0c;xls轉化為xlsx&#xff1f; 首先&#xff…

mysql三大日志—— 二進制日志binlog

binlog用于記錄數據庫執行的寫入性操作&#xff0c;由服務層進行記錄&#xff0c;通過追加的方式以二進制的形式保存在磁盤中。 binlog主要用于主從復制和數據恢復。 主從復制&#xff1a;在主機端開啟binlog&#xff0c;然后將binlog發送到各個從機&#xff0c;從機存放binl…

sykwalking8.2和mysql5.7快速部署

1.SkyWalking 是什么&#xff1f; 分布式系統的應用程序性能監視工具&#xff0c;專為微服務、云原生架構和基于容器&#xff08;Docker、K8s、Mesos&#xff09;架構而設計。 提供分布式追蹤、服務網格遙測分析、度量聚合和可視化一體化解決方案。 2.SkyWalking 有哪些功能…

Spring Task入門案例

Spring Task 是Spring框架提供的任務調度工具&#xff0c;可以按照約定的時間自動執行某個代碼邏輯。 定位&#xff1a;定時任務框架 作用&#xff1a;定時自動執行某段Java代碼 強調&#xff1a;只要是需要定時處理的場景都可以使用Spring Task 1. cron表達式 cron表達式…

Java多線程線程間的通信—wait及notify方法

線程間的相互作用 線程間的相互作用:線程之間需要一些協調通信,來共同完成一件任務。 Object類中相關的方法有兩個notify方法和三個wait方法: Object (Java Platform SE 7 ) 因為wait和notify方法定義在Object類中,因此會被所有的類所繼承。 這些方法都是final的,即它們…

樹形dp模板

285. 沒有上司的舞會 - AcWing題庫 #include<iostream> #include<cstdio> #include<cstdlib> #include<string> #include<cstring> #include<cmath> #include<ctime> #include<algorithm> #include<utility> #include&…

Visual Studio 與QT ui文件

對.ui文件鼠標右鍵&#xff0c;然后單擊 Open with…在彈出的窗口中&#xff0c;選中左側的 Qt Designer&#xff0c;然后單擊右側的 Add 按鈕&#xff0c;隨后會彈出一個窗口&#xff0c;在 Program: 輸入框中輸入 Qt Designer 的路徑&#xff0c;最后單擊 OK找到 Qt Designer…

內網ip與外網ip

一、關于IP地址 我們平時直接接觸最多的是內網IP。而且還可以自己手動修改ip地址。而外網ip&#xff0c;我們很少直接接觸&#xff0c;都是間接接觸、因為外網ip一般都是運營商管理&#xff0c;而且是全球唯一的&#xff0c;一般我們自己是無法修改的。 內網IP和外網IP是指在…