惰性隊列工作原理
惰性隊列通過盡可能多地將消息存儲到磁盤上來減少內存的使用。與傳統隊列相比,惰性隊列不會主動將消息加載到內存中,而是盡量讓消息停留在磁盤上,從而降低內存占用。盡管如此,它并不保證所有操作都是同步寫入磁盤的。這意味著消息可能會先被緩存到操作系統的緩沖區中,然后由操作系統決定何時將其真正寫入磁盤。
- 優點:適合處理大量消息且對內存壓力敏感的場景。
- 缺點:由于頻繁的磁盤I/O操作,性能可能不如傳統隊列。
同步刷盤的概念
同步刷盤意味著每次寫入操作都會等待數據完全寫入磁盤后才返回確認信息。雖然這種方式提供了更強的數據持久性保證,但它也顯著增加了寫入操作的延遲。對于RabbitMQ而言,可以通過設置消息為持久化來增加數據的安全性,但對于極端情況下的數據安全性要求,還需要結合其他策略如調整操作系統參數或使用文件系統級別的同步寫入配置。
延遲插件的工作原理
RabbitMQ本身沒有內置的延遲隊列功能,但可以通過安裝rabbitmq_delayed_message_exchange
插件實現這一功能。該插件允許創建一個自定義交換機類型,該交換機能夠根據消息頭中的延遲時間屬性來延遲消息的傳遞。
在Spring Boot中集成RabbitMQ惰性隊列和延遲消息
1.?項目初始化
首先,確保你的Spring Boot項目中包含必要的依賴:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
2.?配置RabbitMQ連接
在application.yml
中配置RabbitMQ連接信息:
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
3.?定義惰性隊列
創建一個配置類來定義惰性隊列:
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定義惰性模式的隊列* @return 返回惰性隊列實例*/@Beanpublic Queue lazyQueue() {Map<String, Object> args = new HashMap<>();// 設置隊列為惰性模式args.put("x-queue-mode", "lazy");return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability}
}
4.?發送持久化消息
創建一個服務類用于發送消息,并確保消息是持久化的:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送一條持久化消息到惰性隊列* @param message 要發送的消息內容*/public void sendMessage(String message) {rabbitTemplate.convertAndSend("my_lazy_queue", message);System.out.println(" [x] Sent '" + message + "'");}
}
確保消息持久化可以在application.yml
中設置如下:
spring:rabbitmq:template:exchange: ''routing-key: 'my_lazy_queue'mandatory: truepublisher-confirms: truepublisher-returns: true
5.?接收消息
創建一個監聽器來接收消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageReceiver {/*** 監聽并接收來自惰性隊列的消息* @param message 接收到的消息內容*/@RabbitListener(queues = "my_lazy_queue")public void receiveMessage(String message) {System.out.println(" [x] Received '" + message + "'");}
}
6.?使用延遲插件發送延遲消息
首先,在RabbitMqConfig
中定義延遲交換機:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定義延遲交換機* @return 返回延遲交換機實例*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}/*** 綁定延遲隊列到延遲交換機* @param delayedQueue 延遲隊列* @param delayExchange 延遲交換機* @return 返回綁定實例*/@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayExchange) {return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());}/*** 定義延遲隊列* @return 返回延遲隊列實例*/@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}
}
然后,創建一個服務類來發送延遲消息:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送帶有延遲的消息* @param message 要發送的消息內容* @param delayTime 延遲時間(毫秒)*/public void sendDelayedMessage(String message, int delayTime) {MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);return message;};rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);System.out.println(" [x] Sent '" + message + "' with delay.");}
}
最后,創建一個監聽器來接收延遲消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedMessageReceiver {/*** 監聽并接收來自延遲隊列的消息* @param message 接收到的消息內容*/@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println(" [x] Received delayed message '" + message + "'");}
}
高級特性和最佳實踐
-
發布確認機制:為了提高可靠性,可以開啟發布確認機制,以確保消息確實被RabbitMQ服務器接受。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message acknowledged");} else {System.err.println("Message not acknowledged due to: " + cause);}
});
-
預取計數(Prefetch Count):通過設置預取計數限制每個消費者同時處理的消息數量,有助于防止消費者被過多未處理的消息壓垮。
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConnectionConfig {@Beanpublic CachingConnectionFactory cachingConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setChannelCacheSize(25);connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);return connectionFactory;}
}
可以在application.yml
中設置:
spring:rabbitmq:listener:simple:prefetch: 10