場景: 從redis 訂閱數據 調用線程來異步處理數據
直接上代碼
定義線程管理類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.*;/*** Created with IntelliJ IDEA.* @Description 線程池管理類*/
@Component
public class ThreadPoolManager implements BeanFactoryAware {private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);//用于從IOC里取對象private BeanFactory factory; //如果實現Runnable的類是通過spring的application.xml文件進行注入,可通過 factory.getBean()獲取,這里只是提一下// 線程池維護線程的最少數量 (根據環境而定)private final static int CORE_POOL_SIZE = 10;// 線程池維護線程的最大數量 (根據環境而定)private final static int MAX_POOL_SIZE = 50;// 線程池維護線程所允許的空閑時間private final static int KEEP_ALIVE_TIME = 0;// 線程池所使用的緩沖隊列大小 (此處隊列設置 需要考慮處理數據的效率 內存的大小)private final static int WORK_QUEUE_SIZE = 99999;@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {factory = beanFactory;}// 消息隊列public LinkedBlockingQueue<String> getMsgQueue() {return msgQueue;}LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();/*** 當線程池的容量滿了,執行下面代碼,將推送數據存入到緩沖隊列*/final RejectedExecutionHandler handler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {String temp = ((MsgHandleThread) r).getRecord();if (StringUtils.isEmpty(temp)) {msgQueue.offer(temp);}}};/*** 創建線程池*/final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);/*** 將任務加入線程池---執行數據處理*/public void addPushRecord(String record) {MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}/*** 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執行任務的需求。*/final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);/*** 檢查(調度線程池),每秒執行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池*/final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判斷緩沖隊列是否存在記錄if (!msgQueue.isEmpty()) {//當線程池的隊列容量少于WORK_QUEUE_SIZE,則開始把緩沖隊列的訂單 加入到 線程池if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {String record = msgQueue.poll();MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}}}}, 0, 1, TimeUnit.SECONDS);/*** 終止訂單線程池+調度線程池*/public void shutdown() {//true表示如果定時任務在執行,立即中止,false則等待任務結束后再停止scheduledFuture.cancel(false);scheduler.shutdown();threadPool.shutdown();}
}
任務處理類
/*** Created with IntelliJ IDEA.* @Description 訂閱數據 處理*/
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);private IDataHandleService _serviceprivate String record;public SubCheckDataThread(String _record) {this.record = _record;}public String getRecord() {return record;}@Overridepublic void run() {try {if (StringUtils.isEmpty(this.record)) {return;}// 無法注入是采用此方法if (_service== null) {_service= ApplicationContextProvider.getBean(IDataHandleService .class);}//TODO 具體業務logger.info("消費完成",record);} catch (Exception e) {e.printStackTrace();}}
}
調用
import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;/*** <p>* 訂閱redis消息* </p>** @Author: zhuYaqiang* @Date: 2024/06/12*/
@Component
public class SubscribeCheckData {@Autowiredprivate ThreadPoolManager threadPoolManager;/**** @Description: 查崗信息訂閱---redis* @Param: [message]* @return: void* @Author: zhuYaqiang* @Date: 2024/06/12*/public void receiveMessage(String message) {try {threadPoolManager.addPushRecord(message);} catch (Exception e) {e.printStackTrace();}}}
redis 訂閱消息后調用線程池處理數據
package com.yicheng.subscribeRedis;import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** @title RedisSubscribeCHeck* @description* @create 2024/6/12 19:30*/
@Configuration
public class RedisMessageListener {@Autowiredprivate SetProperties setProperties;@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);listenerCheckAdapter.afterPropertiesSet();listenerAlarmNoticeAdapter.afterPropertiesSet();//訂閱了的通道// 訂閱查崗數據container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));//這個container 可以添加多個 messageListenerreturn container;}/*** 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法* 監聽查崗消息* @param receiver* @return*/@BeanMessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}/*** 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法* 監聽報警通知信息* @param receiver* @return*/@BeanMessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}}
以上代碼已在實際項目中使用,覺得有用的點贊收藏評論