仿 RabbitMQ 的消息隊列3(實戰項目)

七. 消息存儲設計

上一篇博客已經將消息統計文件的讀寫代碼實現了,下一步我們將實現創建隊列文件和目錄。

實現創建隊列文件和目錄

初始化 0\t0 這樣的初始值.

//創建隊列對應的文件和目錄:public void createQueueFile(String queueName) throws IOException, MqException {//先創建對應的目錄:File file = new File(getQueueDir(queueName));if(!file.exists()){boolean ok = file.mkdirs();if(!ok) throw new IOException("創建隊列目錄失敗。baseDir:"+file.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的目錄已經被創建過了,創建失敗");}//下面開始創建 數據文件:File dataFile = new File(getQueueDataDir(queueName));if(!dataFile.exists()){boolean ok = dataFile.createNewFile();if(!ok) throw new IOException("創建數據文件失敗。queuedataDir:"+dataFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的數據文件已經被創建過了,創建失敗");}//創建 統計文件:File statFile = new File(getQueueStatDir(queueName));if(!statFile.exists()){boolean ok = statFile.createNewFile();if(!ok) throw new IOException("創建統計文件失敗。queuestatDir:"+statFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 隊列對應的統計文件已經被創建過了,創建失敗");}//給消息統計文件設定初始值 0\t0, (消息數量:0,有效消息數量:0)// 目的:不用在今后使用的時候對空文件做一些特殊的判定Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;//再寫入:writeStat(queueName,stat);}

實現刪除文件或目錄

注意:File 類的 delete ?法只能刪除空?錄. 因此需要先把內部的?件先刪除掉,如果還存在多余文件,就會刪除失敗。

//刪除隊列的文件或目錄://隊列也是可以刪除的,當隊列刪除后,對應的消息文件啥的,也要隨之刪除。public void deleteQueueFile(String queueName) throws IOException{//先刪除 數據文件:File queueDataFile = new File(getQueueDataDir(queueName));boolean ok1 = queueDataFile.delete();//再刪除 統計文件:File queueStatFile = new File(getQueueStatDir(queueName));boolean ok2 = queueStatFile.delete();//再刪除目錄;File file = new File(getQueueDir(queueName));boolean ok3 = file.delete();if(!(ok1 && ok2 && ok3)){//任意一個刪除失敗,就失敗,拋出異常:throw new IOException("刪除隊列文件或目錄失敗");}}

檢查隊列?件是否存在

判定該隊列的消息?件和統計?件是否存在. ?旦出現缺失, 則不能進?后續?作.

//檢查隊列的 文件或目錄 是否存在: 目的:判斷是否隊列之前被 別人用過。//用處1:如果后續有生產者給 broker server 生產消息了,這個消息可能需要記錄到文件上,此時需要判斷文件是否存在(持久化的應用)。public boolean checkFilesExists(String queueName){//如果隊列的 數據文件和統計文件都存在,才存在:File queueDataFiles = new File(getQueueDataDir(queueName));if(!queueDataFiles.exists()) return false;File queueStatFiles = new File(getQueueStatDir(queueName));if(!queueStatFiles.exists()) return false;//都存在,則返回true;return true;}

實現消息對象序列化/反序列化

先創建工具類BinaryTool用與序列化/反序列化。
在這里插入圖片描述

  • 使? ByteArrayInputStream / ByteArrayOutputStream 針對 byte[] 進?封裝, ?便后續操作. (這兩個流對象是純內存的, 不需要進? close).
  • 使? ObjectInputStream / ObjectOutputStream 進?序列化 / 反序列化操作. 通過內部的readObject / writeObject 即可完成對應操作.
