Java項目之MQ
- 七. 消息存儲設計
- 設計思路
- 為什么要用文件存儲
- 文件存儲結構
- queue_data.txt ?件格式:
- queue_stat.txt ?件格式:
- 創建 MessageFileManager 類
- 定義一個內部類, 來表示該隊列的統計信息 Stat
- 實現統計?件Stat讀寫(文本文件的讀寫)
- InputStream——Scanner
- OutputStream——PrintWriter
- 實現創建隊列?錄
- 實現刪除隊列?錄
- 檢查隊列?件是否存在(發送消息時用到)
- 實現消息對象序列化/反序列化
- byte數組轉成對象 對象轉成byte數組
- ByteArrayOutputStream——ObjectOutputStream
- 實現寫?消息?件【信息寫入數據文件】
- 1. 檢查消息文件是否存在
- 2. 把Message對象轉成二進制數據
- 3. 隊列上鎖
- 4. 獲取數據文件,根據數據文件長度設置beg,end
- 5. 寫入文件
- 6. 更新stat
- 實現刪除消息
- 根據beg 找到消息
- 實現消息加載
- 實現垃圾回收(GC)
- 測試 MessageFileManager
七. 消息存儲設計
設計思路
為什么要用文件存儲
消息需要在硬盤上存儲. 但是并不直接放到數據庫中, ?是直接使??件存儲.
原因如下:
- 對于消息的操作并不需要復雜的 增刪改查 .
- 對于?件的操作效率?數據庫會?很多
主流 MQ 的實現(包括 RabbitMQ), 都是把消息存儲在?件中, ?不是數據庫中.
文件存儲結構
我們給每個隊列分配?個?錄. ?錄的名字為 data + 隊列名. 形如 ./data/testQueue
該?錄中包含兩個固定名字的?件
? queue_data.txt 消息數據?件, ?來保存消息內容.
? queue_stat.txt 消息統計?件, ?來保存消息統計信息.
queue_data.txt ?件格式:
使??進制?式存儲.
每個消息分成兩個部分:
? 前四個字節, 表? Message 對象的?度(字節數)
? 后?若?字節, 表? Message 內容.
? 消息和消息之間?尾相連.
每個 Message 基于 Java 標準庫的 ObjectInputStream / ObjectOutputStream 序列化.
Message 對象中的 offsetBeg 和 offsetEnd 正是?來描述每個消息體所在的位置
queue_stat.txt ?件格式:
使??本?式存儲.
?件中只包含??, ??包含兩列(都是整數), 使? \t 分割.
第?列表?當前總的消息數?. 第?列表?有效消息數?.
形如:
2000\t1500
創建 MessageFileManager 類
創建 mqserver.database.MessageFileManager
public class MessageFileManager {// 定義一個內部類, 來表示該隊列的統計信息// 有限考慮使用 static, 靜態內部類.static public class Stat {// 此處直接定義成 public, 就不再搞 get set 方法了.// 對于這樣的簡單的類, 就直接使用成員, 類似于 C 的結構體了.public int totalCount; // 總消息數量public int validCount; // 有效消息數量}public void init() {// 暫時不需要做啥額外的初始化工作, 以備后續擴展}// 預定消息文件所在的目錄和文件名// 這個方法, 用來獲取到指定隊列對應的消息文件所在路徑private String getQueueDir(String queueName) {return "./data/" + queueName;}// 這個方法用來獲取該隊列的消息數據文件路徑// 注意, 二進制文件, 使用 txt 作為后綴, 不太合適. txt 一般表示文本. 此處咱們也就不改.// .bin / .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 這個方法用來獲取該隊列的消息統計文件路徑private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}
? 內部包含?個 Stat 類, ?來表?消息統計?件的內容.
? getQueueDir, getQueueDataPath, getQueueStatPath ?來表?這?個?件所在位置
定義一個內部類, 來表示該隊列的統計信息 Stat
// 定義一個內部類, 來表示該隊列的統計信息// 有限考慮使用 static, 靜態內部類.static public class Stat {// 此處直接定義成 public, 就不再搞 get set 方法了.// 對于這樣的簡單的類, 就直接使用成員, 類似于 C 的結構體了.public int totalCount; // 總消息數量public int validCount; // 有效消息數量}
實現統計?件Stat讀寫(文本文件的讀寫)
InputStream——Scanner
OutputStream——PrintWriter
private Stat readStat(String queueName) {// 由于當前的消息統計文件是文本文件, 可以直接使用 Scanner 來讀取文件內容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 來寫文件.// OutputStream 打開文件, 默認情況下, 會直接把原文件清空. 此時相當于新的數據覆蓋了舊的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}
實現創建隊列?錄
每個隊列都有??的?錄和配套的?件. 通過下列?法把?錄和?件先準備好
// 創建隊列對應的文件和目錄
public void createQueueFiles(String queueName) throws IOException {// 1. 先創建隊列對應的消息目錄File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在, 就創建這個目錄boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("創建目錄失敗! baseDir=" + baseDir.getAbsolutePath());}}// 2. 創建隊列數據文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 創建消息統計文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4. 給消息統計文件, 設定初始值. 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);
}
把上述約定的?件都創建出來, 并對消息統計?件進?初始化.
初始化 0\t0 這樣的初始值.
實現刪除隊列?錄
如果隊列需要刪除, 則隊列對應的?錄/?件也需要刪除.
// 刪除隊列的目錄和文件.// 隊列也是可以被刪除的. 當隊列刪除之后, 對應的消息文件啥的, 自然也要隨之刪除.public void destroyQueueFiles(String queueName) throws IOException {// 先刪除里面的文件, 再刪除目錄.File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一個刪除失敗, 都算整體刪除失敗.throw new IOException("刪除隊列目錄和文件失敗! baseDir=" + baseDir.getAbsolutePath());}}
如果隊列需要刪除, 則隊列對應的?錄/?件也需要刪除
檢查隊列?件是否存在(發送消息時用到)
判定該隊列的消息?件和統計?件是否存在. ?旦出現缺失, 則不能進?后續?作
// 檢查隊列的目錄和文件是否存在.// 比如后續有生產者給 broker server 生產消息了, 這個消息就可能需要記錄到文件上(取決于消息是否要持久化)public boolean checkFilesExits(String queueName) {// 判定隊列的數據文件和統計文件是否都存在!!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}
實現消息對象序列化/反序列化
byte數組轉成對象 對象轉成byte數組
ByteArrayOutputStream——ObjectOutputStream
Message 對象需要轉成?進制寫??件. 并且也需要把?件中的?進制讀出來解析成 Message 對象. 此處針對這?的邏輯進?封裝.
創建 common.BinaryTool
// 下列的邏輯, 并不僅僅是 Message, 其他的 Java 中的對象, 也是可以通過這樣的邏輯進行序列化和反序列化的.
// 如果要想讓這個對象能夠序列化或者反序列化, 需要讓這個類能夠實現 Serializable 接口.
public class BinaryTool {// 把一個對象序列化成一個字節數組public static byte[] toBytes(Object object) throws IOException {// 這個流對象相當于一個變長的字節數組.// 就可以把 object 序列化的數據給逐漸的寫入到 byteArrayOutputStream 中, 再統一轉成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 此處的 writeObject 就會把該對象進行序列化, 生成的二進制字節數據, 就會寫入到// ObjectOutputStream 中.// 由于 ObjectOutputStream 又是關聯到了 ByteArrayOutputStream, 最終結果就寫入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}// 這個操作就是把 byteArrayOutputStream 中持有的二進制數據取出來, 轉成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一個字節數組, 反序列化成一個對象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 此處的 readObject, 就是從 data 這個 byte[] 中讀取數據并進行反序列化.object = objectInputStream.readObject();}}return object;}
}
? 使? ByteArrayInputStream / ByteArrayOutputStream 針對 byte[] 進?封裝, ?便后續操作. (這
兩個流對象是純內存的, 不需要進? close).
? 使? ObjectInputStream / ObjectOutputStream 進?序列化 / 反序列化操作. 通過內部的
readObject / writeObject 即可完成對應操作.
? 此處涉及到的序列化對象, 需要實現 Serializable 接?. 這?點咱們的 Message 對象已經實現過了.
對于 serialVersionUID , 此處咱們暫時不需要. ?家可以??了解 serialVersionUID 的?途
實現寫?消息?件【信息寫入數據文件】
1. 檢查消息文件是否存在
2. 把Message對象轉成二進制數據
3. 隊列上鎖
4. 獲取數據文件,根據數據文件長度設置beg,end
5. 寫入文件
6. 更新stat
// 這個方法用來把一個新的消息, 放到隊列對應的文件中.// queue 表示要把消息寫入的隊列. message 則是要寫的消息.public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 檢查一下當前要寫入的隊列對應的文件是否存在.if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 隊列對應的文件不存在! queueName=" + queue.getName());}// 2. 把 Message 對象, 進行序列化, 轉成二進制的字節數組.byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先獲取到當前的隊列數據文件的長度, 用這個來計算出該 Message 對象的 offsetBeg 和 offsetEnd// 把新的 Message 數據, 寫入到隊列數據文件的末尾. 此時 Message 對象的 offsetBeg , 就是當前文件長度 + 4// offsetEnd 就是當前文件長度 + 4 + message 自身長度.File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通過這個方法 queueDataFile.length() 就能獲取到文件的長度. 單位字節.message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 寫入消息到數據文件, 注意, 是追加寫入到數據文件末尾.try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下來要先寫當前消息的長度, 占據 4 個字節的~~dataOutputStream.writeInt(messageBinary.length);// 寫入消息本體dataOutputStream.write(messageBinary);}}// 5. 更新消息統計文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
- 慮線程安全, 按照隊列維度進?加鎖.
- 使? DataOutputStream 進??進制寫操作. ?原? OutputStream 要?便.
- 需要記錄 Message 對象在?件中的偏移量. 后續的刪除操作依賴這個偏移量定位到消息. offsetBeg是原有?件??的基礎上, 再 + 4. 4 個字節是存放消息??的空間. (參考上?的圖).
- 寫完消息, 要同時更新統計信息.
創建 common.MqException , 作為?定義異常類. 后續業務上出現問題, 都統?拋出這個異常
實踐中創建多個異常類, 分別表?不同異常種類是更好的做法. 此處我們只是偷懶了
public class MqException extends Exception {public MqException(String reason) {super(reason);}
}
實現刪除消息
根據beg 找到消息
此處的刪除只是 “邏輯刪除”, 即把 Message 類中的 isValid 字段設置為 0.
這樣刪除速度?較快. 實際的徹底刪除, 則通過我們??實現的 GC 來解決
// 這個是刪除消息的方法.// 這里的刪除是邏輯刪除, 也就是把硬盤上存儲的這個數據里面的那個 isValid 屬性, 設置成 0// 1. 先把文件中的這一段數據, 讀出來, 還原回 Message 對象;// 2. 把 isValid 改成 0;// 3. 把上述數據重新寫回到文件.// 此處這個參數中的 message 對象, 必須得包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1. 先從文件中讀取對應的 Message 數據.byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 把當前讀出來的二進制數據, 轉換回成 Message 對象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 設置為無效.diskMessage.setIsValid((byte) 0x0);// 此處不需要給參數的這個 message 的 isValid 設為 0, 因為這個參數代表的是內存中管理的 Message 對象// 而這個對象馬上也要被從內存中銷毀了.// 4. 重新寫入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 雖然上面已經 seek 過了, 但是上面 seek 完了之后, 進行了讀操作, 這一讀, 就導致, 文件光標往后移動, 移動到// 下一個消息的位置了. 因此要想讓接下來的寫入, 能夠剛好寫回到之前的位置, 就需要重新調整文件光標.randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);// 通過上述這通折騰, 對于文件來說, 只是有一個字節發生改變而已了~~}// 不要忘了, 更新統計文件!! 把一個消息設為無效了, 此時有效消息個數就需要 - 1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}
? 使? RandomAccessFile 來隨機訪問到?件的內容.
? 根據 Message 中的 offsetBeg 和 offsetEnd 定位到消息在?件中的位置. 通過randomAccessFile.seek 操作?件指針偏移過去. 再讀取.
? 讀出的結果解析成 Message 對象, 修改 isValid 字段, 再重新寫回?件. 注意寫的時候要重新設定?件指針的位置. ?件指針會隨著上述的讀操作產?改變.
? 最后, 要記得更新統計?件, 把合法消息 - 1.
實現消息加載
把消息內容從?件加載到內存中. 這個功能在服務器重啟, 和垃圾回收的時候都很關鍵
// 使用這個方法, 從文件中, 讀取出所有的消息內容, 加載到內存中(具體來說是放到一個鏈表里)// 這個方法, 準備在程序啟動的時候, 進行調用.// 這里使用一個 LinkedList, 主要目的是為了后續進行頭刪操作.// 這個方法的參數, 只是一個 queueName 而不是 MSGQueue 對象. 因為這個方法不需要加鎖, 只使用 queueName 就夠了.// 由于該方法是在程序啟動時調用, 此時服務器還不能處理請求呢~~ 不涉及多線程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 這個變量記錄當前文件光標.long currentOffset = 0;// 一個文件中包含了很多消息, 此處勢必要循環讀取.while (true) {// 1. 讀取當前消息的長度, 這里的 readInt 可能會讀到文件的末尾(EOF)// readInt 方法, 讀到文件末尾, 會拋出 EOFException 異常. 這一點和之前的很多流對象不太一樣.int messageSize = dataInputStream.readInt();// 2. 按照這個長度, 讀取消息內容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 說明文件有問題, 格式錯亂了!!throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);}// 3. 把這個讀到的二進制數據, 反序列化回 Message 對象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看這個消息對象, 是不是無效對象.if (message.getIsValid() != 0x1) {// 無效數據, 直接跳過.// 雖然消息是無效數據, 但是 offset 不要忘記更新.currentOffset += (4 + messageSize);continue;}// 5. 有效數據, 則需要把這個 Message 對象加入到鏈表中. 加入之前還需要填寫 offsetBeg 和 offsetEnd// 進行計算 offset 的時候, 需要知道當前文件光標的位置的. 由于當下使用的 DataInputStream 并不方便直接獲取到文件光標位置// 因此就需要手動計算下文件光標.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 這個 catch 并非真是處理 "異常", 而是處理 "正常" 的業務邏輯. 文件讀到末尾, 會被 readInt 拋出該異常.// 這個 catch 語句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢復 Message 數據完成!");}}return messages;}
? 使? DataInputStream 讀取數據. 先讀 4 個字節為消息的?度, 然后再按照這個?度來讀取實際消
息內容.
? 讀取完畢之后, 轉換成 Message 對象.
? 同時計算出該對象的 offsetBeg 和 offsetEnd.
? 最終把結果整理成鏈表, 返回出去.
? 注意, 對于 DataInputStream 來說, 如果讀取到 EOF, 會拋出?個 EOFException , ?不是返回特定
值. 因此需要注意上述循環的結束條件.
實現垃圾回收(GC)
上述刪除操作, 只是把消息在?件上標記成了?效. 并沒有騰出硬盤空間. 最終?件??可能會越積越
多. 因此需要定期的進?批量清除.
此處使?類似于復制算法. 當總消息數超過 2000, 并且有效消息數?少于 50% 的時候, 就觸發 GC.
GC 的時候會把所有有效消息加載出來, 寫?到?個新的消息?件中, 使?新?件, 代替舊?件即可.
// 使用這個方法, 從文件中, 讀取出所有的消息內容, 加載到內存中(具體來說是放到一個鏈表里)// 這個方法, 準備在程序啟動的時候, 進行調用.// 這里使用一個 LinkedList, 主要目的是為了后續進行頭刪操作.// 這個方法的參數, 只是一個 queueName 而不是 MSGQueue 對象. 因為這個方法不需要加鎖, 只使用 queueName 就夠了.// 由于該方法是在程序啟動時調用, 此時服務器還不能處理請求呢~~ 不涉及多線程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 這個變量記錄當前文件光標.long currentOffset = 0;// 一個文件中包含了很多消息, 此處勢必要循環讀取.while (true) {// 1. 讀取當前消息的長度, 這里的 readInt 可能會讀到文件的末尾(EOF)// readInt 方法, 讀到文件末尾, 會拋出 EOFException 異常. 這一點和之前的很多流對象不太一樣.int messageSize = dataInputStream.readInt();// 2. 按照這個長度, 讀取消息內容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 說明文件有問題, 格式錯亂了!!throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);}// 3. 把這個讀到的二進制數據, 反序列化回 Message 對象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看這個消息對象, 是不是無效對象.if (message.getIsValid() != 0x1) {// 無效數據, 直接跳過.// 雖然消息是無效數據, 但是 offset 不要忘記更新.currentOffset += (4 + messageSize);continue;}// 5. 有效數據, 則需要把這個 Message 對象加入到鏈表中. 加入之前還需要填寫 offsetBeg 和 offsetEnd// 進行計算 offset 的時候, 需要知道當前文件光標的位置的. 由于當下使用的 DataInputStream 并不方便直接獲取到文件光標位置// 因此就需要手動計算下文件光標.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 這個 catch 并非真是處理 "異常", 而是處理 "正常" 的業務邏輯. 文件讀到末尾, 會被 readInt 拋出該異常.// 這個 catch 語句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢復 Message 數據完成!");}}return messages;}// 檢查當前是否要針對該隊列的消息數據文件進行 GCpublic boolean checkGC(String queueName) {// 判定是否要 GC, 是根據總消息數和有效消息數. 這兩個值都是在 消息統計文件 中的.Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {return true;}return false;}private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通過這個方法, 真正執行消息數據文件的垃圾回收操作.// 使用復制算法來完成.// 創建一個新的文件, 名字就是 queue_data_new.txt// 把之前消息數據文件中的有效消息都讀出來, 寫到新的文件中.// 刪除舊的文件, 再把新的文件改名回 queue_data.txt// 同時要記得更新消息統計文件.public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 進行 gc 的時候, 是針對消息數據文件進行大洗牌. 在這個過程中, 其他線程不能針對該隊列的消息文件做任何修改.synchronized (queue) {// 由于 gc 操作可能比較耗時, 此處統計一下執行消耗的時間.long gcBeg = System.currentTimeMillis();// 1. 創建一個新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情況下, 這個文件不應該存在. 如果存在, 就是意外~~ 說明上次 gc 了一半, 程序意外崩潰了.throw new MqException("[MessageFileManager] gc 時發現該隊列的 queue_data_new 已經存在! queueName=" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 創建文件失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 從舊的文件中, 讀取出所有的有效消息對象了. (這個邏輯直接調用上述方法即可, 不必重新寫了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 寫入到新的文件中.try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先寫四個字節消息的長度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4. 刪除舊的數據文件, 并且把新的文件進行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 刪除舊的數據文件失敗! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新統計文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 執行完畢! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}
如果?件很?, 消息?常多, 可能?較低效, 這種就需要把?件做拆分和合并了.
Rabbitmq 本體是這樣實現的. 但是咱們此處為了實現簡單, 就不做這個了.
測試 MessageFileManager
創建 MessageFileManagerTests 編寫測試?例代碼.
? 創建兩個隊列, ?來輔助測試.
? 使? ReflectionTestUtils.invokeMethod 來調?私有?法.
package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;
import sun.awt.image.ImageWatched;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@SpringBootTest
public class MessageFileManagerTests {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 這個方法是每個用例執行之前的準備工作@BeforeEachpublic void setUp() throws IOException {// 準備階段, 創建出兩個隊列, 以備后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 這個方法就是每個用例執行完畢之后的收尾工作@AfterEachpublic void tearDown() throws IOException {// 收尾階段, 就把剛才的隊列給干掉.messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testpublic void testCreateFiles() {// 創建隊列文件已經在上面 setUp 階段執行過了. 此處主要是驗證看看文件是否存在.File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testpublic void testReadWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此處就需要使用反射的方式, 來調用 writeStat 和 readStat 了.// Java 原生的反射 API 其實非常難用~~// 此處使用 Spring 幫我們封裝好的 反射 的工具類.ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);// 寫入完畢之后, 再調用一下讀取, 驗證讀取的結果和寫入的數據是一致的.MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);System.out.println("測試 readStat 和 writeStat 完成!");}private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {// 構造出消息, 并且構造出隊列.Message message = createTestMessage("testMessage");// 此處創建的 queue 對象的 name, 不能隨便寫, 只能用 queueName1 和 queueName2. 需要保證這個隊列對象// 對應的目錄和文件啥的都存在才行.MSGQueue queue = createTestQueue(queueName1);// 調用發送消息方法messageFileManager.sendMessage(queue, message);// 檢查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 檢查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比較兩個字節數組的內容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往隊列中插入 100 條消息, 然后驗證看看這 100 條消息從文件中讀取之后, 是否和最初是一致的.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 讀取所有消息LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 創建隊列, 寫入 10 個消息. 刪除其中的幾個消息. 再把所有消息讀取出來, 判定是否符合預期.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 刪除其中的三個消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 對比這里的內容是否正確.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {// 先往隊列中寫 100 個消息. 獲取到文件大小.// 再把 100 個消息中的一半, 都給刪除掉(比如把下標為偶數的消息都刪除)// 再手動調用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 獲取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 刪除偶數下標的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手動調用 gcmessageFileManager.gc(queue);// 重新讀取文件, 驗證新的文件的內容是不是和之前的內容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶數下標的刪了, 剩下的就是奇數下標的元素了.// actual 中的 0 對應 expected 的 1// actual 中的 1 對應 expected 的 3// actual 中的 2 對應 expected 的 5// actual 中的 i 對應 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 獲取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}
}