java 中多線程、 隊列使用實例,處理大數據業務

場景: 從redis 訂閱數據 調用線程來異步處理數據

直接上代碼

定義線程管理類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.*;/*** Created with IntelliJ IDEA.* @Description 線程池管理類*/
@Component
public class ThreadPoolManager implements BeanFactoryAware {private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);//用于從IOC里取對象private BeanFactory factory; //如果實現Runnable的類是通過spring的application.xml文件進行注入,可通過 factory.getBean()獲取,這里只是提一下// 線程池維護線程的最少數量 (根據環境而定)private final static int CORE_POOL_SIZE = 10;// 線程池維護線程的最大數量 (根據環境而定)private final static int MAX_POOL_SIZE = 50;// 線程池維護線程所允許的空閑時間private final static int KEEP_ALIVE_TIME = 0;// 線程池所使用的緩沖隊列大小 (此處隊列設置 需要考慮處理數據的效率  內存的大小)private final static int WORK_QUEUE_SIZE = 99999;@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {factory = beanFactory;}// 消息隊列public LinkedBlockingQueue<String> getMsgQueue() {return msgQueue;}LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();/*** 當線程池的容量滿了,執行下面代碼,將推送數據存入到緩沖隊列*/final RejectedExecutionHandler handler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {String  temp = ((MsgHandleThread) r).getRecord();if (StringUtils.isEmpty(temp)) {msgQueue.offer(temp);}}};/*** 創建線程池*/final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);/*** 將任務加入線程池---執行數據處理*/public void addPushRecord(String  record) {MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}/*** 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執行任務的需求。*/final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);/*** 檢查(調度線程池),每秒執行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池*/final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判斷緩沖隊列是否存在記錄if (!msgQueue.isEmpty()) {//當線程池的隊列容量少于WORK_QUEUE_SIZE,則開始把緩沖隊列的訂單 加入到 線程池if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {String record = msgQueue.poll();MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}}}}, 0, 1, TimeUnit.SECONDS);/*** 終止訂單線程池+調度線程池*/public void shutdown() {//true表示如果定時任務在執行,立即中止,false則等待任務結束后再停止scheduledFuture.cancel(false);scheduler.shutdown();threadPool.shutdown();}
}
任務處理類
/*** Created with IntelliJ IDEA.* @Description 訂閱數據 處理*/
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);private IDataHandleService _serviceprivate String record;public SubCheckDataThread(String  _record) {this.record = _record;}public String getRecord() {return record;}@Overridepublic void run() {try {if (StringUtils.isEmpty(this.record)) {return;}// 無法注入是采用此方法if (_service== null) {_service= ApplicationContextProvider.getBean(IDataHandleService .class);}//TODO 具體業務logger.info("消費完成",record);} catch (Exception e) {e.printStackTrace();}}
}
調用
import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;/*** <p>* 訂閱redis消息* </p>** @Author: zhuYaqiang* @Date: 2024/06/12*/
@Component
public class SubscribeCheckData {@Autowiredprivate ThreadPoolManager threadPoolManager;/**** @Description:  查崗信息訂閱---redis* @Param: [message]* @return: void* @Author: zhuYaqiang* @Date: 2024/06/12*/public void receiveMessage(String message) {try {threadPoolManager.addPushRecord(message);} catch (Exception e) {e.printStackTrace();}}}
redis 訂閱消息后調用線程池處理數據
package com.yicheng.subscribeRedis;import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** @title RedisSubscribeCHeck* @description* @create 2024/6/12 19:30*/
@Configuration
public class RedisMessageListener {@Autowiredprivate SetProperties setProperties;@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);listenerCheckAdapter.afterPropertiesSet();listenerAlarmNoticeAdapter.afterPropertiesSet();//訂閱了的通道// 訂閱查崗數據container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));//這個container 可以添加多個 messageListenerreturn container;}/*** 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法* 監聽查崗消息* @param receiver* @return*/@BeanMessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}/*** 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法*   監聽報警通知信息* @param receiver* @return*/@BeanMessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}}

以上代碼已在實際項目中使用,覺得有用的點贊收藏評論

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66799.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66799.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66799.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【自動駕駛】4 智駕生態概述

目錄 1 智駕生態概述 ▲ 關鍵組成部分 ▲ 概述 2 關鍵技術 ▲ 傳感器 ▲ 感知 ▲ 數據閉環 3 未來市場 1 智駕生態概述 智能駕駛生態&#xff0c;簡稱智駕生態&#xff0c;是指圍繞智能駕駛技術的開發、應用、服務和支持所形成的產業體系和合作網絡。 涵蓋了從硬件設…

2025.1.20——一、[RCTF2015]EasySQL1 二次注入|報錯注入|代碼審計

題目來源&#xff1a;buuctf [RCTF2015]EasySQL1 目錄 一、打開靶機&#xff0c;整理信息 二、解題思路 step 1&#xff1a;初步思路為二次注入&#xff0c;在頁面進行操作 step 2&#xff1a;嘗試二次注入 step 3&#xff1a;已知雙引號類型的字符型注入&#xff0c;構造…

”彩色的驗證碼,使用pytesseract識別出來的驗證碼內容一直是空“的解決辦法

問題&#xff1a;彩色的驗證碼&#xff0c;使用pytesseract識別出來的驗證碼內容一直是空字符串 原因&#xff1a;pytesseract只識別黑色部分的內容 解決辦法&#xff1a;先把彩色圖片精確轉換成黑白圖片。再將黑白圖片進行反相&#xff0c;將驗證碼部分的內容變成黑色&#…

Unity3D項目開發中的資源加密詳解

前言 在Unity3D游戲開發中&#xff0c;保護游戲資源不被非法獲取和篡改是至關重要的一環。資源加密作為一種有效的技術手段&#xff0c;可以幫助開發者維護游戲的知識產權和安全性。本文將詳細介紹Unity3D項目中如何進行資源加密&#xff0c;并提供相應的技術詳解和代碼實現。…

RabbitMQ 在實際應用時要注意的問題

1. 冪等性保障 1.1 冪等性介紹 冪等性是數學和計算機科學中某些運算的性質,它們可以被多次應?,?不會改變初始應?的結果. 應?程序的冪等性介紹 在應?程序中,冪等性就是指對?個系統進?重復調?(相同參數),不論請求多少次,這些請求對系統的影響都是相同的效果. ?如數據庫…

AIGC視頻生成明星——Emu Video模型

大家好&#xff0c;這里是好評筆記&#xff0c;公主號&#xff1a;Goodnote&#xff0c;專欄文章私信限時Free。本文詳細介紹Meta的視頻生成模型Emu Video&#xff0c;作為Meta發布的第二款視頻生成模型&#xff0c;在視頻生成領域發揮關鍵作用。 &#x1f33a;優質專欄回顧&am…

Debian 上安裝PHP

1、安裝軟件源拓展工具 apt -y install software-properties-common apt-transport-https lsb-release ca-certificates 2、添加 Ond?ej Sur 的 PHP PPA 源&#xff0c;需要按一次回車&#xff1a; add-apt-repository ppa:ondrej/php 3、更新軟件源緩存&#xff1a; apt-g…

office 2019 關閉word窗口后卡死未響應

最近關閉word文件總是出現卡死未響應的狀態&#xff0c;必須從任務管理器才能殺掉word 進程&#xff0c;然后重新打開word再保存&#xff0c;很是麻煩。&#xff08;#其他特征&#xff0c;在word中打字會特別變慢&#xff0c;敲擊鍵盤半秒才出現字符。&#xff09; office官網…

SecureUtil.aes數據加密工具類

數據加密、解密工具類 包含map和vo的數據轉換 import cn.hutool.core.bean.BeanUtil; import cn.hutool.crypto.SecureUtil;import java.util.HashMap; import java.util.Map;/*** 數據解析**/ public class ParamUtils {/*** 數據解密** param params 參數* param secretKe…

機器學習:支持向量機

支持向量機&#xff08;Support Vector Machine&#xff09;是一種二類分類模型&#xff0c;其基本模型定義為特征空間上的間隔最大的廣義線性分類器&#xff0c;其學習策略便是間隔最大化&#xff0c;最終可轉化為一個凸二次規劃問題的求解。 假設兩類數據可以被 H x : w T x…

SQL-leetcode—1148. 文章瀏覽 I

1148. 文章瀏覽 I Views 表&#xff1a; ---------------------- | Column Name | Type | ---------------------- | article_id | int | | author_id | int | | viewer_id | int | | view_date | date | ---------------------- 此表可能會存在重復行。&#xff08;換句話說…

k8s資源預留

k8s資源預留 https://kubernetes.io/zh-cn/docs/tasks/administer-cluster/reserve-compute-resources/ vim /var/lib/kubelet/config.yamlenforceNodeAllocatable: - pods kubeReserved: # 配置 kube 資源預留cpu: 500mmemory: 1Giephemeral-storage: 1Gi systemReserved: #…

[STM32 HAL庫]串口空閑中斷+DMA接收不定長數據

一、空閑中斷 STM32的串口具有空閑中斷&#xff0c;什么叫做空閑呢&#xff1f;如何觸發空閑中斷呢&#xff1f; 空閑&#xff1a;串口發送的兩個字符之間間隔非常短&#xff0c;所以在兩個字符之間不叫空閑。空閑的定義是總線上在一個字節的時間內沒有再接收到數據。觸發條件…

Unity Line Renderer Component入門

Overview Line Renderer 組件是 Unity 中用于繪制連續線段的工具。它通過在三維空間中的兩個或兩個以上的點的數組&#xff0c;并在每個點之間繪制一條直線。可以繪制從簡單的直線到復雜的螺旋線等各種圖形。 1. 連續性和獨立線條 連續性&#xff1a;Line Renderer 繪制的線條…

純 Python、Django、FastAPI、Flask、Pyramid、Jupyter、dbt 解析和差異分析

一、純 Python 1.1 基礎概念 Python 是一種高級、通用、解釋型的編程語言&#xff0c;以其簡潔易讀的語法和豐富的標準庫而聞名。“純 Python” 在這里指的是不依賴特定的 Web 框架或數據分析工具&#xff0c;僅使用 Python 原生的功能和標準庫來開發應用程序或執行任務。 1.…

SQL記錄學習日志

刪除表 DROP TABLE&#xff1a;徹底刪除表和其數據&#xff0c;無法恢復。 DROP TABLE IF EXISTS&#xff1a;在刪除之前檢查表是否存在。 TRUNCATE TABLE&#xff1a;刪除所有數據&#xff0c;但保留表的結構。 DELETE&#xff1a;刪除表中的所有數據&#xff0c;但保留表的結…

QT:tftp client 和 Server

1.TFTP簡介 TFTP&#xff08;Trivial File Transfer Protocol,簡單文件傳輸協議&#xff09;是TCP/IP協議族中的一個用來在客戶機與服務器之間進行簡單文件傳輸的協議&#xff0c;提供不復雜、開銷不大的文件傳輸服務。端口號為69。 FTP是一個傳輸文件的簡單協議&#xff0c;…

WPF5-x名稱空間

1. x名稱空間2. x名稱空間內容3. x名稱空間內容分類 3.1. x:Name3.2. x:Key3.3. x:Class3.4. x:TypeArguments 4. 總結 1. x名稱空間 “x名稱空間”的x是映射XAML名稱空間時給它取的名字&#xff08;取XAML的首字母&#xff09;&#xff0c;里面的成員&#xff08;如x:Class、…

前端jquery 實現文本框輸入出現自動補全提示功能

git倉庫&#xff1a;web_study/some-demos/inputAutoFit at main Cong0925/web_study (github.com) 壓縮包&#xff1a;已綁定到指定資源 示例圖&#xff1a; 實現說明: 1.首先&#xff0c;html部分設置好相關的定位標簽如圖&#xff1a; 2.主要函數 3.默認數據

緩存之美:萬文詳解 Caffeine 實現原理(上)

由于社區最大字數限制&#xff0c;本文章將分為兩篇&#xff0c;第二篇文章為緩存之美&#xff1a;萬文詳解 Caffeine 實現原理&#xff08;下&#xff09; 大家好&#xff0c;我是 方圓。文章將采用“總-分-總”的結構對配置固定大小元素驅逐策略的 Caffeine 緩存進行介紹&…