/*** 這個類用來序列化 與反序列化* 此處我們采用的是java標注庫里的 ObjectOutputStream 和ObjectInputStream 兩個流對象,但是序列化的對象必須要實現Serializable接口、** 由于將序列化,反序列化當做一個工具,很多數據都可能用到,所以我們將它的方法搞成靜態的**/
public class BinaryTool {//序列化:public static byte[] toBytes(Object object) throws IOException {//由于在try里面寫流對象能自動關閉省去我們不少事,所以,直接寫在try()里//這里 使用ByteArrayOutputStream是因為 未知的byte數組的長度,這個類能自動記錄。try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){//將byteArrayOutputStream 傳入ObjectOutputStream 就相當于將他們相互關聯了,當objectOutputStream調用writeObject方法//時會將這個對象寫入關聯的byteArrayOutputStream里,然后直接調用byteArrayOutputStream里的方法,將序列化的數據轉換成直接數組就行了//其實這個 ObjectOutputStream 不僅可以關聯數組,還可以是文件,網絡。關聯了文件就將對象序列化到文件里,關聯了網絡,就是網絡數據的傳輸socket。try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){objectOutputStream.writeObject(object);}//這個操作就是把byteArrayOutputStream中持有的二進制數據取出來,轉成byte[]return byteArrayOutputStream.toByteArray();}}//反序列化:public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){return objectInputStream.readObject();}}}}

實現寫入消息文件

  • 考慮線程安全, 按照隊列維度進?加鎖.
  • 使? DataOutputStream 進??進制寫操作. ?原? OutputStream 要?便.
  • 需要記錄 Message 對象在?件中的偏移量. 后續的刪除操作依賴這個偏移量定位到消息,這也是message里的偏移量初始化的時候,就是發送消息的時候。offsetBeg是原有?件??的基礎上, 再 + 4. 4 個字節是存放消息大小的空間.
  • 寫完消息, 要同時更新統計信息.
 //該方法將傳來的一個新的消息放到對應的文件當中:新增消息public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {//先檢查文件是否存在:如果不存在怎拋出異常,這個異常可以自定義。if(!checkFilesExists(queue.getName())){throw new MqException("[MessageFileManager對應的文件不存在]!queueName"+queue.getName());}//先進行序列化:byte[] binaryMessage = BinaryTool.toBytes(message);//為了解決線程安全問題,我們引入鎖,如果此時的鎖對象 是同一個隊列,那就阻塞等待。synchronized (queue){//先將數據文件new出來,看看此時文件里已經寫入的數據長度,方便我們后續計算offsetbegin和offsetendFile file = new File(getQueueDataDir(queue.getName()));//在寫入消息的時候才對message里的offsetbegin和offsetend 進行賦值:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+binaryMessage.length);//由于我們的寫入是追加寫入,所以不要忘記 truetry (OutputStream outputStream = new FileOutputStream(file,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先寫入4個字節的消息長度:dataOutputStream.writeInt(binaryMessage.length);//再寫入offsetbegin 和 offsetenddataOutputStream.write(binaryMessage);}}//此時已經將消息數據文件寫完了,不要忘記消息統計文件:Stat stat = readStat(queue.getName());stat.validCount++;stat.totalCount++;writeStat(queue.getName(),stat);}}

創建異常類MqException

作為?定義異常類. 后續業務上出現問題, 都統?拋出這個異常
在這里插入圖片描述

public class MqException extends Exception{public MqException(String reason){super(reason);}
}

實現刪除消息

此處的刪除只是 “邏輯刪除”, 即把 Message 類中的 isValid 字段設置為 0.

