目錄
一.消息隊列背景
二.需求分析
核心概念:
BrokerServer:
BrokerServer的核心API:
交換機Exchange:
持久化:
網絡通信:
消息應答:
三、模塊劃分
四、創建項目
五、創建核心類
Exchange:
MSGQueue:
Binding:
Message:
六.數據庫設計
七.實現DataBaseManager類:
DataBaseManager類:
編寫DataBaseManager類的測試用例:
八.消息存儲設置
九.創建MessageFileManager類
創建MessageFileManager類:
實現該類的功能:
1.創建數據文件:統計文件的目錄,文件
2.實現讀寫統計文件功能:
3.實現讀寫數據文件功能:
4.實現讀寫消息方法:
5.加載文件中的所有有效消息;
6.實現垃圾清除功能:GC:
編寫MessageFileManager測試用例:
十.整合數據庫和文件數據
一.消息隊列背景
我們學習過阻塞隊列(BlockingQueue), 阻塞隊列最?的?途,就是?來實現 ?產者消費 者模型.??產者消費者模型,存在諸多好處,是后端開發的常?編程?式.
? 解耦合
? 削峰填?
在實際的后端開發中,尤其是分布式系統?,跨主機之間使??產者消費者模型,也是?常普遍的需求.因此,我們通常會把阻塞隊列,封裝成?個獨?的服務器程序,并且賦予其更豐富的功能.這樣的程序我們就稱為消息隊列(Message Queue,MQ).
市面上成熟的消息隊列有很多:kafka,RabbitMQ,RocketMQ.....
接下來我們就以RabbitMQ為模版,實現他的核心功能.
二.需求分析
核心概念:
生產者Producter,消費者Consumer,中間人Broker,發布publisher,訂閱Subscribe.
?個?產者,?個消費者
多個?產者,多個消費者
生產者和消費者存在1對1和多對多等多種關系模式.
生產者發布消息,到達Broker,Broker將消息發送給訂閱該消息的消費者.Broker是消息隊列的核心,負責存儲和轉發消息.
BrokerServer:
在broker內部,又存在一下概念:
虛擬機VirtualHost,交換機Exchange,隊列Queue,綁定關系Binding
虛擬機類似于數據庫的database,起到數據分割和管理的作用,一個BrokerServer內部可以有多個虛擬機.
交換機Exchange:生產者將消息發送到Broker的交換機上,交換機根據不同的路由規則,將消息分發到不同的隊列上。
隊列Queue:接收并存儲交換機分發的消息,根據消費者的訂閱隊列的情況,將消息推送給消息者。(消費者自己決定從哪個隊列上讀取消息)
綁定關系Binding:這個綁定關系,指的是交換機和隊列的綁定關系。(交換機和隊列存在多對多的綁定關系)
消息Message:真正存儲的數據。
BrokerServer的核心API:
對于BrokerServer來說,要實現以下核心API,通過這些API來實現消息隊列的基本功能:
1. 創建隊列 (queueDeclare)
2. 銷毀隊列 (queueDelete)
3. 創建交換機 (exchangeDeclare)
4. 銷毀交換機 (exchangeDelete)
5. 創建綁定 (queueBind)
6. 解除綁定 (queueUnbind)
7. 發布消息 (basicPublish)
8. 訂閱消息 (basicConsume)
9. 確認消息 (basicAck)
生產者和消費者通過遠程調用這些API,實現生產者 消費者模型。
交換機Exchange:
對于RabbitMQ來說,支持4種交換機類型:
直接交換機Direct,扇出交換機 Fanout ,主題交換機Topic ,首部交換機Header
Header交換機比較復雜且實用場景較少,目前我們僅實現前3種。
直接交換機Direct:?產者發送消息時,直接指定被該交換機綁定的隊列名。(發放專屬紅包)
扇出交換機 Fanout :?產者發送的消息會被復制到該交換機的所有隊列中. (發一個紅包,所有人都能領,且領到的是該紅包的所有金額)
主題交換機Topic:綁定隊列到交換機上時, 指定?個字符串為 bindingKey. 發送消息指定?個字符串為routingKey. 當 routingKey 和 bindingKey 滿??定的匹配條件的時候, 則把消息投遞到指定隊列。(發一個口令紅包,回答出對應的口令,才可以領紅包,且領到的是該紅包的所有金額)
持久化:
要將交換機,隊列,消息都設置持久化功能,防止當BrokerServer出現異常宕機時,重啟后,存儲在上面的數據丟失。(將其都存儲在磁盤和內存上各一份)
網絡通信:
生產者和消費者都是客戶端,BrokerServer作為服務端,通過網絡進行通信,客戶端要提供對應的API,來實現對服務器的調用。
1. 創建 Connection
2. 關閉 Connection
3. 創建 Channel
4. 關閉 Channel
5. 創建隊列 (queueDeclare)
6. 銷毀隊列 (queueDelete)
7. 創建交換機 (exchangeDeclare)
8. 銷毀交換機 (exchangeDelete)
9. 創建綁定 (queueBind)
10. 解除綁定 (queueUnbind)
11. 發布消息 (basicPublish)
12. 訂閱消息 (basicConsume)
13. 確認消息 (basicAck)
在 broker 的基礎上, 客?端還要增加 Connection 操作和 Channel 操作.
Connection 對應?個 TCP 連接.
Channel 則是 Connection 中的邏輯通道.?個 Connection 中可以包含多個 Channel.
Channel 和 Channel 之間的數據是獨?. 不會相互?擾。這樣的設定主要是為了能夠更好的復? TCP 連接, 達到?連接的效果, 避免頻繁的創建關閉 TCP 連接。
消息應答:
被消費的消息,要進行消息應答,當消費者的應答方式為自動時,只要消息從隊列中分發出去,就會將其從隊列中刪除;若為手動應答模式,需要消費者自己調用確認接收消息并返回確認響應,才會將消息從隊列中刪除。
三、模塊劃分
四、創建項目
創建springboot項目,引入springWeb和MyBatis,LomBok類。
五、創建核心類
Exchange,MSGQueue,Binding,Message:
Exchange:
/*** 創建交換機:*/
@Data
public class Exchange {//交換機的唯一標識private String name;//交換機類型:默認為直接交換件private ExchangeType type = ExchangeType.DIRECT;//是否持久化 ,默認持久化private boolean durable = true;//在交換機不再使用的時候,是否自動刪除 ,默認不自動刪除private boolean autoDelete = false;//選填參數private Map<String,Object> args;
}
關于args屬性, 由于數據庫沒有對應的Map類型,在寫入數據庫時,要將其轉換成String類型,從數據庫中讀取數據時,再將其轉換成Map類型,因此,要重寫args的set和get方法:
//這里的args的數據類型是Map,數據庫中沒有對應的數據類型,// 存入數據庫時,要轉換成String類型,//從數據庫中讀取時,要轉換成Map類型public String getArgs(){ObjectMapper mapper = new ObjectMapper();try {return mapper.writeValueAsString(args);} catch (JsonProcessingException e) {
// throw new RuntimeException(e);e.printStackTrace();}return "{}";}public void setArgs(String args) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();// 將字符串轉換為HashMap類型,若轉換為基本數據類型時直接填就行,// 這里轉換的是HashMap集合類型,比較復雜,需要借助匿名內部類TypeReference類實現this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});}
/*** 使用枚舉類 聲明交換機類型*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type = type;}public int getType(){return this.type;}
}
MSGQueue:
/*** 創建隊列類*/
@Data
public class MSGQueue {//隊列唯一標識private String name;//是否持久化 ,默認持久化private boolean durable = true;//是否自動刪除 ,默認不自動刪除private boolean autoDelete = false;//選填參數private Map<String,Object> args;
}
這里的args有和Excahnge同樣的問題,要重寫args的set和get方法:
//這里的args的數據類型是Map,數據庫中沒有對應的數據類型,// 存入數據庫時,要轉換成String類型,//從數據庫中讀取時,要轉換成Map類型public String getArgs(){ObjectMapper mapper = new ObjectMapper();try {return mapper.writeValueAsString(args);} catch (JsonProcessingException e) {
// throw new RuntimeException(e);e.printStackTrace();}return "{}";}public void setArgs(String args) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});}
Binding:
/*** 創建綁定關系 交換機->隊列*/
@Data
public class Binding {//要綁定的交換機名private String exchangeName;//要綁定的隊列名private String queueName;//綁定關系(只有交換機為Topic類型時,該綁定才起作用)private String bindingKey;
}
Message:
/*** 創建消息* 后面傳遞消息時,需要將消息進行序列化,此處要實現Serializable接口才可以被序列化*/
@Data
public class Message implements Serializable {//消息中的真正數據private byte[] body;//消息本身的一些屬性:private BasicProperties basicProperties = new BasicProperties();//消息在文件中的起始位置,[offsetBeg,offsetEnd]//由于消息要持久化存儲,因此要存儲在磁盤中,為了區分每一條消息,記錄每條消息的起始位置//使用transient修改,避免被序列化private transient long offsetBeg = 0;private transient long offsetEnd = 0;//消息在磁盤中是否有效,該屬性用于刪除磁盤消息使用,使用邏輯刪除//0x1:有效 0x0:無效private byte isVaild = 0x1;//創建消息 同時給消息分配一個id,通過UUID,創建唯一ID,消息Id以"M-"開頭//routingKey:以參數為準,當原來的消息存在routingKey時,參數會將其覆蓋//此處相當于使用了工廠方法創建消息對象,而沒有使用構造方法,因為這樣可以通過方法名獲取到相關信息public Message createMessageById(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();message.setMessageId("M-"+ UUID.randomUUID());message.setRoutingKey(routingKey);if(basicProperties!=null){message.setBasicProperties(basicProperties);}message.setBody(body);return message;}//為了方便引用,將BasicProperties中的屬性在這里調用一下,方便直接通過Message調用其基本屬性public String getMessageId(){return basicProperties.getMessageId();}public void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliveryMode(){return basicProperties.getDeliveryMode();}public void setDeliveryMode(int deliveryMode){basicProperties.setDeliveryMode(deliveryMode);}
}
/*** 保存消息的屬性信息* 為保證消息可以被序列化,此處也要實現Serializable接口*/
@Data
public class BasicProperties implements Serializable {//消息的唯一標識private String messageId;//消息的RoutingKey,用于消息轉發private String routingKey;//是否持久化//1:持久化 0:非持久化private int deliveryMode = 1;
}
六.數據庫設計
要將數據持久化,就要存儲到磁盤上,常用的數據庫有MySQL,....
此處我們使用SQList來存儲數據,SQList較于MySQL更加輕量級,使用起來也更簡單.
SQLite 只是?個動態庫(當然, 官?也提供了可執?程序 exe), 我們在 Java 中直接引? SQLite 依賴, 即可直接使?, 不必安裝其他的軟件
引入pom.xml依賴:
<!-- 引入 SQLite數據庫--><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.49.1.0</version>
</dependency>
配置數據源applicaton.yml:
spring:datasource:url: jdbc:sqlite:./data/meta.db #約定將數據庫文件放到 ./data/meta.db中username: #由于sqlite不是針對客戶端服務的,因此無需設置用戶名和密碼password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml
創建表,刪除表,查詢數據:
/*** 創建表*/
@Mapper
public interface MetaMapper {//創建交換機 隊列 綁定關系表void createExchangeTable();void createQueueTable();void createBindingTable();//新增數據void insertExchange(Exchange exchange);void insertQueue(MSGQueue queue);void insertBinding(Binding binding);//刪除數據void deleteExchange(String exchangeName);void deleteQueue(String queueName);void deleteBinding(String exchangeName,String queueName);//查詢數據List<Exchange> selectAllExchanges();List<MSGQueue> selectAllQueues();List<Binding> selectAllBindings();
}
由于MyBatis僅針對MySQL / Oracle ?持執?多個 SQL 語句的, 但是針對 SQLite 是不?持的, 只能寫成多個?法.
MetaMapper.xml:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.rabbitmq.mq02.mqServer.mapper.MetaMapper">
<!-- 創建表--><update id="createExchangeTable">create table if not exists exchange(name varchar(64) promary key,type int,durable boolean,autoDelete boolean,args varchar(1024))</update><update id="createQueueTable">create table if not exists queue(name varchar(64) primary key,durable boolean,autoDelete boolean,args varchar(1024))</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(64),queueName varchar(64),bindingKey varchar(64))</update>
<!-- 插入數據--><insert id="insertExchange" parameterType="org.rabbitmq.mq02.mqServer.core.Exchange">insert into exchange values(#{name},#{type},#{durable},#{autoDelete},#{args});</insert><insert id="insertQueue" parameterType="org.rabbitmq.mq02.mqServer.core.MSGQueue">insert into queue values(#{name},#{durable},#{autoDelete},#{args})</insert><insert id="insertBinding" parameterType="org.rabbitmq.mq02.mqServer.core.Binding">insert into binding values(#{exchangeName},#{queueName},#{bindingKey})</insert>
<!-- 刪除數據--><delete id="deleteExchange" parameterType="java.lang.String">delete from exchange where name = #{name};</delete><delete id="deleteQueue" parameterType="java.lang.String">delete from queue where name = #{name};</delete><delete id="deleteBinding" parameterType="java.lang.String">delete from binding where bindingKey = #{bindingKey};</delete>
<!-- 查找數據--><select id="selectAllExchanges" resultType="org.rabbitmq.mq02.mqServer.core.Exchange">select * from exchange;</select><select id="selectAllQueues" resultType="org.rabbitmq.mq02.mqServer.core.MSGQueue">select * from queue;</select><select id="selectAllBindings" resultType="org.rabbitmq.mq02.mqServer.core.Binding">select * from binding;</select>
</mapper>
七.實現DataBaseManager類:
通過這個類來封裝針對數據庫的操作.整理數據庫中的數據
MeatMapper要通過手動管理,而非spring注入,修改MqApplication類,新增一個屬性:
@SpringBootApplication
public class Mq02Application {public static ConfigurableApplicationContext context;public static void main(String[] args) {context = SpringApplication.run(Mq02Application.class, args);}
}
DataBaseManager類:
/*** 通過這個類來封裝針對數據庫的操作.*/
public class DataBaseManager {
//通過手動管理,非spring注入private MetaMapper meatMapper;public void init(){//手動管理數據meatMapper = Mq02Application.context.getBean(MetaMapper.class);//建庫建表//1.先查詢數據庫是否已經存在if(!checkDBExists()){//不存在時,先建庫建表File file = new File("./data/meta.db");//先創建目錄file.mkdirs();//創建表createTable();//初始化,主要初始化 交換機的類型createDefaultExchange();System.out.println("[DataBaseManager] 數據庫初始化完成!");}else{System.out.println("[DataBaseManager] 數據庫以存在!");}}//初始化交換機private void createDefaultExchange() {Exchange exchange = new Exchange();exchange.setType(ExchangeType.DIRECT);exchange.setName("");exchange.setAutoDelete(false);exchange.setDurable(false);meatMapper.insertExchange(exchange);System.out.println("[DataBaseManager] 初始化交換機成功!");}private void createTable() {meatMapper.createExchangeTable();meatMapper.createQueueTable();meatMapper.createBindingTable();System.out.println("[DataBaseManager] 建表成功!");}//查詢數據庫是否已經創建private boolean checkDBExists() {File file = new File("./data/meta.db");return file.exists();}//封裝其他數據庫操作:對交換機 隊列 綁定關系的 增刪查public void insertExchange(Exchange exchange){meatMapper.insertExchange(exchange);
}public List<Exchange> selectAllExchanges(){return meatMapper.selectAllExchanges();}public void deleteExchange(String exchangeName){meatMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue){meatMapper.insertQueue(queue);}public List<MSGQueue> selectAllQueues(){return meatMapper.selectAllQueues();}public void deleteQueue(String queueName){meatMapper.deleteQueue(queueName);}public void insertBinding(Binding binding){meatMapper.insertBinding(binding);}public List<Binding> selectAllBindings(){return meatMapper.selectAllBindings();}public void deleteBindings(String bindingKey){meatMapper.deleteBinding(bindingKey);}
}
編寫DataBaseManager類的測試用例:
在實現項目的過程中,每實現一個階段的功能,不可能一下就全部正確實現,可能會出現各種錯誤.
就要及時編寫測試用例,對當前代碼功能進行測試,防止寫完了整個項目再測試,出現各種錯誤.
做到有錯即時查找修改.
private DataBaseManager dataBaseManager = new DataBaseManager();/*** 有些測試用例產生的數據會影響之后的測試結果,在每次測試之前,還要進行數據清除,比較麻煩* 這個方法,在執行每一個測試前,要執行該方法,用于初始化數據,構造好數據*/@BeforeEachvoid setUp() {Mq02Application.context = SpringApplication.run(MetaMapper.class);//在執行每個測試之前,進行數據庫的創建和舒適化工作:dataBaseManager.init();}/*** 在執行每個測試用例之后,都要執行該方法,用于數據的清除和資源的釋放,*/@AfterEachvoid tearDown() {//釋放資源,刪除數據庫Mq02Application.context.close();dataBaseManager.deleteTable();}
再在DataBaseManager類中添加刪除數據庫及目錄方法:
//刪除數據庫及創建的目錄:public void deleteTable() {File file = new File("./data/meta.db");//1.刪除數據庫:boolean ok = file.delete();if(ok){System.out.println("[DataBaseManager] 刪除數據庫成功!");}else{System.out.println("[DataBaseManager] 刪除數據庫失敗!");}//2.刪除目錄:
//注意,在windows系統上,只有目錄為空時,才能刪除成功File file1 = new File("./data");ok = file1.delete();if(ok){System.out.println("[DataBaseManager] 刪除目錄成功!");}else{System.out.println("[DataBaseManager] 刪除目錄失敗!");}}
測試init()方法:
@Testvoid init(){//初始化方法已經在setUp方法中調用過了,這里只要判斷數據是否正確:List<Exchange> exchanges = dataBaseManager.selectAllExchanges();List<MSGQueue> msgQueues = dataBaseManager.selectAllQueues();List<Binding> bindings = dataBaseManager.selectAllBindings();//判斷創建結果是否正確,可以通過打印結果,自己觀察比對,//還可以調用專門的測試方法,直接與期望值進行對比,判斷是否正確//assertEquals方法的第一個參數默認為期望值,第二個參數為實際值//判斷期望值與實際值是否一致,一致返回true,不一致返回false//交換機在初始化的時候已經初始化了一個,因此,查詢到的交換機的個數應為1個,隊列和綁定為0個Assertions.assertEquals(1,exchanges.size());Assertions.assertEquals(0,msgQueues.size());Assertions.assertEquals(0,bindings.size());}
對交換機,隊列,綁定的插入刪除測試:
//對交換機的測試 //先創建一個交換機實例:private Exchange createExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);
// exchange.setArgs(null);return exchange;}@Testvoid insertExchange() {Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());Exchange e = exchanges.get(1);Assertions.assertEquals("exchangeTest",e.getName());Assertions.assertEquals(ExchangeType.DIRECT,e.getType());Assertions.assertTrue(e.isDurable());Assertions.assertFalse(e.isAutoDelete());}@Testvoid deleteExchange() {Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());dataBaseManager.deleteExchange("exchangeTest");exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(1,exchanges.size());}
//對隊列的測試private MSGQueue createQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);return queue;}@Testvoid insertQueue() {MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Assertions.assertEquals("queueTest",queues.get(0).getName());Assertions.assertTrue(queues.get(0).isDurable());Assertions.assertFalse(queues.get(0).isAutoDelete());}@Testvoid deleteQueue() {MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());dataBaseManager.deleteQueue("queueTest");queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(0,queues.size());}//對綁定的測試:private Binding createBinding(String bindingKey){Binding binding = new Binding();binding.setExchangeName("exchangeTest");binding.setQueueName("queueTest");binding.setBindingKey(bindingKey);return binding;}@Testvoid insertBinding() {//創建綁定之前,要先創建交換機和隊列Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Binding binding = createBinding("bindingKeyTest");dataBaseManager.insertBinding(binding);List<Binding> bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());Binding b = bindings.get(0);Assertions.assertEquals("bindingKeyTest",b.getBindingKey());Assertions.assertEquals("exchangeTest",b.getExchangeName());Assertions.assertEquals("queueTest",b.getQueueName());}@Testvoid deleteBindings() {//創建綁定之前,要先創建交換機和隊列Exchange exchange = createExchange("exchangeTest");dataBaseManager.insertExchange(exchange);List<Exchange> exchanges = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());MSGQueue queue = createQueue("queueTest");dataBaseManager.insertQueue(queue);List<MSGQueue> queues = dataBaseManager.selectAllQueues();Assertions.assertEquals(1,queues.size());Binding binding = createBinding("bindingKeyTest");dataBaseManager.insertBinding(binding);List<Binding> bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());dataBaseManager.deleteBindings("bindingKeyTest");bindings = dataBaseManager.selectAllBindings();Assertions.assertEquals(0,bindings.size());}
八.消息存儲設置
消息要存儲在磁盤中,從文件中讀寫,消息較于交換機隊列來說,查詢次數較少,僅需要讀和取消息.
因此從文件中讀取比從數據庫中查找要快很多.
根據消息所在不同的隊列,為每個隊列命名一個目錄,為?data/隊列名.eg: ./data/testQueue.
每個隊列所目錄中包含兩個文件: 消息數據文件,消息統計文件.?
數據文件queue_data.txt : 存儲消息體相關數據.
規定文件格式:使用二進制存儲(節省空間).每個消息分成2部分,第一部分存儲該消息對象的長度,規定占4字節; 第二部分存儲消息的數據部分.消息和消息之間首尾相連.
統計文件queue_stat.txt: 存儲該數據文件中的總消息數和有效消息數,
規定使用文本格式存儲, 僅存儲一行兩列數據,第一列為數據文件中的總消息數,第二行為有效消息數中間用\t進行分隔.eg: 100\t50.
九.創建MessageFileManager類
創建MessageFileManager類:
/*** 用于管理消息在文件中的存儲* 該類中藥完成的事情:1.創建數據文件,統計文件的目錄,文件* 2.讀寫統計文件* 3.讀寫數據文件* 4.讀取文件中所有的有效消息* 5.對文件中的消息進行垃圾回收*/
public class MessageFileManager {/*** 創建一個內部類,表示消息的統計文件類*/public static class Stat {//消息總數public int totalCount;//有效消息數public int validCount;}//這里目前沒有要初始化的public void init(){}
}
實現該類的功能:
1.創建數據文件:統計文件的目錄,文件
//獲取隊列文件目錄名:public String getQueueDir(String queueName){return "./data/"+queueName;}//獲取消息 數據文件 路徑名public String getDataFilePath(String queueName){return getQueueDir(queueName)+"/queue_data.txt";}//獲取消息 統計文件 路徑名public String getStatFilePath(String queueName){return getQueueDir(queueName)+"/queue_stat.txt";}//創建隊列文件public void createQueueFile(String queueName) throws MqException, IOException {File file = new File(getQueueDir(queueName));//當目錄不存在時,創建目錄if (!file.exists()) {boolean ok = file.mkdirs();if (!ok) {throw new MqException("[MessageFileManager] 創建目錄失敗!");}}//創建文件File dataFile = new File(getDataFilePath(queueName));if (!dataFile.exists()) {//數據文件不存在時,創建數據文件:boolean ok = dataFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 創建數據文件失敗!");}}File statFile = new File(getStatFilePath(queueName));if (!statFile.exists()) {boolean ok = statFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 創建統計文件失敗!");}}//初始化統計文件數據Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);System.out.println("[MessageFileManager] 創建目錄及文件成功!");}//銷毀文件和目錄//刪除指定隊列的目錄和文件//當隊列被刪除后,對應的文件和目錄也就要被刪除了public void destoryQueueFile(String queueName) throws IOException {//1.刪除數據文件File dataFile = new File(getDataFilePath(queueName));boolean ok1 = dataFile.delete();//2.刪除統計文件File statFile = new File(getStatFilePath(queueName));boolean ok2 = statFile.delete();//3.刪除總目錄File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if(!ok1 || !ok2 || !ok3){throw new IOException("刪除隊列和文件失敗! baseDir: "+baseDir.getAbsolutePath());}}
自定義一個MqException異常,
用于處理拋出消息隊列過程中的邏輯異常.
/*** 與項目有關的異常*/
public class MqException extends Exception {public MqException(String reason){super(reason);}
}
2.實現讀寫統計文件功能:
//從統計文件中讀取消息private Stat readStat(String queueName){Stat stat = new Stat();try (FileInputStream fileInputStream = new FileInputStream(getStatFilePath(queueName))){//通過Scanner來讀取文件中的數據Scanner scan = new Scanner(fileInputStream);stat.totalCount = scan.nextInt();stat.validCount = scan.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}//寫消息到統計文件中private void writeStat(String queueName,Stat stat){//注意:此處打開文件,默認是清空文件的,可以通過設置第二個參數為true:拼接文件形式打開文件,不會清空數據try(FileOutputStream outputStream = new FileOutputStream(getStatFilePath(queueName))){PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount+"\t"+stat.validCount);//注意:要刷新緩沖區,將寫入的數據從緩沖區都放到磁盤中printWriter.flush();} catch (IOException e) {throw new RuntimeException(e);}}
3.實現讀寫數據文件功能:
將消息存儲到文件中是以二進制形式存儲的,要先將消息進行序列化,然后再進行寫入.讀取消息數據的時候,也要將二進制文件進行反序列化,才能看懂文件內容.
這里先實現一個序列化和反序列工具類:
/*** 消息序列化和飯系列化工具,將所有方法都用static修飾,直接通過類名調用方法* 序列化:將對象轉化為字節數組* 反序列化:將字節數組轉化為對象*/
public class BinaryTool {//序列化: 將對象轉為字節數組//將Object序列化的數據逐漸寫到ByteArrayOutputStream中,再轉換為byte[]public static byte[] toByte(Object object) throws IOException {try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){//writeObject方法就會將對象進行序列換,轉換為二進制數據寫入到ObjectOutputStream中,//由于ObjectOutputStream又關聯了ByteArrayOutputStream,//于是最終就寫入到了ByteArrayOutputStream中objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}//反序列化: 將字節數組轉化為對象public static Object fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){object = objectInputStream.readObject();}}return object;}
}
4.實現讀寫消息方法:
//向數據文件中發送消息//1.判斷文件是否存在//2.將消息序列化//3.寫入數據文件//4.更新統計文件數據//考慮線程安全問題://當多個線程同時發送消息時,可能會發送線程安全問題,這里以隊列為維度加鎖,因此要傳入參數隊列對象public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {//1.判斷文件是否存在:boolean ok = checkFileExists(queue.getName());if(!ok){throw new MqException("[MessageFileManager] 數據文件不存在,發送消息到數據文件失敗 queueName: "+queue.getName());}synchronized(queue){//2.將消息先進行序列化:byte[] messageByte = BinaryTool.toByte(message);//3.寫消息到文件中:File file = new File(getDataFilePath(queue.getName()));//設置 消息在文件中的初始位置屬性:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+messageByte.length);//這里打開文件要以追加的方式打開,不能清空文件中的內容try(FileOutputStream outputStream = new FileOutputStream(file,true);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先寫前4個字節:記錄消息的長度,//注意,這里規定的就是用4字節的長度,不能單使用write寫入,這樣還是只寫入1個字節dataOutputStream.writeInt(messageByte.length);dataOutputStream.write(messageByte);}//更新統計文件數據Stat stat = readStat(queue.getName());stat.totalCount+=1;stat.validCount+=1;writeStat(queue.getName(),stat);}}private boolean checkFileExists(String queueName) throws MqException {File baseFile = new File(getQueueDir(queueName));if(!baseFile.exists()){System.out.println("[MessageFileManager] 隊列對應的目錄不存在 queueName:"+queueName);return false;}File dataFile = new File(getDataFilePath(queueName));if(!dataFile.exists()){System.out.println("[MessageFileManager] 隊列對應的數據文件不存在 queueName:"+queueName);return false;}File statFile = new File(getStatFilePath(queueName));if(!statFile.exists()){System.out.println("[MessageFileManager] 隊列對應的統計文件不存在 queueName:"+queueName);return false;}return true;}//將一條消息從文件中刪除(邏輯刪除,將isValid置為無效:0x0//1.先讀取到消息//2.將消息的isValid設為無效//3.再重新寫入到文件原來位置//4.修改配置文件數據//刪除文件也存在線程安全問題,要以隊列為維度進行上鎖public void deleteMessageFromFile(MSGQueue queue,Message message){synchronized(queue){//要找消息的位置,采用隨機讀取文件方法,且以可讀可寫的方式打開文件try(RandomAccessFile randomAccessFile = new RandomAccessFile(getDataFilePath(queue.getName()),"rw")){int len = (int)(message.getOffsetEnd() - message.getOffsetBeg());byte[] data = new byte[len];//定位光標到消息的起始位置:randomAccessFile.seek(message.getOffsetBeg());//從消息的起始位置讀取文件randomAccessFile.read(data);//將數據反序列化:Message newMessage = (Message) BinaryTool.fromBytes(data);//修改為無效消息newMessage.setIsVaild((byte)0x0);//再將消息寫入到文件中://先進行序列化消息byte[] payload = BinaryTool.toByte(newMessage);//要寫到原來位置,在讀取的時候,光標會變化,要重新設置光標,randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(payload);//更新統計文件數據Stat stat = readStat(queue.getName());if(stat.validCount>0){stat.validCount-=1;}writeStat(queue.getName(),stat);} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(e);}}}
5.加載文件中的所有有效消息;
用于服務器重啟時恢復數據使用:
//獲取文件中的所有有效數據,該方法用戶服務器宕機重啟時,恢復數據public List<Message> loadAllMessage(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try(FileInputStream fileInputStream = new FileInputStream(getDataFilePath(queueName));DataInputStream dataInputStream = new DataInputStream(fileInputStream)){long currentOffset = 0;while(true){int len = dataInputStream.readInt();//讀取數據byte[] data = new byte[len];int n = dataInputStream.read(data);if(n!=len){throw new MqException("[MessageFileManager] 讀取消息格式有誤! queueName:"+queueName);}//進行反序列化Message message = (Message) BinaryTool.fromBytes(data);if(message.getIsVaild()==0x0){//為無效消息,直接跳過currentOffset+=(4+len);continue;}//設置消息的起始位置message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+len);currentOffset+=(4+len);messages.add(message);}} catch (EOFException e){//這個異常是讀取到文件末尾,拋出的異常,屬于正常邏輯System.out.println("[MessageFileManager] 加載消息完成 !");}return messages;}
6.實現垃圾清除功能:GC:
//進行垃圾回收:當統計文件中記錄的有效數據數量占總數據數量<50%時,進行垃圾回收//垃圾回收:新創建一個文件,將有效消息復制到新文件中,復制完后,將源文件刪除,將新文件名改為源文件名public void GC(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {//創建一個新的文件:File newFile = new File(getNewDataFilePath(queue.getName()));if(newFile.exists()){throw new MqException("[MessageFileManager] 上次GC未執行成功, queueName:"+queue.getName());}boolean ok = newFile.createNewFile();if(!ok){throw new MqException("[MessageFileManager] 創建新文件失敗! queueName:"+queue.getName());}//將原來文件中的消息都取出來List<Message> messages = loadAllMessage(queue.getName());//將消息進行序列化,再存入新文件中try(FileOutputStream outputStream = new FileOutputStream(newFile,true);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message m:messages){byte[] payload = BinaryTool.toByte(m);dataOutputStream.writeInt(payload.length);dataOutputStream.write(payload);}}//刪除舊文件String oldDataFilePath = getDataFilePath(queue.getName());File oldFile = new File(oldDataFilePath);ok = oldFile.delete();if(!ok){throw new MqException("[MessageFileManager] 舊文件刪除失敗! oldFilePath:"+oldFile.getAbsolutePath());}//更新新文件名:ok = newFile.renameTo(oldFile);if(!ok){throw new MqException("[MessageFileManager] 更新文件名失敗 newFilePath"+newFile.getAbsolutePath());}//更新統計文件數據Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);}
//創建一個新的文件路徑:private String getNewDataFilePath(String queueName) {return getQueueDir(queueName)+"newQueue_data.txt";}
//查看當前隊列是否需要進行GC掃描public boolean checkGC(String queueName){Stat stat = readStat(queueName);int t1 = stat.validCount;int t2 = stat.totalCount;if(t2>=2000 && t1*1.0 /t2*1.0<0.5){return true;}return false;}
編寫MessageFileManager測試用例:
@SpringBootTest
class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static String queueName1 = "queueTest1";private static String queueName2 = "queueTest2";//創建2個文件@BeforeEachvoid setUp() throws IOException, MqException {messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);System.out.println("2個文件創建成功!");}
//銷毀文件@AfterEachvoid tearDown() throws IOException {messageFileManager.destoryQueueFile(queueName1);messageFileManager.destoryQueueFile(queueName2);System.out.println("2個文件銷毀成功!");}@Testvoid createQueueFile() {//在setUp方法中已經調用了該方法了,這里僅進行結果測試對比了File dataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");File dataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");File statFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");File statFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertTrue(dataFile1.exists());Assertions.assertTrue(dataFile2.exists());Assertions.assertTrue(statFile1.exists());Assertions.assertTrue(statFile2.exists());}@Testvoid readWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;//由于對統計文件的讀寫訪問方法都是私有方法,無法直接進行調用//但又必須進行測試,可以通過反射的方法進行訪問//注意:這里不能通過先修改督學stat方法的訪問權限,進行測試,然后再將訪問權限改過來,//因為對原代碼測試后再修改后的測試就是無效的,不知道修改后的代碼是否還會出現別的問題// 參數說明: 要反射的方法的類對象 要反射的方法名 方法的參數ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);MessageFileManager.Stat s = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100,s.totalCount);Assertions.assertEquals(50,s.validCount);}private MSGQueue createQueue(){MSGQueue queue = new MSGQueue();queue.setName(queueName1);queue.setDurable(true);queue.setAutoDelete(false);return queue;}private Message createMessage(String body){Message message = new Message();return message.createMessageById("routingKeyTest",null,body.getBytes());}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();Message message = createMessage("hello");messageFileManager.sendMessage(queue,message);//測試stat文件MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1,stat.validCount);Assertions.assertEquals(1,stat.totalCount);//測試data文件List<Message> messages = messageFileManager.loadAllMessage(queueName1);Assertions.assertEquals(1,messages.size());Message m = messages.get(0);Assertions.assertEquals(message.getMessageId(),m.getMessageId());Assertions.assertEquals(message.getRoutingKey(),m.getRoutingKey());Assertions.assertEquals(message.getDeliveryMode(),m.getDeliveryMode());Assertions.assertArrayEquals(message.getBody(),m.getBody());System.out.println(m);}@Testvoid deleteMessageFromFile() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向隊列中發送10條消息for(int i=0;i<5;i++){Message message = createMessage("hello" + i);messageFileManager.sendMessage(queue,message);list.add(message);}Assertions.assertEquals(5,list.size());//刪除后3條messageFileManager.deleteMessageFromFile(queue,list.get(3));messageFileManager.deleteMessageFromFile(queue,list.get(4));messageFileManager.deleteMessageFromFile(queue,list.get(2));List<Message> messages = messageFileManager.loadAllMessage(queueName1);Assertions.assertEquals(2,messages.size());for(int i=0;i<2;i++){Message m = messages.get(i);Message m2 = list.get(i);Assertions.assertEquals(m2.getRoutingKey(),m.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m.getBody());Assertions.assertEquals(m2.getIsVaild(),m.getIsVaild());}}@Testvoid loadAllMessage() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向隊列中添加100條消息for(int i=0;i<100;i++){Message message = createMessage("hello"+i);messageFileManager.sendMessage(queue,message);list.add(message);}List<Message> messages = messageFileManager.loadAllMessage(queue.getName());Assertions.assertEquals(100,messages.size());for(int i=0;i<100;i++){Message m1 = messages.get(i);Message m2 = list.get(i);Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m1.getBody());Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());}}@Testvoid GC() throws IOException, MqException, ClassNotFoundException {MSGQueue queue = createQueue();List<Message> list = new LinkedList<>();//向隊列中發送10條消息for(int i=0;i<100;i++){Message message = createMessage("hello" + i);messageFileManager.sendMessage(queue,message);list.add(message);}File file = new File("./data/" + queue.getName() + "/queue_data.txt");System.out.println("GC前文件大小: "+file.length());Assertions.assertEquals(100,list.size());//刪除偶數消息for(int i=0;i<100;i+=2){Message message = list.get(i);messageFileManager.deleteMessageFromFile(queue,message);}messageFileManager.GC(queue);System.out.println("GC后文件大小: "+file.length());List<Message> messages = messageFileManager.loadAllMessage(queueName1);for(int i=0;i<messages.size();i++){Message m1 = messages.get(i);Message m2 = list.get(2*i+1);Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());Assertions.assertArrayEquals(m2.getBody(),m1.getBody());Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());}}
}
十.整合數據庫和文件數據
上面的代碼中,使用數據庫存儲了Exchange,Queue,Binding,使用文件存儲了Message,
下面對數據庫和文件中的數據進行整合.進行統一管理.
見下一篇博客.