基于Kafka實現簡單的延時隊列

生命無罪,健康萬歲,我是laity。

我曾七次鄙視自己的靈魂:

第一次,當它本可進取時,卻故作謙卑;

第二次,當它在空虛時,用愛欲來填充;

第三次,在困難和容易之間,它選擇了容易;

第四次,它犯了錯,卻借由別人也會犯錯來寬慰自己;

第五次,它自由軟弱,卻把它認為是生命的堅韌;

第六次,當它鄙夷一張丑惡的嘴臉時,卻不知那正是自己面具中的一副;

第七次,它側身于生活的污泥中,雖不甘心,卻又畏首畏尾。

基于Kafka實現簡單的延時隊列

業務場景:
listener kafka 中的指定topic,接收并處理其中的message,再基于websocket向前端推送數據,前端接收到數據后將數據放置到定時隊列中,進行5s的倒計時
,情況1:時間到了進行觸發下一步的接口(該操作為自動操作)
,情況2:時間未到有人為干預進行點擊進入下一步的接口(該操作為人工操作)。
問題:當前端頁面進行切換頁面后,前端是無法將定時隊列中的數據進行存儲,從而進行清空;
解決方案:kafka的延時隊列解決切換頁面后未處理的message;
1.解決流程:
(1)listener 到 message 并處理后直接進行走一遍自動操作,
(2)并將存入庫中的saveId進行返回至websocket推向前端的JSON數據中,
(3)再通過寫好的send方法將JSONObject發送至kafka的消息隊列中,
(4)之后不論人工還是自動都進行update操作(基于saveId可以去查詢到’自動處理’時的留痕,判斷update_time的是否存在,不在則進行update(因為觸發update只有當前端時間到了或kafka延時隊列的listener))。

實現流程

1.引入pom依賴

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><optional>true</optional>
</dependency>

2.yaml進行自動配置

spring:application:# 應用名稱name: youServerNamekafka:bootstrap-servers: youKafkaIp:9092consumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

3.各各類的實現

DelayEnum.java

import lombok.AllArgsConstructor;
import lombok.Getter;/*** @author laity*/
@Getter
@AllArgsConstructor
public enum DelayEnum {FIVE_S(5, "topic_message_5s"),TEN_S(10, "topic_message_10s");private final int delay_time;private final String topic_name;
}

KafkaDelayMsg.java

import lombok.Data;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author laity */
@Data
public class KafkaDelayMsg implements Delayed {private String msg; // content - 可以換成對應的實體private DelayEnum delayEnum;private long time;public KafkaDelayMsg() {}public KafkaDelayMsg(String msg, DelayEnum delayEnum) {this.msg = msg;this.delayEnum = delayEnum;this.time = System.currentTimeMillis() + delayEnum.getDelay_time() * 1000;}/* 延時隊列實現的關鍵 */@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.time, ((KafkaDelayMsg) o).time);}
}

KafkaUtil.java

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@Slf4j
public class KafkaUtil {KafkaProducer<String, String> kafkaProducer = null;private static final Map<String, Object> map = new HashMap<>();public KafkaUtil() {map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.x.x:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());}// sendpublic void send(KafkaDelayMsg msg) {KafkaProducer producer = new KafkaProducer(map);if (msg.getDelayEnum().getDelay_time() == DelayEnum.FIVE_S.getDelay_time()) {KafkaDelayQueue.FIVE_S.add(msg);}KafkaDelayQueue.run(producer);}
}

KafkaDelayQueue.java

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author laity 設計隊列*/
@Slf4j
public class KafkaDelayQueue {public static DelayQueue<KafkaDelayMsg> FIVE_S = new DelayQueue<>();// 實際sendMsg的functionpublic static void run(KafkaProducer<String, String> producer) {ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.execute(() -> {log.info("=============== 開始推送執行 ==================");while (true) {try {KafkaDelayMsg take = FIVE_S.take();// 推送數據RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(take.getDelayEnum().getTopic_name(), take.getMsg())).get();log.info("============================= CONTENT:" + take.toString() + "-" + recordMetadata.topic() + "-" + recordMetadata.partition() + " ===============================");} catch (Exception e) {e.printStackTrace();}}});}
}

