前幾天老板突然匆匆忙忙的過來說對賬系統最近越來越慢了,能不能快速優化一下?我了解了對賬系統的業務后,發現還是挺簡單的,用戶通過在線商城下單,會生成電子訂單,保存在訂單庫。之后物流會生成派送單給用戶發貨,派送單保存在派送單庫。為了防止漏送或重復派送,對賬系統每天還會校驗是否存在異常訂單。
對賬系統的處理邏輯很簡單,你可以參考下面的對賬系統流程圖。目前的對賬系統處理邏輯是,首先查詢訂單,然后查詢派送單,然后對比訂單和派送單,將差異寫入差異庫。
對賬系統的代碼抽象之后也很簡單,核心代碼如下,就是在一個單線程里循環查詢訂單、派送單,然后執行對賬,最后寫入差異庫。
while(存在未對賬的訂單){//查詢未對賬訂單pos = getPRders();//查詢派送單dos = getDOrder();//執行對賬操作diff = chech(pos,dos);//差異寫入差異庫save(diff);
}
利用并行優化對賬系統
老板要我優化性能,那我就首先要找到這個對賬系統的瓶頸所在。
目前的對賬系統由于訂單量和派送單量巨大,所以查詢對賬訂單getPOrders和查詢派送單getDOrders相對較慢,那有沒有什么辦法可以快速優化一下呢?目前對賬系統是單線程執行的。圖形化后是下面這個樣子,對于串行化的系統性能優化,首先想到的是能否利用多線程并行處理。
所以這里你應該所以,這里你應該能夠看出來這個對賬系統里的瓶頸,查詢未對賬訂單getPOrder和查詢派送訂單getDOrder是否可以并行處理呢?顯然是可以的,因為這兩個操作并沒有先后順序的依賴,這兩個最耗時的操作并行執行之后,執行過程如下圖所示。對比一下單線程的執行示意圖,你會發現同等時間里并行執行的吞吐量近乎單線程的兩倍。提升效果還是相對明顯的。
思路有了,下面我們再來看看如何利用代碼實現。在下面的代碼中,我們創建了兩個線程T1和T2,并行執行查詢未對賬訂單getPOrder和查詢派送訂單getDOrder這兩個操作。在主線程中執行對賬操作check和差異寫入save兩個操作,不過要注意的是,主線程需要等待線程T1和T2執行完才能執行check和save兩個操作。為此,我們通過調用T1.join和T2.join來實現等待。當T1和T2線程退出時。調用T1.join和T2.join的主線程就會從阻塞狀態被喚醒,從而執行之后的check和save。
while(存在未對賬訂單){//查詢未對賬訂單Thread T1 = new Thread(() ->{pos = getPOrder();});T1.start();//查詢派送訂單Thread T2 = new Thread(()->{dos = getDOrder();});T2.start();//等待T1 T2的結果T1.join();T2.join();//執行對賬操作diff = check(pos,dos);save(diff);
}
用CountDownLatch實現線程等待
經過上面的優化之后,基本可以和老板匯報收工了,但是有點美中不足,相信你也發現了,while循環里面每次都會創建新的線程,而創建線程可是個耗時的操作,所以最好是創建出來的線程能夠循環利用。估計這時候你已經想到線程池了,是的,線程池就能解決這個問題。
而下面的代碼就是利用線程池優化之后的,我們首先創建一個固定大小為2的線程池,之后在while循環里面重復利用,一切看上去都很順利。但是有個問題好像無解了,那就是主線程如何知道getPOrder和getDOrder這兩個操作什么時候執行完?前面的主線程通過調用線程T1和T2的join方法來等待線程T1和T2退出,但是在線程池的方案里,線程根本不會退出,所以join方法已經失效了。
//創建兩個線程的線程池
Executor executor = Executor.newFixedThreadPool(2);
while(存在未對賬訂單){//查詢未對賬訂單executor.execute(() ->{pos = getPOrder();});//查詢派送訂單executor.execute(()->{dos = getDOrder();});/* 如何實現線程等待呢?*///執行對賬操作diff = check(pos,dos);save(diff);
}
那如何解決這個問題呢?你可以開動腦筋想出很多辦法。最直接的辦法是弄一個計數器,初始值設置成2,當執行完pos = getPOrder();這個操作之后,計數器減1,執行完dos = getDOrder();之后,計數器也減1,在主線程里,等待計數器等于零,當計數器等于零時,說明這兩個查詢操作執行完了。等待計數器等于零其實就是一個條件變量,用管程實現起來也很簡單。
不過我并不建議你在實際項目中去實現上面的方案,因為Java并發包里已經提供了實現類似功能的工具類:CountDownLatch,這里我們可以直接使用。下面的代碼示例中,在while循環里面,我們首先創建了一個CountDownLatch,計數器的初始值等于2。之后在pos = getPOrder();和dos = getDOrder();兩個語句的后面對計數器執行減1操作。這個對計數器減1的操作是通過調用latch.countDown()來實現的。在主線程中,我們通過調用latch.await()來實現對計數器等于0的等待。
Executor executor = Executor.newFixedThreadPool(2);
while(存在未對賬訂單){//計數器初始化未2CountDownLatch latch = new CountDownLatch(2);//查詢未對賬訂單executor.execute(() ->{pos = getPOrder();latch.countDown();});//查詢派送訂單executor.execute(()->{dos = getDOrder();latch.countDown()});//等待連個查詢操作結束latch.await();//執行對賬操作diff = check(pos,dos);save(diff);
}
進一步優化性能
經過上面的重重優化之后,長出一口氣,終于可以交付了。不過在交付之前還需要再次審視一番,看還有沒有優化的余地,仔細看還是有的。
前面我們將getPOrder和getDOrder這兩個查詢操作并行了,但這兩個查詢操作和對賬操作check save之間還是串行的。很顯然,這兩個查詢操作和對賬操作也是可以并行的。也就是說,在執行對賬操作的時候,可以同時去執行下一輪的查詢操作,這個過程可以形象的描述為下面這幅圖。
那接下來我們再來思考一下如何實現這步優化,兩次查詢操作都能夠和對賬操作并行。對賬操作還依賴查詢操作的結果,這明顯有點生產者-消費者的意思。兩次查詢操作是生產者,對賬操作是消費者。既然是生產者-消費者模型,那就需要有個隊列來保存生產者生產數據,而消費者則從這個隊列消費數據。
不過針對對賬這個項目,我設計了兩個隊列,并且兩個隊列的元素之間還有對應關系,具體如下圖所示,查詢訂單查詢操作將訂單查詢結果插入訂單隊列,派送單查詢操作將派送單插入派送單隊列,這兩個隊列的元素之間是有一一對應的關系的。兩個隊列的好處是,對賬操作可以每次從訂單隊列出一個元素,從派送單隊列出一個元素,然后對這兩個元素執行對賬操作,這樣數據一定不會亂掉。
下面再來看看如何利用雙隊列來實現完全的并行。一個最直接的想法是,一個線程T1執行訂單的查詢工作,一個線程T2執行派送單的查詢工作,當線程T1和T2都各自生產完一條數據的時候,就通知線程T3執行對賬操作。這個想法雖然看上去很簡單,但其實還隱藏著一個條件,那就是線程T1和線程T2的工作步調要一致,不能一個跑的太快,一個跑的太慢。只有這樣才能做到各自生產完一條數據的時候,通知線程T3。
下面這幅圖形象的描述了上面的意圖,線程T1和線程T2只有都生產完了一條數據的時候,才能一起向下執行,也就是說線程T1和線程T2還要相互等待,步調要一致。同時線程T1和T2都生產完一條數據的時候,還要能夠通知線程T3執行對賬操作。
用CyclicBarrier實現線程同步
下面我們就來實現上面提到的方案,這兩個方案的難點有兩個,一個是線程T1和T2要做到步調一致,另一個是要能夠通知到線程T3。
你依然可以利用一個計數器來解決這兩個難點。計數器初始化為2,線程T1和T2生產完一條數據,都將計數器減1,如果計數器大于0,則線程T1或T2等待,如果計數器等于零,則通知線程T3,并喚醒等待的線程T1和T2。與此同時,將計數器重新置為2。這樣線程T1和T2生產下一條數據的時候,就可以繼續使用這個計數器了。
同樣,還是建議你不要在實際項目中這么做,因為Java并發包里也已經提供了相關的工具類:CyclicBarrier。下面的代碼中,我們首先創建了一個計數器初始值為2的CyclicBarrier,你需要注意的是創建CyclicBarrier的時候我們還需要傳入一個回調函數,當計數器減到0的時候會調用這個回調函數。
線程T1負責查詢訂單,當查詢出一條的時候調用barrier.await()來將計數器減1,同時等待計數器變成0。線程T2負責查詢派送訂單,當查詢出一條時也調用barrier.await()來將計數器減1,同時等待計數器變成0,當T1和T2都調用barrier.await()的時候,計數器會減到0,當T1和T2,此時T1和T2就可以執行下一條語句了,同時還會調用的回調函數來執行對賬操作。
非常值得一提的是,CyclicBarrier的計數器有自動重置功能,當減到零的時候會自動重置回你設置的初始值,這個功能看用起來實在太方便了。
//訂單隊列
Vector<P> pos;
//派送單隊列
Vector<P> dos;
//執行回調的線程池
Executor executor = Executor.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2,()->{executor.execute(() -> check());});void check(){P p = pos.remove(0);D d = dos.remove(0);//執行對賬操作diff = check(p,d);save(diff);
}void checkAll(){//查詢未對賬訂單Thread T1 = new Thread(() ->{while(存在未對賬訂單){//查詢訂單庫pos.add(getPOrder());//等待barrier.await();}});T1.start();//查詢派送訂單Thread T2 = new Thread(() ->{while(存在未對賬訂單){//查詢訂單庫pos.add(getDOrder());//等待barrier.await();}});T2.start();
}
總結
CountDownLatch和CyclicBarrier是Java并發包提供的兩個非常應用的線程同步工具類,這兩個工具類的用法區別在這里還是有必要再強調一下的。CountDownLatch主要用于解決一個線程等待多個線程的場景可以類比于旅游團長要等待所有的旅客到齊才能去下一個景點,而CyclicBarrier是一組線程之間互相等待,更像是幾個驢友之間的不離不棄。