[學成在線]06-視頻分片上傳

上傳視頻

需求分析

  1. 教學機構人員進入媒資管理列表查詢自己上傳的媒資文件。

點擊“媒資管理”

進入媒資管理列表頁面查詢本機構上傳的媒資文件。

  1. 教育機構用戶在"媒資管理"頁面中點擊 "上傳視頻" 按鈕。

點擊“上傳視頻”打開上傳頁面

  1. 選擇要上傳的文件,自動執行文件上傳。

  1. 視頻上傳成功會自動處理,處理完成可以預覽視頻。

斷點續傳

概念介紹

需求背景

通常視頻文件都比較大,所以對于媒資系統上傳文件的需求要滿足大文件的上傳要求。http協議本身對上傳文件大小沒有限制,但是客戶的網絡環境質量、電腦硬件環境等參差不齊,如果一個大文件快上傳完了網斷了沒有上傳完成,需要客戶重新上傳,用戶體驗非常差,所以對于大文件上傳的要求最基本的是斷點續傳。

什么是斷點續傳

引用百度百科:斷點續傳指的是在下載或上傳時,將下載或上傳任務(一個文件或一個壓縮包)人為的劃分為幾個部分,每一個部分采用一個線程進行上傳或下載,如果碰到網絡故障,可以從已經上傳或下載的部分開始繼續上傳下載未完成的部分,而沒有必要從頭開始上傳下載,斷點續傳可以提高節省操作時間,提高用戶體驗性。

斷點續傳流程如下圖:

流程如下:

1、前端上傳前先把文件分成塊

2、一塊一塊的上傳,上傳中斷后重新上傳,已上傳的分塊則不用再上傳

3、各分塊上傳完成最后在服務端合并文件

分塊與合并原理

為了更好的理解文件分塊上傳的原理,下邊用java代碼測試文件的分塊與合并。

文件分塊的流程如下:

1、獲取源文件長度

2、根據設定的分塊文件的大小計算出塊數

3、從源文件讀數據依次向每一個塊文件寫數據。

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件處理測試* @date 2022/9/13 9:21*/public class BigFileTest {//測試文件分塊方法@Testpublic void testChunk() throws IOException {File sourceFile = new File("C:\\Users\\Lenovo\\Desktop\\學成在線項目—視頻\\day06\\Day6-01.上傳視頻-什么是斷點續傳.mp4"); // 原始文件String chunkPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\"; // 分塊文件存儲目錄File chunkFolder = new File(chunkPath);if (!chunkFolder.exists()) {chunkFolder.mkdirs();}//分塊大小long chunkSize = 1024 * 1024 * 1;//分塊數量long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);System.out.println("分塊總數:"+chunkNum);//緩沖區大小byte[] b = new byte[1024];//使用RandomAccessFile訪問文件(變化流), "r"表示讀取流, "rw"表示寫入流RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");//分塊for (int i = 0; i < chunkNum; i++) {//創建分塊文件File file = new File(chunkPath + i);if(file.exists()){file.delete();}boolean newFile = file.createNewFile();if (newFile) {//向分塊文件中寫數據RandomAccessFile raf_write = new RandomAccessFile(file, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);if (file.length() >= chunkSize) {break;}}raf_write.close();System.out.println("完成分塊"+i);}}raf_read.close();}
}

執行結果: 目標文件被分片的儲存在文件夾中

]文件合并流程:

1、找到要合并的文件并按文件合并的先后進行排序。

2、創建合并文件

3、依次從合并的文件中讀取數據向合并文件寫入數

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件處理測試* @date 2022/9/13 9:21*/public class BigFileTest {//測試文件合并方法@Testpublic void testMerge() throws IOException {//塊文件目錄File chunkFolder = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\");//原始文件File originalFile = new File("C:\\Users\\Lenovo\\Desktop\\學成在線項目—視頻\\day06\\Day6-01.上傳視頻-什么是斷點續傳.mp4");//合并文件File mergeFile = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\Day6-01.上傳視頻-什么是斷點續傳.mp4");if (mergeFile.exists()) {mergeFile.delete();}//創建新的合并文件mergeFile.createNewFile();//用于寫文件RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");//指針指向文件頂端raf_write.seek(0);//緩沖區byte[] b = new byte[1024];//分塊列表File[] fileArray = chunkFolder.listFiles();// 轉成集合,便于排序List<File> fileList = Arrays.asList(fileArray);// 從小到大排序Collections.sort(fileList, new Comparator<File>() {@Overridepublic int compare(File o1, File o2) {return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());}});//開始合并文件for (File chunkFile : fileList) {RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);}raf_read.close();}raf_write.close();//校驗文件try (FileInputStream fileInputStream = new FileInputStream(originalFile);FileInputStream mergeFileStream = new FileInputStream(mergeFile);) {//取出原始文件的md5String originalMd5 = DigestUtils.md5Hex(fileInputStream);//取出合并文件的md5進行比較String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);if (originalMd5.equals(mergeFileMd5)) {System.out.println("合并文件成功");} else {System.out.println("合并文件失敗");}}}
}