KafkaConsumer.java - 業務代碼

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;/*** @author laity 消費*/@Component
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class KafkaConsumer {private final KelCjTaskResultMapper mapper;// beanRef的功能public String getTopicName() {return DelayEnum.FIVE_S.getTopic_name();}// @KafkaListener(topics = "send_message_5s", groupId = "consumer-laity")@KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(String message) {// 1.實現你的業務}
}

總結

年輕人,你的職責是平整土地,而非焦慮時光。你做三四月的事,在八九月自有答案。我是Laity,正在前進的Laity。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/93015.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/93015.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/93015.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OceanBase 4.3.5 解析:DDL性能診斷

背景DDL操作通常耗時較長&#xff0c;特別是涉及補數據流程的DDL語句。在執行過程中&#xff0c;用戶面臨兩個主要痛點&#xff1a;一是無法實時獲取DDL執行進度&#xff0c;難以區分長時間運行是正常現象還是由內部異常導致的停滯&#xff1b;二是執行效率經常低于預期&#x…

幸福網咖訂座點餐小程序的設計與實現

文章目錄前言詳細視頻演示具體實現截圖后端框架SpringBoot微信小程序持久層框架MyBaits成功系統案例&#xff1a;參考代碼數據庫源碼獲取前言 博主介紹:CSDN特邀作者、985高校計算機專業畢業、現任某互聯網大廠高級全棧開發工程師、Gitee/掘金/華為云/阿里云/GitHub等平臺持續…

C語言————練習題冊(答案版)

目錄 每日更新5-10題&#xff0c;感興趣可以訂閱 一.理解函數、操作符、占位符 1.1 歡迎來到C語言的世界 1.2 輸入和輸出 1.3 浮點數的打印 1.4 字符串的打印 1.14 I am iron man 1.5 求和運算 1.6 計算比例 1.7 求商求余 1.8 不同數位上的數字 1.8.1 求個位數 1.8…

haproxy配置詳解

1、haproxy簡介 HAProxy是法國開發者 威利塔羅(Willy Tarreau) 在2000年使用C語言開發的一個開源軟件 是一款具備高并發(萬級以上)、高性能的TCP和HTTP負載均衡器 支持基于cookie的持久性&#xff0c;自動故障切換&#xff0c;支持正則表達式及web狀態統計 企業版網站&#xff…

計網-TCP可靠傳輸

TCP&#xff08;傳輸控制協議&#xff09;的可靠傳輸是通過一系列機制保證數據準確、有序、不丟失地到達接收方。以下是TCP可靠傳輸的詳細過程及核心機制&#xff1a;1. 數據分塊與序列號&#xff08;Seq&#xff09;分塊&#xff1a;應用層數據被分割成適合傳輸的TCP報文段&am…

數智管理學(三十九)

第三章 數智化對管理理論的沖擊第三節 系統理論與生態化管理的強化系統理論作為理解企業運作與環境互動的重要框架&#xff0c;一直強調企業是一個由多個相互關聯子系統構成的整體&#xff0c;其核心要素包括整體性、開放性、動態性和反饋機制。在傳統管理視角下&#xff0c;這…

哈希表(c語言)

文章目錄哈希表哈希表知識點哈希表概念負載因子哈希表的優缺點哈希沖突哈希函數常見哈希函數處理哈希沖突開放定址法線性探測二次探測鏈地址法哈希表的實現哈希表的核心:HashMap核心函數&#xff1a;從創建到銷毀創建哈希表&#xff1a;hashmap_create()銷毀哈希表:hashmap_des…

【Canvas與旗幟】條紋版大明三辰旗

【成圖】【代碼】<!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>十三條紋版大明三辰旗 Draft1</title><style type"text/…

【Java】空指針(NullPointerException)異常深度攻堅:從底層原理到架構級防御,老司機的實戰經驗

寫Java代碼這些年&#xff0c;空指針異常&#xff08;NullPointerException&#xff09;就像甩不掉的影子。線上排查問題時&#xff0c;十次有八次最后定位到的都是某個對象沒處理好null值。但多數人解決問題只停留在加個if (obj ! null)的層面&#xff0c;沒從根本上想過為什么…

【NLP輿情分析】基于python微博輿情分析可視化系統(flask+pandas+echarts) 視頻教程 - 主頁-評論用戶時間占比環形餅狀圖實現