  • 使? RandomAccessFile 來隨機訪問到?件的內容.(隨機訪問其實沒什么玄乎的,就像數組一樣,能通過下標快速訪問某個元素,這就是隨機訪問的原理。內存是支持隨機訪問的)。
  • 根據 Message 中的 offsetBeg 和 offsetEnd 定位到消息在?件中的位置. 通過randomAccessFile.seek 操作?件指針偏移過去. 再讀取.
  • 讀出的結果解析成 Message 對象, 修改 isValid 字段, 再重新寫回?件. 注意寫的時候要重新設定文件指針的位置. ?件指針會隨著上述的讀操作產?改變,所以要重新seek,將光標移動到開始。
  • 最后, 要記得更新統計?件, 把合法消息 - 1.
 //刪除消息:主要的操作歩奏:// 1,將消息讀出來//2,將消息里的isVail 改成0x0//3,將消息放回文件中public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {//由于刪除消息的時候也可能收到線程安全問題,所以我們要加鎖:synchronized (queue){//先將消息讀出來://由于我們正常使用的FileInputStream,只能從頭開始讀。而此時的場景更傾向于 隨機讀取,所以我們使用到了RandomAccessFile進行隨機讀取//注意這個RandomAccessFile類的第二個參數:rw可讀可寫。try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){//先準備一個byte數組用來放 讀出來的二進制數據:byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];//先將光標刷新到 offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//將message二進制數據讀出來randomAccessFile.read(bufferSrc);//轉換成message對象:Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);//將message里的isVail 改成0x0message1.setIsVail((byte) 0x0);//將新的message1 轉成二進制:byte[] bufferDest = BinaryTool.toBytes(message1);//由于上一次讀文件光標已經發生了變化,所以此時還要調整光標到offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//將數據寫入文件:randomAccessFile.write(bufferDest);//此時已經將數據文件里的vail改成無效,那我們需不需要將這個內存中的 message對象里的vail也改成無效呢?//可以是可以,但是沒有必要:想象一下我們將一個文件標記成無效的場景是不是我們此時要刪掉這個文件的時候,//此時我們都要刪掉這個文件了,當然要連同文件里的數據和內存中的數據都刪了呀,文件里的數據可能需要一些歩奏,//但是在內存中刪一個對象實在太容易了,今后會有內存中的刪除消息操作。這就相當于讓一個將死之人多活幾秒,但他終究逃不過死亡//這就是message里的vail 其實不需要改動的原因。}//不要忘了統計文件也要更新, 由于我們此時已經將數據文件里的一個消息改成無效的,所以此時統計文件里的有效消息就要--了Stat stat = readStat(queue.getName());if(stat.validCount >0){stat.validCount --;}//再將更新后的統計信息 寫入文件writeStat(queue.getName(),stat);}}

實現消息加載

這個功能在服務器重啟, 和垃圾回收的時候都很關鍵

  • 使? DataInputStream 讀取數據. 先讀 4 個字節為消息的?度, 然后再按照這個?度來讀取實際消息內容.
  • 讀取完畢之后, 轉換成 Message 對象.
  • 同時計算出該對象的 offsetBeg 和 offsetEnd.
  • 最終把結果整理成鏈表, 返回出去.
  • 注意, 對于 DataInputStream 來說, 如果讀取到 EOF, 會拋出?個 EOFException , ?不是返回特定值. 因此需要注意上述循環的結束條件.
//從消息數據文件當中讀出所有消息://由于是服務器剛啟動的時候才會調用這個方法,此時的隊列還不能處理各種請求,所以不需要考慮線程安全問題。public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {//先new出來一個linkedList來放所有消息:使用鏈表是因為要進行頭刪和尾刪等操作:LinkedList<Message> messages = new LinkedList<>();//創建流對象:try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){//與上面的DataOutputStream對應,此時用的是DataInputStreamtry(DataInputStream dataInputStream = new DataInputStream(inputStream)){//由于要讀的消息可能不止一條,所以用一個while循環://但是如果我們直接這樣寫會一直重復的讀一個消息,而DataInputStream不能控制光標的移動,所以要定義一個量來//記錄我們讀到哪里了,另外,這個量也為后續message對象的offsetbegin和offsetend的初始化提供便利long currentOffset = 0;//寫完大概得邏輯以后不知道不會不會有疑問,這個while的條件可是true啊,這不死循環了嘛,//其實這也是無奈之舉,主要原因是dataInputStream.readInt()讀到文件的末尾并不會返回-1,EOF啥的,而是//直接拋出 EOFException異常,直接結束循環,因此我們只用在外層catch住這個異常就行了,這是一個很特別的預料之內的循環結束方式while(true){//先讀4個字節,求出數據的長度:int messageLen = dataInputStream.readInt();//創建一個剛好能裝messageLen長度的字節數組:byte[] messageBinary = new byte[messageLen];//讀出消息數據:并且用變量接收,判斷讀出的數據是否和預期的數據長度一致,若不一致,說明格式不正確,錯亂了則拋出異常int realMessageLen = dataInputStream.read(messageBinary);if(realMessageLen != messageLen){throw new MqException("[MessageFileManager] 文件格式錯誤!!!queueName:"+queueName);}//將數組反序列化成message對象Message message = (Message) BinaryTool.fromBytes(messageBinary);//如果讀到的消息是無效的,則跳過這個無效消息,更新currentoffset:if(message.getIsVail() == 0x0){currentOffset+=(4+messageLen);continue;}//再將message里的offsetbegin和offsetend給初始化:message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+messageLen);//正常讀完后,別忘了,將currentoffset更新currentOffset+=(4+messageLen);//再將消息加入到鏈表當中messages.add(message);}}catch (EOFException e){System.out.println("[MessageFileManager] 恢復Message 數據完成!!!");}}return messages;}

實現垃圾回收(GC)

