Java項目之消息隊列(手寫java模擬實現mq)【三、MQ的核心類-消息類的存儲(用文件存儲消息)】? ★

Java項目之MQ

  • 七. 消息存儲設計
    • 設計思路
      • 為什么要用文件存儲
      • 文件存儲結構
        • queue_data.txt ?件格式:
        • queue_stat.txt ?件格式:
    • 創建 MessageFileManager 類
      • 定義一個內部類, 來表示該隊列的統計信息 Stat
      • 實現統計?件Stat讀寫(文本文件的讀寫)
        • InputStream——Scanner
        • OutputStream——PrintWriter
      • 實現創建隊列?錄
      • 實現刪除隊列?錄
      • 檢查隊列?件是否存在(發送消息時用到)
    • 實現消息對象序列化/反序列化
        • byte數組轉成對象 對象轉成byte數組
        • ByteArrayOutputStream——ObjectOutputStream
      • 實現寫?消息?件【信息寫入數據文件】
        • 1. 檢查消息文件是否存在
        • 2. 把Message對象轉成二進制數據
        • 3. 隊列上鎖
        • 4. 獲取數據文件,根據數據文件長度設置beg,end
        • 5. 寫入文件
        • 6. 更新stat
      • 實現刪除消息
        • 根據beg 找到消息
      • 實現消息加載
      • 實現垃圾回收(GC)
    • 測試 MessageFileManager

七. 消息存儲設計

設計思路

為什么要用文件存儲

消息需要在硬盤上存儲. 但是并不直接放到數據庫中, ?是直接使??件存儲.
原因如下:

  1. 對于消息的操作并不需要復雜的 增刪改查 .
  2. 對于?件的操作效率?數據庫會?很多

主流 MQ 的實現(包括 RabbitMQ), 都是把消息存儲在?件中, ?不是數據庫中.

文件存儲結構

我們給每個隊列分配?個?錄. ?錄的名字為 data + 隊列名. 形如 ./data/testQueue
該?錄中包含兩個固定名字的?件
? queue_data.txt 消息數據?件, ?來保存消息內容.
? queue_stat.txt 消息統計?件, ?來保存消息統計信息.

在這里插入圖片描述

queue_data.txt ?件格式:

使??進制?式存儲.
每個消息分成兩個部分:
? 前四個字節, 表? Message 對象的?度(字節數)
? 后?若?字節, 表? Message 內容.
? 消息和消息之間?尾相連.
每個 Message 基于 Java 標準庫的 ObjectInputStream / ObjectOutputStream 序列化.
在這里插入圖片描述
Message 對象中的 offsetBeg 和 offsetEnd 正是?來描述每個消息體所在的位置

queue_stat.txt ?件格式:

使??本?式存儲.
?件中只包含??, ??包含兩列(都是整數), 使? \t 分割.
第?列表?當前總的消息數?. 第?列表?有效消息數?.
形如:

2000\t1500

創建 MessageFileManager 類

創建 mqserver.database.MessageFileManager

public class MessageFileManager {// 定義一個內部類, 來表示該隊列的統計信息// 有限考慮使用 static, 靜態內部類.static public class Stat {// 此處直接定義成 public, 就不再搞 get set 方法了.// 對于這樣的簡單的類, 就直接使用成員, 類似于 C 的結構體了.public int totalCount;  // 總消息數量public int validCount;  // 有效消息數量}public void init() {// 暫時不需要做啥額外的初始化工作, 以備后續擴展}// 預定消息文件所在的目錄和文件名// 這個方法, 用來獲取到指定隊列對應的消息文件所在路徑private String getQueueDir(String queueName) {return "./data/" + queueName;}// 這個方法用來獲取該隊列的消息數據文件路徑// 注意, 二進制文件, 使用 txt 作為后綴, 不太合適. txt 一般表示文本. 此處咱們也就不改.// .bin / .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 這個方法用來獲取該隊列的消息統計文件路徑private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}

? 內部包含?個 Stat 類, ?來表?消息統計?件的內容.
? getQueueDir, getQueueDataPath, getQueueStatPath ?來表?這?個?件所在位置

定義一個內部類, 來表示該隊列的統計信息 Stat

