寫在文章開頭
近期對一些比較老的項目進行代碼走查,碰到一些極端的并發編程惡習,所以筆者就基于此文演示這類問題以及面對并發編程時我們應該需要了解一些常見套路。
Hi,我是sharkChili,是個不斷在硬核技術上作死的java coder,是CSDN的博客專家,也是開源項目Java Guide的維護者之一,熟悉Java也會一點Go,偶爾也會在C源碼邊緣徘徊。寫過很多有意思的技術博客,也還在研究并輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公眾號:寫代碼的SharkChili,實時獲取筆者最新的技術推文同時還能和筆者進行深入交流。
提出一個需求
基于筆者近期走查的案例筆者以一個類似的需求進行演示,這個需求是通過一個定時的任務調度線程從任務表中獲取任務項,通過這個任務項得到要到data
表查詢對應任務的數據集并進行數據推送。
此時如果用戶通過頁面點擊暫停,這些正在發送的數據在數據庫中的狀態就會被更新為暫停,完成后再將這個定時調度的線程暫停。
整體流程如下圖所示,理想情況下,兩個線程的工作過程為:
- 線程1從數據庫找到任務,并通過這個任務找到數據表找到要發送的數據集,存入內存中。
- 線程1更新數據集狀態為待發送,不斷發送數據。
- 系統收到用戶頁面的暫停操作,創建一個線程2,從內存中找到要發送的數據,將這些數據集的狀態更新為已暫停。
- 線程完成數據暫停后將線程1的執行打斷。
問題復現
基于這個需求,筆者給出下面這樣一個錯誤的例子,首先我們定義一下要發送的數據類,可以看到這個類包含id、數據和數據發送狀態:
@Data
@AllArgsConstructor
public class SendData {private int id;private String data;/*** 0 未開始* 1 發送中* 2 已完成* 3 暫停*/private int status;
}
然后我們再給出任務的封裝,如下所示,我們通過任務表可以查到任務的id和名稱,通過id就可以到數據表定位到當前任務的數據集,并將其添加到sendDataLinkedList
中:
@Data
@AllArgsConstructor
public class TaskInfo {private int taskId;private String taskName;//數據集private LinkedList<SendData> sendDataLinkedList;//若sendDataLinkedList不為空則彈出第一個元素public SendData popSendData() {if (CollUtil.isNotEmpty(sendDataLinkedList)) {return sendDataLinkedList.pop();}return null;}//將數據添加到sendDataLinkedList中public void addSendData(SendData sendData) {sendDataLinkedList.add(sendData);}
}
然后我們給出模擬數據,可以看到筆者用taskInfoMap
模擬任務表中的數據,用mysqlSendDataList
模擬數據庫中對應task
要發送的數據集:
private static List<SendData> mysqlSendDataList = new ArrayList<>();private static Map<Integer, TaskInfo> taskInfoMap = new HashMap<>();static {//模擬其他線程查到要執行的任務,并存入內存taskInfoMap.put(1, new TaskInfo(1, "任務1", new LinkedList<>()));//模擬任務1在mysql表中要發送的電話號碼mysqlSendDataList.add(new SendData(1, "數據1", 0));mysqlSendDataList.add(new SendData(2, "數據2", 0));mysqlSendDataList.add(new SendData(3, "數據3", 0));mysqlSendDataList.add(new SendData(4, "數據4", 0));mysqlSendDataList.add(new SendData(5, "數據5", 0));mysqlSendDataList.add(new SendData(6, "數據6", 0));mysqlSendDataList.add(new SendData(7, "數據7", 0));mysqlSendDataList.add(new SendData(8, "數據8", 0));mysqlSendDataList.add(new SendData(9, "數據9", 0));mysqlSendDataList.add(new SendData(10, "數據10", 0));}
對應的線程代碼如下,可以看到線程1會從數據庫中讀取數據并更新為發送中然后進行發送,并在完成后更新數據庫狀態。
而線程2則是模擬收到用戶狀態請求后,從內存中的任務集找到任務1,然后定位到正在發送的數據集將其數據庫狀態更新為暫停,然后將線程1暫停(這里用stop模擬打斷定時任務)
。
public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {//模擬查任務TaskInfo taskInfo = taskInfoMap.get(1);//模擬從數據庫中取出待發送的數據log.info("線程1更新狀態為發送中");List<SendData> dataList = mysqlSendDataList.stream().filter(s -> s.getStatus() == 0).collect(Collectors.toList());//更新狀態為發送中mysqlSendDataList.stream().forEach(d -> d.setStatus(1));//將數據存入鏈表中dataList.forEach(taskInfo::addSendData);while (true) {SendData sendData = taskInfo.popSendData();if (sendData == null) {break;}log.info("發送數據:{} 成功", JSONUtil.toJsonStr(sendData));}//更新狀態為發送完成mysqlSendDataList.stream().forEach(d -> d.setStatus(2));});Thread t2 = new Thread(() -> {//模擬從內存中找到任務,然后從內存中找到正在發送的號碼,并將其數據庫狀態更新為待發送TaskInfo taskInfo = taskInfoMap.get(1);for (SendData sendData : taskInfo.getSendDataLinkedList()) {SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == sendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暫停任務:{}", JSONUtil.toJsonStr(mysqlSendData));}//打斷正在工作的線程try {t1.wait();t1.interrupt();} catch (InterruptedException e) {e.printStackTrace();}log.info("打斷t1線程,暫停發送任務");});t1.setName("t1");t1.start();t2.setName("t2");t2.start();System.out.println("執行結束");}
正常情況下,這種代碼因為多線程操作單一數據集進行動態迭代刪除時是會拋出ConcurrentModificationException
的,但是筆者在走查類似上文這種例子時并為發現這個問題,經過對于流程和場景梳理時得出了答案。
筆者發現這個啟動和暫停任務的場景執行的數據量非常大,因為龐大的數據量,被暫停了任務基本都會在排隊或者剛剛完成數據集狀態更新為發送中就被類似于線程2的代碼完美暫停掉。
但是也不免出現一些比較極端的場景:
- 任務1正好被執行。
- 執行過程中收到暫停信號,線程2讀取內存中任務1的數據集,更新數據庫狀態。
- 任務2正準備打斷任務1,CPU又切回線程1,因為線程2暫停數據時并沒有將內存中的數據集刪除,導致這些在數據庫中已經被暫停的數據集仍然被發送了。
最終很可能導致同樣的一批數據被重復發送兩次。
對應的現象也就像下面這段代碼一樣,
00:17:43.052 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 線程1更新狀態為發送中
00:17:49.093 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":1,"data":"數據1","status":3}
00:17:49.716 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":2,"data":"數據2","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":3,"data":"數據3","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":4,"data":"數據4","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":5,"data":"數據5","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":6,"data":"數據6","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":7,"data":"數據7","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":8,"data":"數據8","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":9,"data":"數據9","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":10,"data":"數據10","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打斷t1線程,暫停發送任務
解決方案
對于此類并發問題的重構并解決的套路考慮的基本要考慮如下兩個點:
- 保持原有的業務邏輯
- 線程互斥保持在一個維度。
- 選用合適的并發容器。
我們都知道重構代碼對于測試的回歸,邏輯的扭轉變化都存在很大的風險點,所以筆者在對這段代碼重構時非常明確的梳理的任務執行的數據流,明確了業務邏輯,這位作者意圖是想在任務暫停時及時更新任務狀態且讓線程1不執行被暫停的任務,所以為了保證暫停的數據集不被線程1發送,首先就需要保證兩個線程操作的集合處于一個維度,而不是像上面的代碼一樣線程1用pop
方法,線程2用get
加遍歷的方式。
所以筆者改動的第一步,就是像容器安全化,將數據集存儲容器改為ConcurrentLinkedDeque
,然后彈出元素的函數改為pollFirst
。
//數據集private ConcurrentLinkedDeque<SendData> sendDataLinkedList;//若sendDataLinkedList不為空則彈出第一個元素public SendData popSendData() {return sendDataLinkedList.pollFirst();}
這里我們也給出pollFirst
的源碼,可以看到它進行元素彈出時會通過CAS
確定彈出的item
是否和操作直線得到的一致,只有compare and set
成功之后才能彈出。
public E pollFirst() {for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//只有cas成功才能彈出元素if (item != null && p.casItem(item, null)) {unlink(p);return item;}}//若為空直接返回nullreturn null;}
其次為了保證兩個線程操作處于一個維度,筆者將getter
容器方法私有化,確保兩者操作都是用同一個pop
方法操作:
private ConcurrentLinkedDeque<SendData> getSendDataLinkedList() {return sendDataLinkedList;}
這樣線程2的暫停邏輯就改為實時pop
出線程1正在發送的數據再暫停,保證了暫停的數據線程1不會發送:
Thread t2 = new Thread(() -> {//模擬從內存中找到任務,然后從內存中找到正在發送的號碼,并將其數據庫狀態更新為待發送TaskInfo taskInfo = taskInfoMap.get(1);SendData sendData = null;while ((sendData = taskInfo.popSendData()) != null) {SendData finalSendData = sendData;SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == finalSendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暫停任務:{}", JSONUtil.toJsonStr(mysqlSendData));}//打斷正在工作的線程try {log.info("打斷t1線程,暫停發送任務");t1.stop();} catch (Exception e) {e.printStackTrace();}});
此時再看輸出結果,可以看到線程1發送了一個數據之后,線程2暫停了其余的數據,調度回到線程1,線程1停止了發送,問題解決:
00:50:18.336 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 線程1更新狀態為發送中
00:50:23.090 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 發送數據:{"id":1,"data":"數據1","status":1} 成功
00:50:26.242 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":2,"data":"數據2","status":3}
00:50:28.200 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":3,"data":"數據3","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":4,"data":"數據4","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":5,"data":"數據5","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":6,"data":"數據6","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":7,"data":"數據7","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":8,"data":"數據8","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":9,"data":"數據9","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暫停任務:{"id":10,"data":"數據10","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打斷t1線程,暫停發送任務
小結
總的來說這是一段比較基礎的并發編程問題,本篇文章更著重的是讓讀者了解并發編程時如何復現以及考慮問題的維度,不難看出筆者進行并發編程問題的解決思路就是三步:
- 理清數據流和并發代碼邏輯。
- 確定合適的容器。
- 確保多線程操作互斥在同一個維度。
我是sharkchili,CSDN Java 領域博客專家,開源項目—JavaGuide contributor,我想寫一些有意思的東西,希望對你有幫助,如果你想實時收到我寫的硬核的文章也歡迎你關注我的公眾號:
寫代碼的SharkChili,同時我的公眾號也有我精心整理的并發編程、JVM、MySQL數據庫個人專欄導航。