業務場景
業務開發過程中,我們經常會需要判斷遠程終端是否在線,當終端離線的時候我們需要發送消息告知相應的系統,
環形隊列
?
1.創建一個index從0到30的環形隊列(本質是個數組)
2.環上每一個slot是一個Set,任務集合
3.同時還有一個Map<uid, index>,記錄uid落在環上的哪個slot里
4.啟動一個timer,每隔1s,在上述環形隊列中移動一格,0->1->2->3…->29->30->0…
5.有一個Current Index指針來標識剛檢測過的slot
6.接收到設備心跳后將尋找到原來uid的位置然后移動到當前指針的后一位,并刪除原來slot里的uid
7.這樣就可以快速獲取超時的設備uid
環形隊列實現
package com.zngx.admin.circle;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Author : zhiying* @Date : 2022-11-22 15:17* @Desc : 環形隊列 - 設備離線判定* 1、預設一個長度為10000的數組(按實際業務定義長度)* 2、每個數組存放一個Set集合* 3、維護一個游標cur,從0到9999遞增,到達9999時,重置為0(啟動一個線程執行)* 4、維護一個map,記錄所有設備ID存放的數組位置,方便查找* 5、監聽到設備心跳時,先將原來的數據從指定位置的集合中刪除,通過計算當前游標位置和keepAlive尋找合適的位置將設備ID放入* 6、當游標指向某個位置a時,a位置的集合中的所有設備全部判定為離線,并清空該位置的集合**/public class CircleQueue<T> {//線程安全鎖Lock lock = new ReentrantLock();//初始環形隊列大小private int capacity = 10000;//當前環形隊列所在節點private volatile int currentIndex = 0;//數據所在節點private Map<T,Integer> dataIndex = new HashMap<>();//環形隊列private Set<T>[] array;public CircleQueue(){array = new HashSet[capacity];}public CircleQueue(int capacity){this.capacity = capacity;array = new HashSet[capacity];}/*** 向環形隊列中添加元素* @param t* @param offset 偏移量,基于游標*/public void add(T t, int offset){int index = currentIndex + offset;if(index >= capacity){index = index - capacity;}try {lock.lock();//判斷數據是否存在if(dataIndex.containsKey(t)){Set<T> old = array[dataIndex.get(t)];old.remove(t);}//獲取當前節點的隊列Set<T> set = array[index];if(null == set){set = new HashSet<>();array[index] = set;}set.add(t);//更新新的節點位置dataIndex.put(t,index);}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}/*** 下移一格,到9999重新置為0*/public void next(){int cur = currentIndex + 1;if(cur >= capacity){cur = cur - capacity;}currentIndex = cur;System.out.println("當前游標位置:" + currentIndex);}/*** 獲取當前游標指向的元素集合* @return*/public Set<T> getAndDeleteData(){Set<T> set = null;try {lock.lock();set = array[currentIndex];return set;}finally {// 將集合中所有的元素移除array[currentIndex] = new HashSet<>();if(set != null && set.size()>0){set.forEach(t -> {dataIndex.remove(t);});}lock.unlock();}}public int getIndex(T t){if(dataIndex.containsKey(t)){return dataIndex.get(t);}return -1;}
}
測試代碼?
@Testpublic void circleTest(){CircleQueue<String> circleQueue = new CircleQueue<>();for (int i=0;i<1000;i++){String uuid = String.valueOf(i+1);int offset = (int) Math.round(Math.random()*10);circleQueue.add(uuid, offset);}checkTimeout(circleQueue);insertDataRandom(circleQueue);try {Thread.sleep(600000);}catch (Exception e){e.printStackTrace();}}private void checkTimeout(CircleQueue<String> circleQueue){service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {Set<String> set = circleQueue.getAndDeleteData();if(set == null || set.isEmpty()) {System.out.println("本次沒有設備離線");}else{System.out.println("這些設備離線啦:" + Joiner.on(",").join(set));}circleQueue.next();}},2,1, TimeUnit.SECONDS);}private void insertDataRandom(CircleQueue<String> circleQueue){service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {String deviceId = String.valueOf(Math.round(Math.random()*100000));int offset = (int) Math.round(Math.random()*10);circleQueue.add(deviceId, offset);System.out.println("插入設備["+deviceId+"], " + offset + "秒后離線");}},3,3, TimeUnit.SECONDS);}
?測試結果
148aed53-a083-40a6-82d3-ede14e5e39c9初始時間1585571543739
當前位置:0數據大小:0
當前位置:1數據大小:0
當前位置:2數據大小:0
當前位置:3數據大小:0
當前位置:4數據大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移動位置29
當前位置:5數據大小:0
當前位置:6數據大小:0
當前位置:7數據大小:0
當前位置:8數據大小:0
當前位置:9數據大小:0
當前位置:10數據大小:0
當前位置:11數據大小:0
當前位置:12數據大小:0
當前位置:13數據大小:0
當前位置:14數據大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移動位置29
當前位置:15數據大小:0
當前位置:16數據大小:0
當前位置:17數據大小:0
當前位置:18數據大小:0
當前位置:19數據大小:0
當前位置:20數據大小:0
當前位置:21數據大小:0
當前位置:22數據大小:0
當前位置:23數據大小:0
當前位置:24數據大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移動位置29
當前位置:25數據大小:0
當前位置:26數據大小:0
當前位置:27數據大小:0
當前位置:28數據大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9過期
超時時間30005