執行結果: 分片文件被合并為正常文件

視頻上傳流程

下圖是上傳視頻的整體流程:

1、前端對文件進行分塊。

2、前端上傳分塊文件前請求媒資服務檢查文件是否存在,如果已經存在則不再上傳。

3、如果分塊文件不存在則前端開始上傳

4、前端請求媒資服務上傳分塊。

5、媒資服務將分塊上傳至MinIO。

6、前端將分塊上傳完畢請求媒資服務合并分塊。

7、媒資服務判斷分塊上傳完成則請求MinIO合并文件。

8、合并完成校驗合并后的文件是否完整,如果完整則上傳完成,否則刪除文件。

minio合并文件測試

1、將分塊文件上傳至minio, minio限制每個分片文件不小于5M

/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();// 將分塊文件上傳至minio@Testpublic void uploadChunk() {String chunkFolderPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\";File chunkFolder = new File(chunkFolderPath);//分塊文件File[] files = chunkFolder.listFiles();//將分塊文件上傳至miniofor (int i = 0; i < files.length; i++) {try {UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("testbucket").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();minioClient.uploadObject(uploadObjectArgs);System.out.println("上傳分塊成功" + i);} catch (Exception e) {e.printStackTrace();}}}}

2、通過minio的合并文件

/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//合并文件,要求分塊文件最小5M@Testpublic void test_merge() throws Exception {// 分塊文件集合(傳統方式)
//        List<ComposeSource> sources = new ArrayList<>();
//        for (int i = 0; i < 10; i++) {
//            // 構建文件信息
//            ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build();
//            sources.add(composeSource);
//        }// 分塊文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(10).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build()).collect(Collectors.toList());// 指定合并后的文件名// 通過sources指定源文件ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("testbucket").object("01.上傳視頻-什么是斷點續傳.mp4").sources(sources).build();// 合并文件minioClient.composeObject(composeObjectArgs);}
}

3、分塊文件使用后就沒用了, 清除分塊文件

/*** @description 測試MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//清除分塊文件@Testpublic void test_removeObjects() {//合并分塊完成將分塊文件清除List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(10).map(i -> new DeleteObject("chunk/".concat(Integer.toString(i)))).collect(Collectors.toList());//構建參數RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("testbucket").objects(deleteObjects).build();//執行刪除Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();}});}}

接口定義

根據上傳視頻流程,定義接口,與前端的約定是操作成功返回{code:0}否則返回{code:-1}

從課程資料中拷貝RestResponse.java類到base工程下的model包下。

/*** @author Mr.M* @version 1.0* @description 通用結果類型* @date 2022/9/13 14:44*/@Data
@ToString
public class RestResponse<T> {/*** 響應編碼,0為正常,-1錯誤*/private int code;/*** 響應提示信息*/private String msg;/*** 響應內容*/private T result;public RestResponse() {this(0, "success");}public RestResponse(int code, String msg) {this.code = code;this.msg = msg;}/*** 錯誤信息的封裝** @param msg* @param <T>* @return*/public static <T> RestResponse<T> validfail(String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setMsg(msg);return response;}public static <T> RestResponse<T> validfail(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常響應數據(包含響應內容)** @return RestResponse Rest服務封裝相應數據*/public static <T> RestResponse<T> success(T result) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);return response;}public static <T> RestResponse<T> success(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常響應數據(不包含響應內容)** @return RestResponse Rest服務封裝相應數據*/public static <T> RestResponse<T> success() {return new RestResponse<T>();}public Boolean isSuccessful() {return this.code == 0;}}

定義接口如下:

/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return null;}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

service開發

校驗方法

