Spring Boot整合Redis實現發布/訂閱(含ACK機制)全流程
一、整體架構
二、實現步驟
步驟1:添加Maven依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
步驟2:配置Redis連接
# application.yml
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 16max-idle: 8
# redisStream配置信息
app:redis:stream: app-eventsgroup: app-groupconsumer: consumer-${random.int(1000)}
步驟3:創建消費者組
@Configuration
public class RedisConfig {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Beanpublic void createConsumerGroup(StringRedisTemplate redisTemplate) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {System.out.println("消費者組已存在: " + groupName);}}
}
步驟4:配置消息監聽容器
@Configuration
public class RedisConfig {// 配置消息監聽線程池@Bean(name = "redisStreamTaskExecutor")public ThreadPoolTaskExecutor redisStreamTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setThreadNamePrefix("redis-stream-");return executor;}// 創建消息監聽容器@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(RedisConnectionFactory factory,@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).executor(executor).batchSize(10).build();return StreamMessageListenerContainer.create(factory, options);}
}
步驟5:注冊消息監聽器
@Component
public class StreamListenerRegistrar {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;@PostConstructpublic void registerListener(StreamMessageListenerContainer container, RedisMessageProcessor processor) {StreamReadRequest<String> readRequest = StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false) // 手動ACK.build();container.register(readRequest, processor);}
}
步驟6:實現消息處理器
@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {@Overridepublic void onMessage(MapRecord<String, String, String> record) {CompletableFuture.runAsync(() -> {try {// 業務處理邏輯processBusiness(record);// 處理成功發送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());} catch (Exception e) {// 失敗消息進入Pending List}});}private void processBusiness(MapRecord<String, String, String> record) throws Exception {String eventType = record.getValue().get("eventType");String payload = record.getValue().get("payload");// 根據事件類型處理switch (eventType) {case "ORDER_CREATED": handleOrder(payload); break;case "PAYMENT_PROCESSED": handlePayment(payload); break;}}
}
步驟7:實現Pending消息處理器
@Component
@Slf4j
public class PendingMessageProcessor {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;// 每分鐘處理一次Pending消息@Scheduled(fixedRate = 60000)public void processPendingMessages() {// 1. 查詢Pending消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);pending.forEach(this::handlePendingMessage);}private void handlePendingMessage(PendingMessage pending) {try {// 2. 重新認領消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().claim(streamKey, Consumer.from(groupName, consumerName), Duration.ofSeconds(30), pending.getId());if (!records.isEmpty()) {MapRecord<String, String, String> record = records.get(0);// 3. 重試處理messageProcessor.processBusiness(record);// 4. 處理成功發送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());}} catch (Exception e) {// 5. 超過重試次數移入死信隊列if (pending.getTotalDeliveryCount() > 3) {moveToDeadLetterQueue(pending);}}}private void moveToDeadLetterQueue(PendingMessage pending) {// 獲取消息內容List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(streamKey, Range.from(pending.getId()));if (!records.isEmpty()) {// 添加到死信隊列redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());// 確認原始消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());}}
}
步驟8:實現消息生產者
@Service
public class RedisMessageProducer {@Value("${app.redis.stream}")private String streamKey;public String sendMessage(String eventType, String payload) {Map<String, String> message = Map.of("eventType", eventType,"payload", payload,"timestamp", String.valueOf(System.currentTimeMillis()));return redisTemplate.opsForStream().add(streamKey, message).getValue();}
}
步驟9:創建REST接口
@RestController
@RequestMapping("/messages")
public class MessageController {private final RedisMessageProducer producer;@PostMappingpublic String sendMessage(@RequestBody MessageRequest request) {return producer.sendMessage(request.getEventType(), request.getPayload());}@Datapublic static class MessageRequest {private String eventType;private String payload;}
}
三、消息生命周期流程圖
1. 正常消息處理流程
2. Pending消息處理流程
?
3. ACK機制工作原理
四、生產環境建議
消費者命名策略
@Value("${app.redis.consumer}") private String consumerName;// 在應用啟動時設置 @PostConstruct public void initConsumerName() {String hostName = InetAddress.getLocalHost().getHostName();String port = environment.getProperty("server.port");consumerName = "consumer-" + hostName + "-" + port; }
動態配置重試策略
app:pending:max_retry: 5retry_interval: 30000 # 30秒
死信隊列監控
@Scheduled(fixedRate = 3600000) // 每小時檢查一次 public void checkDeadLetterQueue() {Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);if (size > 0) {alertService.sendAlert("死信隊列有 " + size + " 條未處理消息");} }
消息TTL設置
// 發送消息時設置最大長度 public String sendMessage(String eventType, String payload) {MapRecord<String, String, String> record = ...;return redisTemplate.opsForStream().add(Record.of(record).withMaxLen(10000).approximate(true)); }
六、總結
本文詳細介紹了Spring Boot整合Redis實現發布/訂閱功能并添加ACK機制的完整方案:
事件驅動架構:使用Redis Stream監聽器實現真正的發布/訂閱模式
可靠ACK機制:通過手動ACK確認確保消息可靠處理
自動恢復系統:Pending消息處理器自動處理失敗消息
死信隊列:隔離無法處理的消息,防止系統阻塞
生產就緒:包含多實例部署、動態配置、監控告警等生產級特性
該方案適用于需要高可靠性消息傳遞的場景,如訂單處理、支付系統、事件溯源等,在保證系統吞吐量的同時提供了消息可靠性保障。