  • 上述刪除操作, 只是把消息在?件上標記成了?效. 并沒有騰出硬盤空間. 最終?件??可能會越積越多. 因此需要定期的進?批量清除.
  • 此處使?類似于復制算法. 當總消息數超過 2000, 并且有效消息數?少于 50% 的時候, 就觸發 GC。GC 的時候會把所有有效消息加載出來, 寫?到?個新的消息?件中, 使?新?件, 代替舊?件即可.
 public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {//根據以前的寫代碼經驗,次GC過程可能有線程安全問題,所以我們直接加鎖://其實這也是為什么形參傳入的是一個隊列,而不是隊列的名字的其中一個原因。synchronized (queue){//由于GC的執行時間可能很慢,我們手動的將時間計算出來,如果將來服務器運行半天無響應了,如果是GC的問題//我們也能知道long gcBegin = System.currentTimeMillis();//先創建新的文件:File newQueueFile = new File(getQueueDataNewPath(queue.getName()));//如果文件已經存在了,可能上一次gc有殘留,這是不正常的,所以拋出異常if(newQueueFile.exists()){throw new MqException("[MessageFilemanager] 隊列的queue_new_data.txt已經存在!!queueName:"+queue.getName());}//如果執行到這,說明文件不存在,則創建新文件:boolean ok = newQueueFile.createNewFile();//如果創建文件失敗,則拋出異常:if(!ok){throw new MqException("[MessageFileManager] 創建文件失敗!!newQueueDataFile:"+newQueueFile.getAbsolutePath());}//先創建一個鏈表用來存儲從原來的文件中取出來的message對象:此處可以用到之前的方法://取出原來文件里的所有有效文件:LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());//new出相應的流對象用來寫入新文件://這里我寫錯了,將queue.getName()傳入了,但是明明是一個不存在的路徑,他竟然還能正常寫?底層也不拋出異常,//我真是又驚訝,又驚嚇。//之后我又去查了查資料:原來是FileOutputStream的問題啊,FileOutputStream太nb了,//如果傳入的字符串對應的路徑不存在,FileOutputStream會自動給你創建一個文件用于寫入,這這這也太貼心了吧,//不過我還是希望他能直接拋異常,畢竟我找bug也找了這么久了,況且天知道他會把我的數據寫到哪里://其實也知道:如果是絕對路徑,他會自動創建路徑下的文件;如果是相對路徑,他會在當前工作空間創建一個文件。//找了一圈以后發下,在我的mq路徑下,就存在一個queuetest1的文件,里面正好是之前我寫入的數據,嗚嗚嗚,要哭了。
//            try(OutputStream outputStream = new FileOutputStream(queue.getName())){try(OutputStream outputStream = new FileOutputStream(newQueueFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//循環讀取messages,將對象重新寫入新文件:for(Message m : messages){//先將消息序列化:一個字節數組:byte[] buffer = BinaryTool.toBytes(m);//將二進制數組寫入新的文件:注意遵循之前的約定:dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//刪除舊文件,這里以前傳入的舊文件的路徑寫錯了,直接傳成了名字,所以寫代碼一定要細心啊。
//            File oldQueueFile = new File(queue.getName());File oldQueueFile = new File(getQueueDataDir(queue.getName()));boolean ok2 = oldQueueFile.delete();System.out.println("[ok2]oldQueueFile 文件刪除:"+ok2);//如果刪除失敗,可能是沒有權限之類的,拋出異常:if(!ok2){throw new MqException("MessageFileManager 刪除舊文件失敗!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());}//重命名新文件:boolean ok3 = newQueueFile.renameTo(oldQueueFile);//如果重命名失敗,拋出異常:if(!ok3) {throw new MqException("[MessageFileManager] 新文件重命名失敗!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()+" , newDataQueueFile="+newQueueFile.getAbsolutePath());}//不要忘記更新統計文件:Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] GC執行完畢!!! 執行的時間:"+(gcEnd - gcBegin)+"ms");}}

測試MessageFileManager

創建MessageFileManagerTest類用于測試:
在這里插入圖片描述

測試前的準備:

  • 創建兩個隊列, ?來輔助測試.
  • 使? ReflectionTestUtils.invokeMethod 來調?私有?法(這就是傳說中的反射,注意它的參數,用法)。
  •     ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
    

這個反射的參數:
第一個參數:類的實例。
第二個參數:你想調用的方法
后面的參數就是不定參數了(數量不確定),能確定的是:后面的參數的就是你想調用的方法的參數。


@SpringBootTest
public class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "queuetest1";private static final String queueName2 = "queuetest2";@BeforeEachpublic void setUp() throws IOException, MqException {//由于我們要測試的是隊列,所以準備工作就是先創建隊列文件:messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);}//這個@AfterEach注解我試過了,即使測試方法執行過程中拋出了異常,這個方法還是在每次執行完測試單元以后該執行他還是執行他,//無關乎異常,真nb@AfterEachpublic void tearDown() throws IOException {//首尾工作,將剛才創建的隊列文件刪掉:messageFileManager.deleteQueueFile(queueName1);messageFileManager.deleteQueueFile(queueName2);}}

測試代碼:

@Testpublic void testCreateFile(){//其實就測試創建的隊列文件是否存在://由于我們在MessageFileManager里的get路徑方法是 private修飾的,所以不能直接調用get路徑方法,只能手動寫上//檢驗 隊列數據文件是否存在:File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");//此處用的方法是isFile 而不是exists,因為要判定這是個文件,并不是只是存在就行,存在了也可能是個目錄。Assertions.assertEquals(true,queueDataFile1.isFile());//檢驗 隊列統計文件是否存在:File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile1.isFile());//檢驗 隊列數據文件是否存在:File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");Assertions.assertEquals(true,queueDataFile2.isFile());//檢驗 隊列統計文件是否存在:File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile2.isFile());System.out.println("[CreateFileText] 測試創建隊列文件成功!!!");}@Testpublic void testReadAndWriteStat(){//先創建出stat類,由于他是內部類,所以要類名. 調用出來:MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 200;stat.validCount = 100;//此時寫入stat 到統計文件當中:但是如果直接用messageFileManager. 由于writeStat是private修飾,所以肯定調用不出來,//此時就要用spring帶的 反射方法了:
//        messageFileManager.//用反射將 stat寫入統計文件:ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);//用反射將 寫入的統計文件讀出來:MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);//判斷讀出來的stat和我們設定的stat是否一樣Assertions.assertEquals(200,statNew.totalCount);Assertions.assertEquals(100,statNew.validCount);System.out.println("[testReadAndWriteStat] 測試成功!!!");}//要想測試發送消息,首先要有隊列和消息吧,所以,我們先寫創建隊列和消息的方法:private MESGQueue createQueue(){MESGQueue queue = new MESGQueue();//這里的隊列名字不能隨便取,因為隨便取的隊列名字也沒有對應的文件啊,要用就要用已經創建了文件的隊列名,//考慮到這個隊列要與文件交互,而我們只創建了queuename1和queuename2兩個名字對應的文件,所以只能用這兩個名字的一個。queue.setName(queueName1);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("aaa", "111");hashMap.put("bbb", "222");queue.setArguments(hashMap);return queue;}private Message createMessage(String context){//此時能用到我們之前在message里寫的創建 message的工廠類了:Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {//先創建隊列與消息:MESGQueue queue = createQueue();Message message = createMessage("abcdefghijklmnopqrstuvwxyz");//發送消息:messageFileManager.sendMessage(queue,message);//驗證stat文件:MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);//驗證data文件:使用loadAllMessagesFromQueueDataFile讀取文件內容:LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//驗證:Assertions.assertEquals(1,messages.size());Message message1 = messages.get(0);//判斷這個message1和我們之前的消息message是否一樣:Assertions.assertEquals(message.getMessageId(),message1.getMessageId());Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());Assertions.assertArrayEquals(message.getBody(),message1.getBody());System.out.println("[testSendMessage] 測試成功!!!");}//雖然上一個testSendMessage 已經間接測試過這個方法,但是為了求穩,再測試一遍@Testpublic void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {//我們需要準備200條數據用來加載://先創建一個隊列用來存放消息:注意這個方法使用的是queueName1創建的隊列MESGQueue queue = createQueue();//先創建一個鏈表用來保存消息,和后面新加載的消息作對比:LinkedList<Message> expectedMessages = new LinkedList<>();//使用for循環創建消息:for(int i =0;i<200;i++){Message message = createMessage("testMessage"+i);//將消息寫入文件:messageFileManager.sendMessage(queue,message);//記錄消息:expectedMessages.add(message);}//調用loadAllMessagesFromQueueDataFile取出所有消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先驗證隊列的數目是否一致:Assertions.assertEquals(200,realMessages.size());//驗證基本屬性:for(int i= 0;i<realMessages.size();i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testLoadAllMessagesFromQueueDataFile] 測試成功!!!");}//測試刪除消息:@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {//先創建一個隊列:MESGQueue queue = createQueue();//創建一個鏈表用來保存預期消息:LinkedList<Message> expectedMessages = new LinkedList<>();//再將20條消息都寫入隊列:for(int i =0;i<20;i++){Message message = createMessage("testMessage"+i);//將消息寫入隊列文件:messageFileManager.sendMessage(queue,message);//記錄消息:expectedMessages.add(message);}//這里 就以刪除前三條消息為例:messageFileManager.deleteMessage(queue,expectedMessages.get(0));messageFileManager.deleteMessage(queue,expectedMessages.get(1));messageFileManager.deleteMessage(queue,expectedMessages.get(2));//讀出消息,對比:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先判斷個數:Assertions.assertEquals(17,realMessages.size());for(int i =3;i<20;i++){Message realMessage = realMessages.get(i-3);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testDeleteMessage] 測試成功!!!");}//測試GC,這里的GC其實只是測試,不用管消息總數是否大于2000或有效消息占比不到50%,因為那是業務上的判定,會有專門的類來進一步封裝//而此處我們只進行測試GC這個方法//計劃將100條消息都存入隊列,然后將奇數下標的消息都刪除,然后執行GC,驗證現在的文件是否比原來的文件小:@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {//創建一個隊列:MESGQueue queue = createQueue();//創建一個鏈表用來記錄消息:LinkedList<Message> expectedMessages = new LinkedList<>();//先發送100條消息:for(int i =0;i<100;i++){Message message = createMessage("testMessage"+i);//發送到隊列:messageFileManager.sendMessage(queue,message);//記錄expectedMessages.add(message);}//刪除奇數下標的消息:for(int i =1;i<100;i+=2){messageFileManager.deleteMessage(queue,expectedMessages.get(i));}//先記錄執行GG之前的文件大小:File oldFile = new File("./data/"+queueName1+"/queue_data.txt");long oldFileLength = oldFile.length();//執行GCmessageFileManager.gc(queue);//記錄執行完GC之后的文件大小:File newFile = new File("./data/"+queueName1+"/queue_data.txt");long newFileLength = newFile.length();//取出真實的消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先驗證消息數量是否對的上:Assertions.assertEquals(50,realMessages.size());//挨個驗證消息:for(int i = 0;i<50;i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i*2);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}//驗證文件大小://這個驗證的原理其實是://刪除一個文件并不是直接刪除,而是邏輯刪除,通過標記統計文件里的vail來標識的,此時的數據文件即使有很多無效的文件,但是他的大小依舊是total//而非vail有效文件的大小。但是如果進行了文件的GC遷移,此時的新文件的大小就是舊文件的vail有效文件的大小了。所以,新文件會小于舊文件的大小。System.out.println("[oldFileLength]:"+oldFileLength);System.out.println("[newFileLength]:"+newFileLength);Assertions.assertTrue(newFileLength<oldFileLength);System.out.println("[testGC] 測試成功!!!");}

測試結果:沒問題

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66898.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66898.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66898.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

HTTP 配置與應用(局域網)

想做一個自己學習的有關的csdn賬號&#xff0c;努力奮斗......會更新我計算機網絡實驗課程的所有內容&#xff0c;還有其他的學習知識^_^&#xff0c;為自己鞏固一下所學知識&#xff0c;下次更新HTTP 配置與應用&#xff08;不同網段&#xff09;。 我是一個萌新小白&#xf…

root用戶Linux銀河麒麟服務器安裝vnc服務

安裝必要桌面環境組件 yum install mate-session-manager -y mate-session #確定是否安裝成功安裝vnc服務器 yum install tigervnc-server -y切換到root為root得vnc設置密碼 su root vncpasswd給root用戶設置vnc服務器文件 vi /etc/systemd/system/vncserver:1.service [Un…

理解深度學習pytorch框架中的線性層

文章目錄 1. 數學角度&#xff1a; y W x b \displaystyle y W\,x b yWxb示例 2. 編程實現角度&#xff1a; y x W T b \displaystyle y x\,W^T b yxWTb3. 常見錯誤與易混點解析4. 小結參考鏈接 在神經網絡或機器學習的線性層&#xff08;Linear Layer / Fully Connect…

C#Object類型的索引,序列化和反序列化

前言 最近在編寫一篇關于標準Mes接口框架的文章。其中有一個非常需要考究的內容時如果實現數據靈活和可使用性強。因為考慮數據靈活性&#xff0c;所以我一開始選取了Object類型作為數據類型&#xff0c;Object作為數據Value字段&#xff0c;String作為數據Key字段&#xff0c…

大模型應用與部署 技術方案

大模型應用與部署 技術方案 一、引言 人工智能蓬勃發展,Qwen 大模型在自然語言處理領域地位關鍵,其架構優勢盡顯,能處理文本創作等多類復雜任務,提供優質交互。Milvus 向量數據庫則是向量數據存儲檢索利器,有高效索引算法(如 IVF_FLAT、HNSWLIB 等)助力大規模數據集相似…

【Prometheus】Prometheus如何監控Haproxy

?? 歡迎大家來到景天科技苑?? &#x1f388;&#x1f388; 養成好習慣&#xff0c;先贊后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者簡介&#xff1a;景天科技苑 &#x1f3c6;《頭銜》&#xff1a;大廠架構師&#xff0c;華為云開發者社區專家博主&#xff0c;…

C# 控制打印機:從入門到實踐

在開發一些涉及打印功能的應用程序時&#xff0c;使用 C# 控制打印機是一項很實用的技能。這篇文章就來詳細介紹下如何在 C# 中實現對打印機的控制。 一、準備工作 安裝相關庫&#xff1a;在 C# 中操作打印機&#xff0c;我們可以借助System.Drawing.Printing命名空間&#x…

Go語言中的值類型和引用類型特點

一、值類型 值類型的數據直接包含值&#xff0c;當它們被賦值給一個新的變量或者作為參數傳遞給函數時&#xff0c;實際上是創建了原值的一個副本。這意味著對新變量的修改不會影響原始變量的值。 Go中的值類型包括&#xff1a; 基礎類型&#xff1a;int&#xff0c;float64…

GPT 結束語設計 以nanogpt為例

GPT 結束語設計 以nanogpt為例 目錄 GPT 結束語設計 以nanogpt為例 1、簡述 2、分詞設計 3、結束語斷點 1、簡述 在手搓gpt的時候&#xff0c;可能會遇到一些性能問題&#xff0c;即關于是否需要全部輸出或者怎么節約資源。 在輸出語句被max_new_tokens 限制&#xff0c…

《探秘:人工智能如何為鴻蒙Next元宇宙網絡傳輸與延遲問題破局》

在元宇宙的宏大愿景中&#xff0c;流暢的網絡傳輸和低延遲是保障用戶沉浸式體驗的關鍵。鴻蒙Next結合人工智能技術&#xff0c;為解決這些問題提供了一系列創新思路和方法。 智能網絡監測與預測 人工智能可以實時監測鴻蒙Next元宇宙中的網絡狀況&#xff0c;包括帶寬、延遲、…

深入MapReduce——計算模型設計

引入 通過引入篇&#xff0c;我們可以總結&#xff0c;MapReduce針對海量數據計算核心痛點的解法如下&#xff1a; 統一編程模型&#xff0c;降低用戶使用門檻分而治之&#xff0c;利用了并行處理提高計算效率移動計算&#xff0c;減少硬件瓶頸的限制 優秀的設計&#xff0c…

macOS安裝Gradle環境

文章目錄 說明安裝JDK安裝Gradle 說明 gradle8.5最高支持jdk21&#xff0c;如果使用jdk22建議使用gradle8.8以上版本 安裝JDK mac系統安裝最新&#xff08;截止2024.9.13&#xff09;Oracle JDK操作記錄 安裝Gradle 下載Gradle&#xff0c;解壓將其存放到資源java/env目錄…

五國十五校聯合巨獻!仿人機器人運動與操控:控制、規劃與學習的最新突破與挑戰

作者&#xff1a; Zhaoyuan Gu, Junheng Li, Wenlan Shen, Wenhao Yu, Zhaoming Xie, Stephen McCrory, Xianyi Cheng, Abdulaziz Shamsah, Robert Griffin, C. Karen Liu, Abderrahmane Kheddar, Xue Bin Peng, Yuke Zhu, Guanya Shi, Quan Nguyen, Gordon Cheng, Huijun Gao,…

CVPR 2024 無人機/遙感/衛星圖像方向總匯(航空圖像和交叉視角定位)

1、UAV、Remote Sensing、Satellite Image(無人機/遙感/衛星圖像) Unleashing Unlabeled Data: A Paradigm for Cross-View Geo-Localization ?codeRethinking Transformers Pre-training for Multi-Spectral Satellite Imagery ?codeAerial Lifting: Neural Urban Semantic …

【BQ3568HM開發板】如何在OpenHarmony上通過校園網的上網認證

引言 前面已經對BQ3568HM開發板進行了初步測試&#xff0c;后面我要實現MQTT的工作&#xff0c;但是遇到一個問題&#xff0c;就是開發板無法通過校園網的認證操作。未認證的話會&#xff0c;學校使用的深瀾軟件系統會屏蔽所有除了認證用的流量。好在我們學校使用的認證系統和…

(Java版本)基于JAVA的網絡通訊系統設計與實現-畢業設計

源碼 論文 下載地址&#xff1a; ????c??????c基于JAVA的網絡通訊系統設計與實現(源碼系統論文&#xff09;https://download.csdn.net/download/weixin_39682092/90299782https://download.csdn.net/download/weixin_39682092/90299782 第1章 緒論 1.1 課題選擇的…

kafka學習筆記4-TLS加密 —— 筑夢之路

1. 準備證書文件 mkdir /opt/kafka/pkicd !$# 生成CA證書 openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -keyout ca.key -out ca.crt -subj "/CNKafka-CA"# 生成私鑰 openssl genrsa -out kafka.key 4096# 生成證書簽名請求 (CSR) openssl req -new -key …

Node.js NativeAddon 構建工具:node-gyp 安裝與配置完全指南

Node.js NativeAddon 構建工具&#xff1a;node-gyp 安裝與配置完全指南 node-gyp Node.js native addon build tool [這里是圖片001] 項目地址: https://gitcode.com/gh_mirrors/no/node-gyp 項目基礎介紹及主要編程語言 Node.js NativeAddon 構建工具&#xff08;node-gyp…

SpringCloud微服務Gateway網關簡單集成Sentinel

Sentinel是阿里巴巴開源的一款面向分布式服務架構的輕量級流量控制、熔斷降級組件。Sentinel以流量為切入點&#xff0c;從流量控制、熔斷降級、系統負載保護等多個維度來幫助保護服務的穩定性。 官方文檔&#xff1a;https://sentinelguard.io/zh-cn/docs/introduction.html …

vscode環境中用倉頡語言開發時調出覆蓋率的方法

在vscode中倉頡語言想得到在idea中利用junit和jacoco的覆蓋率&#xff0c;需要如下幾個步驟&#xff1a; 1.在vscode中搭建倉頡語言開發環境&#xff1b; 2.在源代碼中右鍵運行[cangjie]coverage. 思路1&#xff1a;編寫了測試代碼的情況&#xff08;包管理工具&#xff09; …