    // 定義一個內部類, 來表示該隊列的統計信息// 有限考慮使用 static, 靜態內部類.static public class Stat {// 此處直接定義成 public, 就不再搞 get set 方法了.// 對于這樣的簡單的類, 就直接使用成員, 類似于 C 的結構體了.public int totalCount;  // 總消息數量public int validCount;  // 有效消息數量}

實現統計?件Stat讀寫(文本文件的讀寫)

InputStream——Scanner
OutputStream——PrintWriter
    private Stat readStat(String queueName) {// 由于當前的消息統計文件是文本文件, 可以直接使用 Scanner 來讀取文件內容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 來寫文件.// OutputStream 打開文件, 默認情況下, 會直接把原文件清空. 此時相當于新的數據覆蓋了舊的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}

實現創建隊列?錄

每個隊列都有??的?錄和配套的?件. 通過下列?法把?錄和?件先準備好

// 創建隊列對應的文件和目錄
public void createQueueFiles(String queueName) throws IOException {// 1. 先創建隊列對應的消息目錄File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在, 就創建這個目錄boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("創建目錄失敗! baseDir=" + baseDir.getAbsolutePath());}}// 2. 創建隊列數據文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 創建消息統計文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4. 給消息統計文件, 設定初始值. 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);
}

把上述約定的?件都創建出來, 并對消息統計?件進?初始化.

初始化 0\t0 這樣的初始值.

實現刪除隊列?錄

如果隊列需要刪除, 則隊列對應的?錄/?件也需要刪除.

    // 刪除隊列的目錄和文件.// 隊列也是可以被刪除的. 當隊列刪除之后, 對應的消息文件啥的, 自然也要隨之刪除.public void destroyQueueFiles(String queueName) throws IOException {// 先刪除里面的文件, 再刪除目錄.File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一個刪除失敗, 都算整體刪除失敗.throw new IOException("刪除隊列目錄和文件失敗! baseDir=" + baseDir.getAbsolutePath());}}

如果隊列需要刪除, 則隊列對應的?錄/?件也需要刪除

檢查隊列?件是否存在(發送消息時用到)

判定該隊列的消息?件和統計?件是否存在. ?旦出現缺失, 則不能進?后續?作

    // 檢查隊列的目錄和文件是否存在.// 比如后續有生產者給 broker server 生產消息了, 這個消息就可能需要記錄到文件上(取決于消息是否要持久化)public boolean checkFilesExits(String queueName) {// 判定隊列的數據文件和統計文件是否都存在!!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}

實現消息對象序列化/反序列化

byte數組轉成對象 對象轉成byte數組
ByteArrayOutputStream——ObjectOutputStream

Message 對象需要轉成?進制寫??件. 并且也需要把?件中的?進制讀出來解析成 Message 對象. 此處針對這?的邏輯進?封裝.
創建 common.BinaryTool

// 下列的邏輯, 并不僅僅是 Message, 其他的 Java 中的對象, 也是可以通過這樣的邏輯進行序列化和反序列化的.
// 如果要想讓這個對象能夠序列化或者反序列化, 需要讓這個類能夠實現 Serializable 接口.
public class BinaryTool {// 把一個對象序列化成一個字節數組public static byte[] toBytes(Object object) throws IOException {// 這個流對象相當于一個變長的字節數組.// 就可以把 object 序列化的數據給逐漸的寫入到 byteArrayOutputStream 中, 再統一轉成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 此處的 writeObject 就會把該對象進行序列化, 生成的二進制字節數據, 就會寫入到// ObjectOutputStream 中.// 由于 ObjectOutputStream 又是關聯到了 ByteArrayOutputStream, 最終結果就寫入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}// 這個操作就是把 byteArrayOutputStream 中持有的二進制數據取出來, 轉成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一個字節數組, 反序列化成一個對象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 此處的 readObject, 就是從 data 這個 byte[] 中讀取數據并進行反序列化.object = objectInputStream.readObject();}}return object;}
}

? 使? ByteArrayInputStream / ByteArrayOutputStream 針對 byte[] 進?封裝, ?便后續操作. (這
兩個流對象是純內存的, 不需要進? close).
? 使? ObjectInputStream / ObjectOutputStream 進?序列化 / 反序列化操作. 通過內部的
readObject / writeObject 即可完成對應操作.
? 此處涉及到的序列化對象, 需要實現 Serializable 接?. 這?點咱們的 Message 對象已經實現過了.
對于 serialVersionUID , 此處咱們暫時不需要. ?家可以??了解 serialVersionUID 的?途

實現寫?消息?件【信息寫入數據文件】

1. 檢查消息文件是否存在
2. 把Message對象轉成二進制數據
3. 隊列上鎖
4. 獲取數據文件,根據數據文件長度設置beg,end
5. 寫入文件
6. 更新stat
    // 這個方法用來把一個新的消息, 放到隊列對應的文件中.// queue 表示要把消息寫入的隊列. message 則是要寫的消息.public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 檢查一下當前要寫入的隊列對應的文件是否存在.if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 隊列對應的文件不存在! queueName=" + queue.getName());}// 2. 把 Message 對象, 進行序列化, 轉成二進制的字節數組.byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先獲取到當前的隊列數據文件的長度, 用這個來計算出該 Message 對象的 offsetBeg 和 offsetEnd// 把新的 Message 數據, 寫入到隊列數據文件的末尾. 此時 Message 對象的 offsetBeg , 就是當前文件長度 + 4// offsetEnd 就是當前文件長度 + 4 + message 自身長度.File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通過這個方法 queueDataFile.length() 就能獲取到文件的長度. 單位字節.message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 寫入消息到數據文件, 注意, 是追加寫入到數據文件末尾.try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下來要先寫當前消息的長度, 占據 4 個字節的~~dataOutputStream.writeInt(messageBinary.length);// 寫入消息本體dataOutputStream.write(messageBinary);}}// 5. 更新消息統計文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
  • 慮線程安全, 按照隊列維度進?加鎖.
  • 使? DataOutputStream 進??進制寫操作. ?原? OutputStream 要?便.
  • 需要記錄 Message 對象在?件中的偏移量. 后續的刪除操作依賴這個偏移量定位到消息. offsetBeg是原有?件??的基礎上, 再 + 4. 4 個字節是存放消息??的空間. (參考上?的圖).
  • 寫完消息, 要同時更新統計信息.

創建 common.MqException , 作為?定義異常類. 后續業務上出現問題, 都統?拋出這個異常

實踐中創建多個異常類, 分別表?不同異常種類是更好的做法. 此處我們只是偷懶了

public class MqException extends Exception {public MqException(String reason) {super(reason);}
}

實現刪除消息

根據beg 找到消息

此處的刪除只是 “邏輯刪除”, 即把 Message 類中的 isValid 字段設置為 0.

這樣刪除速度?較快. 實際的徹底刪除, 則通過我們??實現的 GC 來解決

    // 這個是刪除消息的方法.// 這里的刪除是邏輯刪除, 也就是把硬盤上存儲的這個數據里面的那個 isValid 屬性, 設置成 0// 1. 先把文件中的這一段數據, 讀出來, 還原回 Message 對象;// 2. 把 isValid 改成 0;// 3. 把上述數據重新寫回到文件.// 此處這個參數中的 message 對象, 必須得包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1. 先從文件中讀取對應的 Message 數據.byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 把當前讀出來的二進制數據, 轉換回成 Message 對象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 設置為無效.diskMessage.setIsValid((byte) 0x0);// 此處不需要給參數的這個 message 的 isValid 設為 0, 因為這個參數代表的是內存中管理的 Message 對象// 而這個對象馬上也要被從內存中銷毀了.// 4. 重新寫入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 雖然上面已經 seek 過了, 但是上面 seek 完了之后, 進行了讀操作, 這一讀, 就導致, 文件光標往后移動, 移動到// 下一個消息的位置了. 因此要想讓接下來的寫入, 能夠剛好寫回到之前的位置, 就需要重新調整文件光標.randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);// 通過上述這通折騰, 對于文件來說, 只是有一個字節發生改變而已了~~}// 不要忘了, 更新統計文件!! 把一個消息設為無效了, 此時有效消息個數就需要 - 1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}

? 使? RandomAccessFile 來隨機訪問到?件的內容.
? 根據 Message 中的 offsetBeg 和 offsetEnd 定位到消息在?件中的位置. 通過randomAccessFile.seek 操作?件指針偏移過去. 再讀取.
? 讀出的結果解析成 Message 對象, 修改 isValid 字段, 再重新寫回?件. 注意寫的時候要重新設定?件指針的位置. ?件指針會隨著上述的讀操作產?改變.
? 最后, 要記得更新統計?件, 把合法消息 - 1.

實現消息加載

把消息內容從?件加載到內存中. 這個功能在服務器重啟, 和垃圾回收的時候都很關鍵

    // 使用這個方法, 從文件中, 讀取出所有的消息內容, 加載到內存中(具體來說是放到一個鏈表里)// 這個方法, 準備在程序啟動的時候, 進行調用.// 這里使用一個 LinkedList, 主要目的是為了后續進行頭刪操作.// 這個方法的參數, 只是一個 queueName 而不是 MSGQueue 對象. 因為這個方法不需要加鎖, 只使用 queueName 就夠了.// 由于該方法是在程序啟動時調用, 此時服務器還不能處理請求呢~~ 不涉及多線程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 這個變量記錄當前文件光標.long currentOffset = 0;// 一個文件中包含了很多消息, 此處勢必要循環讀取.while (true) {// 1. 讀取當前消息的長度, 這里的 readInt 可能會讀到文件的末尾(EOF)//    readInt 方法, 讀到文件末尾, 會拋出 EOFException 異常. 這一點和之前的很多流對象不太一樣.int messageSize = dataInputStream.readInt();// 2. 按照這個長度, 讀取消息內容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 說明文件有問題, 格式錯亂了!!throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);}// 3. 把這個讀到的二進制數據, 反序列化回 Message 對象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看這個消息對象, 是不是無效對象.if (message.getIsValid() != 0x1) {// 無效數據, 直接跳過.// 雖然消息是無效數據, 但是 offset 不要忘記更新.currentOffset += (4 + messageSize);continue;}// 5. 有效數據, 則需要把這個 Message 對象加入到鏈表中. 加入之前還需要填寫 offsetBeg 和 offsetEnd//    進行計算 offset 的時候, 需要知道當前文件光標的位置的. 由于當下使用的 DataInputStream 并不方便直接獲取到文件光標位置//    因此就需要手動計算下文件光標.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 這個 catch 并非真是處理 "異常", 而是處理 "正常" 的業務邏輯. 文件讀到末尾, 會被 readInt 拋出該異常.// 這個 catch 語句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢復 Message 數據完成!");}}return messages;}

? 使? DataInputStream 讀取數據. 先讀 4 個字節為消息的?度, 然后再按照這個?度來讀取實際消
息內容.
? 讀取完畢之后, 轉換成 Message 對象.
? 同時計算出該對象的 offsetBeg 和 offsetEnd.
? 最終把結果整理成鏈表, 返回出去.
? 注意, 對于 DataInputStream 來說, 如果讀取到 EOF, 會拋出?個 EOFException , ?不是返回特定
值. 因此需要注意上述循環的結束條件.

實現垃圾回收(GC)

上述刪除操作, 只是把消息在?件上標記成了?效. 并沒有騰出硬盤空間. 最終?件??可能會越積越
多. 因此需要定期的進?批量清除.
此處使?類似于復制算法. 當總消息數超過 2000, 并且有效消息數?少于 50% 的時候, 就觸發 GC.
GC 的時候會把所有有效消息加載出來, 寫?到?個新的消息?件中, 使?新?件, 代替舊?件即可.

    // 使用這個方法, 從文件中, 讀取出所有的消息內容, 加載到內存中(具體來說是放到一個鏈表里)// 這個方法, 準備在程序啟動的時候, 進行調用.// 這里使用一個 LinkedList, 主要目的是為了后續進行頭刪操作.// 這個方法的參數, 只是一個 queueName 而不是 MSGQueue 對象. 因為這個方法不需要加鎖, 只使用 queueName 就夠了.// 由于該方法是在程序啟動時調用, 此時服務器還不能處理請求呢~~ 不涉及多線程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 這個變量記錄當前文件光標.long currentOffset = 0;// 一個文件中包含了很多消息, 此處勢必要循環讀取.while (true) {// 1. 讀取當前消息的長度, 這里的 readInt 可能會讀到文件的末尾(EOF)//    readInt 方法, 讀到文件末尾, 會拋出 EOFException 異常. 這一點和之前的很多流對象不太一樣.int messageSize = dataInputStream.readInt();// 2. 按照這個長度, 讀取消息內容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 說明文件有問題, 格式錯亂了!!throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);}// 3. 把這個讀到的二進制數據, 反序列化回 Message 對象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看這個消息對象, 是不是無效對象.if (message.getIsValid() != 0x1) {// 無效數據, 直接跳過.// 雖然消息是無效數據, 但是 offset 不要忘記更新.currentOffset += (4 + messageSize);continue;}// 5. 有效數據, 則需要把這個 Message 對象加入到鏈表中. 加入之前還需要填寫 offsetBeg 和 offsetEnd//    進行計算 offset 的時候, 需要知道當前文件光標的位置的. 由于當下使用的 DataInputStream 并不方便直接獲取到文件光標位置//    因此就需要手動計算下文件光標.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 這個 catch 并非真是處理 "異常", 而是處理 "正常" 的業務邏輯. 文件讀到末尾, 會被 readInt 拋出該異常.// 這個 catch 語句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢復 Message 數據完成!");}}return messages;}// 檢查當前是否要針對該隊列的消息數據文件進行 GCpublic boolean checkGC(String queueName) {// 判定是否要 GC, 是根據總消息數和有效消息數. 這兩個值都是在 消息統計文件 中的.Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {return true;}return false;}private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通過這個方法, 真正執行消息數據文件的垃圾回收操作.// 使用復制算法來完成.// 創建一個新的文件, 名字就是 queue_data_new.txt// 把之前消息數據文件中的有效消息都讀出來, 寫到新的文件中.// 刪除舊的文件, 再把新的文件改名回 queue_data.txt// 同時要記得更新消息統計文件.public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 進行 gc 的時候, 是針對消息數據文件進行大洗牌. 在這個過程中, 其他線程不能針對該隊列的消息文件做任何修改.synchronized (queue) {// 由于 gc 操作可能比較耗時, 此處統計一下執行消耗的時間.long gcBeg = System.currentTimeMillis();// 1. 創建一個新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情況下, 這個文件不應該存在. 如果存在, 就是意外~~ 說明上次 gc 了一半, 程序意外崩潰了.throw new MqException("[MessageFileManager] gc 時發現該隊列的 queue_data_new 已經存在! queueName=" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 創建文件失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 從舊的文件中, 讀取出所有的有效消息對象了. (這個邏輯直接調用上述方法即可, 不必重新寫了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 寫入到新的文件中.try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先寫四個字節消息的長度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4. 刪除舊的數據文件, 并且把新的文件進行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 刪除舊的數據文件失敗! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新統計文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 執行完畢! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}

如果?件很?, 消息?常多, 可能?較低效, 這種就需要把?件做拆分和合并了.
Rabbitmq 本體是這樣實現的. 但是咱們此處為了實現簡單, 就不做這個了.

測試 MessageFileManager

創建 MessageFileManagerTests 編寫測試?例代碼.
? 創建兩個隊列, ?來輔助測試.
? 使? ReflectionTestUtils.invokeMethod 來調?私有?法.

package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;
import sun.awt.image.ImageWatched;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@SpringBootTest
public class MessageFileManagerTests {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 這個方法是每個用例執行之前的準備工作@BeforeEachpublic void setUp() throws IOException {// 準備階段, 創建出兩個隊列, 以備后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 這個方法就是每個用例執行完畢之后的收尾工作@AfterEachpublic void tearDown() throws IOException {// 收尾階段, 就把剛才的隊列給干掉.messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testpublic void testCreateFiles() {// 創建隊列文件已經在上面 setUp 階段執行過了. 此處主要是驗證看看文件是否存在.File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testpublic void testReadWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此處就需要使用反射的方式, 來調用 writeStat 和 readStat 了.// Java 原生的反射 API 其實非常難用~~// 此處使用 Spring 幫我們封裝好的 反射 的工具類.ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);// 寫入完畢之后, 再調用一下讀取, 驗證讀取的結果和寫入的數據是一致的.MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);System.out.println("測試 readStat 和 writeStat 完成!");}private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {// 構造出消息, 并且構造出隊列.Message message = createTestMessage("testMessage");// 此處創建的 queue 對象的 name, 不能隨便寫, 只能用 queueName1 和 queueName2. 需要保證這個隊列對象// 對應的目錄和文件啥的都存在才行.MSGQueue queue = createTestQueue(queueName1);// 調用發送消息方法messageFileManager.sendMessage(queue, message);// 檢查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 檢查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比較兩個字節數組的內容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往隊列中插入 100 條消息, 然后驗證看看這 100 條消息從文件中讀取之后, 是否和最初是一致的.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 讀取所有消息LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 創建隊列, 寫入 10 個消息. 刪除其中的幾個消息. 再把所有消息讀取出來, 判定是否符合預期.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 刪除其中的三個消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 對比這里的內容是否正確.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {// 先往隊列中寫 100 個消息. 獲取到文件大小.// 再把 100 個消息中的一半, 都給刪除掉(比如把下標為偶數的消息都刪除)// 再手動調用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 獲取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 刪除偶數下標的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手動調用 gcmessageFileManager.gc(queue);// 重新讀取文件, 驗證新的文件的內容是不是和之前的內容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶數下標的刪了, 剩下的就是奇數下標的元素了.// actual 中的 0 對應 expected 的 1// actual 中的 1 對應 expected 的 3// actual 中的 2 對應 expected 的 5// actual 中的 i 對應 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 獲取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/22713.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/22713.shtml
英文地址,請注明出處:http://en.pswp.cn/web/22713.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python爬蟲入門教程(一)

上一篇文章講了爬蟲的工作原理&#xff0c;這篇文章以后就要重點開始講編程序了。 簡單爬蟲的的兩個步驟&#xff1a; 使用HTTPRequest工具模擬HTTP請求&#xff0c;接收到返回的文本。用于請求的包有: requests、urllib等。 對接收的文本進行篩選,獲取想要的內容。用戶篩選文…

JavaScript-內存分配,關鍵字const

內存空間 內存分為棧和堆 棧&#xff1a;由操作系統自動釋放存放的變量值和函數值等。簡單數據類型存放在棧中 棧會由低到高先入后出 堆&#xff1a;存儲引用類型 &#xff08;數組&#xff0c;對象&#xff09; 對象會先將數據存放在堆里面&#xff0c;堆的地址放在棧里面 關鍵…

【ArcGISProSDK】獲取要素中某字段最大值

public static async Task<double> GetMaxValueFromField(string vectorFilePath, string fieldName){double maxValue = 0;//矢量所在文件夾var filePath = System.IO.Path.GetDirectoryName(vectorFilePath);//矢量名稱var fileName = System.IO.Path.GetFileNameWitho…

VMD-PSO-LSTM單維時序預測模型(單輸入單輸出)-附代碼

VMD-PSO-LSTM單維時序預測模型&#xff08;單輸入單輸出&#xff09; 1&#xff09;首先對原始單維數據進行VMD分解&#xff0c;分解為K個模態分量和1個殘差分量 2&#xff09;將各個模態分量輸入模型&#xff0c;建立模型進行預測 3&#xff09;將各個預測結果相加得到最終…

clickhouse(十五、存儲優化實踐)

文章目錄 背景問題定位優化方式排序鍵設計寫入順序壓縮算法 DoubleDeltaLowCardinality避免使用Nullable 總結 背景 clickhouse集群容量告警&#xff0c;項目中某些表占據大量的存儲空間&#xff0c;借此機會對ck的存儲優化進行實踐學習&#xff0c;并通過多種方式測試驗證優化…

設計模式相關更新中

詳見gitee: 更新中? ??????設計模式相關: 設計模式相關介紹 (gitee.com) 一.面向對象的設計原則 二.單例模式

React(五)useEffect、useRef、useImperativeHandle、useLayoutEffect

(一)useEffect useEffect – React 中文文檔 useEffect hook用于模擬以前的class組件的生命周期&#xff0c;但比原本的生命周期有著更強大的功能 1.類組件的生命周期 在類組件編程時&#xff0c;網絡請求&#xff0c;訂閱等操作都是在生命周期中完成 import React, { Com…

算法題day37日(補5.23日卡:貪心算法day4)

一、刷題&#xff1a; 1.leetcode題目 860. 檸檬水找零 - 力扣&#xff08;LeetCode&#xff09;&#xff08;easy&#xff09;&#xff1a; 我覺得我寫的代碼有點蠢 class Solution:def lemonadeChange(self, bills: List[int]) -> bool:dict_ {5:0,10:0}if bills[0] !…

Python降維基礎知識:深入探索與實戰應用

Python降維基礎知識&#xff1a;深入探索與實戰應用 在數據分析和機器學習的廣闊領域中&#xff0c;降維技術一直扮演著重要的角色。Python&#xff0c;作為數據處理和機器學習的首選語言&#xff0c;為我們提供了豐富的降維工具和算法。本文將從四個方面、五個方面、六個方面…

算法訓練營第四十九天 | LeetCode 139單詞拆分

LeetCode 139 單詞拆分 基本還是完全背包的思路&#xff0c;不過用了三重循環&#xff0c;第三重循環是用于判斷當前字符串尾部指定長度字符是否和列表中某一字符串相同&#xff0c;是的話可以將當前dp[j]或上當前下標減去該單詞長度后的下標值。 代碼如下&#xff1a; clas…

平滑值(pinghua)

平滑值 題目描述 一個數組的“平滑值”定義為&#xff1a;相鄰兩數差的絕對值的最大值。 具體的&#xff0c;數組a的平滑值定義為 f ( a ) m a x i 1 n ? 1 ∣ a i 1 ? a i ∣ f(a)max_{i1}^{n-1}|a_{i1}-a_i| f(a)maxi1n?1?∣ai1??ai?∣ 現在小紅拿到了一個數組…

【前端】響應式布局筆記——flex

二、Flex Flex(FlexiableBox:彈性盒子&#xff0c;用于彈性布局&#xff0c;配合rem處理尺寸的適配問題)。 1、flex-direction:子元素在父元素盒子中的排列方式。 父級元素添加&#xff1a;flex-direction: row; 父級元素添加&#xff1a;flex-direction: row-reverse; 父…

家政預約小程序13我的訂單

目錄 1 我的訂單頁面布局2 全部訂單頁面3 完善訂單狀態4 查詢訂單信息總結 現在我們已經完成了家政預約小程序主體功能的開發&#xff0c;包含服務的查看&#xff0c;在線預約已經登錄等功能。預約之后就需要家政公司的客服進行派單&#xff0c;由服務人員進行上門服務。在小程…

Hotcoin精彩亮相Consensus 2024 Austin,探索行業風向標

5 月 31 日&#xff0c;由CoinDesk主辦的“Consensus 2024”大會在德克薩斯州的奧斯汀市正式落下帷幕。作為全球規模最大、最具影響力的加密貨幣、區塊鏈、Web3盛會&#xff0c;本次Consensus 2024 Austin吸引來自 100 多個國家/地區的 15,000 多名與會者、6,800 家公司、850 多…

Linux 程序守護腳本

引言 程序是由代碼形成的&#xff0c;代碼是由人寫的。只要是人&#xff0c;都會有疏忽的時候&#xff0c;導致寫出的程序有bug&#xff0c;當然最嚴重的bug就是程序閃退。 本文旨在提供一個程序守護腳本&#xff0c;當監測到程序閃退后&#xff0c;立馬將程序再起啟動&#…

java 獲取文件的MIME類型

MIME類型簡介 MIME類型&#xff08;Multipurpose Internet Mail Extensions&#xff09;是一種標準&#xff0c;用于描述文檔、文件或字節流的性質和格式。它最初是為了在電子郵件中傳輸多媒體數據而設計的&#xff0c;但后來也被廣泛用于Web開發中。每種MIME類型都由一個唯一…

【C++】手動模擬String底層與深淺拷貝

在string類&#xff1a;版本、組件、構造、操作及應用和 C中string的一些超常用函數 (附習題)這兩篇文章中我們已經了解到了string&#xff0c;現在讓我們再來手動實現模擬一下吧~ 模擬實現string是為了更好的理解string函數的使用和深淺拷貝方面的知識~ 總體整理了兩張思維導…

【Python編程】【Jupyter Notebook】啟動時報錯:no available port could be found

一、報錯描述 在Jupyter Notebook中編寫程序&#xff0c;無法運行&#xff0c;提示由于沒有可供監聽的端口&#xff0c;無法啟動Jupyter服務器&#xff0c;如下圖所示&#xff1a; 二、原因分析 通過報錯信息&#xff0c;猜測大概是由于網絡環境的原因。首先&#xff0c;關閉…

多角度剖析事務和事件的區別

事務和事件這兩個概念在不同的領域有著不同的含義&#xff0c;尤其是在計算機科學、數據庫管理和軟件工程中。下面從多個角度來剖析事務和事件的區別&#xff1a; 計算機科學與數據庫管理中的事務 事務(Transaction)&#xff1a; 定義&#xff1a;在數據庫管理中&#xff0c…

C語言(結構體)

Hi~&#xff01;這里是奮斗的小羊&#xff0c;很榮幸各位能閱讀我的文章&#xff0c;誠請評論指點&#xff0c;歡迎歡迎~~ &#x1f4a5;個人主頁&#xff1a;小羊在奮斗 &#x1f4a5;所屬專欄&#xff1a;C語言 本系列文章為個人學習筆記&#xff0c;在這里撰寫成文一…