使用SSE解決獲取狀態不一致問題
- 1. 問題描述
- 2. SSE介紹
- 2.1 SSE 的工作原理
- 2.2 SSE 的事件格式規范
- 2.3 SSE與其他技術對比
- 2.4 SSE 的優缺點
- 3. 實戰代碼
1. 問題描述
目前做的一個功能是上傳多個文件,這個上傳文件是整體功能的一部分,文件在上傳的過程中需要輪詢文件上傳的狀態的接口,還有要調用整個任務狀態的接口,因為我們的Mysql是單例的,有多個大文件上傳的時候會出現任務狀態不一致的問題,經過調研有WebSocket與SSE兩種方式,但是我只需要把信息推給前端,所以使用個更輕量的SSE。
2. SSE介紹
SSE(Server-Sent Events) 是 HTML5 標準中的一項技術,它允許服務器通過單向連接持續地將數據推送給客戶端(通常是瀏覽器)。
- 與傳統的 HTTP 請求響應不同,SSE 是一種服務器主動推送數據的機制。
- 與 WebSocket 不同,它是單向通信:服務器 -> 客戶端。
SSE 使用標準的 HTTP 協議 和 文本/event-stream 格式,通常用于實時消息推送,如股票行情、社交動態通知、在線狀態更新等。
2.1 SSE 的工作原理
- 客戶端通過普通的 HTTP 請求發起連接:
GET /events HTTP/1.1
Accept: text/event-stream
- 服務器響應:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- 事件流
event: message
id: 123
data: 這是要推送的數據
- 客戶端自動處理每一條數據,并在連接斷開時自動重連。
2.2 SSE 的事件格式規范
每條消息由一組字段組成,字段以冒號:
分隔,每條消息之間由兩個換行符 \n\n
分隔:
示例:
id: 1001
event: update
data: {"progress": 50}
2.3 SSE與其他技術對比
2.4 SSE 的優缺點
? 優點:
- 使用標準 HTTP 協議,兼容性好;
- 實現簡單,開發成本低;
- 支持自動重連機制;
- 支持事件類型、多行數據等;
- 適用于需要實時更新、但無需客戶端發送大量消息的場景。
? 缺點:
- 只支持單向通信;
- 不支持二進制數據;
- 不支持所有瀏覽器(如 IE);
- 有并發連接數限制(部分瀏覽器每個域名默認最大連接數限制);
- 需配置好心跳機制、防止代理服務器中斷連接。
3. 實戰代碼
@GetMapping("/file-upload-status")
@Operation(summary = "文件上傳狀態SSE")
public SseEmitter fileUploadStatus(@RequestParam("taskCode") String taskCode,@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
log.info("文件上傳狀態SSE, 請求參數: {}, Last-Event-ID: {}", taskCode, lastEventId);try {return dataConfigService.fileUploadStatus(taskCode, lastEventId);
} catch (Exception e) {log.error("文件上傳狀態SSE,失敗,錯誤信息:{}", e.getMessage());throw e;}
}/*** 用于存儲每個任務對應的 SseEmitter 實例。* key 為任務唯一標識 taskCode,value 為與該任務關聯的 SseEmitter。* 使用 ConcurrentHashMap 是為了支持多線程環境下的并發讀寫,確保線程安全。* <p>* 場景:* - 客戶端發起 SSE 連接時,將對應的 SseEmitter 存入此 Map。* - 后臺定時任務或上傳進度更新時可以通過 taskCode 拿到對應的 emitter 推送數據。* - 當連接斷開(如超時、關閉、出錯)時,從 Map 中移除對應的 emitter,避免內存泄漏。*/private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();/*** 定時任務線程池,用于定期推送每個任務的上傳狀態到前端客戶端(通過 SseEmitter)。* 使用 Runtime.getRuntime().availableProcessors() 獲取當前機器的可用處理器核心數,* 并以此設置線程池大小,合理利用系統資源。* <p>* 特點:* - ScheduledExecutorService 支持定時或周期性任務調度,適合做周期性狀態推送。* - 使用線程池而不是單線程可以支持多個任務并發狀態推送。* - 可根據系統并發量和任務復雜度調整線程池大小。*/private final ScheduledExecutorService scheduledExecutorService =Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());/*** 緩存最近一次推送的狀態(用于斷線續傳)*/private final Map<String, DataFileUploadStatusVO> lastEventDataMap = new ConcurrentHashMap<>();
@Overridepublic SseEmitter fileUploadStatus(String taskCode, String lastEventId) {// 檢查任務代碼是否為空if (StringUtils.isBlank(taskCode)) {throw new ServiceException(400, "請求參數不能為空");}// 創建 SseEmitter 實例,設置超時時間為 1 小時(單位:毫秒)SseEmitter emitter = new SseEmitter(60 * 60 * 1000L);// 將當前任務的 emitter 注冊到全局 emitterMap 中,用于后續狀態推送emitterMap.put(taskCode, emitter);// 定義清理邏輯(連接關閉時自動移除 emitter,避免內存泄漏)Runnable cleanup = () -> {emitterMap.remove(taskCode);lastEventDataMap.remove(taskCode);};// 注冊回調:客戶端主動關閉連接時調用emitter.onCompletion(cleanup);// 注冊回調:連接超時后自動關閉并清理emitter.onTimeout(cleanup);// 注冊回調:出現異常時進行日志記錄并清理資源emitter.onError(e -> {log.error("文件上傳狀態SSE 錯誤,taskCode:{}, 錯誤信息:{}", taskCode, e.getMessage(), e);cleanup.run();});// 嘗試斷點續傳(只在客戶端提供 Last-Event-ID 的情況下)if (StringUtils.isNotBlank(lastEventId)) {DataFileUploadStatusVO lastData = lastEventDataMap.get(taskCode);if (lastData != null) {try {emitter.send(SseEmitter.event().id(lastEventId).name("upload-status").reconnectTime(5000L).data(lastData, MediaType.APPLICATION_JSON));log.info("文件上傳狀態SSE,斷點續傳成功,taskCode:{}, lastEventId:{}, 數據:{}", taskCode, lastEventId,JSON.toJSONString(lastData));} catch (IOException e) {log.error("文件上傳狀態SSE,斷點續傳失敗,taskCode:{}, 錯誤:{}", taskCode, e.getMessage(), e);}}}// 創建一個原子容器用于持有定時任務引用,以便后續取消任務AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();// 啟動定時任務ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(() -> {try {// 從全局映射中獲取當前連接的 emitter,若為空說明連接已關閉,停止執行SseEmitter currentEmitter = emitterMap.get(taskCode);if (currentEmitter == null) {return;}// 查詢文件上傳狀態List<DataConfigRespVO> dataConfigList = getDataConfigList(taskCode);// 查詢任務信息TaskRespVO taskRespVO = taskService.getTask(taskCode);// 將數據庫實體轉換為狀態對象列表List<DataFileUploadStatusVO.FileStatus> fileStatusList = dataConfigList.stream().map(item -> {DataFileUploadStatusVO.FileStatus fileStatus = new DataFileUploadStatusVO.FileStatus();fileStatus.setId(item.getId());fileStatus.setCode(item.getCode());fileStatus.setImportMethod(item.getImportMethod());fileStatus.setFilename(item.getFilename());fileStatus.setFileUrl(item.getFileUrl());fileStatus.setSize(item.getSize());fileStatus.setUploadStatusCode(item.getUploadStatusCode());return fileStatus;}).collect(Collectors.toList());// 構建完整狀態返回對象DataFileUploadStatusVO statusVO = new DataFileUploadStatusVO();statusVO.setTaskCode(taskCode);statusVO.setCurrentTime(TimeUtil.getCurrentTime());statusVO.setFileStatusList(fileStatusList);statusVO.setName(taskRespVO.getName());statusVO.setDataType(taskRespVO.getDataType());statusVO.setDataTypeStr(taskRespVO.getDataTypeStr());statusVO.setImportType(taskRespVO.getImportType());statusVO.setCurrentLinkCode(taskRespVO.getCurrentLinkCode());statusVO.setCurrentLink(taskRespVO.getCurrentLink());statusVO.setCurrentLinkStatusCode(taskRespVO.getCurrentLinkStatusCode());statusVO.setCurrentLinkStatus(taskRespVO.getCurrentLinkStatus());String eventId = String.valueOf(System.currentTimeMillis());// 推送狀態try {// 設置狀態statusVO.setFinished(fileUploadFished(statusVO));// 保存最后一次推送的數據,用于斷線續傳lastEventDataMap.put(taskCode, statusVO);currentEmitter.send(SseEmitter.event()// 事件 ID,供斷線續傳.id(eventId)// 事件名.name("upload-status")// 告訴客戶端:斷線后5秒再重連.reconnectTime(5000L)// 推送數據為 JSON.data(statusVO, MediaType.APPLICATION_JSON));log.info("文件上傳狀態SSE,當前時間:{},taskCode:{},推送數據:{}", TimeUtil.getCurrentTime(), taskCode,JSON.toJSONString(statusVO));} catch (IOException ioException) {// 客戶端斷開連接或傳輸異常,主動清理資源并中止定時任務log.error("文件上傳狀態SSE,錯誤:{},taskCode:{}", ioException.getMessage(), taskCode);currentEmitter.completeWithError(ioException);cleanup.run();futureRef.get().cancel(true);return;}// 判斷任務是否完成if (fileUploadFished(statusVO)) {// 設置任務完成statusVO.setFinished(true);currentEmitter.send(SseEmitter.event().name("upload-status").data(statusVO, MediaType.APPLICATION_JSON));// 主動關閉連接currentEmitter.complete();// 清理資源并取消定時任務cleanup.run();futureRef.get().cancel(true);log.info("文件上傳狀態SSE,任務已完成,taskCode:{},當前時間:{},emitterMap:{},大小:{}", taskCode,TimeUtil.getCurrentTime(), JSON.toJSONString(emitterMap), emitterMap.size());}} catch (Exception e) {log.error("文件上傳狀態SSE, SSE推送失敗,taskCode:{},錯誤信息:{}", taskCode, e.getMessage(), e);SseEmitter failedEmitter = emitterMap.get(taskCode);if (failedEmitter != null) {failedEmitter.completeWithError(e);}cleanup.run();futureRef.get().cancel(true);}// 每2秒執行一次}, 0, 2, TimeUnit.SECONDS);// 記錄定時任務引用futureRef.set(future);// 返回 emitter 給前端,保持連接return emitter;}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:27","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":1,"currentLinkStatus":"執行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:29","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":1,"currentLinkStatus":"執行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:31","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":1,"currentLinkStatus":"執行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:33","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":1,"currentLinkStatus":"執行中"}event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新聞文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件導入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件導入"},"filename":"新聞文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新聞文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"測試文件上傳123","dataType":2,"dataTypeStr":"非結構化數據","importType":"file","currentLinkCode":2,"currentLink":"數據配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}