首先實現檢查文件方法和檢查分塊方法, 在MediaFileService中定義service接口如下

/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 檢查文件是否存在* @param fileMd5 文件的md5* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:38*/public RestResponse<Boolean> checkFile(String fileMd5);/*** @description 檢查分塊是否存在* @param fileMd5  文件的md5* @param chunkIndex  分塊序號* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:39*/public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);}

service接口實現方法

package com.xuecheng.media.api;import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.service.MediaFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

在接口中調用service提供的檢查文件方法和檢查分塊方法

/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上傳前檢查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分塊文件上傳前的檢測")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}
}
上傳方法

定義service接口

/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 上傳分塊* @param fileMd5  文件md5* @param chunk  分塊序號* @param localChunkFilePath  分塊文件本地路徑* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:50*/public RestResponse uploadChunk(String fileMd5,int chunk,String localChunkFilePath);}

接口實現:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//視頻文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;/*** @param localFilePath 文件地址* @param bucket        桶* @param objectName    對象名稱* @return void* @description 將文件寫入minIO* @author Mr.M* @date 2022/10/12 21:22*/public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {try {// 構建文件參數UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();// 執行上傳操作minioClient.uploadObject(testbucket);log.debug("上傳文件到minio成功,bucket:{},objectName:{}", bucket, objectName);System.out.println("上傳成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上傳文件到minio出錯,bucket:{},objectName:{},錯誤原因:{}", bucket, objectName, e.getMessage(), e);XueChengPlusException.cast("上傳文件到文件系統失敗");}return false;}// 根據文件擴展名取出mimeTypeprivate String getMimeType(String extension) {if (extension == null)extension = "";//根據擴展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//通用mimeType,字節流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if (extensionMatch != null) {mimeType = extensionMatch.getMimeType();}return mimeType;}//得到分塊文件的目錄private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}@Overridepublic RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {//得到分塊文件的目錄路徑String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分塊文件的路徑String chunkFilePath = chunkFileFolderPath + chunk;//mimeTypeString mimeType = getMimeType(null);//將文件存儲至minIOboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath);if (!b) {log.debug("上傳分塊文件失敗:{}", chunkFilePath);return RestResponse.validfail(false, "上傳分塊失敗");}log.debug("上傳分塊文件成功:{}",chunkFilePath);return RestResponse.success(true);}}

完善接口

/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "上傳分塊文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {// 創建一個臨時文件File tempFile = File.createTempFile("minio", "temp");file.transferTo(tempFile);// 獲取文件路徑String localFilePath = tempFile.getAbsolutePath();RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);return restResponse;}}

接口測試

  1. 更新前端文件
  • 將uploadtools.ts文件覆蓋前端工程src/utils 目錄下的同名文件,
  • 把前端切換文件增大到10M

  • 將 media-add-dialog.vue文件覆蓋前端工程src\module-organization\pages\media-manage\components目錄下的同名文件
  1. 修改后端配置
  • 前端對文件分塊的大小為5MB,SpringBoot web默認上傳文件的大小限制為1MB,這里需要在media-api工程修改配置如下:
spring:servlet:multipart:max-file-size: 50MBmax-request-size: 50MB

  • max-file-size: 單個文件的大小限制
  • Max-request-size: 單次請求的大小限制
  1. 啟動前后端服務, 聯調

合并方法

定義service接口

/*** @author Mr.M* @version 1.0* @description 媒資文件管理業務類* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 合并分塊* @param companyId  機構id* @param fileMd5  文件md5* @param chunkTotal 分塊總和* @param uploadFileParamsDto 文件信息* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:56*/public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
}

