七. 消息存儲設計
上一篇博客已經將消息統計文件的讀寫代碼實現了,下一步我們將實現創建隊列文件和目錄。
實現創建隊列文件和目錄
初始化 0\t0 這樣的初始值.
//創建隊列對應的文件和目錄:public void createQueueFile(String queueName) throws IOException, MqException {//先創建對應的目錄:File file = new File(getQueueDir(queueName));if(!file.exists()){boolean ok = file.mkdirs();if(!ok) throw new IOException("創建隊列目錄失敗。baseDir:"+file.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的目錄已經被創建過了,創建失敗");}//下面開始創建 數據文件:File dataFile = new File(getQueueDataDir(queueName));if(!dataFile.exists()){boolean ok = dataFile.createNewFile();if(!ok) throw new IOException("創建數據文件失敗。queuedataDir:"+dataFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的數據文件已經被創建過了,創建失敗");}//創建 統計文件:File statFile = new File(getQueueStatDir(queueName));if(!statFile.exists()){boolean ok = statFile.createNewFile();if(!ok) throw new IOException("創建統計文件失敗。queuestatDir:"+statFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的統計文件已經被創建過了,創建失敗");}//給消息統計文件設定初始值 0\t0, (消息數量:0,有效消息數量:0)// 目的:不用在今后使用的時候對空文件做一些特殊的判定Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;//再寫入:writeStat(queueName,stat);}
實現刪除文件或目錄
注意:File 類的 delete ?法只能刪除空?錄. 因此需要先把內部的?件先刪除掉,如果還存在多余文件,就會刪除失敗。
//刪除隊列的文件或目錄://隊列也是可以刪除的,當隊列刪除后,對應的消息文件啥的,也要隨之刪除。public void deleteQueueFile(String queueName) throws IOException{//先刪除 數據文件:File queueDataFile = new File(getQueueDataDir(queueName));boolean ok1 = queueDataFile.delete();//再刪除 統計文件:File queueStatFile = new File(getQueueStatDir(queueName));boolean ok2 = queueStatFile.delete();//再刪除目錄;File file = new File(getQueueDir(queueName));boolean ok3 = file.delete();if(!(ok1 && ok2 && ok3)){//任意一個刪除失敗,就失敗,拋出異常:throw new IOException("刪除隊列文件或目錄失敗");}}
檢查隊列?件是否存在
判定該隊列的消息?件和統計?件是否存在. ?旦出現缺失, 則不能進?后續?作.
//檢查隊列的 文件或目錄 是否存在: 目的:判斷是否隊列之前被 別人用過。//用處1:如果后續有生產者給 broker server 生產消息了,這個消息可能需要記錄到文件上,此時需要判斷文件是否存在(持久化的應用)。public boolean checkFilesExists(String queueName){//如果隊列的 數據文件和統計文件都存在,才存在:File queueDataFiles = new File(getQueueDataDir(queueName));if(!queueDataFiles.exists()) return false;File queueStatFiles = new File(getQueueStatDir(queueName));if(!queueStatFiles.exists()) return false;//都存在,則返回true;return true;}
實現消息對象序列化/反序列化
先創建工具類BinaryTool用與序列化/反序列化。
- 使? ByteArrayInputStream / ByteArrayOutputStream 針對 byte[] 進?封裝, ?便后續操作. (這兩個流對象是純內存的, 不需要進? close).
- 使? ObjectInputStream / ObjectOutputStream 進?序列化 / 反序列化操作. 通過內部的readObject / writeObject 即可完成對應操作.
/*** 這個類用來序列化 與反序列化* 此處我們采用的是java標注庫里的 ObjectOutputStream 和ObjectInputStream 兩個流對象,但是序列化的對象必須要實現Serializable接口、** 由于將序列化,反序列化當做一個工具,很多數據都可能用到,所以我們將它的方法搞成靜態的**/
public class BinaryTool {//序列化:public static byte[] toBytes(Object object) throws IOException {//由于在try里面寫流對象能自動關閉省去我們不少事,所以,直接寫在try()里//這里 使用ByteArrayOutputStream是因為 未知的byte數組的長度,這個類能自動記錄。try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){//將byteArrayOutputStream 傳入ObjectOutputStream 就相當于將他們相互關聯了,當objectOutputStream調用writeObject方法//時會將這個對象寫入關聯的byteArrayOutputStream里,然后直接調用byteArrayOutputStream里的方法,將序列化的數據轉換成直接數組就行了//其實這個 ObjectOutputStream 不僅可以關聯數組,還可以是文件,網絡。關聯了文件就將對象序列化到文件里,關聯了網絡,就是網絡數據的傳輸socket。try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){objectOutputStream.writeObject(object);}//這個操作就是把byteArrayOutputStream中持有的二進制數據取出來,轉成byte[]return byteArrayOutputStream.toByteArray();}}//反序列化:public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){return objectInputStream.readObject();}}}}
實現寫入消息文件
- 考慮線程安全, 按照隊列維度進?加鎖.
- 使? DataOutputStream 進??進制寫操作. ?原? OutputStream 要?便.
- 需要記錄 Message 對象在?件中的偏移量. 后續的刪除操作依賴這個偏移量定位到消息,這也是message里的偏移量初始化的時候,就是發送消息的時候。offsetBeg是原有?件??的基礎上, 再 + 4. 4 個字節是存放消息大小的空間.
- 寫完消息, 要同時更新統計信息.
//該方法將傳來的一個新的消息放到對應的文件當中:新增消息public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {//先檢查文件是否存在:如果不存在怎拋出異常,這個異常可以自定義。if(!checkFilesExists(queue.getName())){throw new MqException("[MessageFileManager對應的文件不存在]!queueName"+queue.getName());}//先進行序列化:byte[] binaryMessage = BinaryTool.toBytes(message);//為了解決線程安全問題,我們引入鎖,如果此時的鎖對象 是同一個隊列,那就阻塞等待。synchronized (queue){//先將數據文件new出來,看看此時文件里已經寫入的數據長度,方便我們后續計算offsetbegin和offsetendFile file = new File(getQueueDataDir(queue.getName()));//在寫入消息的時候才對message里的offsetbegin和offsetend 進行賦值:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+binaryMessage.length);//由于我們的寫入是追加寫入,所以不要忘記 truetry (OutputStream outputStream = new FileOutputStream(file,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先寫入4個字節的消息長度:dataOutputStream.writeInt(binaryMessage.length);//再寫入offsetbegin 和 offsetenddataOutputStream.write(binaryMessage);}}//此時已經將消息數據文件寫完了,不要忘記消息統計文件:Stat stat = readStat(queue.getName());stat.validCount++;stat.totalCount++;writeStat(queue.getName(),stat);}}
創建異常類MqException
作為?定義異常類. 后續業務上出現問題, 都統?拋出這個異常
public class MqException extends Exception{public MqException(String reason){super(reason);}
}
實現刪除消息
此處的刪除只是 “邏輯刪除”, 即把 Message 類中的 isValid 字段設置為 0.
- 使? RandomAccessFile 來隨機訪問到?件的內容.(隨機訪問其實沒什么玄乎的,就像數組一樣,能通過下標快速訪問某個元素,這就是隨機訪問的原理。內存是支持隨機訪問的)。
- 根據 Message 中的 offsetBeg 和 offsetEnd 定位到消息在?件中的位置. 通過randomAccessFile.seek 操作?件指針偏移過去. 再讀取.
- 讀出的結果解析成 Message 對象, 修改 isValid 字段, 再重新寫回?件. 注意寫的時候要重新設定文件指針的位置. ?件指針會隨著上述的讀操作產?改變,所以要重新seek,將光標移動到開始。
- 最后, 要記得更新統計?件, 把合法消息 - 1.
//刪除消息:主要的操作歩奏:// 1,將消息讀出來//2,將消息里的isVail 改成0x0//3,將消息放回文件中public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {//由于刪除消息的時候也可能收到線程安全問題,所以我們要加鎖:synchronized (queue){//先將消息讀出來://由于我們正常使用的FileInputStream,只能從頭開始讀。而此時的場景更傾向于 隨機讀取,所以我們使用到了RandomAccessFile進行隨機讀取//注意這個RandomAccessFile類的第二個參數:rw可讀可寫。try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){//先準備一個byte數組用來放 讀出來的二進制數據:byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];//先將光標刷新到 offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//將message二進制數據讀出來randomAccessFile.read(bufferSrc);//轉換成message對象:Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);//將message里的isVail 改成0x0message1.setIsVail((byte) 0x0);//將新的message1 轉成二進制:byte[] bufferDest = BinaryTool.toBytes(message1);//由于上一次讀文件光標已經發生了變化,所以此時還要調整光標到offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//將數據寫入文件:randomAccessFile.write(bufferDest);//此時已經將數據文件里的vail改成無效,那我們需不需要將這個內存中的 message對象里的vail也改成無效呢?//可以是可以,但是沒有必要:想象一下我們將一個文件標記成無效的場景是不是我們此時要刪掉這個文件的時候,//此時我們都要刪掉這個文件了,當然要連同文件里的數據和內存中的數據都刪了呀,文件里的數據可能需要一些歩奏,//但是在內存中刪一個對象實在太容易了,今后會有內存中的刪除消息操作。這就相當于讓一個將死之人多活幾秒,但他終究逃不過死亡//這就是message里的vail 其實不需要改動的原因。}//不要忘了統計文件也要更新, 由于我們此時已經將數據文件里的一個消息改成無效的,所以此時統計文件里的有效消息就要--了Stat stat = readStat(queue.getName());if(stat.validCount >0){stat.validCount --;}//再將更新后的統計信息 寫入文件writeStat(queue.getName(),stat);}}
實現消息加載
這個功能在服務器重啟, 和垃圾回收的時候都很關鍵
- 使? DataInputStream 讀取數據. 先讀 4 個字節為消息的?度, 然后再按照這個?度來讀取實際消息內容.
- 讀取完畢之后, 轉換成 Message 對象.
- 同時計算出該對象的 offsetBeg 和 offsetEnd.
- 最終把結果整理成鏈表, 返回出去.
- 注意, 對于 DataInputStream 來說, 如果讀取到 EOF, 會拋出?個 EOFException , ?不是返回特定值. 因此需要注意上述循環的結束條件.
//從消息數據文件當中讀出所有消息://由于是服務器剛啟動的時候才會調用這個方法,此時的隊列還不能處理各種請求,所以不需要考慮線程安全問題。public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {//先new出來一個linkedList來放所有消息:使用鏈表是因為要進行頭刪和尾刪等操作:LinkedList<Message> messages = new LinkedList<>();//創建流對象:try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){//與上面的DataOutputStream對應,此時用的是DataInputStreamtry(DataInputStream dataInputStream = new DataInputStream(inputStream)){//由于要讀的消息可能不止一條,所以用一個while循環://但是如果我們直接這樣寫會一直重復的讀一個消息,而DataInputStream不能控制光標的移動,所以要定義一個量來//記錄我們讀到哪里了,另外,這個量也為后續message對象的offsetbegin和offsetend的初始化提供便利long currentOffset = 0;//寫完大概得邏輯以后不知道不會不會有疑問,這個while的條件可是true啊,這不死循環了嘛,//其實這也是無奈之舉,主要原因是dataInputStream.readInt()讀到文件的末尾并不會返回-1,EOF啥的,而是//直接拋出 EOFException異常,直接結束循環,因此我們只用在外層catch住這個異常就行了,這是一個很特別的預料之內的循環結束方式while(true){//先讀4個字節,求出數據的長度:int messageLen = dataInputStream.readInt();//創建一個剛好能裝messageLen長度的字節數組:byte[] messageBinary = new byte[messageLen];//讀出消息數據:并且用變量接收,判斷讀出的數據是否和預期的數據長度一致,若不一致,說明格式不正確,錯亂了則拋出異常int realMessageLen = dataInputStream.read(messageBinary);if(realMessageLen != messageLen){throw new MqException("[MessageFileManager] 文件格式錯誤!!!queueName:"+queueName);}//將數組反序列化成message對象Message message = (Message) BinaryTool.fromBytes(messageBinary);//如果讀到的消息是無效的,則跳過這個無效消息,更新currentoffset:if(message.getIsVail() == 0x0){currentOffset+=(4+messageLen);continue;}//再將message里的offsetbegin和offsetend給初始化:message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+messageLen);//正常讀完后,別忘了,將currentoffset更新currentOffset+=(4+messageLen);//再將消息加入到鏈表當中messages.add(message);}}catch (EOFException e){System.out.println("[MessageFileManager] 恢復Message 數據完成!!!");}}return messages;}
實現垃圾回收(GC)
- 上述刪除操作, 只是把消息在?件上標記成了?效. 并沒有騰出硬盤空間. 最終?件??可能會越積越多. 因此需要定期的進?批量清除.
- 此處使?類似于復制算法. 當總消息數超過 2000, 并且有效消息數?少于 50% 的時候, 就觸發 GC。GC 的時候會把所有有效消息加載出來, 寫?到?個新的消息?件中, 使?新?件, 代替舊?件即可.
public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {//根據以前的寫代碼經驗,次GC過程可能有線程安全問題,所以我們直接加鎖://其實這也是為什么形參傳入的是一個隊列,而不是隊列的名字的其中一個原因。synchronized (queue){//由于GC的執行時間可能很慢,我們手動的將時間計算出來,如果將來服務器運行半天無響應了,如果是GC的問題//我們也能知道long gcBegin = System.currentTimeMillis();//先創建新的文件:File newQueueFile = new File(getQueueDataNewPath(queue.getName()));//如果文件已經存在了,可能上一次gc有殘留,這是不正常的,所以拋出異常if(newQueueFile.exists()){throw new MqException("[MessageFilemanager] 隊列的queue_new_data.txt已經存在!!queueName:"+queue.getName());}//如果執行到這,說明文件不存在,則創建新文件:boolean ok = newQueueFile.createNewFile();//如果創建文件失敗,則拋出異常:if(!ok){throw new MqException("[MessageFileManager] 創建文件失敗!!newQueueDataFile:"+newQueueFile.getAbsolutePath());}//先創建一個鏈表用來存儲從原來的文件中取出來的message對象:此處可以用到之前的方法://取出原來文件里的所有有效文件:LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());//new出相應的流對象用來寫入新文件://這里我寫錯了,將queue.getName()傳入了,但是明明是一個不存在的路徑,他竟然還能正常寫?底層也不拋出異常,//我真是又驚訝,又驚嚇。//之后我又去查了查資料:原來是FileOutputStream的問題啊,FileOutputStream太nb了,//如果傳入的字符串對應的路徑不存在,FileOutputStream會自動給你創建一個文件用于寫入,這這這也太貼心了吧,//不過我還是希望他能直接拋異常,畢竟我找bug也找了這么久了,況且天知道他會把我的數據寫到哪里://其實也知道:如果是絕對路徑,他會自動創建路徑下的文件;如果是相對路徑,他會在當前工作空間創建一個文件。//找了一圈以后發下,在我的mq路徑下,就存在一個queuetest1的文件,里面正好是之前我寫入的數據,嗚嗚嗚,要哭了。
// try(OutputStream outputStream = new FileOutputStream(queue.getName())){try(OutputStream outputStream = new FileOutputStream(newQueueFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//循環讀取messages,將對象重新寫入新文件:for(Message m : messages){//先將消息序列化:一個字節數組:byte[] buffer = BinaryTool.toBytes(m);//將二進制數組寫入新的文件:注意遵循之前的約定:dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//刪除舊文件,這里以前傳入的舊文件的路徑寫錯了,直接傳成了名字,所以寫代碼一定要細心啊。
// File oldQueueFile = new File(queue.getName());File oldQueueFile = new File(getQueueDataDir(queue.getName()));boolean ok2 = oldQueueFile.delete();System.out.println("[ok2]oldQueueFile 文件刪除:"+ok2);//如果刪除失敗,可能是沒有權限之類的,拋出異常:if(!ok2){throw new MqException("MessageFileManager 刪除舊文件失敗!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());}//重命名新文件:boolean ok3 = newQueueFile.renameTo(oldQueueFile);//如果重命名失敗,拋出異常:if(!ok3) {throw new MqException("[MessageFileManager] 新文件重命名失敗!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()+" , newDataQueueFile="+newQueueFile.getAbsolutePath());}//不要忘記更新統計文件:Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] GC執行完畢!!! 執行的時間:"+(gcEnd - gcBegin)+"ms");}}
測試MessageFileManager
創建MessageFileManagerTest類用于測試:
測試前的準備:
- 創建兩個隊列, ?來輔助測試.
- 使? ReflectionTestUtils.invokeMethod 來調?私有?法(這就是傳說中的反射,注意它的參數,用法)。
-
ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
這個反射的參數:
第一個參數:類的實例。
第二個參數:你想調用的方法
后面的參數就是不定參數了(數量不確定),能確定的是:后面的參數的就是你想調用的方法的參數。
@SpringBootTest
public class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "queuetest1";private static final String queueName2 = "queuetest2";@BeforeEachpublic void setUp() throws IOException, MqException {//由于我們要測試的是隊列,所以準備工作就是先創建隊列文件:messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);}//這個@AfterEach注解我試過了,即使測試方法執行過程中拋出了異常,這個方法還是在每次執行完測試單元以后該執行他還是執行他,//無關乎異常,真nb@AfterEachpublic void tearDown() throws IOException {//首尾工作,將剛才創建的隊列文件刪掉:messageFileManager.deleteQueueFile(queueName1);messageFileManager.deleteQueueFile(queueName2);}}
測試代碼:
@Testpublic void testCreateFile(){//其實就測試創建的隊列文件是否存在://由于我們在MessageFileManager里的get路徑方法是 private修飾的,所以不能直接調用get路徑方法,只能手動寫上//檢驗 隊列數據文件是否存在:File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");//此處用的方法是isFile 而不是exists,因為要判定這是個文件,并不是只是存在就行,存在了也可能是個目錄。Assertions.assertEquals(true,queueDataFile1.isFile());//檢驗 隊列統計文件是否存在:File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile1.isFile());//檢驗 隊列數據文件是否存在:File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");Assertions.assertEquals(true,queueDataFile2.isFile());//檢驗 隊列統計文件是否存在:File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile2.isFile());System.out.println("[CreateFileText] 測試創建隊列文件成功!!!");}@Testpublic void testReadAndWriteStat(){//先創建出stat類,由于他是內部類,所以要類名. 調用出來:MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 200;stat.validCount = 100;//此時寫入stat 到統計文件當中:但是如果直接用messageFileManager. 由于writeStat是private修飾,所以肯定調用不出來,//此時就要用spring帶的 反射方法了:
// messageFileManager.//用反射將 stat寫入統計文件:ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);//用反射將 寫入的統計文件讀出來:MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);//判斷讀出來的stat和我們設定的stat是否一樣Assertions.assertEquals(200,statNew.totalCount);Assertions.assertEquals(100,statNew.validCount);System.out.println("[testReadAndWriteStat] 測試成功!!!");}//要想測試發送消息,首先要有隊列和消息吧,所以,我們先寫創建隊列和消息的方法:private MESGQueue createQueue(){MESGQueue queue = new MESGQueue();//這里的隊列名字不能隨便取,因為隨便取的隊列名字也沒有對應的文件啊,要用就要用已經創建了文件的隊列名,//考慮到這個隊列要與文件交互,而我們只創建了queuename1和queuename2兩個名字對應的文件,所以只能用這兩個名字的一個。queue.setName(queueName1);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("aaa", "111");hashMap.put("bbb", "222");queue.setArguments(hashMap);return queue;}private Message createMessage(String context){//此時能用到我們之前在message里寫的創建 message的工廠類了:Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {//先創建隊列與消息:MESGQueue queue = createQueue();Message message = createMessage("abcdefghijklmnopqrstuvwxyz");//發送消息:messageFileManager.sendMessage(queue,message);//驗證stat文件:MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);//驗證data文件:使用loadAllMessagesFromQueueDataFile讀取文件內容:LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//驗證:Assertions.assertEquals(1,messages.size());Message message1 = messages.get(0);//判斷這個message1和我們之前的消息message是否一樣:Assertions.assertEquals(message.getMessageId(),message1.getMessageId());Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());Assertions.assertArrayEquals(message.getBody(),message1.getBody());System.out.println("[testSendMessage] 測試成功!!!");}//雖然上一個testSendMessage 已經間接測試過這個方法,但是為了求穩,再測試一遍@Testpublic void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {//我們需要準備200條數據用來加載://先創建一個隊列用來存放消息:注意這個方法使用的是queueName1創建的隊列MESGQueue queue = createQueue();//先創建一個鏈表用來保存消息,和后面新加載的消息作對比:LinkedList<Message> expectedMessages = new LinkedList<>();//使用for循環創建消息:for(int i =0;i<200;i++){Message message = createMessage("testMessage"+i);//將消息寫入文件:messageFileManager.sendMessage(queue,message);//記錄消息:expectedMessages.add(message);}//調用loadAllMessagesFromQueueDataFile取出所有消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先驗證隊列的數目是否一致:Assertions.assertEquals(200,realMessages.size());//驗證基本屬性:for(int i= 0;i<realMessages.size();i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testLoadAllMessagesFromQueueDataFile] 測試成功!!!");}//測試刪除消息:@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {//先創建一個隊列:MESGQueue queue = createQueue();//創建一個鏈表用來保存預期消息:LinkedList<Message> expectedMessages = new LinkedList<>();//再將20條消息都寫入隊列:for(int i =0;i<20;i++){Message message = createMessage("testMessage"+i);//將消息寫入隊列文件:messageFileManager.sendMessage(queue,message);//記錄消息:expectedMessages.add(message);}//這里 就以刪除前三條消息為例:messageFileManager.deleteMessage(queue,expectedMessages.get(0));messageFileManager.deleteMessage(queue,expectedMessages.get(1));messageFileManager.deleteMessage(queue,expectedMessages.get(2));//讀出消息,對比:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先判斷個數:Assertions.assertEquals(17,realMessages.size());for(int i =3;i<20;i++){Message realMessage = realMessages.get(i-3);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testDeleteMessage] 測試成功!!!");}//測試GC,這里的GC其實只是測試,不用管消息總數是否大于2000或有效消息占比不到50%,因為那是業務上的判定,會有專門的類來進一步封裝//而此處我們只進行測試GC這個方法//計劃將100條消息都存入隊列,然后將奇數下標的消息都刪除,然后執行GC,驗證現在的文件是否比原來的文件小:@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {//創建一個隊列:MESGQueue queue = createQueue();//創建一個鏈表用來記錄消息:LinkedList<Message> expectedMessages = new LinkedList<>();//先發送100條消息:for(int i =0;i<100;i++){Message message = createMessage("testMessage"+i);//發送到隊列:messageFileManager.sendMessage(queue,message);//記錄expectedMessages.add(message);}//刪除奇數下標的消息:for(int i =1;i<100;i+=2){messageFileManager.deleteMessage(queue,expectedMessages.get(i));}//先記錄執行GG之前的文件大小:File oldFile = new File("./data/"+queueName1+"/queue_data.txt");long oldFileLength = oldFile.length();//執行GCmessageFileManager.gc(queue);//記錄執行完GC之后的文件大小:File newFile = new File("./data/"+queueName1+"/queue_data.txt");long newFileLength = newFile.length();//取出真實的消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先驗證消息數量是否對的上:Assertions.assertEquals(50,realMessages.size());//挨個驗證消息:for(int i = 0;i<50;i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i*2);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}//驗證文件大小://這個驗證的原理其實是://刪除一個文件并不是直接刪除,而是邏輯刪除,通過標記統計文件里的vail來標識的,此時的數據文件即使有很多無效的文件,但是他的大小依舊是total//而非vail有效文件的大小。但是如果進行了文件的GC遷移,此時的新文件的大小就是舊文件的vail有效文件的大小了。所以,新文件會小于舊文件的大小。System.out.println("[oldFileLength]:"+oldFileLength);System.out.println("[newFileLength]:"+newFileLength);Assertions.assertTrue(newFileLength<oldFileLength);System.out.println("[testGC] 測試成功!!!");}