大家好&#xff0c;我是java1234_小鋒老師&#xff0c;最近寫了一套【NLP輿情分析】基于python微博輿情分析可視化系統(flaskpandasecharts)視頻教程&#xff0c;持續更新中&#xff0c;計劃月底更新完&#xff0c;感謝支持。今天講解主頁-評論用戶時間占比環形餅狀圖實現 視頻…

Redis面試精講 Day 5:Redis內存管理與過期策略

【Redis面試精講 Day 5】Redis內存管理與過期策略 開篇 歡迎來到"Redis面試精講"系列的第5天&#xff01;今天我們將深入探討Redis內存管理與過期策略&#xff0c;這是面試中經常被問及的核心知識點。對于后端工程師而言&#xff0c;理解Redis如何高效管理內存、處…

ICMPv6報文類型詳解表

一、錯誤報文類型&#xff08;Type 1-127&#xff09;Type值名稱Code范圍觸發條件示例典型用途1Destination Unreachable0-60: 無路由到目標1: 通信被管理員禁止2: 地址不可達3: 端口不可達4: 分片需要但DF標志設置5: 源路由失敗6: 目的地址不可達網絡故障診斷2Packet Too Big0…

配置nodejs

第一步確認 node.exe 和 npm 存在 例如安裝目錄D:\nodejs檢查是否存在以下文件&#xff1a; node.exenpm.cmdnpx.cmd 第二步&#xff1a;添加環境變量 PATH 圖形化操作步驟&#xff08;Windows&#xff09;&#xff1a; 右鍵「此電腦」→「屬性」點擊左側 「高級系統設置」彈出…

MySQL的命令行客戶端

MySQL中的一些程序&#xff1a;MySQL在安裝完成的時候&#xff0c;一般都會包含如下程序&#xff1a;在Linux系統下&#xff0c;通過/usr/bin目錄下&#xff0c;可以通過命令查看&#xff1a;以下是常用的MySQL程序&#xff1a;程序名作用mysqldMySQL的守護進程即MySQL服務器&a…

C# 值類型與引用類型的儲存方式_堆棧_

目錄 值類型 引用類型 修改stu3的值 stu也被修改了 為什么? &#xff08;對象之間&#xff09; 值類型中&#xff0c;值全在棧中單獨存儲&#xff0c;變量之間不會影響 結構體中&#xff0c;結構體全在棧中&#xff0c;結構體與結構體之間也不會相互影響 靜態資源區 值類…

解鎖永久會員的白噪音軟件:睡眠助手

如今的年輕人壓力普遍較大&#xff0c;學會解壓至關重要。這期就為大家推薦一款優秀的白噪音軟件&#xff0c;在壓力大時聽聽&#xff0c;能起到不錯的解壓效果。 睡眠助手 文末獲取 這款軟件的特別版本十分出色&#xff0c;知曉的人不多。它已解鎖永久會員&#xff0c;無需登…

uniapp使用css實現進度條帶動畫過渡效果

一、效果 二、實現原理 1.uni.createAnimation 動畫函數 2.初始化uni.createAnimation方法 3.監聽值的變化調用動畫執行方法 三、代碼 1.實現方式比較簡單&#xff0c;目前是vue3的寫法&#xff0c;vue2只需要稍微改動即可 <template><view class"layout_progre…

高級分布式系統調試:調試的科學與 USE 方法實戰

高級分布式系統調試:調試的科學與 USE 方法實戰 前言:從“救火”到“探案” 當一個復雜的分布式系統出現“灰色故障”——例如“服務有時會變慢”、“偶爾出現超時錯誤”——我們該從何處著手?隨機地查看 Grafana 儀表盤,或者漫無目的地 tail -f 日志,往往效率低下,甚至…

棧算法之【有效括號】

目錄 LeetCode-20題 LeetCode-20題 給定一個只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判斷字符串是否有效。 有效字符串需滿足&#xff1a; 左括號必須用相同類型的右括號閉合。 左括號必須以正確的順序閉合。 每…

大模型——Data Agent:超越 BI 與 AI 的邊界

Data Agent:超越 BI 與 AI 的邊界 1. 數據工具的演進路徑 在數據分析領域,技術工具經歷了多個階段的演進。這些演進不僅反映了技術的進步,也體現了用戶需求和使用場景的變化。 Excel 時代:告別手工作業,陷入“表格泥潭“,早期數據分析依賴 Excel,實現基礎數據記錄、計…