接口實現:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//視頻文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;@AutowiredMediaFileService currentProxy;/*** 合并分塊** @param companyId           機構id* @param fileMd5             文件md5* @param chunkTotal          分塊總和* @param uploadFileParamsDto 文件信息* @return*/@Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {// 1.找到分塊文件, 調用minio的sdk進行文件合并// 分塊文件目錄String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 1.1分塊文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath.concat(Integer.toString(i))).build()).collect(Collectors.toList());//源文件名稱String fileName = uploadFileParamsDto.getFilename();//文件擴展名String extension = fileName.substring(fileName.lastIndexOf("."));//合并后文件的objectNameString objectName = getFilePathByMd5(fileMd5, extension);// 1.2指定合并后的文件信息ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket(bucket_video).object(objectName) // 合并后的文件objectName.sources(sources)   // 通過sources指定源文件.build();// 1.3合并文件try {minioClient.composeObject(composeObjectArgs);} catch (Exception e) {e.printStackTrace();log.debug("合并文件失敗,fileMd5:{},異常:{}", fileMd5, e.getMessage(), e);return RestResponse.validfail(false, "合并文件失敗。");}// 2.校驗合并后的文件和源文件是否一致// 先下載合并后的文件File file = downloadFileFromMinIO(bucket_video, objectName);try (FileInputStream fileInputStream = new FileInputStream(file)) {//計算合并后文件的md5值String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);//比較原始文件和合并后文件的MD5值if (!fileMd5.equals(mergeFile_md5)) {log.error("校驗合并文件md5值不一致,原始文件:{},合并文件:{}", fileMd5, mergeFile_md5);return RestResponse.validfail(false, "文件合并校驗失敗");}//保存文件大小uploadFileParamsDto.setFileSize(file.length());} catch (Exception e) {e.printStackTrace();return RestResponse.validfail(false, "文件合并校驗失敗");}// 3.將文件信息入庫MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);if (mediaFiles == null) {return RestResponse.validfail(false, "文件入庫失敗");}// 4.清理分塊文件clearChunkFiles(chunkFileFolderPath, chunkTotal);return RestResponse.success(true);}//得到分塊文件的目錄private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}/*** 得到合并后的文件的地址** @param fileMd5 文件id即md5值* @param fileExt 文件擴展名* @return*/private String getFilePathByMd5(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}/*** 從minio下載文件** @param bucket     桶* @param objectName 對象名稱* @return 下載后的文件*/public File downloadFileFromMinIO(String bucket, String objectName) {//臨時文件File minioFile = null;FileOutputStream outputStream = null;try {InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());//創建臨時文件minioFile = File.createTempFile("minio", ".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(stream, outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}/*** 清除分塊文件** @param chunkFileFolderPath 分塊文件路徑* @param chunkTotal          分塊文件總數*/private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {try {//待刪除分塊文件列表List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))).collect(Collectors.toList());// 分塊文件信息RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();// 清除分塊文件Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);// 真正刪除分塊文件results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();log.error("清除分塊文件失敗,objectname:{}", deleteError.objectName(), e);}});} catch (Exception e) {e.printStackTrace();log.error("清除分塊文件失敗,chunkFileFolderPath:{}", chunkFileFolderPath, e);}}}

controller完善

/*** @author Mr.M* @version 1.0* @description 大文件上傳接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上傳接口", tags = "大文件上傳接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {// todo: 機構idLong companyId = 1232141425L;UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();uploadFileParamsDto.setFileType("001002");uploadFileParamsDto.setTags("課程視頻");uploadFileParamsDto.setRemark("");uploadFileParamsDto.setFilename(fileName);return mediaFileService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);}}

功能測試: 下邊進行前后端聯調

  1. 上傳一個視頻測試合并分塊的執行邏輯

進入service方法逐行跟蹤。

  1. 斷點續傳測試

上傳一部分后,停止刷新瀏覽器再重新上傳,通過瀏覽器日志發現已經上傳過的分塊不再重新上傳

  1. 文件分片上傳后合并分片, 合并完成后刪除分片文件

update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?

多個線程同時執行上邊的sql只會有一個線程執行成功。

  1. 什么是樂觀鎖、悲觀鎖?
  • synchronized是一種悲觀鎖,在執行被synchronized包裹的代碼時需要首先獲取鎖,沒有拿到鎖則無法執行,是總悲觀的認為別的線程會去搶,所以要悲觀鎖。
  • 樂觀鎖的思想是它不認為會有線程去爭搶,盡管去執行,如果沒有執行成功就再去重試。

定義mapper

package com.xuecheng.media.mapper;/*** <p>*  Mapper 接口* </p>** @author itcast*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 開啟一個任務* @param id 任務id* @return 更新記錄數*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);}

service方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/***  開啟一個任務* @param id 任務id* @return true開啟任務成功,false開啟任務失敗*/public boolean startTask(long id);}
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;//實現如下public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}}

更新任務狀態

任務處理完成需要更新任務處理結果,任務執行成功更新視頻的URL、及任務處理結果,將待處理任務記錄刪除,同時向歷史任務表添加記錄。

