RokcetMQ的介紹和基礎知識見這篇博客——RocketMQ學習(1) 快速入門
本篇為上一篇的深入學習,很多基礎知識不再贅述。
目錄
- 消息重復消費問題(去重;冪等)
- 布隆過濾器
- 重試機制
- 死信消息
- SpringBoot集成RocketMQ
- 集成SpringBoot發送不同消息模式(同步消息)
- 異步消息
- 單向消息
- 延遲消息
- 順序消息
- 批量消息
- 事務消息
- 發送對象消息和集合消息
- 消息標簽,tag
- 消息key
- 負載均衡和廣播,兩種消息消費模式
- 消息堆積問題
- 消息丟失問題
- 消息安全問題
- 底層設計小探
消息重復消費問題(去重;冪等)
為什么會出現重復消費問題呢?
消息重復消費的主要原因有兩個:
- 生產者多次投遞:同一條消息可能被生產者多次發送。
- 消費者方重試機制:消費者在擴容或其他情況下會重試消費。
廣播模式(BROADCASTING)
在廣播模式下,所有注冊的消費者都會收到并處理消息。通常,這些消費者是集群部署的微服務,因此每臺機器都會消費同一條消息。雖然這是根據需求選擇的模式,但會導致多臺機器重復消費相同的消息。
負載均衡模式(CLUSTERING)
在負載均衡模式下,如果一個topic被多個consumer group消費,也會出現重復消費的情況。即使在同一個consumer group中,一個隊列只會分配給一個消費者,但以下情況仍可能導致重復消費:
- 負載均衡重新分配:當一個新的消費者加入或一個現有的消費者下線時,同組的所有消費者需要重新進行負載均衡。新的消費者需要獲取之前的消費偏移量(offset)。如果之前的消費者已經消費了一條消息但尚未提交offset,那么新的消費者可能會重新消費這條消息。
- 順序消費模式(Orderly):在順序消費模式下,前一個消費者解鎖后,新的消費者加鎖再進行消費。雖然這種方式比并發消費(concurrently)更嚴格,但由于加鎖的線程和提交offset的線程不同,極端情況下仍會出現重復消費。
- 批量消息處理:當發送批量消息時,整個批量消息會被當作一條消息處理。如果批量中的部分消息處理成功而其他消息失敗,重新消費時會導致已經成功處理的消息被再次消費。
如何避免重復消費?
在負載均衡模式下,并且在同一個消費者組中,如果不希望消息被重復消費,可以進行去重操作。具體方法如下:
消息唯一標識:為每條消息設置唯一的標識(如msgId或自定義的唯一key)。
去重邏輯:在消費者端實現去重邏輯,通過檢查消息的唯一標識來判斷消息是否已經被消費過。
查看官方文檔可知:
RocketMQ無法避免消息重復(Exactly-Once),所以如果業務對消費重復非常敏感,務必要在業務層面進行去重處理。可以借助關系數據庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內容中的唯一標識字段,例如訂單Id等。在消費之前判斷唯一鍵是否在關系數據庫中存在。如果不存在則插入,并消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)
msgId一定是全局唯一標識符,但是實際使用中,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發、因客戶端重投機制導致的重復等),這種情況就需要使業務字段進行重復消費。
還是強烈推薦學習一下官方文檔
其中實際過程要考慮原子性問題”是指確保判斷消息是否存在和插入消息這兩個操作要作為一個不可分割的整體進行,這樣才能避免并發情況下的競態條件,確保數據的一致性。
為了解決這個問題,我們需要確保“判斷消息是否存在”和“插入消息”這兩個操作是原子的,可以通過數據庫的唯一約束(如唯一鍵)和原子操作來實現。
利用唯一約束和插入操作:
可以使用數據庫的唯一鍵約束和插入操作的組合來確保原子性。如果插入操作因唯一鍵約束失敗,則說明消息已經存在,避免重復消費。
唯一標識如何存儲?
想法很好,但是消息的體量是非常大的,可能在生產環境中會到達上千萬甚至上億條,那么我們該如何選擇一個容器來保存所有消息的標識,并且又可以快速的判斷是否存在呢?
內存里的map可以嗎?不行,內存重啟就沒了,且集群之間不共享內存。
redis可以嗎?mysql可以嗎?
Mysql可以使用去重表,在數據庫操作中使用唯一約束,確保相同的操作不會被執行多次。而Redis的setnx命令天然就支持冪等。
使用redis的優點:高性能、數據結構豐富、分布式特性、TTL (過期時間)支持
使用redis的缺點:內存大小限制、持久化機制不如傳統關系型數據庫、數據丟失風險
使用MySQL 的優點:持久化存儲、查詢能力強、事務支持
使用MySQL 的缺點:性能瓶頸、擴展性差不如Redis、維護成本高
所以對于高并發大流量場景可以使用Redis,對于數據持久化要求高、歷史數據分析需求強的場景可以使用Mysql
在實際項目中,可以根據具體需求,將兩者結合使用
這里貼一下使用Mysql的方法:
生產者
@Test
void repeatProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();String key = UUID.randomUUID().toString();System.out.println(key);// 測試 發兩個key一樣的消息Message m1 = new Message("repeatTopic", null, key, "扣減庫存-1".getBytes());Message m1Repeat = new Message("repeatTopic", null, key, "扣減庫存-1".getBytes());producer.send(m1);producer.send(m1Repeat);System.out.println("發送成功");producer.shutdown();
}
消費者
/*** 冪等性(mysql的唯一索引, redis(setnx) )* 多次操作產生的影響均和第一次操作產生的影響相同* 新增:普通的新增操作 是非冪等的,唯一索引的新增,是冪等的* 修改:看情況* 查詢: 是冪等操作* 刪除:是冪等操作* ---------------------* 我們設計一個去重表 對消息的唯一key添加唯一索引* 每次消費消息的時候 先插入數據庫 如果成功則執行業務邏輯 [如果業務邏輯執行報錯 則刪除這個去重表記錄]* 如果插入失敗 則說明消息來過了,直接簽收了** @throws Exception*/@Testvoid repeatConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("repeatTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 先拿keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 原生方式操作Connection connection = null;try {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");} catch (SQLException e) {e.printStackTrace();}PreparedStatement statement = null;try {// 插入數據庫 因為我們 key做了唯一索引statement = connection.prepareStatement("insert into order_oper_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");} catch (SQLException e) {e.printStackTrace();}try {// 新增 要么成功 要么報錯 修改 要么成功,要么返回0 要么報錯statement.executeUpdate();} catch (SQLException e) {System.out.println("executeUpdate");if (e instanceof SQLIntegrityConstraintViolationException) {// 唯一索引沖突異常// 說明消息來過了System.out.println("該消息來過了");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}e.printStackTrace();}// 處理業務邏輯// 如果業務報錯 則刪除掉這個去重表記錄 delete order_oper_log where order_sn = keys;// 要不然下次來的時候它依然還在里面 就沒法去重試了System.out.println(new String(messageExt.getBody()));System.out.println(keys);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}
布隆過濾器
還以選擇布隆過濾器(BloomFilter)
布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進制向量和一系列隨機映射函數。布隆過濾器可以用于檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都比一般的算法要好的多,缺點是有一定的誤識別率和刪除困難。
在hutool的工具中我們可以直接使用,當然你自己使用redis的bitmap類型手寫一個也是可以的。
在微服務架構中,各個服務實例通常運行在不同的機器或容器上,并且內存不共享,此時可以將布隆過濾器存儲在分布式緩存系統中,比如 Redis 或 Memcached。這些緩存系統支持跨多個實例的數據共享,并且可以提供高效的讀取和寫入操作。比如使用 Redis 的布隆過濾器模塊(如 RedisBloom)來創建和管理布隆過濾器。
還將布隆過濾器存儲在一個共享的外部存儲系統中,例如將布隆過濾器序列化并存儲在數據庫或共享文件系統中。這樣所有的微服務實例都可以訪問相同的布隆過濾器。
生產者
@Test
public void testRepeatProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 設置nameServer地址producer.setNamesrvAddr("localhost:9876");// 啟動實例producer.start();// 我們可以使用自定義key當做唯一標識String keyId = UUID.randomUUID().toString();System.out.println(keyId);Message msg = new Message("TopicTest", "tagA", keyId, "我是一個測試消息".getBytes());SendResult send = producer.send(msg);System.out.println(send);// 關閉實例producer.shutdown();
}
添加hutool的依賴
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.11</version>
</dependency>
消費者
/*** 在boot項目中可以使用@Bean在整個容器中放置一個單例對象*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);@Test
public void testRepeatConsumer() throws Exception {// 創建默認消費者組DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setMessageModel(MessageModel.BROADCASTING);// 設置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 訂閱一個主題來消費 表達式,默認是*consumer.subscribe("TopicTest", "*");// 注冊一個消費監聽 MessageListenerConcurrently是并發消費// 默認是20個線程一起消費,可以參看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 判斷是否存在布隆過濾器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下處理業務return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 這個處理業務,然后放入過濾器中// do sth...bloomFilter.add(keys);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
重試機制
生產者重試:
@Test
public void retryProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();// 生產者發送消息 重試次數 一般也不會去改它 情況比較少見// 一般連接也不會斷開 斷開了重試也沒用producer.setRetryTimesWhenSendFailed(2); // 同步消息重試次數 默認2次producer.setRetryTimesWhenSendAsyncFailed(2); // 異步消息重試次數 默認2次String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());producer.send(message);System.out.println("發送成功");producer.shutdown();
}
// 失敗的情況重發3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S內沒有發送成功,就會重試
producer.send(msg, 1000);
在消費者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就會執行重試,下面代碼中說明了,我們在實際生產過程中,一般重試3-5次,如果還沒有消費成功,則可以把消息簽收了,通知人工等處理。
消息分為消息體和消息頭,消息頭里有延遲等級、隊列di、broker名字、發送時間、重試次數等信息,如下所示我們就在業務邏輯里打出了重試次數
死信消息定義見下面代碼注釋,死信消息會被存放在一個死信主題中去 主題的名稱:%DLQ%retry-consumer-group
,即%DLQ%消費者組名稱
/*** 重試的時間間隔* 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 默認重試16次* 1.能否自定義重試次數 consumer.setMaxReconsumeTimes(2);* 2.如果重試了16次(并發模式) 順序模式下(int最大值次)都是失敗的? 是一個死信消息 則會放在一個死信主題中去 主題的名稱:%DLQ%retry-consumer-group* 3.當消息處理失敗的時候 該如何正確的處理? 再寫一個消費者專門去消費死信消息* --------------* 重試的次數 一般的業務 5次就夠了* @throws Exception*/
@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");// 設定重試次數consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());// 打印重試次數System.out.println(messageExt.getReconsumeTimes());System.out.println(new String(messageExt.getBody()));// 業務報錯了 返回null 返回 RECONSUME_LATER 都會重試return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();System.in.read();
}
測試效果:
死信消息
上面的案例我們知道當一條消息初次消費失敗,RocketMQ會自動進行消息重試,達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,該消息不會立刻被丟棄,而是將其發送到該消費者對應的特殊隊列中,這類消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue),死信隊列是死信Topic下分區數唯一的單獨隊列。如果產生了死信消息,那對應的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName
,死信隊列的消息將不會再被消費。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查詢到對應死信消息的信息。我們也可以去監聽死信隊列,然后進行自己的業務上的邏輯
死信隊列監聽邏輯
/// 直接監聽死信主題的消息 消費是消費不了了 那么就記錄下 通知人工接入處理
@Test
public void retryDeadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());System.out.println(new String(messageExt.getBody()));System.out.println("記錄到特別的位置 文件 or mysql 通知人工處理");// 業務報錯了 返回null 返回 RECONSUME_LATER 都會重試return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
但是現實場景里可能有幾十個主題,那么就有幾十個死信主題,那么寫幾十個消費者都去監聽嗎?不切實際了,所以現實開發一般這樣寫,在業務處理邏輯中對重試次數進行判斷,如果超過某個次數了,就直接在業務邏輯里進行死信的記錄、人工的通知邏輯等。有的小公司都不寫這個邏輯,直接讓運維去dashBoard定期檢查死信主題里有沒有消息,然后進行人工操作。
第二種方案 用法比較多@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());// 業務處理try {handleDb();} catch (Exception e) {// 重試int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3) {// 不要重試了System.out.println("記錄到特別的位置 文件 mysql 通知人工處理");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 業務報錯了 返回null 返回 RECONSUME_LATER 都會重試return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
} //模擬現實的業務處理 比如一些orderService userService的操作
SpringBoot集成RocketMQ
首先來搭建rocketmq-producer(消息生產者)
完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.11</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.powernode</groupId><artifactId>b-rocketmq-boot-p</artifactId><version>0.0.1-SNAPSHOT</version><name>b-rocketmq-boot-p</name><description>b-rocketmq-boot-p</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
修改配置文件application.yml
spring:application:name: rocketmq-producer
rocketmq:name-server: 你的服務ip:9876 # rocketMq的nameServer地址producer:group: powernode-group # 生產者組別send-message-timeout: 3000 # 消息發送的超時時間 默認就是3sretry-times-when-send-async-failed: 2 # 異步消息發送失敗重試次數 默認就是2max-message-size: 4194304 # 消息的最大長度 默認就是這個 4M大小
我們在測試類里面測試發送消息
/*** 注入rocketMQTemplate,我們使用它來操作mq*/
@Autowired
private RocketMQTemplate rocketMQTemplate;/*** 測試發送簡單的消息** @throws Exception*/
@Test
public void testSimpleMsg() throws Exception {// 往powernode的主題里面以同步的方式發送一個簡單的字符串消息SendResult sendResult = rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一個消息");// 拿到消息的發送狀態System.out.println(sendResult.getSendStatus());// 拿到消息的idSystem.out.println(sendResult.getMsgId());
}
這里rocketMQTemplate的send方法有很多,可以自己去嘗試,然后看看,syncSend表示同步發送,asyncSend表示異步發送,sendOneWay表示單向發,sendOneWayOrderly表示單向順序發,syncSendOrderly表示同步順序發,asyncSendOrderly表示異步順序發等等。
此時查看控制臺可以看到這條消息。
然后搭建rocketmq-consumer(消息消費者)
完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.11</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.powernode</groupId><artifactId>c-rocketmq-boot-c</artifactId><version>0.0.1-SNAPSHOT</version><name>c-rocketmq-boot-c</name><description>c-rocketmq-boot-c</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
修改配置文件application.yml
spring:application:name: rocketmq-consumer
server:port: 8081
rocketmq:name-server: 你的服務ip:9876 # rocketMq的nameServer地址# 一個boot項目中可以寫很多個消費者成像,但是一般在開發中一個boot項目只對應一個消費者# 主題名也一般不在這里指定 在listener里寫
添加一個監聽的類SimpleMsgListener
@Component // 需要交給IOC容器管理
// 注解里指定主題和消費者組
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> { /*** 這個方法就是消費者的方法 注意要實現接口RocketMQListener* 如果接口泛型制定了固定的類型 那么onMessage就會把消息體轉換成那個類型 比如String* 我們這里寫成MessageExt 這個類型是消息的所有內容* ------------------------* 方法也不需要return* 沒有報錯 就簽收了* 如果報錯了 就是拒收 就會重試** @param message*/@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
然后啟動rocketmq-consumer的啟動項,查看控制臺,發現我們已經監聽到消息了
集成SpringBoot發送不同消息模式(同步消息)
上面是經典的同步發送消息,理解為:消息由消費者發送到broker后,會得到一個確認,是具有可靠性的。這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知等。
我們在上面的快速入門中演示的消息,就是同步消息,即
rocketMQTemplate.syncSend()
rocketMQTemplate.send()
rocketMQTemplate.convertAndSend()
這三種發送消息的方法,底層都是調用syncSend,發送的是同步消息
下面我們來玩一玩不同的消息模式
異步消息
rocketMQTemplate.asyncSend()
// 異步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一個異步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失敗" + throwable.getMessage());}
});
// 測試一下異步的效果
System.out.println("誰先執行");
// 掛起jvm 不讓方法結束
System.in.read();
單向消息
這種方式主要用在不關心發送結果的場景,這種方式吞吐量很大,但是存在消息丟失的風險,例如日志信息的發送
// 單向
rocketMQTemplate.sendOneWay("bootOnewayTopic", "單向消息");
延遲消息
還記得我們使用rocketmq原生的api怎么發延遲消息的嗎,我們是將消息new出來后,通過消息來設置延遲時間,但是這里的Message對象和之前的其實不同了,之前是org.apache.rocketmq.common.message.Message;現在是org.springframework.messaging.Message;這個消息對象沒有給設置延遲時間的api,而是通過重載syncSend方法實現延遲時間的設置:
這里第三個參數是連接mq的超時時間,第四個是消息的延遲等級,這個delayLevel才對應我們之前的延遲消息發送設置。
// 延遲Message<String> msg = MessageBuilder.withPayload("我是一個延遲消息").build();rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3);
順序消息
還是上一篇博客的訂單類案例,看不懂的兄弟一定要先去看第一篇博客,對應起來學就很容易了。
這里也注意api的使用,第一個參數訂閱主題,第二個參數對象,第三個參數傳入一個HashKey取到某一個消息的唯一標識,用在我們選擇哪一個隊列時,這里我們傳入訂單號,這樣同一個訂單就會被放入同一個隊列了,并且這里不需要我們手動判斷選擇哪個隊列了。
// 順序消息 發送者放 需要將一組消息 都發在同一個隊列中去 消費者 需要單線程消費
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下單"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下單"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 發送 第二個參數雖然是Object類型 但是一般都是以json的方式進行處理rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
這里的消費者代碼因為與一般的代碼有點區別,所以再單獨寫一下
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 順序消費模式默認是并發模式 因為是順序消息 要修改成單線程maxReconsumeTimes = 5 // 消費重試的次數
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}
重啟rocketmq-consumer服務,可以看到是局部順序消費的:
批量消息
List<String> messages = Arrays.asList(new String("批量消息-消息1"),new String("批量消息-消息2"),new String("批量消息-消息3"),new String("批量消息-消息4")
);
List<Message<?>> messageList = messages.stream().map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());// RocketMQ每批次消息的最大字節限制是4MB,需根據此限制進行拆分
int size = messageList.size();
int batchSize = 100; // 每批發送100條消息
for (int i = 0; i < size; i += batchSize) {int end = Math.min(size, i + batchSize);List<Message<?>> subList = messageList.subList(i, end);rocketMQTemplate.syncSend("bootBatchTopic", subList);
}
}
由于RocketMQ每批次消息的最大字節限制是4MB,因此需要對消息進行分批處理發送。這里簡單設定每批次發送100條消息,可以根據實際情況調整。
事務消息
這個可以先不學了,比較雞肋,基本不用,后面我們會專門學分布式事務seata,它是專門解決分布式事務問題的。這里也附上代碼,有興趣可以學一下。
生產者
/*** 測試事務消息* 默認是sync(同步的)* 事務消息會有確認和回查機制* 事務消息都會走到同一個監聽回調里面,所以我們需要使用tag或者key來區分過濾** @throws Exception*/
@Test
public void testTrans() throws Exception {// 構建消息體Message<String> message = MessageBuilder.withPayload("這是一個事務消息").build();// 發送事務消息(同步的) 最后一個參數才是消息主題TransactionSendResult transaction = rocketMQTemplate.sendMessageInTransaction("powernode", message, "消息的參數");// 拿到本地事務狀態System.out.println(transaction.getLocalTransactionState());// 掛起jvm,因為事務的回查需要一些時間System.in.read();
}
消費者
/*** 事務消息的監聽與回查* 類上添加注解@RocketMQTransactionListener 表示這個類是本地事務消息的監聽類* 實現RocketMQLocalTransactionListener接口* 兩個方法為執行本地事務,與回查本地事務*/
@Component
@RocketMQTransactionListener(corePoolSize = 4,maximumPoolSize = 8)
public class TmMsgListener implements RocketMQLocalTransactionListener {/*** 執行本地事務,這里可以執行一些業務* 比如操作數據庫,操作成功就return RocketMQLocalTransactionState.COMMIT;* 可以使用try catch來控制成功或者失敗;* @param msg* @param arg* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 拿到消息參數System.out.println(arg);// 拿到消息頭System.out.println(msg.getHeaders());// 返回狀態COMMIT,UNKNOWNreturn RocketMQLocalTransactionState.UNKNOWN;}/*** 回查本地事務,只有上面的執行方法返回UNKNOWN時,才執行下面的方法 默認是1min回查* 此方法為回查方法,執行需要等待一會* xxx.isSuccess() 這里可以執行一些檢查的方法* 如果返回COMMIT,那么本地事務就算是提交成功了,消息就會被消費者看到** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {System.out.println(msg);return RocketMQLocalTransactionState.COMMIT;}
}
測試發送事務,建議斷點啟動
- 消息會先到事務監聽類的執行方法,
- 如果返回狀態為COMMIT,則消費者可以直接監聽到
- 如果返回狀態為ROLLBACK,則消息發送失敗,直接回滾
- 如果返回狀態為UNKNOW,則過一會會走回查方法
- 如果回查方法返回狀態為UNKNOW或者ROLLBACK,則消息發送失敗,直接回滾
- 如果回查方法返回狀態為COMMIT,則消費者可以直接監聽到
發送對象消息和集合消息
這個上面已經說過了其實,主要就是監聽的時候泛型中寫對象的類型即可
生產者
/*** 測試發送對象消息** @throws Exception*/
@Test
public void testObjectMsg() throws Exception {Order order = new Order();order.setOrderId(UUID.randomUUID().toString());order.setOrderName("我的訂單");order.setPrice(998D);order.setCreateTime(new Date());order.setDesc("加急配送");// 往powernode-obj主題發送一個訂單對象rocketMQTemplate.syncSend("powernode-obj", order);
}
消費者也添加一個Order類(拷貝過來)
/*** 創建一個對象消息的監聽* 1.類上添加注解@Component和@RocketMQMessageListener* 2.實現RocketMQListener接口,注意泛型的使用*/
@Component
@RocketMQMessageListener(topic = "powernode-obj", consumerGroup = "powernode-obj-group")
public class ObjMsgListener implements RocketMQListener<Order> {/*** 消費消息的方法** @param message*/@Overridepublic void onMessage(Order message) {System.out.println(message);}
}
發送集合消息
和對象消息同理,創建一個Order的集合,發送出去,監聽方注意修改泛型中的類型為Object即可,這里就不做重復演示了
同時這里注意與發送批量消息進行區分,發送批量消息時,發送的類型是List<Message<?>>
,此時List里的消息批量發送到一個隊列,才會被一條一條的消費。
消息標簽,tag
我們從源碼注釋得知,tag帶在主題后面用:來攜帶,感謝注釋
我們往下去看源碼,在
org.apache.rocketmq.spring.support.RocketMQUtil 的getAndWrapMessage方法里面看到了具體細節,我們也知道了keys在消息頭里面攜帶
// topic:tag
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一個帶tag的消息");
消費者:
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// 選擇類型 默認就是tag過濾模式selectorExpression = "tagA || tagB" // 默認"*"代表所有
// selectorType = SelectorType.SQL92,// sql92過濾模式
// selectorExpression = "a in (3,5,7)" // 相當于"3 || 5 || 7"
// SQL92需要broker.conf中開啟enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
過濾模式也可以使用sql92模型,一般幾乎不用,默認也是不支持的
需要在配置文件broker.conf中開啟enbalePropertyFilter=true,查看源碼可知其語法:
比如主題是bootTagTopic:3,那么可以這樣寫過濾表達式:“a > 5”,消息過不來,這種寫法幾乎用不到
消息key
// key是寫帶在消息頭的
Message<String> message = MessageBuilder.withPayload("我是一個帶key的消息").setHeader(RocketMQHeaders.KEYS, "qwertasdafg").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);
在消費者里也可以取key,在消息頭里,不再贅述。
負載均衡和廣播,兩種消息消費模式
之前說過Rocketmq消息消費的模式分為兩種:負載均衡模式和廣播模式
負載均衡模式表示多個消費者交替消費同一個主題里面的消息
廣播模式表示每個每個消費者都消費一遍訂閱的主題的消息
其中在注解中的messageModel屬性可以配置消息模式,默認就是MessageModel.CLUSTERING
,即集群模式(負載均衡),點進源碼可以看到另外一種模式是MessageModel.BROADCASTING
,即廣播模式
下面進行演示,建兩個類做兩個消費者監聽器DC1和DC2
/*** [CLUSTERING] 集群模式下 隊列會被消費者分攤, 注意隊列數量>=消費者數量 消息的消費位點 mq服務器會記錄處理* BROADCASTING 廣播模式下 消息會被每一個消費者都處理一次, mq服務器不會記錄消費點位,也不會重試*/
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING, // 集群模式 負載均衡consumeThreadNumber = 40)
public class DC1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a組的第一個消費者:" + message);}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DC2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a組的第二個消費者:" + message);}
}
發送10條消息測試一下,發現消息是被均勻分配的。
/ 測試消息消費模式 集群模塊 廣播模式@Test
void modeTest() throws Exception {for (int i = 1; i <= 10; i++) {rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "個消息");}
}
再加一個消費者3,發現第2個消費者消費了5條消息。
這里再強調一下上篇博客的內容,如果是廣播模式,那好說,一個組內所有的消費者都會拿到每個隊列里的消息,如果是負載均衡模式,假設有c1、c2兩個消費者,那么每個隊列是要固定聯系好消費者的,即比如隊列1、2的消息只會給c1,2、3的消息只會給c2,比如4個隊列都指定了消費者,組內再有一個消費者,那它永遠沒有消息。
所以最好隊列數量>=組內消費者數量
另外一般最好先啟動消費者再啟動生產者,假設先啟動生產者,然后有10條消息堆積,再啟動消費者,消費者組可能有多個消費者,如果某個消費者c1啟動的快,它可能認為該組只有它一個,就會去消費,等c2啟動好就沒有消息了。
再來玩一下廣播模式,我們再開3個消費者,到另一個消費者組去,也訂閱這個主題,兩個消費者組都能消費到消息,觀察一下它們的區別,DC4~DC6的代碼就貼個DC4:
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 廣播模式
)
public class DC4 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b組的第一個消費者:" + message);}
}
然后我們發5條消息,觀察發現b組消費了15條消息,a組只消費了5條。
觀察dashboard也可發現,a組是負載均衡模式會分配著來,差值都為0,正常被消費,而b組的終端都是1個,且差值不對應,還有差值,因為廣播模式終端是不會幫你處理點位的,即廣播模式下,消息消費成功與否都不會重試!
消息堆積問題
當差值很大,即MQ里堆積了很多消息沒有消費,一般認為單條隊列消息差值>=10w時 算堆積問題。
這個問題如何解決?這個問題要分析問題發生的情況來分析
什么情況下會出現堆積:1. 生產太快了;2. 消費者消費出現問題
如果是生產太快了,那么
- 生產方可以做業務限流,常見的限流算法我這篇博客有寫
- 增加消費者數量,但是也得消費者數量<=隊列數量,適當的設置最大的消費線程數量(根據任務劃分,mq大部分處理的是IO密集型(2n)/如果是CPU計算型(n+1))。這個可以在
@RocketMQMessageListener
注解的consumeThreadNumber
屬性配置 - 動態擴容隊列數量,從而增加消費者數量(dashboard可以操作,注意別在消息還沒消費完的時候去縮容,要不然消息就沒了)
如果是消費者消費出現問題,那么就排查消費者程序的問題,還有就是優化消費邏輯,減少單條消息的處理時間。我這篇博客有寫已經上線的bug如何排查,還有這篇博客也寫了JVM的排查
除此之外要監控系統的消息堆積情況,及時擴容 Broker 或調整負載均衡策略。有些第三方監控工具專門用于監控消息中間件的狀態,例如 Prometheus、Grafana、Datadog 等,也可以自定義監控腳本。
PS:補充一些很少使用的操作
dashboard還提供了跳過堆積的操作,選中對應的消費者組,該堆積的消息將不會被該消費者消費了,即位點后移。
還有重置消費位點的操作,消費過的消息想再次消費,那么可以前移消費位點。可以選擇給某個消費者組的位點重置到前面的某個時間上。
消息丟失問題
broker刷盤丟失
這是最容易發生丟失問題的情況。
生產者把消息發送給broker后,broker要對消息進行持久化,持久化分為同步刷盤和異步刷盤。同步刷盤會在broker端開啟一塊磁盤區進行順序IO,等到持久化成功然后告訴生產者,同步刷盤比較安全一般不會出現丟失問題,但是性能不高。而異步刷盤有一個內存buffer,統一刷盤,性能高,但是容易出現消息丟失問題。
解決這個問題可以采用同步刷盤但是性能很低,大部分情況下不會使用同步刷盤,如果使用異步刷盤如何避免消息丟失,消息丟失不可怕,怕不知道哪一條丟了,那么我們可以在生產者端也進行持久化,發送消息后自己持久化log到文件或者mysql中。保存消息的key creatTime status(發送成功),消費者在處理完業務邏輯后,去更新這個消息的狀態,key handleTime status(處理成功)。然后我們可以通過定時任務定期檢查數據庫,哪些消息發送很久但還沒被消費,進行補發,同時結合冪等操作防止重復消費。
這個問題確實深挖起來還是有很多可以講的,收集了在網上一些博主對這個問題的看法,消息丟失問題可能因為這些原因
在消息發送端:
- Producer故障,在消息成功發送broker之前,Producer發送故障或者宕機
- 網絡中斷,在Producer與Broker之間的消息傳遞過程中,網絡連接中斷或延遲
- Broker未確認,接收到消息后由于某些原因未能向Producer發送確認
解決方案:增強發送端的可靠性
- 對于同步發送,確保消息發送成功后才繼續后續操作。
- 對于異步發送,設置回調函數,使用重試機制,Producer在發送消息的時候如果沒有收到Broker的確認,可以采用重試策略
- 事務消息,rocketmq支持事務消息,可以確保在特定業務的操作中消息發送的可靠性
在消息存儲端:
- 單點故障,如果消息只保存在Master Broker上,那么一旦Master發生故障,消息就可能丟失,
- 磁盤故障,存儲消息的磁盤損壞會導致數據丟失
解決方案:強化消息存儲的安全性
- Master-Slave同步,確保每一條消息都在Master和至少一個Slave上有保存
- 設置合理的刷盤策略,例如同步刷盤(SYNC_FLUSH)模式,確保消息寫入磁盤后才返回成功
- 使用RAID,在硬件層面采用RAID技術來增強磁盤的容錯能力。(RAID(Redundant Array of Independent Disks)技術是一種數據存儲技術,通過將多個獨立的硬盤組合起來,以提高數據的可靠性、性能或者兩者兼顧的存儲系統。)
在消息中間件Broker端:
- 同步延遲,Master與Slave之間的同步延遲會導致Master的節點未及時同步到Slave節點
- Slave故障,在同步的過程中Slave宕機或出現問題
解決方案:優化消息同步策略
- 選擇適合當前業務特性的Master-Slave同步機制,比如同步復制或者異步復制
- 增加監控的機制去實時監控Master和Slave的同步狀態,一旦出現延遲或者故障立刻觸發告警(Dashboard 可視化、編寫監控工具設計合理的監控指標進行歷史數據分析) 有些第三方監控工具專門用于監控消息中間件的狀態,例如 Prometheus、Grafana、Datadog 等。
在消息消費端:
- 消費者在處理消息時出現異常,消息未能正確處理,導致消息丟失
- 消費者在處理消息后未能正確提交 offset,導致重復消費或丟失消息
解決方案:強化消息消費的可靠性
- 在消費邏輯中手動增加異常處理機制,確保消息處理失敗時能夠重新消費,也能自定義去查消息體里的重試次數,避免自動重試過多,也避免額外的死信監聽
- 使用手動提交 offset 的方式,確保在消息成功處理后才提交 offset
- 使用冪等性設計,確保消息重復消費時不會造成業務錯誤
還有就是消息堆積導致丟失,這個可見上一小節。
消息軌跡監控
rocketmq提供了消息軌跡監控功能,會影響性能,一般不開。
1.在broker.conf中開啟消息追蹤 traceTopicEnable=true
2.重啟broker即可
3.生產者的yml配置文件開啟發送方消息軌跡 enable-msg-trace: true
原生的api直接在定義消費者組的時候有個參數enableMsgTrace可以指定是否開啟
4. 消費者開啟消息軌跡功能,可以給單獨的某一個消費者開啟,在注解的enableMsgTrace屬性中指定開啟 enableMsgTrace = true
在rocketmq的面板中可以查看消息軌跡(通過message_id或者key查看)
默認會將消息軌跡的數據存在 RMQ_SYS_TRACE_TOPIC 主題里面
消息安全問題
RocketMQ 的消息安全問題涉及到消息的傳輸安全、身份認證、權限控制等方面。以下是 RocketMQ 消息安全的幾個主要方面及其解決方案:
-
傳輸安全:
-
- 問題:在消息傳輸過程中,可能存在竊聽、篡改等安全威脅,導致消息泄露或被篡改。
-
- 解決方案:使用 TLS/SSL 加密協議對消息進行傳輸加密,確保消息在網絡傳輸過程中的機密性和完整性。
-
身份認證:
-
- 問題:在分布式系統中,需要對客戶端和服務端進行身份認證,以防止未經授權的用戶訪問消息系統。
-
- 解決方案:采用身份認證機制,如用戶名密碼、JWT(JSON Web Token)等方式對客戶端進行身份認證,確保只有經過授權的用戶能夠訪問消息系統。
-
訪問控制:
-
- 問題:需要對用戶訪問消息隊列的權限進行控制,以確保用戶只能訪問其有權訪問的隊列。
-
- 解決方案:實現訪問控制列表(ACL)機制,對消息隊列進行權限控制,包括讀取和寫入權限等。只有具有相應權限的用戶才能對隊列進行操作。
-
數據加密:
-
- 問題:在消息存儲和傳輸過程中,需要對消息內容進行加密,以保護消息的機密性。
-
- 解決方案:對消息內容進行加密處理,確保消息在存儲和傳輸過程中的機密性。可以使用對稱加密算法或非對稱加密算法對消息內容進行加密。
-
防止重放攻擊:
-
- 問題:在消息傳輸過程中,可能會受到重放攻擊,即攻擊者重復發送已經被記錄的有效消息,以達到非法訪問或篡改消息的目的。
-
- 解決方案:采用消息序列號、時間戳、令牌等方式對消息進行防重放處理,確保每條消息只能被處理一次。
-
日志審計:
-
- 問題:需要對消息系統的操作進行日志審計,以便追溯和監控用戶的操作行為。
-
- 解決方案:記錄用戶對消息系統的操作日志,包括用戶的登錄、訪問權限、消息發送和接收等操作,以便于后續審計和監控。
其中訪問控制具體操作如下:
一、開啟acl的控制 在broker.conf中開啟aclEnable=true
二、配置賬號密碼 修改plain_acl.yml(跟broker.conf同級目錄),文件內容如下所示
globalWhiteRemoteAddresses是全局白名單,accounts是賬號,密碼必須大于8位,默認給了兩個案例賬號,比如第一個賬號,admin:false說明不是管理員賬號,第二個賬號是,管理員是可以隨便操作的。再看第一個賬號,不是管理員賬號,defaultTopicPerm默認權限是拒絕,只能SUB(只讀),PUB是發表,他給一些主題設置了權限,主題A不能讀不能寫,B能讀能寫,C只讀。對group也設置了權限。
另外賬號2我們要修改一個地方,他的白名單是192.168.1.*,比如我們的ip是192.168.206.186,那么可以修改成白名單是192.168.*.*
三、dashboard的配置文件也要改,雖然你是通過nameserver監控broker,但是broker現在有acl了,你也得認證。
修改控制面板的配置文件 (控制面板就是個jar包,改里面的application.properties),放開52/53行 ,如果想把面板的登錄也弄個賬號密碼那就把49行改為true
把修改好的application.properties上傳到服務器的jar包平級目錄下即可 覆蓋內部的配置文件
配置文件的加載優先級:當前項目project所在目錄的/config目錄下 > 當前項目project所在目錄下 > classpath的/config目錄 > classpath的根目錄
重啟服務發現面板需要登錄賬號密碼了,面板的賬號密碼被定義在application.properties同級的users.properties下,管理員默認賬號密碼是admin和admin
生產者和消費者使用密碼也很簡單,注意跟dashboard面板的的賬號密碼不是同一個。在yml文件里配置,否則發消息會報錯沒有權限訪問,消費者也是
rocketmq:name-server: 你的ip:9876 # rocketMq的nameServer地址consumer:access-key: rocketmq2secret-key: 12345678
rocketmq:name-server: 你的ip:9876 # rocketMq的nameServer地址producer:group: boot-producer-group # 生產者組的名字access-key: rocketmq2secret-key: 12345678
底層設計小探
官網提供源碼版本的下載,以后學習某個新技術也是這個思路,先了解它是什么,然后快速上手,搭起來把簡單的api過一遍,先入門再說。然后再學習它的深入特性和原理,最后是源碼級別的學習。
官網提供的文檔其實都很詳細,也提供了源碼的下載:
解壓完之后是一個項目,里面是各種各樣的東西的實現,比如broker的實現,nameserver的實現,源碼的注釋比較少,但是結構很清晰,源碼的學習也是大牛的必經之路,
作為小白我們先不看它的源碼,先去看它的docs,大部分的問題,其實在文檔里都會有寫。最重要的第一步,先去看它的總體設計,即下圖的design.md,工作中更多的是根據自己的需要去查找相關的文檔及源碼。在這里偷懶,我就不貼閱讀體會了。