緩存擊穿
緩存擊穿問題也稱熱點key問題,就是一個高并發訪問(該key訪問頻率高,訪問次數多)并且緩存重建業務比較復雜的key突然失效了,大量的請求訪問會在瞬間給數據庫帶來巨大的沖擊。
緩存重建業務比較復雜:
緩存在redis數據庫中存儲,在一定時間后被清除,緩存失效,失效以后需要重新從數據庫中查詢寫入redis,在實際開發中,從數據庫中查詢并且構建數據并不是查到什么就存儲進redis,有些業務比較復雜,需要多表查詢的,甚至是要去各種各樣的表關聯的運算最終得到的結果將其緩存進數據庫。這樣的業務耗時就比較長,在該時間段內,相當于redis中一直沒有緩存,而在這一時間段內,無數請求就無法命中緩存,就直接到達數據庫。
常見解決方案:
-
互斥鎖
當線程1查詢緩存未命中時,去獲取互斥鎖,獲取成功后查詢數據庫重建緩存數據,在寫入緩存之后釋放鎖。這樣做的話,在其他線程來發起請求后,未命中緩存則嘗試去獲取互斥鎖,獲取互斥鎖失敗,則讓其進入自旋狀態(讓線程循環執行搶鎖的過程),直到前一個線程釋放鎖之后,在發起請求,若緩存命中,則返回,若未命中,則獲取互斥鎖,循環往復。
如圖所示:
問題:互相等待,就比如同一時刻中有一千個線程發起請求,但只有一個線程在構建,其他線程都在進行自旋,如果構造時間過久,其他線程只能自旋,而長時間的自旋會讓CPU一直在空轉,CPU沒有辦法去執行其他任務,會浪費CPU,性能較差。
-
邏輯過期
可以認為是永不過期,即當下往Redis中存儲數據時,不設置過期時間,而是在設置value時添加一個expire字段(在當前時間基礎加上一個過期時間),該字段的意義在于提醒我們何時銷毀該key,即在邏輯意義上維護的過期時間,而該key在redis中沒有過期時間,再加上在redis配置的合適的內存淘汰策略,只要該key寫入redis,就一定可以查到,不會出現緩存未命中的情況。
適用情況:在舉辦活動時添加,活動結束之間將其移除即可。
注意事項:需要判斷邏輯時間有無過期
使用詳情:
當線程1查詢緩存發現邏輯時間已經過期,則嘗試獲取互斥鎖,獲取成功,則新開一線程進行查詢數據庫重建緩存數據,在寫入緩存重置邏輯過期時間,最后釋放鎖,而線程1在此之前將過期數據返回。
在重建緩存期間,如果有新線程發起請求,發現邏輯時間過期,則嘗試獲取互斥鎖,如果獲取互斥鎖失敗,則直接返回過期數據。
如圖所示:
方案對比
解決方案 | 優點 | 缺點 |
---|---|---|
互斥鎖(考慮數據一致性) | 沒有額外的內存消耗 ,保證一致性,實現簡單 | 線程需要等待,性能受影響,可能有死鎖風險 |
邏輯過期(考慮性能) | 線程無需等待,性能較好 | 不保證一致性,有額外內存消耗,實現復雜 |
小結:這兩種方案都是在解決緩存重建這一段時間內產生的并發問題。
互斥鎖:在緩存重建的這段時間內讓這些并發的線程串行執行或者相互等待,從而確保安全。確保數據一致性,犧牲了服務的可用性
邏輯過期:在緩存重建這段時間內保證了可用性,犧牲的是數據的一致性(可能訪問的是舊數據)。
案例展示:基于互斥鎖方式解決緩存擊穿問題
需求:修改根據ID查詢商鋪的業務,基于互斥鎖方式來解決緩存擊穿問題。
分析業務流程變化
原先業務流程:
修改步驟,在判斷緩存是否命中之后,如果未命中,需要先去嘗試獲取互斥鎖,判斷是否拿到,如果沒有拿到鎖,說明已經有線程在更新,不應該繼續往下執行,需要休眠一段時間,在重新嘗試。如果拿到互斥鎖,執行緩存重建,就可以去查詢數據庫,將查詢到的數據寫入Redis,隨后釋放互斥鎖,最后返回結果。
新的業務流程如圖所示:
注意事項:在該業務流程中使用的所并不是我們平時使用的鎖,我們平時使用synchronized或者lock,這種鎖的執行邏輯就是拿到鎖執行,沒拿到鎖等待,而該業務流程中的鎖的執行邏輯是自定義的,因此要采用自定義的互斥鎖(在多個線程并行執行時,只有一條線程成功,其余線程失敗),而在學習redis中string類型時中 setnx
命令的效果與之相近,當該key不存在的時候存入,如果存在就不存入,這就是一種互斥效果,在大量線程并發訪問時,只有一條線程可以成功。因此獲取鎖為賦值命令,而釋放鎖則是刪除命令(del
)。
為了防止程序出故障導致遲遲沒有執行刪除命令,因此在設置setnx時通常都會為其設置有效期,來防止鎖一直不釋放,造成死鎖,導致業務故障。
這與真正的互斥鎖還是有所差距的,但是在這里夠用了。
前置代碼:
聲明兩個方法代表獲取鎖以及釋放鎖
// 嘗試獲取鎖
private boolean tryLock(String key){//設置有效期時間取決于業務執行時間,一般比業務時間長一些即可。Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);//建議不要直接返回flag,防止返回空指針,因為Boolean是boolean的包裝類,需要進行拆箱操作,可能導致空指針 網絡問題或者鍵不存在但Redis未響應,可能會返回null,因此需要實用工具類判斷。改成BooleanUtil.isTrue(flag)。return BooleanUtil.isTrue(flag);
}
// 釋放鎖
private void unlock(String key){stringRedisTemplate.delete(key);
}
流程說明:
進行方法封裝,在queryWithMutex()
方法中進行緩存重建的業務代碼。
代碼展示:
?
public Result queryById(Long id) {//緩存穿透// ? ? ? Shop shop = queryWithPassThrough(id);//互斥鎖解決緩存擊穿Shop shop = queryWithMutex(id);if (shop == null) {return Result.fail("店鋪不存在");}// 7.返回return Result.ok(shop);}//互斥鎖解決緩存穿透public Shop queryWithMutex(Long id) {//1.從redis中查詢商鋪緩存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);Shop shop = JSONUtil.toBean(shopJson, Shop.class);// 2.判斷是否存在if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回return shop;}// 4.判斷命中的是否是空值if (shopJson != null) {// 返回錯誤信息return null;}//4.實現緩存重建//4.1 獲取互斥鎖String lock = LOCK_SHOP_KEY + id;try {boolean isLock = tryLock(lock);//4.2.判斷是否獲取成功if (!isLock) {//4.3.失敗,則休眠并重試Thread.sleep(50);return queryWithMutex(id);}// 4.2.獲取鎖成功,再次檢查redis緩存 判斷是否為空,如果存在shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回shop = JSONUtil.toBean(shopJson, Shop.class);return shop;}//4.4.成功,根據id查詢數據庫shop = getById(id);//模擬重建的延時Thread.sleep(200);if (shop == null) {//將空值返回redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 5.不存在,返回錯誤return null;}// 6.存在,寫入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);// 7.釋放互斥鎖}catch (InterruptedException e) {throw new RuntimeException(e);}finally {unlock(lock);}//8.返回return shop;?}// 邏輯過期解決緩存擊穿public Shop queryWithPassThrough(Long id){//1.從redis中查詢商鋪緩存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY+id);// 2.判斷是否存在if (StrUtil.isNotBlank(shopJson)){// 3.存在,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return shop;}// 4.判斷命中的是否是空值if (shopJson != null){// 返回錯誤信息return null;}// 4.不存在,根據id查詢數據庫Shop shop = getById(id);if (shop == null) {//將空值返回redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 5.不存在,返回錯誤return null;}// 6.存在,寫入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);// 7.返回return shop;?}
借助Jmeter工具進行高并發環境模擬
測試結果如下。
案例展示:基于邏輯過期方式解決緩存擊穿問題
需求:修改根據ID查詢商鋪的業務,基于邏輯過期方式來解決緩存擊穿問題。
邏輯過期并不是真的過期,他要求存儲數據到redis中的時候,額外的添加一個過期時間的字段。
key本身不需要設置TTL,他的過期時間不由redis控制,而是由業務代碼判斷是否過期,這樣在業務上就會復雜很多。要修改業務流程。
修改詳情:首先前端去提交商鋪的ID到服務端,服務端在通過ID在Redis中查詢緩存,而在邏輯過期方式中,緩存不會出現未命中的情況。
原因:key沒有過期時間,一旦key添加在緩存中,就會永久存在,除非活動結束,在人工刪除,而像這種熱點key一般是參加活動的一些商品,或者是一些其他的東西,會提前加入緩存,并設置邏輯過期時間。
因此,在理論情況下,所有的熱點key都會提前添加好,并一直存在,直到活動結束,人工刪除。
因此可以不用判斷是否命中緩存,如果緩存不存在,則說明該key并不在活動中,所以在流程中象征性的判斷一下即可,若未命中就直接返回空。
而核心邏輯就在于默認命中之后,在命中后需要判斷是否過期,也就是邏輯過期時間。
如果未過期,就直接返回即可。
如果數據過期,說明需要重新加載,需要去做緩存重建。
但是也不能讓所有線程都去重建,因此還是需要爭搶,即先嘗試獲取互斥鎖,然后判斷是否獲取到,如果獲取失敗,說明之前已經有線程在進行更新緩存,這時可以直接返回舊數據。
如果搶奪成功,就需要去執行緩存重建,而且并不是在本線程執行,而是新建線程去執行緩存重建,而本線程先返回舊數據,由該獨立線程執行數據重建,查詢數據庫,將數據寫入緩存,并且設置邏輯過期時間,再去釋放鎖即可。
流程如圖所示:
代碼實現:
考慮問題:
將數據寫入redis的時候,我們要設置一個邏輯過期時間,那邏輯過期時間如何添加數據里?
可以直接找到實體類,在實體類中添加邏輯過期時間字段,但這種方案并不友好,因為對原來的代碼和業務邏輯進行了修改,這里有兩個方案,
一:在工具類中新建對象RedisData,在該類中新建LocalDatatime類型的字段expireTime,然后讓實體類shop集繼承redisData類,
二:在RedisData中去添加Object類型的Data,也就是說redisData自己帶有過期時間并且里面帶有數據,這個數據就是要存入redis的熱點key,是一個萬能的存儲對象。第二種方案不會對原來的代碼有任何的修改。
在這里選擇第二種方案。而像這種熱點key的數據需要提前導入進去,在實際開發中,可能會有一個后臺管理系統,可以將某一些熱點的數據在后臺提前的添加緩存中,但是該項目沒有,因此只能基于單元測試的方式,將店鋪數據加入緩存中,相當于做一個緩存的預設。
代碼展示:
向Redis存入熱點數據以及設置邏輯過期時間
?private void saveShop2Redis(Long id, Long expireSeconds) {// 1.查詢店鋪數據Shop shop = getById(id);// 2.封裝邏輯過期時間RedisData redisData = new RedisData();redisData.setData(shop);// 設置邏輯過期時間 現在時間加上過期時間redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 3.寫入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}
編寫單元測試:
?@Resourceprivate ShopServiceImpl shopService;@Testvoid testSaveShop() {shopService.saveShop2Redis(1L, 10L);}
運行,測試通過,查看redis數據庫
數據預熱完成
開始解決緩存擊穿問題
代碼展示:
?public Shop queryWithLogicalExpire(Long id){//1.從redis中查詢商鋪緩存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY+id);// 2.判斷是否存在if (StrUtil.isBlank(shopJson)){// 3.不存在,直接返回空return null;}//4.命中,需要把json反序列化為對象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);// 4.存在,判斷緩存是否過期//Data實際上是jsonObject對象Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();?// 5.判斷是否過期if(expireTime.isAfter(LocalDateTime.now())){// 5.1.未過期,直接返回店鋪信息return shop;}// 5.2.過期,需要緩存重建// 6.重建緩存// 6.1.獲取互斥鎖String lock = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lock);// 6.2.判斷是否獲取鎖成功if (isLock) {//再次檢測redis緩存是否過期redisData = JSONUtil.toBean(stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id), RedisData.class);if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {// 6.1.未過期,直接返回。return JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);}// 6.3.成功,開啟獨立線程,實現緩存重建CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 重建緩存this.saveShop2Redis(id, 20L);} catch (Exception e) {throw new RuntimeException(e);}finally {// 釋放鎖unlock(lock);}});}
注意事項:在開啟新線程時建議使用線程池,新建線程經常要創建與銷毀,十分浪費性能,使用線程池可以做到線程復用。
因此新建線程池執行器。
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
為了模擬緩存有一定的延遲,可以在數據存入時休眠200ms
?public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException {// 1.查詢店鋪數據Shop shop = getById(id);// 模擬重建的延時Thread.sleep(200);// 2.封裝邏輯過期時間RedisData redisData = new RedisData();redisData.setData(shop);// 設置邏輯過期時間 現在時間加上過期時間redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 3.寫入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}
延遲越長,越容易出現這種線程安全問題。
開始測試,在高并發的情況下,會不會出現大量線程重建的情況(并發的安全問題),以及一致性問題,在緩存重建完成之前查詢到的是舊的數據還是新的數據(將數據庫中的數據修改一下進行對比)
新建100個線程并發執行。
檢查成果:
測試成功,在前一半線程中,數據為舊數據,后一半線程數據為新數據。
而在數據庫只進行了一次數據查詢,證明并發安全,只會有一次重建,但是數據一致性會有一些問題。
拓展知識:線程池的具體流程以及如何實現線程復用
線程核心類是ThreadPoolExecutor,它也有很多基于threadpoolexecutor包裝的類, ThreadPoolExecutor的七大參數
-
corePoolSize (核心線程):來一個任務開一個線程,直到達到我們的corePoorSize,如果到了這個線程,就不能去開新的線程,將其放在隊列中,即workQueue(阻塞隊列)
-
workQueue(阻塞隊列/任務隊列):當核心線程達到以后,之后的任務就放在阻塞隊列中 。
-
maximumPoolSize(最大線程數): 當阻塞隊列滿了以后,需要求其他的線程幫忙,所以會去開額外的線程,但是,corePoolsize加上來幫忙的線程不能超過這個最大線程數。
-
keepAliveTime(線程保持活躍時間):代表任務消費完之后,剛剛幫忙的線程可能要回收,也可以設置corePoolSize的線程要回收,KeepAliveTime就是設置要回收的時間,過多久以后回收線程。
-
TimeUnit:時間單位
-
ThreadFactory(線程工廠):線程池的核心是線程復用,要去創建線程,這個工廠就代表你如何創建線程,以及你在創建線程他的一些屬性設置。
-
RejectedExecutionHandler(接口)(拒絕策略),當線程數滿了以后,包括開始開啟的線程,,包括后面幫你的線程 還有任務隊列也滿了的時候,不清楚該怎么做,就提供這個接口給你,只要實現RejectedExecution方法就可以了。
執行流程
在初始化線程以后,執行execute方法,execute方法傳的是一個任務task(runnable),這個任務是抽象的(接口). 源碼解析:
?
? //ctl:是一個原子類型,用于保存當前線程池的線程數以及線程狀態。//有三位是來保證狀態的,還有二十九位用來保存線程數。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;??int c = ctl.get(); //首先,先拿到當前線程池的線程數 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //如果說小于corePoolSize(核心線程),則調用addworker方法。(添加線程來幫助執行任務),也就是說,只要線程數小于核心線程,就會添加一個線程。return;//直接返回c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {//當大于corePoolSize時,則將添加到阻塞隊列中(workqueue),如果if條件成立,則說明阻塞隊列添加成功int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false)) // 當我的線程數大于corePoolSize并且阻塞隊列也是滿的,則在調用addworker方法,增加線程,不過此次增加線程的參數為false,false的意思是這個添加的線程是非核心線程,相當于額外的線程reject(command);//如果額外的線程也沒有添加成功,就直接拒絕,調用RejectedExecution方法即可(拒絕策略)}
細致解析addworker方法
?
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;?for (;;) {// ? 三元運算符 如果傳入的core為true,說明要添加核心線程,那么就去和core對比,如果不為true,則和maximumPoolSize(最大線程數)對比if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); ?// Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}//以上代碼用于線程判斷,判斷線程是否關閉,以及線程狀態是否正常?boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); // 開啟線程,去找workerfinal Thread t = w.thread; // 開啟線程if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();?if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {container.start(t);//啟動線程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
找到worker構造器
?Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); //使用線程工廠來創建線程}public void run() {runWorker(this);// 調用run方法}
runworker方法執行任務
?
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask; // task 是我們傳的參數 w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {// 除了傳進來的的以外,我們還要去阻塞隊列中去拿,通過getTask()方法w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run(); //執行我們的任務afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
getTask()方法
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out??for (;;) {int c = ctl.get();?// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}?int wc = workerCountOf(c);?// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //當允許核心線程回收的時候或線程數大于核心線程數,這是可以回收線程, 怎么回收?run方法結束,線程就回收了,run方法什么時候結束??if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}?try {//當到達超時時間以后,允許核心線程回收時,就可以回收,如果不允許線程回收或者現在線程數小于核心線程數,就調用take()方法(一直阻塞)。這也是線程池做到線程復用的關鍵地方,線程池中的線程不會回收,只會通過阻塞隊列阻塞Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();// 怎么拿到的?take()方法或poll方法;poll()方法是超時阻塞,poll是一直阻塞,如果沒有任務,要么就是超時阻塞,要么就是一直阻塞,如果有任務就去拿任務if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
以上就是源碼中的關鍵源碼。以及相應的解釋
具體流程
在初始化線程以后,執行execute方法,execute方法傳的是一個任務task。
當執行task時,首先判斷是否大于corePoolSize(核心線程),如果是,直接丟給workqueue(阻塞隊列)。
如果不是,就通過work類中的ThreadFactory(線程工廠)開啟線程,執行start方法去Thread中回調run方法。
如果workqueue(阻塞隊列)也滿了之后,會再次經過一次判斷,如果此時線程數小于maximumPoolSize(最大線程數),則再次通過work類中的ThreadFactory(線程工廠)去開啟線程。
如果大于maximumPoolSize(最大線程數),則是拒絕策略(RejectedExecutorHandler接口中的RejectedExecution()方法),
此時run方法有兩種選擇,當前有task時,執行當前拿到的task,如果沒有,自旋去workqueue(阻塞隊列)中,會去執行take方法或者poll方法。
take方法是無限制的阻塞,而poll方法則是有超時時間的限制的阻塞,超時時間也可以控制。
當workqueue為空,且允許關閉核心線程 或者當前的線程數大于核心線程數boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
,可以進行線程回收(調用poll方法)。
否則調用take方法,阻塞在take方法中,等待workqueue里面存在task。
這樣的話,線程就一直不會被回收,只要代碼中 allowCoreThreadTimeOut為false且線程數剛好小于核心線程數
如圖所示: