2019獨角獸企業重金招聘Python工程師標準>>>
一個countDown在多線程調度下使用不當的分享
1. 詭異的數據抖動
在一個需求開發過程中,由于有多角色需要獲取每個角色下的菜單;結果出現了單角色下拉去菜單沒問題,多角色情況下只有一個角色的菜單正常返回的問題。這個問題很憂傷,沒有菜單如何進功能頁面?
2. 懷疑是緩存
因為多角色下菜單采用了Redis緩存,故而懷疑是其中一個角色下的菜單是緩存失效,但是關閉掉緩存依然不起作用,排除緩存影響。
3. debug發現問題
通過增加日志輸出,在關閉掉緩存的實時模式下,依然存在菜單時而有,時而沒有的情況。證明應該是代碼有問題。
在一個多線程調度調度服務類中,發現一個問題,即在debug到如下代碼,會出現后續代碼未執行完全,接口結果即被返回的情況。
//經過查詢相關API,不會出現時序問題,可放心使用final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {//過早調用countDown BUGlatch.countDown();LogHelper.EXCEPTION.error("執行任務異常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}@Overridepublic void onSuccess(T t) {//過早調用countDown BUGlatch.countDown();if(futureCallback!=null){futureCallback.onSuccess(t);}}});}
4. countDown調用時機不對
在調用遠程接口返回后,立即執行 lacth.countDown(); 會立刻造成主線程阻塞釋放,立即響應結果,丟失部分數據。如下圖所示,在第二個任務獲取數據處理完成后, 就立即調用latch.countDown(), 致使后續的回調還未執行。主線程在收到 latch的釋放阻塞后,返回了不完整的數據結果。
改造后,將countDown放在回調執行完成之后,并放置在 try {} finally { }代碼塊之中,保證一定得到執行,防止拋異常后,主線程阻塞。 如下所示:
final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {try{LogHelper.DEFAULT.info("多線程回調,latchCount="+latch.getCount());LogHelper.EXCEPTION.error("執行任務異常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}finally {latch.countDown();}}@Overridepublic void onSuccess(T t) {try{LogHelper.DEFAULT.info("多線程回調成功,latchCount="+latch.getCount());if(futureCallback!=null){futureCallback.onSuccess(t);}}finally {latch.countDown();}}});}
5. 本次使用的多線程調度說明
使用Future阻塞模式,不會出現以上問題,使用future.get()的阻塞式獲取,不需要CountDownLatch工具類配合使用。而且本次其實也可以使用Future來實現同樣的功能。但是Future沒有提供 onFailure, onSucess 這樣的回調接口,考慮到易用性,采用了ListenableFuture;
本次采用的 spring的 ListenableFuture 方式回調來實現回調式聚合。在對CountDownLatch使用不當的情況下,出現了該問題。解決該問題后,功能運行正常。
本次提供的服務類 ThreadPoolExecutorService, 其主要的方法如下:
/*** 線程池服務類** @author David* @since 2018/5/15*/
public interface ThreadPoolExecutorService {/*** 執行多線程任務處理,并通過 futureCallback對結果進行回調處理* @param callableList 異步線程可執行的任務 List* @param futureCallback 異步回調* @param timeout 超時時間,毫秒* @param <T>*/<T> void execute(List<Callable<T>> callableList, ListenableFutureCallback<T> futureCallback, long timeout);/*** 執行多線程任務處理,并將結果聚合成一個List 進行返回** @param callableList 異步線程可執行的任務 List* @param failureCallback 失敗的回調方法* @param skipNull 是否忽略Null 結果,如果忽略 null 不會添加到List 之中* @param timeout 超時時間,毫秒* @param <T>*/<T> List<T> executeAndMerge(List<Callable<T>> callableList, FailureCallback failureCallback, boolean skipNull, long timeout);/*** 執行多線程任務處理,并將結果List,聚合成一個List 進行返回** @param callableList 異步線程可執行的任務 List* @param failureCallback 失敗的回調方法* @param timeout 超時時間,毫秒* @param <T>*/<T> List<T> executeAndMergeList(List<Callable<List<T>>> callableList, FailureCallback failureCallback, long timeout);
6. Future 和 ListenableFuture的區別
spring 或者 guava 的 ListenableFutrue其使用方式和機理應該是類似的,這里的說明是通用的。本次工具類使用的是spring自帶的ListenableFutrue。
6.1 區別說明
ListenableFuture顧名思義就是可以監聽的Future,它是對java原生Future的擴展增強。我們知道Future表示一個異步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態。這樣做,代碼復雜,而且效率低下。使用ListenableFuture幫我們檢測Future是否完成了,如果完成就自動調用回調函數,這樣可以讓主線程不必阻塞,減少并發程序的復雜度。
6.4 spring ListenableFuture使用示例
一個簡單的示例,如果要獲得 ListenableFuture,則需要一個對Java線程池進行修飾過的線程池執行器。如下所示:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="${threadpool.corePoolSize}" /><property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" /><property name="maxPoolSize" value="${threadpool.maxPoolSize}" /><property name="queueCapacity" value="${threadpool.queueCapacity}" /><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean>
guava的請參考以下方式進行修飾,這里不進行詳細說明:
//Java線程池的類型是可選的【根據場景自行構建】 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
得到修飾過的線程池執行器后,即可提交可Callable任務,得到 ListenableFuture 進行處理。
以下為計算1~5的3次方的結果并輸出,不需要聚合結果,每個線程計算完畢后,立刻輸出結果:
for(int i=1; i<=5; i++){final int num = i;ListenableFuture<Integer> listenableFuture = threadPoolTaskExecutor.submitListenable(new Callable<Integer>() {@Overridepublic Integer call(){return (int)Math.pow(num,3);}});listenableFuture.addCallback(new ListenableFutureCallback<Integer>() {@Overridepublic void onFailure(Throwable throwable) {LogHelper.EXCEPTION.error("處理失敗", throwable);}@Overridepublic void onSuccess(Integer result) {System.out.println(num + "的3次方為:"+ result);}});}
6.2 是否可以添加多個callback
答案是肯定的,因為ListenableFuture 的addCallback是添加到ListenableFutureCallbackRegistry 一個注冊中心。而注冊中心底層是支持多個回調的。在6.3會具體介紹回調方法注冊中心的處理邏輯。
public void addCallback(ListenableFutureCallback<? super T> callback) {this.callbacks.addCallback(callback);}
6.3 addCallback是否需要考慮時序
guava 和 spring的 ListenableFuture 均做了時序兼容,在listenableFuture執行的任意時刻調用 addCallback 均可準確的執行回調。這里就不得不說在調用addCallback的時候,其實將回到方法注冊到ListenableFutureCallbackRegistry(回調注冊中心)。
6.3.1 ListenableFutureCallbackRegistry的特性有:
a. 記錄了Callable的3種調度狀態:
NEW(新建,還未執行),SUCCESS(返回成功),FAILURE(拋異常,失敗)
b. 有兩個處理隊列,分別是:
successCallbacks:存儲執行成功的回調方法;
failureCallbacks: 存儲執行失敗的回調方法(拋異常)。
c. 存儲響應結果
將Callable<T> 返回的結果,也放在回調中心里面;
d. mutex 對象保證線程安全
有一個成員變量:private final Object mutex; 用來保證在添加回調任務,或者設置結果集的時候,注冊中心是線程安全的。
其在添加回調任務的時候,處理流程為:
synchronized(this.mutex) {...}
如上圖所示,在添加回調任務的時候,會通過synchronized先獲取 mutex 排它鎖,保證處理的線程安全。繼而判斷任務的狀態:
如果為NEW,標識任務還未執行完畢,這時候需要將任務先放入隊列,待任務執行完畢再根據狀態調用回調方法;
如果為SUCCESS,標識任務已經執行成功,不需要再放入隊列,而是在當前線程中,直接調用 onSuccess方法;
如果為FAILURE, 標識任務已經執行失敗,不需要再放入隊列,而是在當前線程中,直接調用 onFailure方法;
此外,還有一個流程,即在Callable任務調度完成,返回結果后,如果未拋異常:對successCallbacks隊列中的方法進行逐個回調;如果拋出異常,對failureCallbacks隊列中的方法進行逐個回調。因流程較簡單,這里只是簡單說明。
6.4 說明
本文只分析到 ListenableFuture 的使用方式, CountDownLatch的調用時機和ListenableFuture 的特性。
因時間和篇幅有限,具體spring 或 guava在submitListenable 任務之后,內部處理邏輯并沒有闡述。感興趣的同學可以具體到源碼查看其內部實行邏輯。