1:BlockingQueue繼承關系
??java.util.concurrent 包里的?BlockingQueue是一個接口,?繼承Queue接口,Queue接口繼承?Collection
?
??BlockingQueue----->Queue-->Collection
?圖:
?
隊列的特點是:先進先出(FIFO)
?
2:BlockingQueue的方法
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
?
?
? | 拋出異常 | 特殊值 | 阻塞 | 超時 |
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() | 不可用 | 不可用 ? |
?
?
四組不同的行為方式解釋:
1(異常)
如果試圖的操作無法立即執行,拋一個異常。
2(特定值)?
如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
3(阻塞)?
如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
4(超時)?
如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
??
1.首先是springBoot的項目框架如下:
2.業務測試流程涉及的類,如下
BusinessThread 類
package com.springboot.demo.Threads;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
?* Created by Administrator on 2018/5/9.
?*/
@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{
? ? private String acceptStr;
? ? public BusinessThread(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }
? ? public String getAcceptStr() {
? ? ? ? return acceptStr;
? ? }
? ? public void setAcceptStr(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? //業務操作
? ? ? ? System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr);
? ? ? ? //線程阻塞
? ? ? ? /*try {
? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr);
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }*/
? ? }
}
TestThreadPoolManager 類
package com.springboot.demo.Threads;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
/**
?* Created by Administrator on 2018/5/10.
?*/
@Component
public class TestThreadPoolManager implements BeanFactoryAware {
? ? //用于從IOC里取對象
? ? private BeanFactory factory; //如果實現Runnable的類是通過spring的application.xml文件進行注入,可通過 factory.getBean()獲取,這里只是提一下
? ? // 線程池維護線程的最少數量
? ? private final static int CORE_POOL_SIZE = 2;
? ? // 線程池維護線程的最大數量
? ? private final static int MAX_POOL_SIZE = 10;
? ? // 線程池維護線程所允許的空閑時間
? ? private final static int KEEP_ALIVE_TIME = 0;
? ? // 線程池所使用的緩沖隊列大小
? ? private final static int WORK_QUEUE_SIZE = 50;
? ? @Override
? ? public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
? ? ? ? factory = beanFactory;
? ? }
? ? /**
? ? ?* 用于儲存在隊列中的訂單,防止重復提交,在真實場景中,可用redis代替 驗證重復
? ? ?*/
? ? Map<String, Object> cacheMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 訂單的緩沖隊列,當線程池滿了,則將訂單存入到此緩沖隊列
? ? ?*/
? ? Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();
? ? /**
? ? ?* 當線程池的容量滿了,執行下面代碼,將訂單存入到緩沖隊列
? ? ?*/
? ? final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
? ? ? ? @Override
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
? ? ? ? ? ? //訂單加入到緩沖隊列
? ? ? ? ? ? msgQueue.offer(((BusinessThread) r).getAcceptStr());
? ? ? ? ? ? System.out.println("系統任務太忙了,把此訂單交給(調度線程池)逐一處理,訂單號:" + ((BusinessThread) r).getAcceptStr());
? ? ? ? }
? ? };
? ? /**創建線程池*/
? ?final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
? ? /**將任務加入訂單線程池*/
? ? public void addOrders(String orderId){
? ? ? ? System.out.println("此訂單準備添加到線程池,訂單號:" + orderId);
? ? ? ? //驗證當前進入的訂單是否已經存在
? ? ? ? if (cacheMap.get(orderId) == null) {
? ? ? ? ? ? cacheMap.put(orderId, new Object());
? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? }
? ? }
? ? /**
? ? ?* 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執行任務的需求。
? ? ?*/
? ? final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
? ? /**
? ? ?* 檢查(調度線程池),每秒執行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池
? ? ?*/
? ? final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? //判斷緩沖隊列是否存在記錄
? ? ? ? ? ? if(!msgQueue.isEmpty()){
? ? ? ? ? ? ? ? //當線程池的隊列容量少于WORK_QUEUE_SIZE,則開始把緩沖隊列的訂單 加入到 線程池
? ? ? ? ? ? ? ? if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
? ? ? ? ? ? ? ? ? ? String orderId = (String) msgQueue.poll();
? ? ? ? ? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? ? ? ? ? ? ? System.out.println("(調度線程池)緩沖隊列出現訂單業務,重新添加到線程池,訂單號:"+orderId);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }, 0, 1, TimeUnit.SECONDS);
? ? /**獲取消息緩沖隊列*/
? ? public Queue<Object> getMsgQueue() {
? ? ? ? return msgQueue;
? ? }
? ? /**終止訂單線程池+調度線程池*/
? ? public void shutdown() {
? ? ? ? //true表示如果定時任務在執行,立即中止,false則等待任務結束后再停止
? ? ? ? System.out.println("終止訂單線程池+調度線程池:"+scheduledFuture.cancel(false));
? ? ? ? scheduler.shutdown();
? ? ? ? threadPool.shutdown();
? ? }
}
TestController 類
package com.springboot.demo;
import com.springboot.demo.Threads.TestThreadPoolManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Queue;
import java.util.UUID;
/**
?* Created by Administrator on 2018/5/9.
?*/
@RestController
public class TestController {
? ? @Autowired
? ? TestThreadPoolManager testThreadPoolManager;
? ? /**
? ? ?* 測試模擬下單請求 入口
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/start/{id}")
? ? public String start(@PathVariable Long id) {
? ? ? ? //模擬的隨機數
? ? ? ? String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();
? ? ? ? testThreadPoolManager.addOrders(orderNo);
? ? ? ? return "Test ThreadPoolExecutor start";
? ? }
? ? /**
? ? ?* 停止服務
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/end/{id}")
? ? public String end(@PathVariable Long id) {
? ? ? ? testThreadPoolManager.shutdown();
? ? ? ? Queue q = testThreadPoolManager.getMsgQueue();
? ? ? ? System.out.println("關閉了線程服務,還有未處理的信息條數:" + q.size());
? ? ? ? return "Test ThreadPoolExecutor start";
? ? }
}
??