一、SSE技術深度解析
1.1 協議工作原理
1.2 與WebSocket對比
特性 | SSE | WebSocket |
---|---|---|
協議 | HTTP | WS/WSS |
方向 | 單向(服務端→客戶端) | 雙向 |
重連 | 自動 | 需手動實現 |
二進制 | 僅文本 | 支持二進制 |
復雜度 | 低 | 中高 |
二、Spring Boot服務端實現
2.1 增強型SSE控制器
@RestController
@RequestMapping("/api/sse")
public class EnhancedSseController {private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(4);@GetMapping("/subscribe/{clientId}")public SseEmitter subscribe(@PathVariable String clientId) {SseEmitter emitter = new SseEmitter(30_000L);// 心跳機制ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> sendHeartbeat(emitter),10, 10, TimeUnit.SECONDS);emitter.onCompletion(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitter.onTimeout(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitters.put(clientId, emitter);return emitter;}@PostMapping("/broadcast")public void broadcast(@RequestBody MessageDto message) {emitters.forEach((id, emitter) -> {try {emitter.send(SseEmitter.event().id(UUID.randomUUID().toString()).name("message").data(message).reconnectTime(5000L));} catch (IOException e) {emitter.complete();emitters.remove(id);}});}
}
2.2 消息實體設計
public class MessageDto {private MessageType type;private String from;private String content;private Instant timestamp;public enum MessageType {NOTIFICATION, ALERT, SYSTEM}
}
三、前端高級實現
3.1 帶重連機制的EventSource
class ResilientEventSource {constructor(url, options = {}) {this.url = url;this.retryDelay = options.retryDelay || 3000;this.maxRetries = options.maxRetries || 5;this.eventHandlers = {};this.connect();}connect() {this.es = new EventSource(this.url);this.retryCount = 0;this.es.onopen = () => {this.retryCount = 0;this.onOpen?.();};this.es.onerror = () => {this.es.close();if (this.retryCount++ < this.maxRetries) {setTimeout(() => this.connect(), this.retryDelay);} else {this.onError?.();}};Object.entries(this.eventHandlers).forEach(([type, handler]) => {this.es.addEventListener(type, handler);});}addEventListener(type, handler) {this.eventHandlers[type] = handler;if (this.es) this.es.addEventListener(type, handler);}
}
3.2 Vue3集成示例
<template><div class="sse-container"><div v-for="msg in messages" :key="msg.id":class="['message', msg.type]"><span class="timestamp">{{ formatTime(msg.timestamp) }}</span><span class="content">{{ msg.content }}</span></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
let eventSource;onMounted(() => {eventSource = new ResilientEventSource('/api/sse/subscribe/user123', {retryDelay: 5000,maxRetries: Infinity});eventSource.addEventListener('message', (e) => {messages.value.push(JSON.parse(e.data));});
});onUnmounted(() => {eventSource?.close();
});
</script>
四、生產環境最佳實踐
4.1 性能優化方案
- 連接池管理:限制最大連接數
- 消息批處理:合并短時間內的多個事件
- 壓縮傳輸:啟用gzip壓縮
- 負載均衡:Nginx配置SSE支持
4.2 Nginx配置示例
server {location /api/sse {proxy_pass <http://backend>;proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;proxy_read_timeout 24h;}
}
五、安全增強措施
5.1 認證授權
@GetMapping("/secure-subscribe")
public SseEmitter secureSubscribe(@RequestHeader("Authorization") String token) {if (!jwtUtil.validateToken(token)) {throw new SecurityException("Invalid token");}String userId = jwtUtil.extractUserId(token);return sseService.subscribe(userId);
}
5.2 消息加密
public String encryptMessage(String raw) {return Base64.getEncoder().encodeToString(aesCipher.doFinal(raw.getBytes(StandardCharsets.UTF_8)));
}
六、監控與運維
6.1 關鍵監控指標
指標 | 采集方式 | 告警閾值 |
---|---|---|
活躍連接數 | Micrometer | >1000 |
消息延遲 | Prometheus | >1s |
錯誤率 | ELK | >5% |
6.2 健康檢查端點
@GetMapping("/health")
public Map<String, Object> health() {return Map.of("status", "UP","connections", emitters.size(),"lastMessage", lastMessageTime);
}
七、擴展應用場景
7.1 實時日志監控
@Bean
public ApplicationListener<LoggingEvent> logListener() {return event -> {if (event.getLevel().isGreaterOrEqual(Level.WARN)) {sseService.broadcast(new MessageDto("SYSTEM",event.getLoggerName(),event.getFormattedMessage()));}};
}
7.2 股票行情推送
@Scheduled(fixedRate = 1_000)
public void pushStockQuotes() {stockService.getLatestQuotes().forEach(quote -> {sseService.sendToUser(quote.getUserId(),new MessageDto("STOCK",quote.getSymbol(),quote.getPrice().toString()));});
}
總結
本文實現的SSE實時消息推送系統具有以下優勢:
- 高效實時:毫秒級消息延遲
- 資源友好:單連接持續復用
- 彈性可靠:自動重連機制
- 易于擴展:支持水平擴展
在實際應用中建議:
- 根據業務需求選擇合適的消息格式(JSON/Protobuf)
- 實施完善的監控告警
- 定期進行壓力測試
- 考慮消息持久化方案
通過合理設計和優化,該方案可支持從中小規模到百萬級連接的消息推送場景。