目錄
1.消息持久化-創建MessageFileManger類
1.1 創建一個類
1.2?創建關于路徑的方法
1.3?定義內部類
1.4 實現消息統計文件讀寫
1.5?實現創建消息目錄和文件
1.6 實現刪除消息目錄和文件
1.7 實現消息序列化
1. 消息序列化的一些概念:
2. 方案選擇:
3.驗證版本
4.序列化代碼
?1.8?把消息寫入文件
1.寫入消息長度
2.線程安全問題
3.加鎖代碼
1.9?刪除文件中的消息
1.隨機訪問
2.刪除代碼
1.10?加載文件中的所有消息
1.消息持久化-創建MessageFileManger類
1.1 創建一個類
1.2?創建關于路徑的方法
獲取消息文件所在的目錄的方法
// 預定消息文件所在的目錄和文件名// 這個方法, 用在獲取到 指定隊列 對應的 消息文件所在路徑private String getQueueDir(String queueName) {return "./data/" + queueName;}// 這個方法用來獲取 該隊列的 消息數據文件路徑// 注意:二進制文件,使用 txt 作為后綴, 不太合適. txt一般表示文本。// .bin .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 這個方法用來獲取 該隊列的 消息統計文件路徑private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
1.3?定義內部類
定義一個內部類, 來表示該隊列的統計信息
// 定義一個內部類, 來表示該隊列的統計信息
// 優先考慮使用 static, 靜態內部類.
static public class Stat {
// 此處直接定義成 public, 就不再搞 get set方法
// 對于這樣的簡單類,直接使用成員,類似于 C的結構體
public int totalCount; // 總消息數量
public int validCount; // 有效消息數量
}
1.此處采用靜態內部類:
靜態內部類 不會依賴上面外部類的 this ,這樣解耦更徹底
如果確實需要訪問外部類的屬性,這時不使用static。這種想在內部類使用外部類非靜態屬性的,只有外部類有 this 的情況下才行。
此處很顯然不會使用外部類的非靜態屬性,所以使用static,限制更少,更方便。
3.使用public:
雖然此類只在這里使用,但是為了測試方便,所以使用public,更方便訪問。
1.4 實現消息統計文件讀寫
1.由于 消息統計文件是文本格式,所以可以直接使用Scanner來讀取文件
2.需要注意的是寫文件時,使用的時FileOutputStream,它的第二個參數可以控制:
(默認)false:直接覆蓋原文
true:追加文本
private Stat readStat(String queueName) {// 由于當前的消息統計文件是文本文件, 可以直接使用 Scanner 來讀取文件內容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName, Stat stat) {// 使用 PrintWriter 來寫文件// OutputStream 打開文件, 默認情況下會直接把原文件清空.此時相當于新的數據覆蓋了舊的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {throw new RuntimeException(e);}}
1.5?實現創建消息目錄和文件
此方法 創建對應的文件和目錄
包括:
? ? ? ? 隊列對應的消息目錄 testQueue1
? ? ? ? 消息數據文件 queue_data.txt
? ? ? ? 消息統計文件 queue_stat.txt
并且為消息統計文件設置初始值 0\t0
// 創建隊列對應的文件和目錄private void createQueueFiles(String queueName) throws IOException {// 創建隊列對應的消息目錄File baseDir = new File(getQueueDir(queueName));if(!baseDir.exists()) {// 不存在就創建這個目錄boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("創建目錄失敗! baseDir = " + baseDir.getAbsolutePath());}}// 創建消息數據文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueDataFile = " + queueDataFile.getAbsolutePath());}}// 創建消息統計文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("創建文件失敗! queueStatFile = " + queueStatFile.getAbsolutePath());}}// 給消息統計文件 設定初始值 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);}
1.6 實現刪除消息目錄和文件
此方法用于刪除 1.5 方法創建的消息目錄和文件
注意:
1. 先刪除文件,在刪除目錄
2. 一個刪除失敗就整體失敗
3.并且給出一個檢查目錄文件是否存在的方法:
? ? ? ? 后續有生產者給 Broker Server 生產消息,這個消息可能需要記錄到文件上,要保證對應的目錄文件是存在的。
// 刪除隊列的目錄和文件// 隊列也是可以被刪除的, 當隊列刪除目錄之后, 對應的消息文件自然而然也要刪除public void destroyQueueFiles(String queueName) throws IOException {// 先刪除文件 再刪除目錄File queueStatFile = new File(getQueueStatPath(queueName));boolean ok1 = queueStatFile.delete();File queueDataFile = new File(getQueueDataPath(queueName));boolean ok2 = queueDataFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一個刪除失敗 算整體刪除失敗throw new IOException("刪除隊列目錄和文件失敗!baseDir = " + baseDir.getAbsolutePath());}}// 檢查隊列的目錄和文件是否存在// 比如后續有生產者給 Broker Server 生產消息了 這個消息可能需要記錄到文件上(取決于消息是否需要持久化)public boolean checkFilesExist(String queueName) {File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}
1.7 實現消息序列化
此處抽出寫一篇文章,主要介紹 Java中的序列化和反序列化是什么?
1. 消息序列化的一些概念:
消息序列化:
? ? ? ? 是什么: 簡單來說就是 -> 把一個對象(結構化數據)轉成一個字符串/字節數組(可以和 降維打擊聯想在一起)
? ? ? ? 為什么:序列化之后,方便存儲和傳輸。
? ? ? ? ? ? ? ? ? ? ? ? 存儲:一般就是存儲在文件中(就像此處消息存到文件里),因為文件只能存字符串/二進制數據,不能直接存對象
? ? ? ? ? ? ? ? ? ? ? ? 傳輸:通過網絡傳輸。
2. 方案選擇:
方案一:
? ? ? ? 使用 JSON 來完成序列化,反序列化:jackson,ObjectMapper(就像把arguments字段存到數據庫中一樣,把(集合)結構化數據轉成字符串)
? ? 不可行:因為 Message,里面存儲的 body 部分,是二進制數據,不太方便使用 JSON 進行序列化。JSON序列化得到的結果是文本數據,無法存儲二進制。
有關JSON序列化的一些小記:
1.為什么JSON無法存儲二進制
????????JSON 格式中有很多特殊符號, : " {} 這些符號會影響 JSON 格式的解析。如果存文本,你的鍵值對中不會包含上述特殊符號。如果存二進制,某個二進制的字節正好就和上述特殊符號ASCII對應,此時可能會引起JSON解析格式錯誤。
?2.如果就想使用 JSON序列化,而且以二進制存儲怎么辦?
? ? ? ? 可以針對二進制數據進行base64編碼,base64作用就是用 4 個字節,表示三個字節的信息,會保證4個字節 都是使用 文本字符。
????????(像HTML 中如果嵌入一個圖片,圖片是二進制數據,就可以把圖片的二進制 base64 編碼然后就可以 直接以文本的形式 嵌入 HTML中)
? ? ? ? 但是這種方法效率底,有額外的轉碼開銷,同時,還會使空間變大。
關于base64的使用,我會寫一篇博文
針對二進制序列化有很多種解決方案
方案二 Java標準庫就提供了序列化的方案 ObjectInputStream 和 ObjectOutputStream
方案三 Hessian也是一個解決方案
方案四?protobuffer
方案五 thrift
此處使用第二個方案,好處:不必引入額外的依賴。
3.驗證版本
此處?private static final long serialVersionUID = 1L;
是用來驗證版本的。實際中開發代碼是不斷修改更新的。
1. 將Message信息序列化存儲到文件中
2.更新了Message類結構
3.針對以前的舊數據進行反序列化,大概率失敗
4.所以,通過此驗證版本,一旦發現版本不一致,直接報錯,不允許反序列化,提醒程序員數據有問題。
4.序列化代碼
package com.xj.mq.common;import java.io.*;/*** Created with IntelliJ IDEA* Description 序列化工具類* 并不僅僅是 Message 其他的Java對象 也是可以通過這樣的邏輯 進行序列化和反序列化* 如果要讓這個對象能序列化和反序列化 需要讓它的類實現 Serializable 接口* User: 王杰* Date: 2025-05-20* Time: 9:56*/
public class BinaryTool {// 把 一個對象 序列化 成一個字節數組public static byte[] toBytes(Object object) throws IOException {// 這個流對象相當于一個變長的字節數組// 就可以把 object 序列化的數據給逐漸的寫入到 byteArrayOutputStream 中 再統一轉成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 此處的 writeObject 就會把該對象進行序列化 生成的二進制字節數據 就會寫入到// ObjectOutputStream 中// 由于 ObjectOutputStream 又關聯到了 ByteArrayOutputStream 最終結果就寫入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}// 這個操作就是把 byteArrayOutputStream 中持有的二進制數據取出來 轉成 byte[]return byteArrayOutputStream.toByteArray();}}// 把 一個字節數組 反序列化 成一個對象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream()){try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){// 此處的 readObject 就是從 data 這個 byte[] 中讀取數據并且反序列化object = objectInputStream.readObject();}return object;}}
}
?1.8?把消息寫入文件
實現把消息寫入文件
1.寫入消息長度
我們需要往這個文件中,先寫入消息的長度(四個字節),是需要把 長度這個 int 的四個字節,一次寫入到文件中:
如果選擇 outputStream.write(messageBinary.length)會出現一些問題
雖然這個 write的參數類型是 int類型,但是實際上只能寫一個字節
在流對象中,經常會涉及到,使用 int 表示 byte 的情況,具體該怎么做呢?
可以把 int 的四個字節分別取出來,一個字節一個字節的寫:
很顯然不用我們親自寫這些,Java標準庫已經提供了現成的類,幫我們封裝好了上述操作(讓我們感受Java的魅力吧!)
DataInputStream/DataOutputStream
// 寫入消息到數據文件 注意 是追加寫入到數據文件末尾
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下來要先寫出來 當前消息的長度 占據 4個字節的dataOutputStream.writeInt(messageBinary.length);// 寫入消息本體dataOutputStream.write(messageBinary);}
}
2.線程安全問題
問題如下圖所示:
怎么辦?加鎖!
確定鎖對象
寫到?synchronized() 里的對象,當前就以 queue 隊列對象 進行加鎖即可。
如果兩個線程,是往同一個隊列中寫消息,此時就阻塞等待
如果兩個線程,往不同的隊列里寫消息,此時不需要阻塞等待
3.加鎖代碼
// 把一個新的消息 放到隊列對應的文件中// queue 表示要把消息寫入的隊列, massage 表示要寫的消息public void sendMessage(MessageQueue queue, Message message) throws MqException, IOException {// 檢查一下 當前要寫入的隊列 對應的文件 是否存在if (!checkFilesExist(queue.getName())) {throw new MqException("[MessageFileManager] 隊列對應的文件不存在!queueName = " + queue.getName());}// 把Message對象 進行序列化 轉成二進制的字節數組byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 先獲取到當前的隊列數據文件的長度 用這個來計算 該Message 對象的 offsetBeg 和 offsetEnd// 把新的 Message 數據 寫入到隊列數據文件的末尾 此時Message對象的 offsetBeg 就是文件長度 + 4// offsetEnd 就是當前文件長度 + 4 + message自身長度File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通過 queueDataFile.length() 就能獲取到文件的長度 單位字節message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 寫入消息到數據文件 注意 是追加寫入到數據文件末尾try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)){try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 接下來要先寫出來 當前消息的長度 占據 4個字節的dataOutputStream.writeInt(messageBinary.length);// 寫入消息本體dataOutputStream.write(messageBinary);}}// 更新 消息統計文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
1.9?刪除文件中的消息
1.隨機訪問
之前用的 FileInputStream 和 FileOutputStream 都是從文件頭讀寫的。而此處我們需要的是,在文件中的指定位置進行讀寫 ,隨機訪問。此處用到的類是 RandomAccessFile
三個方法:
read:讀取時會移動光標
write:寫入時會移動光標
seek:調整當前的文件光標(當前要讀寫的位置)
2.刪除代碼
// 刪除消息的方法// 這里的刪除時 邏輯刪除 也就是把硬盤上存儲這個數據里邊的哪個 isValid 屬性 設置成 0// 先把文件中的這一段數據 讀出來 還原回 Message 對象// 把 isValid 改成 0// 把上述數據重新寫回到文件// 此處這個參數中的 message 對象 必須得包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MessageQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")){// 先從文件中讀取對應的 Message 數據byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 把當前讀出來的二進制數據 轉換回 Message 對象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 把 isValid 設置為無效message.setIsValid((byte) 0x0);// 此處不需要 給參數 message的 isValid 設為0 因為這個參數代表的是內存中管理的 Message 對象// 而這個對象 馬上也要被從內存中銷毀// 重新寫入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 雖然上面已經 seek過了 但是上面 seek完了 進行了讀操作 這導致 文件光標往后移動// 移動到下一個消息的位置 因此想讓接下來的寫入 能夠剛好寫回到之前的位置 就需要重新調整光標位置randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);// 通過上述代碼 只是改變了一個重要的有效標記位 (一個字節)}// 不要忘了 更新統計文件 把一個消息設為無效 此時有效消息個數要 -1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}
1.10?加載文件中的所有消息
讀取文件中所有的消息內容:
? ? ? ? 此處注意這個方法是在程序啟動時調用 此時服務器還不能處理請求 不涉及多線程操作文件
// 使用這個方法 從文件中 讀取出所有的消息內容 加載到內存中 (具體來說就是放在一個鏈表里)// 使用這個方法 準備在程序啟動的時候 進行調用// 這里使用了一個 LinkedList 主要目的是為了后續進行頭刪操作// 這個方法參數 只是一個queueName 而不是 MessageQueue對象 因為這個方法不需要加鎖 只使用 queueName 就夠了// 由于該方法是在程序啟動時調用 此時服務器還不能處理請求 不涉及多線程操作文件public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){try (DataInputStream dataInputStream = new DataInputStream(inputStream)){// 這個變量記錄當前文件光標long currentOffset = 0;// 一個文件 包含了很多信息 此處勢必 循環讀取while (true) {// 讀取當前消息的長度// readInt 方法讀到文件末尾 會拋出 EOFException 異常 這一點和以前的流對象不同int messageSize = dataInputStream.readInt();// 按照這個長度 讀取消息內容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配 說明文件有問題 格式錯亂了throw new MqException("[MessageFileManager] 文件格式錯誤!" + queueName);}// 當讀到這個二進制數據 反序列化回 Message 對象Message message = (Message)BinaryTool.fromBytes(buffer);// 判斷一下看看這個消息對象 是不是無效對象if (message.getIsValid() != 0x1) {// 無效數據 直接跳過// 雖然消息是無效數據 但是 offset 不要忘記更新currentOffset += (4 + messageSize);continue;}// 有效數據 則需要把這個 Message對象加入到鏈表中 加入之前還需要填寫 offsetBeg 和 offsetEnd// 進行計算 offset的時候 需要知道當前文件光標的位置 由于當前使用的是 dataInputStream 并不方便直接獲取文件光標// 因此就需要手動計算下文件光標message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}}catch (EOFException e) {// 這個 catch 并非真是處理 “異常” 而是處理 “正常” 的業務邏輯 文件讀到末尾 會被 readInt 拋出該異常// 這個 catch 語句中也不需要做什么System.out.println("[MessageFileManager] 恢復 Message 數據完成");}}return messages;}