在MediaFileProcessService接口添加方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @description 保存任務結果* @param taskId  任務id* @param status 任務狀態* @param fileId  文件id* @param url url* @param errorMsg 錯誤信息* @return void* @author Mr.M* @date 2022/10/15 11:29*/void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);}

service接口方法實現如下:

package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任務,如果不存在則直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//處理失敗,更新任務處理結果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//處理失敗if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任務處理狀態為失敗,任務信息:{}",mediaProcess_u);return ;}//任務處理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒資文件中的訪問urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//處理成功,更新url和狀態mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到歷史記錄MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//刪除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}}

視頻處理

視頻采用并發處理,每個視頻使用一個線程去處理,每次處理的視頻數量不要超過cpu核心數。

所有視頻處理完成結束本次執行,為防止代碼異常出現無限期等待則添加超時設置,到達超時時間還沒有處理完成仍結束任務。

定義任務類VideoTask 如下:

package com.xuecheng.media.jobhandler;/*** 視頻處理任務類** @author Mr.M* @version 1.0* @description TODO* @date 2022/10/15 11:58*/
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileProcessService mediaFileProcessService;@AutowiredMediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpeg_path;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片參數int shardIndex = XxlJobHelper.getShardIndex(); // 執行器序號,從0開始int shardTotal = XxlJobHelper.getShardTotal();  // 執行器總數//取出cpu核心數作為一次處理數據的條數int processors = Runtime.getRuntime().availableProcessors();//查詢待處理的任務List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);//實際查到的任務數int size = mediaProcessList.size();log.debug("取到視頻處理任務數:" + size);if (size <= 0) {return;}//創建線程池ExecutorService executorService = Executors.newFixedThreadPool(size);// 使用的計數器CountDownLatch countDownLatch = new CountDownLatch(size);mediaProcessList.forEach(mediaProcess -> {// 將任務加入線程池executorService.execute(() -> {try {Long taskId = mediaProcess.getId(); // 任務id// 開啟任務boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("搶占任務失敗, 任務id: {}", taskId);return;}// 準備參數String bucket = mediaProcess.getBucket();  //桶String filePath = mediaProcess.getFilePath();  //存儲路徑String fileId = mediaProcess.getFileId(); //原始視頻的md5值//將要處理的文件下載到服務器上File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);if (originalFile == null) {log.debug("下載待處理文件失敗,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗");return;}//處理結束的視頻文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("創建mp4臨時文件失敗");mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "創建mp4臨時文件失敗");return;}//視頻處理結果String result = "";try {//開始處理視頻Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//開始視頻轉換,成功將返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("處理視頻文件:{},出錯:{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {//記錄錯誤信息log.error("處理視頻失敗,視頻地址:{},錯誤信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//將mp4上傳至minio//mp4在minio的存儲路徑String objectName = getFilePath(fileId, ".mp4");//訪問urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//將url存儲至數據,并更新狀態為成功,并將待處理視頻記錄刪除存入歷史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上傳視頻失敗或入庫失敗,視頻地址:{},錯誤信息:{}", bucket + objectName, e.getMessage());//最終還是失敗了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "處理后視頻上傳或入庫失敗");}} finally {// 計數器減1countDownLatch.countDown();}});});//等待,給一個充裕的超時時間,防止無限等待,到達超時時間還沒有處理完成則結束任務countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}}

測試

基本測試

進入xxl-job調度中心添加執行器和視頻處理任務

  1. 添加執行器

  1. 視頻處理任務

  • 在xxl-job配置任務調度策略:
    • 1)配置阻塞處理策略為:丟棄后續調度。
    • 2)配置視頻處理調度時間間隔不用根據視頻處理時間去確定,可以配置的小一些,如:5分鐘,即使到達調度時間如果視頻沒有處理完會丟棄調度請求。

  1. 配置完成開始測試視頻處理:
  • 首先上傳至少4個視頻,非mp4格式。

  • 在xxl-job啟動視頻處理任務

  • 觀察媒資管理服務后臺日志

失敗測試

1、先停止調度中心的視頻處理任務。

2、上傳視頻,手動修改待處理任務表中file_path字段為一個不存在的文件地址

3、啟動任務

觀察任務處理失敗后是否會重試,并記錄失敗次數。

搶占任務測試

