文章目錄
- 一、引言:問題背景
- 二、技術選型與項目架構
- 三、核心設計與實現
- 1. 初始化上傳 (`/init`)
- 2. 上傳分塊 (`/chunk`)
- 3. 完成上傳與合并 (`/complete`)
- 4. 查詢上傳進度 (`/progress`)
- 四、斷點續傳工作流程
- 五、方案優勢總結
- 六、拓展優化
- 七、方案優勢對比
一、引言:問題背景
在醫療問診等需要高可靠性數據記錄的場景中,我們的一項關鍵功能是:醫生與患者問答填寫問卷時,系統需要實時錄制音頻并與問卷綁定,以保證數據的真實性和有效性。然而,直接上傳完整的錄音文件(尤其是長時間問診產生的大文件)面臨兩個嚴峻的技術挑戰:
- 網絡穩定性:醫院網絡環境復雜,長時間上傳大文件極易因網絡波動而中斷,導致整個上傳失敗,用戶體驗極差。
- 文件與服務器壓力:大文件上傳占用服務器連接時間過長,接口響應慢,且服務端一次性處理大文件對內存和IO壓力巨大,容易導致文件損壞或上傳失敗。
為了解決這些問題,我們放棄了傳統的一次性上傳方案,轉而采用 分塊上傳與斷點續傳 相結合的技術方案。本文將詳細介紹如何基于 Spring Boot 3.0
、MyBatis-Plus
和 MinIO
對象存儲來實現這一穩健的上傳流程。
二、技術選型與項目架構
- 后端框架: Spring Boot 3.0
- ORM 框架: MyBatis-Plus 3.5
- 對象存儲: MinIO 8.2.2
- 狀態緩存: Redis (用于存儲上傳狀態和分塊索引)
- 核心思路: 將大文件分割成多個小塊,分別上傳。利用 MinIO 的分塊上傳能力和 Redis 的記錄功能,實現上傳中斷后可從中斷點繼續上傳,而非重新開始。
三、核心設計與實現
我們通過四個清晰的 RESTful 接口來串聯整個上傳流程:
init
: 初始化上傳,獲取本次上傳的唯一ID。chunk
: 上傳單個文件分塊。complete
: 通知服務端所有分塊已上傳完畢,執行合并操作。progress
: 查詢當前上傳進度。
**上傳任務狀態流轉圖:**本圖描述了一個上傳任務可能處于的各種狀態及其轉換條件。任務從“初始化”狀態進入“上傳中”子狀態,并隨著每個分塊的上傳成功而逐步推進。核心在于,當任務因異常進入“已中斷”狀態后,可以通過查詢進度重新回到“上傳中”狀態,繼續上傳剩余分塊,從而實現“續傳”。超時未完成的任務會被系統清理。
1. 初始化上傳 (/init
)
客戶端在上傳前,首先需要調用初始化接口。
Controller:
@Operation(summary = "初始化分片上傳")
@PostMapping("/init")
public CommonResult<FileUploadDTO.UploadInitResponse> initUpload(@RequestParam("fileName") String fileName,@RequestParam("fileSize") long fileSize) {return CommonResult.SUCCESS(minioSysFileServiceImpl.initUpload(fileName, fileSize));
}
Service:
@Override
public FileUploadDTO.UploadInitResponse initUpload(String fileName, long fileSize) {// 1. 生成唯一上傳ID,用于標識本次上傳任務String uploadId = UUID.randomUUID().toString();// 2. 根據預設的分塊大小,計算總分塊數int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);// 3. 創建上傳狀態對象并存入Redis,設置過期時間以防僵尸任務UploadStatus status = new UploadStatus(uploadId, fileName, fileSize, totalChunks);redisTemplate.opsForValue().set(RedisKeyUtil.getUploadKey(uploadId), status, 24, TimeUnit.HOURS);// 4. 返回給客戶端:uploadId 和 總分塊數return new FileUploadDTO.UploadInitResponse(uploadId, totalChunks, chunkSize);
}
此接口的核心是生成一個全局唯一的 uploadId
,并將文件元信息(名稱、大小、分塊數)存入 Redis,為后續的分塊上傳和續傳奠定基礎。
2. 上傳分塊 (/chunk
)
客戶端根據初始化接口返回的 totalChunks
,將文件切分,并循環調用此接口上傳每一個分塊。
Controller:
@Operation(summary = "上傳分片")
@PostMapping("/chunk")
public CommonResult<Void> uploadChunk(@RequestParam("uploadId") String uploadId,@RequestParam("chunkNumber") int chunkNumber,MultipartFile chunk) {minioSysFileServiceImpl.uploadChunk(uploadId, chunkNumber, chunk);return CommonResult.SUCCESS();
}
Service:
@Override
public void uploadChunk(String uploadId, int chunkNumber, MultipartFile chunk){// 1. 為當前分塊生成在MinIO中的唯一對象名稱// 格式如:chunks/{uploadId}/{chunkNumber}String objectName = chunkObjectName(uploadId, chunkNumber);try (InputStream inputStream = chunk.getInputStream()) {// 2. 調用MinIO SDK上傳分塊PutObjectArgs args = PutObjectArgs.builder().bucket(minioConfig.getBucketName()).object(objectName).stream(inputStream, chunk.getSize(), -1).contentType(chunk.getContentType()).build();minioClient.putObject(args);} catch (Exception e) {log.error("上傳分塊 {} 失敗: {}", chunkNumber, e.getMessage());throw new BaseException(ResultCode.CHUNK_UPLOAD_FAILED); // 拋出自定義異常,由全局異常處理器處理}// 3. 上傳成功后,在Redis中標記該分塊已完成markChunkUploaded(uploadId, chunkNumber);
}private void markChunkUploaded(String uploadId, int chunkNumber) {// 使用Set結構存儲已上傳的分塊編號redisTemplate.opsForSet().add(RedisKeyUtil.getUploadKey(uploadId) + ":chunks", chunkNumber);
}
每個分塊都是獨立上傳的,一個分塊的失敗不會影響其他分塊。成功上傳后,其編號會被記錄到 Redis 的 Set 中,用于后續的進度查詢和完成校驗。
3. 完成上傳與合并 (/complete
)
當所有分塊都上傳完畢后,客戶端調用此接口。
Controller:
@Operation(summary = "完成上傳并合并文件")
@PostMapping("/complete")
public CommonResult<String> completeUpload(@RequestParam("uploadId") String uploadId){return CommonResult.SUCCESS(minioSysFileServiceImpl.completeUpload(uploadId));
}
Service:
@Override
public String completeUpload(String uploadId) {// 1. 從Redis獲取上傳狀態,并檢查所有分塊是否已上傳完成UploadStatus status = getUploadStatus(uploadId);if (!status.isComplete()) {throw new BaseException(ResultCode.CHUNK_UPLOAD_NOT_COMPLETE);}// 2. (業務邏輯)準備文件記錄String originalFilename = status.getFileName();FileRecord record = ... // 創建或更新文件記錄邏輯try {String finalObjectName = ... // 生成最終在MinIO中存儲的文件名// 3. 核心:合并分塊// 構建一個源分塊列表List<ComposeSource> sources = IntStream.range(0, status.getTotalChunks()).mapToObj(i -> ComposeSource.builder().bucket(minioConfig.getBucketName()).object(chunkObjectName(uploadId, i)) // 指向每個分塊.build()).collect(Collectors.toList());// 調用MinIO的composeObject API合并文件// 此操作在MinIO服務端進行,高效且不耗費應用服務器資源minioClient.composeObject(ComposeObjectArgs.builder().bucket(minioConfig.getBucketName()).object(finalObjectName).sources(sources).build());// 4. 合并成功后,清理臨時分塊文件for (int i = 0; i < status.getTotalChunks(); i++) {minioClient.removeObject(RemoveObjectArgs.builder().bucket(minioConfig.getBucketName()).object(chunkObjectName(uploadId, i)).build());}String fullUrl = minioConfig.getUrl() + "/" + minioConfig.getBucketName() + "/" + finalObjectName;record.setUrl(fullUrl);} catch (Exception e) {log.error("文件合并失敗:{}", e.getMessage());throw new BaseException("文件合并失敗");} finally {// 5. 清理Redis狀態redisTemplate.delete(RedisKeyUtil.getUploadKey(uploadId));redisTemplate.delete(RedisKeyUtil.getUploadKey(uploadId) + ":chunks");}fileRecordService.saveOrUpdate(record);return record.getUrl(); // 返回最終文件的訪問地址
}
這是最精妙的一步。合并操作 (composeObject
) 是在 MinIO 服務端完成的,它只是將各個分塊文件的元數據組合起來,形成一個邏輯上的完整文件,而不需要在應用服務器上進行耗時的二進制流合并,因此速度極快,資源消耗極低。
4. 查詢上傳進度 (/progress
)
在上傳過程中,客戶端可以定時調用此接口來獲取上傳進度,用于前端顯示進度條。
Service:
@Override
public FileUploadDTO.UploadProgressResponse getUploadProgress(String uploadId) {UploadStatus status = getUploadStatus(uploadId);return new FileUploadDTO.UploadProgressResponse(status.getUploadedChunks().size(), // 已上傳數status.getTotalChunks(), // 總數status.getUploadedChunks() // 已上傳的編號集合);
}private UploadStatus getUploadStatus(String uploadId) {// 從Redis獲取基礎狀態UploadStatus status = (UploadStatus) redisTemplate.opsForValue().get(RedisKeyUtil.getUploadKey(uploadId));if (status == null) {throw new BaseException(ResultCode.CHUNK_ID_NOT_EXIST);}// 從Redis Set中獲取已上傳的分塊編號,并設置到狀態對象中Set<Object> uploadedChunks = redisTemplate.opsForSet().members(RedisKeyUtil.getUploadKey(uploadId) + ":chunks");if (uploadedChunks != null) {status.setUploadedChunks(uploadedChunks.stream().map(o -> Integer.parseInt(o.toString())) // 注意類型轉換.collect(Collectors.toSet()));}return status;
}
四、斷點續傳工作流程
此方案如何實現斷點續傳?流程如下:
- 客戶端首次上傳文件前,調用
/init
獲取uploadId
。 - 開始上傳分塊。假設在上傳到第 50 個分塊時網絡中斷。
- 網絡恢復后,客戶端可以先調用
/progress?uploadId=xxx
查詢進度。 - 接口返回
{uploadedChunks: 49, totalChunks: 100, uploadedChunkNumbers: [0,1,2,...,48]}
。 - 客戶端得知前 50 個塊(0-49)中,第 49 塊(索引從0開始)還未成功上傳,于是從第 49 塊開始繼續上傳,而不需要重傳 0-48 塊。
- 所有分塊上傳完成后,調用
/complete
完成合并。
五、方案優勢總結
- 提升穩定性:網絡中斷后可從斷點繼續,避免重復勞動和流量浪費。
- 減輕服務器壓力:分塊上傳變小請求,降低服務器內存和IO壓力。MinIO 服務端合并,效率極高。
- 提升用戶體驗:前端可以實時顯示精確的上傳進度條。
- 清晰的責任分離:四個接口職責單一,邏輯清晰,易于維護和擴展。
六、拓展優化
- 分塊大小:需要根據實際網絡情況和文件大小調整
chunkSize
(例如 5MB 或 10MB),在減少請求次數和降低單次失敗成本之間取得平衡。 - 異常處理與重試:在
uploadChunk
方法中,可以實現更強大的重試機制,例如最多重試 3 次。 - 過期清理:需要有一個定時任務,清理 Redis 中超過一定時間(如 24 小時)仍未完成的
UploadStatus
以及 MinIO 中對應的臨時分塊文件,避免存儲資源浪費。 - 安全性:可以對
uploadId
進行校驗,確保用戶只能操作自己發起的上傳任務。
七、方案優勢對比
傳統方案 vs 分塊斷點續傳方案
方面 | 傳統單次上傳 (Traditional) | 分塊斷點續傳 (Chunked & Resumable) |
---|---|---|
網絡中斷 | 完全失敗,需從頭開始重傳 | 從中斷點繼續,僅需傳剩余分塊 |
進度反饋 | 難以實現精確的進度條 | 實時精確的進度反饋 |
服務器壓力 | 長時間占用連接,內存IO壓力大 | 分塊小請求,壓力分散,服務端合并高效 |
用戶體驗 | 差,失敗成本高 | 優,可控且可靠 |