在奇麟大數據業務系統的開發及使用過程中,例如OBS對象存儲文件管理、流計算DSC依賴管理,經常會遇到上傳文件這樣的基礎需求,一般情況下,前端上傳文件就是new FormData,然后把文件 append 進去,然后post發送給后端就完事了,但是文件越大,上傳的文件也就越長,在上傳過程中,經常會遇到請求超時或者阻礙用戶繼續操作的情況,十分影響用戶體驗,因為把該文件直接在一個請求體中提交,會出現一些問題,以nginx為例:
其默認允許1MB以內的文件
超過1MB的文件,需要設置?
client_max_body_size
放開體積限制
一味地只放寬Nginx的限制,會出現一些問題,最明顯的問題是服務器的存儲和網絡帶寬壓力都會非常大。
以奇麟云存儲OBS對象存儲文件管理中使用的常規上傳解決方案為例展示
// 將目標文件轉為FormData數據流function?uploadRequest?(params)?{? ? ?const?file =?params.file? ? ?const?form =?new?FormData()? ? ?form.append('file', file)? ? ?const?config = {? ? ? ? ? ?headers: {? ? ? ? ? ? ??'Content-Type':?'multipart/form-data',? ? ? ? ? ?},? ? ? ? ? ?onUploadProgress: e => {? ? ? ? ? ? ??// 上傳進度百分比? ? ? ? ? ? ??if?(e.total >?0) {? ? ? ? ? ? ? ? ? e.percent = e.loaded / e.total *?100? ? ? ? ? ? ? }? ? ? ? ? ? ??params.onProgress(e)? ? ? ? ? ?},? ? ? ? ? ?timeout:?0,?// 上傳文件不設置超時時間? ? ?}? ? ?await?request({'XXXX','post', form, config}? ? ? ? ?.then(res => {? ? ? ? ??// 上傳成功? ? ? ? ? ? ?params.onSuccess(res)? ? ? ? ? })? ? ? ? ? ? .catch(err => {? ? ? ? ? ? ??params.onError(err)? ? ? ? ? ? })? ? ? ? ? ?.finally(() => {})}
上傳文件過大的情況,很容易出現網絡超時導致接口報錯的情況,且用戶在當前操作頁面無法終止,只能持續等待直至上傳操作完成
為了解決上述業務痛點,這里就需要尋求大文件上傳的解決方案,核心是利用?Blob.prototype.slice
?方法,和數組的 slice 方法相似,文件的 slice 方法可以返回?原文件的某個切片
提前定義好單個切片大小,將大文件切分為一個個小的切片,然后借助 http 的可并發性,同時上傳多個切片。這樣從原本傳一個大文件,變成了并發傳多個小的文件切片,可以大大減少上傳時間
因為由于是并發請求傳輸,所以傳輸到服務端的切片文件順序可能會發生變化,因此我們還需要給每個切片記錄順序
1、前端實現部分
前端使用 Vue 作為開發框架,組件庫使用 Element-Plus 作為 UI 框架
1.1 上傳控件
首先創建選擇文件的控件并監聽 change 事件,另外就是上傳按鈕
<template>? ?<div?class="upload-container">? ??<input?type="file"?@change="handleFileChange"?/>? ??<el-button?@click="handleUpload">upload</el-button>??</div></template><script?setup>import?{ ref }?from?'vue'const?container =?ref({??file:?null})function?handleFileChange(e) {? ? ?const?[file] = e.target.files;? ? ?if?(!file)?return;? ? ?container.value.file?= file;}async?handleUpload() {??// ...}</script>
1.2 請求邏輯
request({? ? ?url,? ? ?method =?"post",? ? ?data,? ? ?headers = {},? ? ?requestList? ?}) {? ? ?return?new?Promise(resolve?=>?{? ? ? ?const?xhr =?new?XMLHttpRequest();? ? ? ?xhr.open(method, url);? ? ? ?Object.keys(headers).forEach(key?=>? ? ? ? ?xhr.setRequestHeader(key, headers[key])? ? ? ?);? ? ? ?xhr.send(data);? ? ? ?xhr.onload?=?e?=>?{? ? ? ? ?resolve({? ? ? ? ? ?data: e.target.response? ? ? ? ?});? ? ? ?};? ? ?});}
1.3 上傳切片
接著就要實現核心的上傳功能,上傳需要做兩件事
對文件進行切片
將切片傳輸給服務端
<template>??<div>? ??<input?type="file"?@change="handleFileChange"?/>? ??<el-button?@click="handleUpload">上傳</el-button>??</div></template><script?setup>import?{ ref }?from?'vue';// 設置默認切片大小 10MB?const?SIZE?=?10?*?1024?*?1024;?const?data =?ref([])const?container =?ref({??file:?null})function?request() {? ?// ****}function?handleFileChange() {? ?// ***}??// 生成文件切片??function?createFileChunk(file, size = SIZE) {? ?const?fileChunkList = [];? ??let?cur =?0;? ??while?(cur < file.size) {? ? ? fileChunkList.push({?file: file.slice(cur, cur + size) });? ? ? cur += size;? ? }? ??return?fileChunkList;? }??// 上傳切片??async?function?uploadChunks() {? ??const?requestList = data.value? ? ? .map(({ chunk,hash }) =>?{? ? ? ??const?formData =?new?FormData();? ? ? ? formData.append("chunk", chunk);? ? ? ? formData.append("hash", hash);? ? ? ? formData.append("filename", container.value.file.name);? ? ? ??return?{ formData };? ? ? })? ? ? .map(({ formData }) =>? ? ? ??request({? ? ? ? ??url:?"http://localhost:8888",? ? ? ? ??data: formData? ? ? ? })? ? ? );? ? ?// 并發請求? ??await?Promise.all(requestList);?? }??async?function?handleUpload() {? ??if?(!container.value.file)?return;? ??const?fileChunkList =?createFileChunk(container.value.file);? ? data.value?= fileChunkList.map(({ file },index) =>?({? ? ??chunk: file,? ? ? ??// 文件名 + 數組下標? ? ??hash: container.value.file.name?+?"-"?+ index? ? }));? ??await?uploadChunks();? }</script>
用戶點擊上傳按鈕時,調用 createFileChunk方法 將文件切片,切片數量通過文件大小控制,這里設置 10MB,也就是說一個 100 MB 的文件會被分成 10 個 10MB 的切片
createFileChunk 內使用 while 循環和 slice 方法將切片放入 fileChunkList 數組中返回
在生成文件切片時,需要給每個切片一個標識作為 hash,這里暫時使用文件名 + 下標的標記方式,這樣后端可以知道當前切片是第幾個切片,用于之后的合并切片
隨后調用 uploadChunks 上傳所有的文件切片,將文件切片,切片 hash,以及文件名放入 formData 中,再調用上一步的 request 函數返回一個 proimise,最后調用 Promise.all 并發上傳所有的切片
1.4 發送合并請求
使用整體思路中提到的第二種合并切片的方式,即前端主動通知服務端進行合并
前端發送額外的合并請求,服務端接受到請求時合并切片
<template>??<div>? ??<input?type="file"?@change="handleFileChange"?/>? ??<el-button?@click="handleUpload">upload</el-button>??</div></template><script?setup>import?{ ref }?from?'vue';// 設置默認切片大小 10MB?const?SIZE?=?10?*?1024?*?1024;?const?data =?ref([])const?container =?ref({??file:?null})function?request() {??// ****}function?handleFileChange() {??// ***}// 生成文件切片function?createFileChunk(file, size = SIZE) {??const?fileChunkList = [];? ?let?cur =?0;? ?while?(cur < file.size) {? ? ?fileChunkList.push({?file: file.slice(cur, cur + size) });? ? ?cur += size;? ?}? ?return?fileChunkList;}// 上傳切片async?function?uploadChunks() {? ?const?requestList = data.value? ? ?.map(({ chunk,hash }) =>?{? ? ? ?const?formData =?new?FormData();? ? ? ?formData.append("chunk", chunk);? ? ? ?formData.append("hash", hash);? ? ? ?formData.append("filename", container.value.file.name);? ? ? ?return?{ formData };? ? ?})? ? ?.map(({ formData }) =>? ? ? ?request({? ? ? ? ?url:?"http://localhost:8888",? ? ? ? ?data: formData? ? ? ?})? ? ?);? ??// 并發請求? ?await?Promise.all(requestList)??// 合并切片? ?await?mergeRequest()}async?function?handleUpload() {? ?if?(!container.value.file)?return;? ?const?fileChunkList =?createFileChunk(container.value.file);? ?data.value?= fileChunkList.map(({ file },index) =>?({? ? ?chunk: file,? ? ??// 文件名 + 數組下標? ? ?hash: container.value.file.name?+?"-"?+ index? ?}));??await?uploadChunks()??async?function?mergeRequest() {? ? ?await?request({? ? ? ?url:?"http://localhost:8888/mergeFile",? ? ? ?headers: {? ? ? ? ?"content-type":?"application/json"? ? ? ?},? ? ? ?data:?JSON.stringify({? ? ? ? ?filename: container.value.file.name? ? ? ?})? ? ?})? ?}}</script>
2、服務端部分,以NodeJs為例
服務端負責接受前端傳輸的切片,并在接收到所有切片后合并所有切片
這里又引伸出兩個問題
服務端合并切片的時機,即切片什么時候傳輸完成?
如何進行切片合并,合并的規則是什么?
第一個問題需要前端配合,前端在每個切片中都攜 帶切片最大數量的信息,當服務端接受到這個數量的切片時自動合并。或者也可以額外發一個請求,主動通知服務端進行切片的合并
第二個問題,具體如何合并切片呢?這里可以使用 Nodejs 的 讀寫流(readStream/writeStream),將所有切片的流傳輸到最終文件的流里
使用 http 模塊搭建一個簡單服務端
const?http =?require("http");const?server = http.createServer();server.on("request",?async?(req, res) => {? res.setHeader("Access-Control-Allow-Origin",?"*");? res.setHeader("Access-Control-Allow-Headers",?"*");??if?(req.method?===?"OPTIONS") {? ? res.status?=?200;? ? res.end();? ??return;? }});server.listen(8888,?() =>?console.log("listening port 8888"));
2.1 接受切片
使用?multiparty
?處理前端傳來的 formData
在 multiparty.parse 的回調中,files 參數保存了 formData 中文件,fields 參數保存了 formData 中非文件的字段
const?http =?require("http");const?path =?require("path");const?fse =?require("fs-extra");const?multiparty =?require("multiparty");const?server = http.createServer();// 大文件上傳的臨時存儲目錄const?UPLOAD_DIR_NAME?= path.resolve(__dirname,?"..",?"target");server.on("request",?async?(req, res) => {? res.setHeader("Access-Control-Allow-Origin",?"*");? res.setHeader("Access-Control-Allow-Headers",?"*");??if?(req.method?===?"OPTIONS") {? ? res.status?=?200;? ? res.end();? ??return;? }??const?multipart =?new?multiparty.Form();? multipart.parse(req,?async?(err, fields, files) => {? ??if?(err) {? ? ??return;? ? }? ??const?[chunk] = files.chunk;? ??const?[hash] = fields.hash;? ??const?[filename] = fields.filename;? ??// 創建臨時文件夾用于臨時存儲 chunk? ??// 添加 chunkDir 前綴與文件名做區分? ??const?chunkDir = path.resolve(UPLOAD_DIR_NAME,?'chunkDir'? filename);? ??if?(!fse.existsSync(chunkDir)) {? ? ??await?fse.mkdirs(chunkDir);? ? }? ??// fs-extra 的 rename 方法 windows 平臺會有權限問題? ??// 參考: https://github.com/meteor/meteor/issues/7852#issuecomment-255767835? ??await?fse.move(chunk.path,?`${chunkDir}/${hash}`);? ? res.end("received file chunks");? });});server.listen(8888,?() =>?console.log("listening port 8888"));
查看 multiparty 處理后的 chunk 對象,path 是存儲臨時文件的路徑,size 是臨時文件大小,在 multiparty 文檔中提到可以使用 fs.rename(這里換成了 fs.remove, 因為 fs-extra 的 rename 方法在 windows 平臺存在權限問題)
在接受文件切片時,需要先創建臨時存儲切片的文件夾,以 chunkDir 作為前綴,文件名作為后綴
由于前端在發送每個切片時額外攜帶了唯一值 hash,所以以 hash 作為文件名,將切片從臨時路徑移動切片文件夾中,最后的結果如下
2.2 合并切片
在接收到前端發送的合并請求后,服務端將文件夾下的所有切片進行合并
const?http =?require("http");const?path =?require("path");const?fse =?require("fs-extra");const?server = http.createServer();const?UPLOAD_DIR_NAME?= path.resolve(__dirname,?"..",?"target");+?const?resolvePost?= req =>+ ??new?Promise(resolve?=>?{+ ? ??let?chunk =?"";+ ? ? req.on("data",?data?=>?{+ ? ? ? chunk += data;+ ? ? });+ ? ? req.on("end",?() =>?{+ ? ? ??resolve(JSON.parse(chunk));+ ? ? });+ ? });+?// 寫入文件流+?const?pipeStream?= (path, writeStream) =>+ ?new?Promise(resolve?=>?{+ ? ?const?readStream = fse.createReadStream(path);+ ? ?readStream.on("end",?() =>?{+ ? ? ?fse.unlinkSync(path);+ ? ? ?resolve();+ ? ?});+ ? ?readStream.pipe(writeStream);+ ?});// 合并切片+?const?mergeFileChunk?=?async?(filePath, filename, size) => {+ ??const?chunkDir = path.resolve(UPLOAD_DIR_NAME,?'chunkDir'?+ filename);+ ??const?chunkPaths =?await?fse.readdir(chunkDir);+ ??// 根據切片下標進行排序+ ??// 否則直接讀取目錄的獲得的順序會錯亂+ ? chunkPaths.sort((a, b) =>?a.split("-")[1] - b.split("-")[1]);+ ??// 并發寫入文件+ ??await?Promise.all(+ ? ? chunkPaths.map((chunkPath, index) =>+ ? ? ??pipeStream(+ ? ? ? ? path.resolve(chunkDir, chunkPath),+ ? ? ? ??// 根據 size 在指定位置創建可寫流+ ? ? ? ? fse.createWriteStream(filePath, {+ ? ? ? ? ??start: index * size,+ ? ? ? ? })+ ? ? ? )+ ? ? )+ ?);+ ?// 合并后刪除保存切片的目錄+ ?fse.rmdirSync(chunkDir);+};server.on("request",?async?(req, res) => {? res.setHeader("Access-Control-Allow-Origin",?"*");? res.setHeader("Access-Control-Allow-Headers",?"*");??if?(req.method?===?"OPTIONS") {? ? res.status?=?200;? ? res.end();? ??return;? }+ ??if?(req.url?===?"/mergeFile") {+ ? ??const?data =?await?resolvePost(req);+ ? ??const?{ filename,size } = data;+ ? ??const?filePath = path.resolve(UPLOAD_DIR_NAME,?`${filename}`);+ ? ??await?mergeFileChunk(filePath, filename);+ ? ? res.end(+ ? ? ??JSON.stringify({+ ? ? ? ??code:?0,+ ? ? ? ??message:?"file merged success"+ ? ? ? })+ ? ? );+ ? }});server.listen(8888,?() =>?console.log("listening port 8888"));
由于前端在發送合并請求時會攜帶文件名,服務端根據文件名可以找到上一步創建的切片文件夾,接著使用 fs.createWriteStream 創建一個可寫流,可寫流文件名就是上傳時的文件名,隨后遍歷整個切片文件夾,將切片通過 fs.createReadStream 創建可讀流,傳輸合并到目標文件中。
值得注意的是每次可讀流都會傳輸到可寫流的指定位置,這是通過 createWriteStream 的第二個參數 start 控制的,目的是能夠并發合并多個可讀流至可寫流中,這樣即使并發時流的順序不同,也能傳輸到正確的位置。所以還需要讓前端在請求的時候提供之前設定好的 size 給服務端,服務端根據 size 指定可讀流的起始位置
async?mergeRequest() {? ? ??await?request({? ? ? ??url:?"http://localhost:8888/mergeFile",? ? ? ??headers: {? ? ? ? ??"content-type":?"application/json"? ? ? ? },? ? ? ??data:?JSON.stringify({+ ? ? ? ??size:?SIZE,? ? ? ? ??filename: container.value.file.name? ? ? ? })? ? ? });? ? }
其實也可以等上一個切片合并完后再合并下個切片,這樣就不需要指定位置,但傳輸速度會降低,所以使用了并發合并的手段
接著只要保證每次合并完成后刪除這個切片,等所有切片都合并完畢后最后刪除切片文件夾即可
至此一個簡單的大文件上傳就完成了,接下來我們在此基礎上擴展一些額外的功能
2.3 總進度條
將每個切片已上傳的部分累加,除以整個文件的大小,就能得出當前文件的上傳進度,所以這里使用 Vue 的計算屬性
computed: {? ? ? ?uploadPercentage() {? ? ? ? ??if?(!container.value.file?|| !data.value.length)?return?0;? ? ? ? ??const?loaded = data.value? ? ? ? ? ? .map(item?=>?item.size?* item.percentage)? ? ? ? ? ? .reduce((acc, cur) =>?acc + cur);? ? ? ? ??return?parseInt((loaded / container.value.file.size).toFixed(2));? ? ? ? }?}
經過切片邏輯改造之后,切片的每塊文件都能分別統計到上傳進度,如下圖所示:
改造之后的大文件上傳效果展示:
3. 總結
大文件上傳
前端上傳大文件時使用 Blob.prototype.slice 將文件切片,并發上傳多個切片,最后發送一個合并的請求通知服務端合并切片
服務端接收切片并存儲,收到合并請求后使用流將切片合并到最終文件
原生 XMLHttpRequest 的 upload.onprogress 對切片上傳進度的監聽
使用 Vue 計算屬性根據每個切片的進度算出整個文件的上傳進度
用戶等待上傳時間大大縮減,改善用戶的體驗
更多技術干貨,
請關注“360智匯云開發者”👇
360智匯云官網:https://zyun.360.cn(復制在瀏覽器中打開)
更多好用又便宜的云產品,歡迎試用體驗~
添加工作人員企業微信👇,get更快審核通道+試用包哦~