1、修改調度中心中視頻處理任務的阻塞處理策略為“覆蓋之間的調度”

2、在搶占任務代碼處打斷點并選擇支持多線程方式

3、在搶占任務代碼處的下邊兩行代碼分別打上斷點,避免觀察時代碼繼續執行。

4、啟動任務

此時多個線程執行都停留在斷點處

依次放行,觀察同一個任務只會被一個線程搶占成功。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/73325.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/73325.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/73325.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Maven安裝與環境配置

首先我們先介紹一些關于Maven的知識&#xff0c;如果著急直接看下面的安裝教程。 目錄 Maven介紹 Maven模型 Maven倉庫 Maven安裝 下載 安裝步驟 Maven介紹 Apache Maven是一個項目管理和構建工具&#xff0c;它基于項目對象模型(Project Object Model , 簡稱: POM)的概念…

【新能源汽車溫度采集與控制系統設計深度解析】

面向汽車行業研發與測試測量設備從業者的技術指南 一、硬件架構設計 新能源汽車的溫度采集與控制系統是保障電池、電機、電控等核心部件安全運行的核心技術之一。其硬件架構需兼顧高精度、抗干擾、可靠性與集成化&#xff0c;以下從信號調理電路、ADC模塊、隔離設計三個維度展…

AI Tokenization

AI Tokenization 人工智能分詞初步了解 類似現在這個&#xff0c;一格子 一格子&#xff0c;拼接出來的&#xff0c;一行或者一句&#xff0c;像不像&#xff0c;我們人類思考的時候組裝出來的話&#xff0c;并用嘴說出來了呢。

React(四)setState原理-性能優化-ref

setState詳解 實現原理 開發中我們并不能直接修改State來重新渲染界面&#xff1a; 因為修改State之后&#xff0c;希望React根據最新的State來重新渲染界面&#xff0c;但這種方式的修改React并不知道數據發生了變化&#xff1b; React并沒有類似于Vue2中的Object.defineP…

SSH密鑰認證 + 文件系統權限控制 + Git倉庫配置+封存與解封GIT倉庫

在本地服務器上實現多個用戶僅通過git push操作修改倉庫、禁止其他改寫方式的需求&#xff0c;可以通過以下步驟實現&#xff1a; 方法概述 通過SSH密鑰認證 文件系統權限控制 Git倉庫配置&#xff0c;確保用戶僅能通過git push命令提交修改&#xff0c;而無法通過直接操作服…

全文通讀:126頁華為IPD集成產品開發與DFX實戰【文末附可編輯PPT下載鏈接】

綁定資料內容: 12023華為流程體系及落地實施【108頁 PPT】.pptx22024版基于華為IPD與質量管理體系融合的研發質量管理【63頁】.pptx

//TODO 動態代理的本質?

待解決 //TODO 面試題 為啥mybatis的mapper只有接口沒有實現類&#xff0c;但它卻能工作&#xff1f;?(ai參考,待深究源碼) 1. 動態代理生成代理對象 MyBatis 使用 JDK 動態代理 為每個 Mapper 接口生成代理對象&#xff1a; ? 核心類&#xff1a;MapperProxy&#xff08;…

C++11中智能指針的使用(shared_ptr、unique_ptr、weak_ptr)

C11中智能指針的使用(shared_ptr、unique_ptr、weak_ptr) 一、shared_ptr原理 shared_ptr 是另一種智能指針&#xff0c;用于實現多個 shared_ptr 實例共享同一個對象的所有權。它通過內部的控制塊&#xff08;通常是一個包含計數器和指向對象的指針的結構&#xff09;來管理…

2024年認證杯SPSSPRO杯數學建模B題(第二階段)神經外科手術的定位與導航全過程文檔及程序

2024年認證杯SPSSPRO杯數學建模 B題 神經外科手術的定位與導航 原題再現&#xff1a; 人的大腦結構非常復雜&#xff0c;內部交織密布著神經和血管&#xff0c;所以在大腦內做手術具有非常高的精細和復雜程度。例如神經外科的腫瘤切除手術或血腫清除手術&#xff0c;通常需要…

嘗試在軟考62天前開始成為軟件設計師-信息系統安全

