上傳視頻
需求分析
- 教學機構人員進入媒資管理列表查詢自己上傳的媒資文件。
點擊“媒資管理”
進入媒資管理列表頁面查詢本機構上傳的媒資文件。
- 教育機構用戶在"媒資管理"頁面中點擊 "上傳視頻" 按鈕。
點擊“上傳視頻”打開上傳頁面
- 選擇要上傳的文件,自動執行文件上傳。
- 視頻上傳成功會自動處理,處理完成可以預覽視頻。
斷點續傳
概念介紹
需求背景
通常視頻文件都比較大,所以對于媒資系統上傳文件的需求要滿足大文件的上傳要求。http協議本身對上傳文件大小沒有限制,但是客戶的網絡環境質量、電腦硬件環境等參差不齊,如果一個大文件快上傳完了網斷了沒有上傳完成,需要客戶重新上傳,用戶體驗非常差,所以對于大文件上傳的要求最基本的是斷點續傳。
什么是斷點續傳
引用百度百科:斷點續傳指的是在下載或上傳時,將下載或上傳任務(一個文件或一個壓縮包)人為的劃分為幾個部分,每一個部分采用一個線程進行上傳或下載,如果碰到網絡故障,可以從已經上傳或下載的部分開始繼續上傳下載未完成的部分,而沒有必要從頭開始上傳下載,斷點續傳可以提高節省操作時間,提高用戶體驗性。
斷點續傳流程如下圖:
流程如下:
1、前端上傳前先把文件分成塊
2、一塊一塊的上傳,上傳中斷后重新上傳,已上傳的分塊則不用再上傳
3、各分塊上傳完成最后在服務端合并文件
分塊與合并原理
為了更好的理解文件分塊上傳的原理,下邊用java代碼測試文件的分塊與合并。
文件分塊的流程如下:
1、獲取源文件長度
2、根據設定的分塊文件的大小計算出塊數
3、從源文件讀數據依次向每一個塊文件寫數據。
package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件處理測試* @date 2022/9/13 9:21*/public class BigFileTest {//測試文件分塊方法@Testpublic void testChunk() throws IOException {File sourceFile = new File("C:\\Users\\Lenovo\\Desktop\\學成在線項目—視頻\\day06\\Day6-01.上傳視頻-什么是斷點續傳.mp4"); // 原始文件String chunkPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\"; // 分塊文件存儲目錄File chunkFolder = new File(chunkPath);if (!chunkFolder.exists()) {chunkFolder.mkdirs();}//分塊大小long chunkSize = 1024 * 1024 * 1;//分塊數量long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);System.out.println("分塊總數:"+chunkNum);//緩沖區大小byte[] b = new byte[1024];//使用RandomAccessFile訪問文件(變化流), "r"表示讀取流, "rw"表示寫入流RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");//分塊for (int i = 0; i < chunkNum; i++) {//創建分塊文件File file = new File(chunkPath + i);if(file.exists()){file.delete();}boolean newFile = file.createNewFile();if (newFile) {//向分塊文件中寫數據RandomAccessFile raf_write = new RandomAccessFile(file, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);if (file.length() >= chunkSize) {break;}}raf_write.close();System.out.println("完成分塊"+i);}}raf_read.close();}
}
執行結果: 目標文件被分片的儲存在文件夾中
]文件合并流程:
1、找到要合并的文件并按文件合并的先后進行排序。
2、創建合并文件
3、依次從合并的文件中讀取數據向合并文件寫入數
package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件處理測試* @date 2022/9/13 9:21*/public class BigFileTest {//測試文件合并方法@Testpublic void testMerge() throws IOException {//塊文件目錄File chunkFolder = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\");//原始文件File originalFile = new File("C:\\Users\\Lenovo\\Desktop\\學成在線項目—視頻\\day06\\Day6-01.上傳視頻-什么是斷點續傳.mp4");//合并文件File mergeFile = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\Day6-01.上傳視頻-什么是斷點續傳.mp4");if (mergeFile.exists()) {mergeFile.delete();}//創建新的合并文件mergeFile.createNewFile();//用于寫文件RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");//指針指向文件頂端raf_write.seek(0);//緩沖區byte[] b = new byte[1024];//分塊列表File[] fileArray = chunkFolder.listFiles();// 轉成集合,便于排序List<File> fileList = Arrays.asList(fileArray);// 從小到大排序Collections.sort(fileList, new Comparator<File>() {@Overridepublic int compare(File o1, File o2) {return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());}});//開始合并文件for (File chunkFile : fileList) {RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);}raf_read.close();}raf_write.close();//校驗文件try (FileInputStream fileInputStream = new FileInputStream(originalFile);FileInputStream mergeFileStream = new FileInputStream(mergeFile);) {//取出原始文件的md5String originalMd5 = DigestUtils.md5Hex(fileInputStream);//取出合并文件的md5進行比較String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);if (originalMd5.equals(mergeFileMd5)) {System.out.println("合并文件成功");} else {System.out.println("合并文件失敗");}}}
}
執行結果: 分片文件被合并為正常文件
視頻上傳流程
下圖是上傳視頻的整體流程:
1、前端對文件進行分塊。
2、前端上傳分塊文件前請求媒資服務檢查文件是否存在,如果已經存在則不再上傳。
3、如果分塊文件不存在則前端開始上傳
4、前端請求媒資服務上傳分塊。
5、媒資服務將分塊上傳至MinIO。
6、前端將分塊上傳完畢請求媒資服務合并分塊。
7、媒資服務判斷分塊上傳完成則請求MinIO合并文件。
8、合并完成校驗合并后的文件是否完整,如果完整則上傳完成,否則刪除文件。
minio合并文件測試
1、將分塊文件上傳至minio, minio限制每個分片文件不小于5M
/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();// 將分塊文件上傳至minio@Testpublic void uploadChunk() {String chunkFolderPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\";File chunkFolder = new File(chunkFolderPath);//分塊文件File[] files = chunkFolder.listFiles();//將分塊文件上傳至miniofor (int i = 0; i < files.length; i++) {try {UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("testbucket").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();minioClient.uploadObject(uploadObjectArgs);System.out.println("上傳分塊成功" + i);} catch (Exception e) {e.printStackTrace();}}}}
2、通過minio的合并文件
/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//合并文件,要求分塊文件最小5M@Testpublic void test_merge() throws Exception {// 分塊文件集合(傳統方式)
// List<ComposeSource> sources = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// // 構建文件信息
// ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build();
// sources.add(composeSource);
// }// 分塊文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(10).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build()).collect(Collectors.toList());// 指定合并后的文件名// 通過sources指定源文件ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("testbucket").object("01.上傳視頻-什么是斷點續傳.mp4").sources(sources).build();// 合并文件minioClient.composeObject(composeObjectArgs);}
}
3、分塊文件使用后就沒用了, 清除分塊文件
/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//清除分塊文件@Testpublic void test_removeObjects() {//合并分塊完成將分塊文件清除List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(10).map(i -> new DeleteObject("chunk/".concat(Integer.toString(i)))).collect(Collectors.toList());//構建參數RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("testbucket").objects(deleteObjects).build();//執行刪除Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();}});}}
接口定義
根據上傳視頻流程,定義接口,與前端的約定是操作成功返回{code:0}否則返回{code:-1}
從課程資料中拷貝RestResponse.java類到base工程下的model包下。
/*** @author Mr.M* @version 1.0* @description 通用結果類型* @date 2022/9/13 14:44*/@Data
@ToString
public class RestResponse<T> {/*** 響應編碼,0為正常,-1錯誤*/private int code;/*** 響應提示信息*/private String msg;/*** 響應內容*/private T result;public RestResponse() {this(0, "success");}public RestResponse(int code, String msg) {this.code = code;this.msg = msg;}/*** 錯誤信息的封裝** @param msg* @param <T>* @return*/public static <T> RestResponse<T> validfail(String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setMsg(msg);return response;}public static <T> RestResponse<T> validfail(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常響應數據(包含響應內容)** @return RestResponse Rest服務封裝相應數據*/public static <T> RestResponse<T> success(T result) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);return response;}public static <T> RestResponse<T> success(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常響應數據(不包含響應內容)** @return RestResponse Rest服務封裝相應數據*/public static <T> RestResponse<T> success() {return new RestResponse<T>();}public Boolean isSuccessful() {return this.code == 0;}}
定義接口如下:
/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return null;}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}
service開發
校驗方法
首先實現檢查文件方法和檢查分塊方法, 在MediaFileService中定義service接口如下
/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 檢查文件是否存在* @param fileMd5 文件的md5* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:38*/public RestResponse<Boolean> checkFile(String fileMd5);/*** @description 檢查分塊是否存在* @param fileMd5 文件的md5* @param chunkIndex 分塊序號* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:39*/public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);}
service接口實現方法
package com.xuecheng.media.api;import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.service.MediaFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}
在接口中調用service提供的檢查文件方法和檢查分塊方法
/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}
}
上傳方法
定義service接口
/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 上傳分塊* @param fileMd5 文件md5* @param chunk 分塊序號* @param localChunkFilePath 分塊文件本地路徑* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:50*/public RestResponse uploadChunk(String fileMd5,int chunk,String localChunkFilePath);}
接口實現:
/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//視頻文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;/*** @param localFilePath 文件地址* @param bucket 桶* @param objectName 對象名稱* @return void* @description 將文件寫入minIO* @author Mr.M* @date 2022/10/12 21:22*/public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {try {// 構建文件參數UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();// 執行上傳操作minioClient.uploadObject(testbucket);log.debug("上傳文件到minio成功,bucket:{},objectName:{}", bucket, objectName);System.out.println("上傳成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上傳文件到minio出錯,bucket:{},objectName:{},錯誤原因:{}", bucket, objectName, e.getMessage(), e);XueChengPlusException.cast("上傳文件到文件系統失敗");}return false;}// 根據文件擴展名取出mimeTypeprivate String getMimeType(String extension) {if (extension == null)extension = "";//根據擴展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//通用mimeType,字節流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if (extensionMatch != null) {mimeType = extensionMatch.getMimeType();}return mimeType;}//得到分塊文件的目錄private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}@Overridepublic RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {//得到分塊文件的目錄路徑String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分塊文件的路徑String chunkFilePath = chunkFileFolderPath + chunk;//mimeTypeString mimeType = getMimeType(null);//將文件存儲至minIOboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath);if (!b) {log.debug("上傳分塊文件失敗:{}", chunkFilePath);return RestResponse.validfail(false, "上傳分塊失敗");}log.debug("上傳分塊文件成功:{}",chunkFilePath);return RestResponse.success(true);}}
完善接口
/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {// 創建一個臨時文件File tempFile = File.createTempFile("minio", "temp");file.transferTo(tempFile);// 獲取文件路徑String localFilePath = tempFile.getAbsolutePath();RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);return restResponse;}}
接口測試
- 更新前端文件
- 將uploadtools.ts文件覆蓋前端工程src/utils 目錄下的同名文件,
- 把前端切換文件增大到10M
- 將 media-add-dialog.vue文件覆蓋前端工程src\module-organization\pages\media-manage\components目錄下的同名文件
- 修改后端配置
- 前端對文件分塊的大小為5MB,SpringBoot web默認上傳文件的大小限制為1MB,這里需要在media-api工程修改配置如下:
spring:servlet:multipart:max-file-size: 50MBmax-request-size: 50MB
- max-file-size: 單個文件的大小限制
- Max-request-size: 單次請求的大小限制
- 啟動前后端服務, 聯調
合并方法
定義service接口
/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 合并分塊* @param companyId 機構id* @param fileMd5 文件md5* @param chunkTotal 分塊總和* @param uploadFileParamsDto 文件信息* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:56*/public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
}
接口實現:
/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//視頻文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;@AutowiredMediaFileService currentProxy;/*** 合并分塊** @param companyId 機構id* @param fileMd5 文件md5* @param chunkTotal 分塊總和* @param uploadFileParamsDto 文件信息* @return*/@Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {// 1.找到分塊文件, 調用minio的sdk進行文件合并// 分塊文件目錄String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 1.1分塊文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath.concat(Integer.toString(i))).build()).collect(Collectors.toList());//源文件名稱String fileName = uploadFileParamsDto.getFilename();//文件擴展名String extension = fileName.substring(fileName.lastIndexOf("."));//合并后文件的objectNameString objectName = getFilePathByMd5(fileMd5, extension);// 1.2指定合并后的文件信息ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket(bucket_video).object(objectName) // 合并后的文件objectName.sources(sources) // 通過sources指定源文件.build();// 1.3合并文件try {minioClient.composeObject(composeObjectArgs);} catch (Exception e) {e.printStackTrace();log.debug("合并文件失敗,fileMd5:{},異常:{}", fileMd5, e.getMessage(), e);return RestResponse.validfail(false, "合并文件失敗。");}// 2.校驗合并后的文件和源文件是否一致// 先下載合并后的文件File file = downloadFileFromMinIO(bucket_video, objectName);try (FileInputStream fileInputStream = new FileInputStream(file)) {//計算合并后文件的md5值String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);//比較原始文件和合并后文件的MD5值if (!fileMd5.equals(mergeFile_md5)) {log.error("校驗合并文件md5值不一致,原始文件:{},合并文件:{}", fileMd5, mergeFile_md5);return RestResponse.validfail(false, "文件合并校驗失敗");}//保存文件大小uploadFileParamsDto.setFileSize(file.length());} catch (Exception e) {e.printStackTrace();return RestResponse.validfail(false, "文件合并校驗失敗");}// 3.將文件信息入庫MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);if (mediaFiles == null) {return RestResponse.validfail(false, "文件入庫失敗");}// 4.清理分塊文件clearChunkFiles(chunkFileFolderPath, chunkTotal);return RestResponse.success(true);}//得到分塊文件的目錄private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}/*** 得到合并后的文件的地址** @param fileMd5 文件id即md5值* @param fileExt 文件擴展名* @return*/private String getFilePathByMd5(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}/*** 從minio下載文件** @param bucket 桶* @param objectName 對象名稱* @return 下載后的文件*/public File downloadFileFromMinIO(String bucket, String objectName) {//臨時文件File minioFile = null;FileOutputStream outputStream = null;try {InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());//創建臨時文件minioFile = File.createTempFile("minio", ".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(stream, outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}/*** 清除分塊文件** @param chunkFileFolderPath 分塊文件路徑* @param chunkTotal 分塊文件總數*/private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {try {//待刪除分塊文件列表List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))).collect(Collectors.toList());// 分塊文件信息RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();// 清除分塊文件Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);// 真正刪除分塊文件results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();log.error("清除分塊文件失敗,objectname:{}", deleteError.objectName(), e);}});} catch (Exception e) {e.printStackTrace();log.error("清除分塊文件失敗,chunkFileFolderPath:{}", chunkFileFolderPath, e);}}}
controller完善
/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {// todo: 機構idLong companyId = 1232141425L;UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();uploadFileParamsDto.setFileType("001002");uploadFileParamsDto.setTags("課程視頻");uploadFileParamsDto.setRemark("");uploadFileParamsDto.setFilename(fileName);return mediaFileService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);}}
功能測試: 下邊進行前后端聯調
- 上傳一個視頻測試合并分塊的執行邏輯
進入service方法逐行跟蹤。
- 斷點續傳測試
上傳一部分后,停止刷新瀏覽器再重新上傳,通過瀏覽器日志發現已經上傳過的分塊不再重新上傳
- 文件分片上傳后合并分片, 合并完成后刪除分片文件
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?
多個線程同時執行上邊的sql只會有一個線程執行成功。
- 什么是樂觀鎖、悲觀鎖?
- synchronized是一種悲觀鎖,在執行被synchronized包裹的代碼時需要首先獲取鎖,沒有拿到鎖則無法執行,是總悲觀的認為別的線程會去搶,所以要悲觀鎖。
- 樂觀鎖的思想是它不認為會有線程去爭搶,盡管去執行,如果沒有執行成功就再去重試。
定義mapper
package com.xuecheng.media.mapper;/*** <p>* Mapper 接口* </p>** @author itcast*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 開啟一個任務* @param id 任務id* @return 更新記錄數*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);}
service方法
package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** 開啟一個任務* @param id 任務id* @return true開啟任務成功,false開啟任務失敗*/public boolean startTask(long id);}
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;//實現如下public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}}
更新任務狀態
任務處理完成需要更新任務處理結果,任務執行成功更新視頻的URL、及任務處理結果,將待處理任務記錄刪除,同時向歷史任務表添加記錄。
在MediaFileProcessService接口添加方法
package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @description 保存任務結果* @param taskId 任務id* @param status 任務狀態* @param fileId 文件id* @param url url* @param errorMsg 錯誤信息* @return void* @author Mr.M* @date 2022/10/15 11:29*/void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);}
service接口方法實現如下:
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任務,如果不存在則直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//處理失敗,更新任務處理結果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//處理失敗if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任務處理狀態為失敗,任務信息:{}",mediaProcess_u);return ;}//任務處理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒資文件中的訪問urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//處理成功,更新url和狀態mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到歷史記錄MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//刪除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}}
視頻處理
視頻采用并發處理,每個視頻使用一個線程去處理,每次處理的視頻數量不要超過cpu核心數。
所有視頻處理完成結束本次執行,為防止代碼異常出現無限期等待則添加超時設置,到達超時時間還沒有處理完成仍結束任務。
定義任務類VideoTask 如下:
package com.xuecheng.media.jobhandler;/*** 視頻處理任務類** @author Mr.M* @version 1.0* @description TODO* @date 2022/10/15 11:58*/
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileProcessService mediaFileProcessService;@AutowiredMediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpeg_path;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片參數int shardIndex = XxlJobHelper.getShardIndex(); // 執行器序號,從0開始int shardTotal = XxlJobHelper.getShardTotal(); // 執行器總數//取出cpu核心數作為一次處理數據的條數int processors = Runtime.getRuntime().availableProcessors();//查詢待處理的任務List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);//實際查到的任務數int size = mediaProcessList.size();log.debug("取到視頻處理任務數:" + size);if (size <= 0) {return;}//創建線程池ExecutorService executorService = Executors.newFixedThreadPool(size);// 使用的計數器CountDownLatch countDownLatch = new CountDownLatch(size);mediaProcessList.forEach(mediaProcess -> {// 將任務加入線程池executorService.execute(() -> {try {Long taskId = mediaProcess.getId(); // 任務id// 開啟任務boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("搶占任務失敗, 任務id: {}", taskId);return;}// 準備參數String bucket = mediaProcess.getBucket(); //桶String filePath = mediaProcess.getFilePath(); //存儲路徑String fileId = mediaProcess.getFileId(); //原始視頻的md5值//將要處理的文件下載到服務器上File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);if (originalFile == null) {log.debug("下載待處理文件失敗,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗");return;}//處理結束的視頻文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("創建mp4臨時文件失敗");mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "創建mp4臨時文件失敗");return;}//視頻處理結果String result = "";try {//開始處理視頻Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//開始視頻轉換,成功將返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("處理視頻文件:{},出錯:{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {//記錄錯誤信息log.error("處理視頻失敗,視頻地址:{},錯誤信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//將mp4上傳至minio//mp4在minio的存儲路徑String objectName = getFilePath(fileId, ".mp4");//訪問urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//將url存儲至數據,并更新狀態為成功,并將待處理視頻記錄刪除存入歷史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上傳視頻失敗或入庫失敗,視頻地址:{},錯誤信息:{}", bucket + objectName, e.getMessage());//最終還是失敗了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "處理后視頻上傳或入庫失敗");}} finally {// 計數器減1countDownLatch.countDown();}});});//等待,給一個充裕的超時時間,防止無限等待,到達超時時間還沒有處理完成則結束任務countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}}
測試
基本測試
進入xxl-job調度中心添加執行器和視頻處理任務
- 添加執行器
- 視頻處理任務
- 在xxl-job配置任務調度策略:
-
- 1)配置阻塞處理策略為:丟棄后續調度。
- 2)配置視頻處理調度時間間隔不用根據視頻處理時間去確定,可以配置的小一些,如:5分鐘,即使到達調度時間如果視頻沒有處理完會丟棄調度請求。
- 配置完成開始測試視頻處理:
- 首先上傳至少4個視頻,非mp4格式。
- 在xxl-job啟動視頻處理任務
- 觀察媒資管理服務后臺日志
失敗測試
1、先停止調度中心的視頻處理任務。
2、上傳視頻,手動修改待處理任務表中file_path字段為一個不存在的文件地址
3、啟動任務
觀察任務處理失敗后是否會重試,并記錄失敗次數。
搶占任務測試
1、修改調度中心中視頻處理任務的阻塞處理策略為“覆蓋之間的調度”
2、在搶占任務代碼處打斷點并選擇支持多線程方式
3、在搶占任務代碼處的下邊兩行代碼分別打上斷點,避免觀察時代碼繼續執行。
4、啟動任務
此時多個線程執行都停留在斷點處
依次放行,觀察同一個任務只會被一個線程搶占成功。