安全屬性 保密性:最小授權原則(能干活的最小權限)、防暴露(隱藏)、信息加密、物理保密完整性(防篡改):安全協議、校驗碼、密碼校驗、數字簽名、公證 可用性:綜合保障( IP過濾、業務流控制、路由選擇控制、審計跟蹤)不可抵賴性:數字簽名 對稱加密 DES :替換移位 3重DESAESR…

Rocky9.5基于sealos快速部署k8s集群

首先需要下載 Sealos 命令行工具&#xff0c;sealos 是一個簡單的 Golang 二進制文件&#xff0c;可以安裝在大多數 Linux 操作系統中。 以下是一些基本的安裝要求&#xff1a; 每個集群節點應該有不同的主機名。主機名不要帶下劃線。 所有節點的時間需要同步。 需要在 K8s …

G口服務器和普通服務器之間的區別

今天小編主要來為大家介紹一下G口服務器和普通服務器之間的區別&#xff01; 首先&#xff0c;從硬件配置上看&#xff0c;普通服務器通常都會配備中央處理器、內存和硬盤等基本的硬件配置&#xff0c;能夠適用于各種應用程序和服務&#xff1b;G口服務器除了基礎的硬件配置還增…

Cursor軟件如何刷新機器碼流程

一.退出Cursor軟件賬號 打開Cursor軟件&#xff0c;點擊設置-->General-->Account-->Log out,現將Cursor軟件上登錄的賬戶退出。 二.將Cursor官網上登錄的Cursor賬戶也清空掉 點擊頭像--> ACCOUNT SETTINGS -->Account-->Advanced-->Delete Account-->…

類與對象(中)(詳解)

【本節目標】 1. 類的6個默認成員函數 2. 構造函數 3. 析構函數 4. 拷貝構造函數 5. 賦值運算符重載 6. const成員函數 7. 取地址及const取地址操作符重載 1.類的6個默認成員函數 如果一個類中什么成員都沒有&#xff0c;簡稱為空類。 空類中真的什么都沒有嗎&…

開發語言漫談-groovy

groovy是一門腳本語言&#xff0c;在前期的腳本語言中簡單介紹了下。現在再深入介紹下&#xff0c;因為它是本平臺上選用的腳本語言。所謂腳本語言就是不用編譯&#xff0c;直接執行。這種特色非常適合做嵌入編程&#xff0c;即編即用。我們知道平臺后臺的業務開發語言是Java&a…

React+Ant Design的Layout布局實現暗黑模式切換

目錄 效果預覽完整代碼我遇到的BUG問題代碼BUG1&#xff1a;暗黑模式下內容區不變成深色BUG2&#xff1a;光亮模式下的左右區域是深色 補充知識ConfigProvider是什么&#xff1f;Ant Design中的theme如何使用&#xff1f;theme 配置的常見字段主題算法通過 useToken 獲取主題 效…

TCP 三次握手與四次揮手過程

TCP 作為一種面向連接的、可靠的傳輸層協議&#xff0c;其連接管理機制對于保障數據的可靠傳輸至關重要。 三次握手&#xff08;建立連接&#xff09; 三次握手是 TCP 建立連接時所采用的機制&#xff0c;其目的在于確保客戶端和服務器雙方都具備發送和接收數據的能力&#x…

【線程安全的單例模式和STL是否是線程安全/智能指針是否是線程安全】

文章目錄 一、單例模式的特點二、餓漢模式實現單例三、懶漢模式實現單例四、STL線程安全嗎&#xff1f;五、智能指針線程安全嗎&#xff1f; 一、單例模式的特點 一個類&#xff0c;只應該實例化了一個對象&#xff0c;就是單例。 二、餓漢模式實現單例 舉個餓漢模式的例子&…

力扣DAY24 | 熱100 | 回文鏈表

前言 簡單 √ 是反轉鏈表的衍生題&#xff0c;很快寫完了。不過沒考慮到恢復鏈表結構的問題。 題目 給你一個單鏈表的頭節點 head &#xff0c;請你判斷該鏈表是否為回文鏈表。如果是&#xff0c;返回 true &#xff1b;否則&#xff0c;返回 false 。 示例 1&#xff1a; 輸…

【GL010】C++

1.C中的const關鍵字有哪些用法&#xff1f; 1.修飾變量&#xff1a;表示變量的值不可修改。 const int a 10; 2.修飾指針&#xff1a; const int* p&#xff1a; // 指針指向的內容不可修改。 int* const p&#xff1a; // 指針本身不可修改。 const int* const…