springboot 分片上傳文件 - postgres(BLOB存儲)
- 方案一(推薦)
? 接收完整文件,后端自動分片并存儲(多線程 大文件)
/*** 接收完整文件,后端自動分片并存儲(多線程 大文件)* @param file* @return* @throws Exception*/public String uploadChunkFile(MultipartFile file) throws Exception {String uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小異常,無法分片";}// 1. 創建臨時目錄存儲分片(大文件避免內存溢出)File tempDir = Files.createTempDirectory("file-chunk-").toFile();//設置JVM退出時自動刪除該目錄tempDir.deleteOnExit();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 2. 先將所有分片寫入臨時文件(流式處理,不占大量內存)while ((bytesRead = inputStream.read(buffer)) != -1) {File chunkFile = new File(tempDir, uploadId + "-" + chunkIndex);try (FileOutputStream fos = new FileOutputStream(chunkFile)) {fos.write(buffer, 0, bytesRead); // 只寫實際讀取的字節}chunkIndex++;}// 3. 一次性提交所有分片任務,使用工具類等待完成ThreadPoolUtils.getNewInstance().submitBatchTasks((int) totalChunks, taskIndex -> {try {// 讀取臨時分片文件(每個任務只加載自己的分片數據)File chunkFile = new File(tempDir, uploadId + "-" + taskIndex);byte[] chunkData = Files.readAllBytes(chunkFile.toPath());// 存儲到數據庫FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(taskIndex);fileUploadMapper.insertFile(entity);} catch (IOException e) {throw new RuntimeException("分片" + taskIndex + "存儲失敗", e);}});} catch (Exception e) {log.error("文件分片上傳失敗", e);throw new RuntimeException("文件分片上傳失敗");} finally {// 4. 清理臨時文件deleteDir(tempDir);}return "文件已成功分片存儲,uploadId: " + uploadId;}// 遞歸刪除臨時目錄private boolean deleteDir(File dir) {if (dir.isDirectory()) {File[] children = dir.listFiles();if (children != null) {for (File child : children) {deleteDir(child);}}}return dir.delete();}
- 方案二
? 接收完整文件,后端自動分片并存儲(多線程 小文件)。。。 大文件可能會內存溢出
/*** 接收完整文件,后端自動分片并使用多線程存儲 (多線程 小文件)* @param file* @return* @throws IOException, InterruptedException*/
public String uploadChunkFile(MultipartFile file) throws IOException, InterruptedException {// 生成唯一上傳ID,用于標識同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 計算總分片數long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小異常,無法分片";}// 讀取文件所有分片數據到內存(小文件適用,大文件建議使用磁盤臨時文件)List<byte[]> chunkDataList = new ArrayList<>();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);chunkDataList.add(chunkData);}}// 獲取線程池工具類實例ThreadPoolUtils threadPool = ThreadPoolUtils.getNewInstance();// 提交批量分片任務并等待完成threadPool.submitBatchTasks((int) totalChunks, chunkIndex -> {byte[] currentChunkData = chunkDataList.get(chunkIndex);long currentChunkSize = currentChunkData.length;// 存儲分片數據到數據庫FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(currentChunkSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(currentChunkData);fileUpload.setChunkIndex(chunkIndex);fileUploadMapper.insertFile(fileUpload);});return "文件已成功分片存儲,uploadId: " + uploadId;
}
- 方案三
? 接收完整文件,后端自動分片并存儲 (單線程) 。。。。上傳大文件時間太久
/*** 接收完整文件,后端自動分片并存儲* @param file* @return* @throws IOException*/public String uploadChunkFileBackup(MultipartFile file) throws IOException {// 生成唯一上傳ID,用于標識同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 計算總分片數long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);List<FileUploadEntity> list = new ArrayList<>();try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 循環讀取文件并分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 處理最后一個可能小于標準分片大小的分片byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);// 獲取分片實際大小(字節數)long chunkActualSize = bytesRead; // 這就是當前分片的實際大小// 存儲當前分片
// saveChunk(uploadId, chunkIndex, totalChunks, chunkData, fileSize, fileName);// 存儲當前分片FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(chunkActualSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(chunkData);fileUpload.setChunkIndex(chunkIndex);fileUploadMapper.insertFile(fileUpload);
// list.add(fileUpload);chunkIndex++;}}//批量添加
// int batchSize = 500;
// for (int i = 0; i < list.size(); i += batchSize) {
// int end = Math.min(i + batchSize, list.size());
// List<FileUploadEntity> subList = list.subList(i, end);
// fileUploadMapper.batchInsert(subList);
// }return "文件已成功分片存儲,uploadId: " + uploadId;}
-
方案四
接收完整文件,后端自動分片并使用 (多線程)線程池未封裝
/*** 接收完整文件,后端自動分片并使用 (多線程)線程池未封裝* @param file* @return* @throws IOException, InterruptedException*/
// @Overridepublic String uploadChunkFile(MultipartFile file) throws IOException, InterruptedException {// 生成唯一上傳ID,用于標識同一文件的所有分片String uploadId = UUID.randomUUID().toString();String fileName = file.getOriginalFilename();long fileSize = file.getSize();// 計算總分片數long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);// 創建線程池,核心線程數可根據服務器配置調整// 通常設置為CPU核心數 * 2 + 1int corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;ExecutorService executorService = Executors.newFixedThreadPool(corePoolSize);// 使用CountDownLatch等待所有線程完成CountDownLatch countDownLatch = new CountDownLatch((int) totalChunks);try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 循環讀取文件并分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 處理最后一個可能小于標準分片大小的分片byte[] chunkData = new byte[bytesRead];System.arraycopy(buffer, 0, chunkData, 0, bytesRead);long chunkActualSize = bytesRead;// 捕獲當前變量的快照,避免線程安全問題final int currentChunkIndex = chunkIndex;final byte[] currentChunkData = chunkData;final long currentChunkSize = chunkActualSize;// 提交分片存儲任務到線程池executorService.submit(() -> {try {FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(uploadId);fileUpload.setChunkSize(currentChunkSize);fileUpload.setChunkNum(totalChunks);fileUpload.setChunkFile(currentChunkData);fileUpload.setChunkIndex(currentChunkIndex);fileUploadMapper.insertFile(fileUpload);} finally {// 無論是否發生異常,都減少計數器countDownLatch.countDown();}});chunkIndex++;}// 等待所有分片處理完成countDownLatch.await();} finally {// 關閉線程池executorService.shutdown();}return "文件已成功分片存儲,uploadId: " + uploadId;}
-
方案五
大對象(Large Object)方案
/*** 大對象(Large Object)方案** PostgreSQL 的大對象(Large Object)機制要求:* 二進制數據通過LargeObjectManager寫入,返回一個OID(數字類型的對象 ID)* 表中只存儲這個OID,而不是直接存儲二進制數據* 讀取時通過OID從大對象管理器中獲取數據* @param file* @return*/@Overridepublic String uploadLargeObjectFile(MultipartFile file) {if (file.isEmpty()) {return "請選擇文件";}try {long fileSize = file.getSize();String fileName = file.getOriginalFilename();long largeObjectId = postgresLargeObjectUtil.createLargeObject(file.getInputStream());FileUploadEntity fileUpload = new FileUploadEntity();fileUpload.setId(IdGenerator.nextId());fileUpload.setUploadId(String.valueOf(largeObjectId));fileUpload.setChunkSize(fileSize);fileUpload.setChunkNum(fileSize);fileUpload.setChunkFile(null);fileUpload.setChunkIndex(2);fileUploadMapper.insertLargeObjectFile(fileUpload);return "大文件上傳成功!文件名:" + fileName + ",大小:" + fileSize + "字節";}catch (Exception e) {log.error("上傳大文件失敗:{}", e);return "上傳失敗:" + e.getMessage();}}//下載@Overridepublic void downloadFile(Long fileId, HttpServletResponse response) {FileUploadEntity fileEntity = fileUploadMapper.getFileById(fileId);long oid = Long.valueOf( fileEntity.getUploadId());try {response.reset();response.setContentType("application/octet-stream");String filename = "fileName.zip";response.addHeader("Content-Disposition", "attachment; filename=" + URLEncoder.encode(filename, "UTF-8"));ServletOutputStream outputStream = response.getOutputStream();postgresLargeObjectUtil.readLargeObject(oid, outputStream);}catch (Exception e) {log.error("下載文件失敗:{}", e);}}
-
方案六
文件字節上傳
/*** 文件字節上傳* @param file* @return*/@Overridepublic String uploadFileByte(MultipartFile file) {if (file.isEmpty()) {return "請選擇文件";}try {// 獲取文件信息String fileName = file.getOriginalFilename();long fileSize = file.getSize();byte[] fileData = file.getBytes(); // 小文件直接獲取字節數組// 執行插入(大文件建議用流:file.getInputStream())String sql = "INSERT INTO system_upload_test (id, upload_id, chunk_size, chunk_num, chunk_file, chunk_index) VALUES (?, ?, ?, ?, ?, ?)";jdbcTemplate.update(sql,111L,"2222",222L,3L,fileData,33L);return "文件上傳成功!";} catch (Exception e) {e.printStackTrace();return "文件上傳失敗:" + e.getMessage();}}// 大文件用流:file.getInputStream()public String uploadBigFile(MultipartFile file) throws Exception {// 1. 定義 SQL(注意:字段順序和占位符對應)String sql = "INSERT INTO user_qgcgk_app.system_upload_test " +"(id, upload_id, chunk_size, chunk_num, chunk_file, chunk_index) " +"VALUES (?, ?, ?, ?, ?, ?)";// 2. 準備參數(確保 InputStream 未關閉)Long id = 1795166209435262976L;String uploadId = "3333";Long chunkSize = 7068L;Long chunkNum = 7068L;InputStream chunkInputStream = file.getInputStream(); // 你的 InputStream(如 FileInputStream、ServletInputStream)Integer chunkIndex = 2;try {// 3. 執行 SQL:通過 PreparedStatementSetter 手動綁定參數jdbcTemplate.update(sql, new PreparedStatementSetter() {@Overridepublic void setValues(PreparedStatement ps) throws SQLException {// 綁定非流參數(按順序,類型匹配)ps.setLong(1, id); // 第1個參數:id(Long)ps.setString(2, uploadId); // 第2個參數:upload_id(String)ps.setLong(3, chunkSize); // 第3個參數:chunk_size(Long)ps.setLong(4, chunkNum); // 第4個參數:chunk_num(Long)// 關鍵:綁定 InputStream 到 bytea 字段(第5個參數)// 第三個參數傳 -1 表示“未知流長度”,PostgreSQL 支持此模式ps.setBinaryStream(5, chunkInputStream, file.getSize());ps.setInt(6, chunkIndex); // 第6個參數:chunk_index(Int)}});} finally {// 4. 執行完成后關閉流,釋放資源if (chunkInputStream != null) {chunkInputStream.close();}}return "上傳成功!";}
- 方案七
無臨時文件+多線(減少IO操作)
/*** 無臨時文件+多線程+批量插入的分片上傳*/public String uploadChunkFile(MultipartFile file) throws Exception {// 生成唯一上傳IDString uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小異常,無法分片";}try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 批量插入緩沖區(每10個分片一批)List<FileUploadEntity> batchList = new ArrayList<>(10);// 計數器:等待所有批量任務完成CountDownLatch latch = new CountDownLatch((int) Math.ceil((double) totalChunks / 10));// 流式讀取并處理分片while ((bytesRead = inputStream.read(buffer)) != -1) {// 復制當前分片數據(避免buffer被覆蓋)byte[] chunkData = Arrays.copyOfRange(buffer, 0, bytesRead);// 創建分片實體FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(chunkIndex);batchList.add(entity);chunkIndex++;// 批量條件:滿10個分片或最后一個分片if (batchList.size() >= 10 || chunkIndex == totalChunks) {// 復制當前批次(避免線程安全問題)List<FileUploadEntity> currentBatch = new ArrayList<>(batchList);// 提交批量插入任務ThreadPoolUtils.getNewInstance().executor(() -> {try {fileUploadMapper.batchInsert(currentBatch);} finally {latch.countDown(); // 任務完成,計數器減1}});batchList.clear(); // 清空緩沖區}}// 等待所有批量任務完成(最多等待5分鐘)boolean allCompleted = latch.await(5, java.util.concurrent.TimeUnit.MINUTES);if (!allCompleted) {throw new BusinessException("文件分片上傳超時,請重試");}} catch (Exception e) {log.error("文件分片上傳失敗,uploadId:{}", uploadId, e);// 失敗時清理已上傳的分片(可選)
// fileUploadMapper.deleteByUploadId(uploadId);throw new BusinessException("文件分片上傳失敗:" + e.getMessage());}return "文件已成功分片存儲,uploadId: " + uploadId;}/*** 無臨時文件+多線程+單條插入的分片上傳*/public String uploadChunkFile(MultipartFile file) throws Exception {String uploadId = UUID.randomUUID().toString();long fileSize = file.getSize();long totalChunks = (long) Math.ceil((double) fileSize / CHUNK_SIZE);if (totalChunks <= 0) {return "文件大小異常,無法分片";}try (InputStream inputStream = file.getInputStream()) {byte[] buffer = new byte[(int) CHUNK_SIZE];int bytesRead;int chunkIndex = 0;// 用于等待所有分片完成CountDownLatch latch = new CountDownLatch((int) totalChunks);// 邊讀取邊提交分片任務,無需臨時文件while ((bytesRead = inputStream.read(buffer)) != -1) {// 復制當前分片數據(避免buffer被下一次read覆蓋)byte[] chunkData = Arrays.copyOfRange(buffer, 0, bytesRead);final int currentIndex = chunkIndex;// 提交異步任務ThreadPoolUtils.getNewInstance().executor(() -> {try {// 直接用內存中的分片數據寫入數據庫FileUploadEntity entity = new FileUploadEntity();entity.setId(IdGenerator.nextId());entity.setUploadId(uploadId);entity.setChunkSize((long) chunkData.length);entity.setChunkNum(totalChunks);entity.setChunkFile(chunkData);entity.setChunkIndex(currentIndex);fileUploadMapper.insertFile(entity);} catch (Exception e) {throw new RuntimeException("分片" + currentIndex + "存儲失敗", e);} finally {latch.countDown();}});chunkIndex++;}// 等待所有分片完成latch.await();} catch (Exception e) {log.error("文件分片上傳失敗", e);throw new BusinessException("文件分片上傳失敗");}return "文件已成功分片存儲,uploadId: " + uploadId;}
- 工具類PostgreSQL大對象工具類```java
import lombok.extern.slf4j.Slf4j;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;/*** PostgreSQL大對象工具類* @author: zrf* @date: 2025/08/25 16:09*/
@Slf4j
@Component
public class PostgresLargeObjectUtil {private final JdbcTemplate jdbcTemplate;public PostgresLargeObjectUtil(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 從輸入流創建大對象并返回OID*/@Transactionalpublic long createLargeObject(InputStream inputStream) throws SQLException {// 獲取數據庫連接并關閉自動提交Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {// 獲取PostgreSQL大對象管理器LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();// 創建大對象,返回OIDlong oid = lobjManager.createLO(LargeObjectManager.READ | LargeObjectManager.WRITE);// 打開大對象并寫入數據try (LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.WRITE)) {OutputStream outputStream = largeObject.getOutputStream();byte[] buffer = new byte[8192];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {outputStream.write(buffer, 0, bytesRead);}}connection.commit();return oid;} catch (Exception e) {connection.rollback();throw new SQLException("創建大對象失敗", e);} finally {connection.close();}}/*** 根據OID讀取大對象內容到輸出流*/public void readLargeObject(long oid, OutputStream outputStream) throws Exception {Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();try (LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ)) {InputStream inputStream = largeObject.getInputStream();byte[] buffer = new byte[8192];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {outputStream.write(buffer, 0, bytesRead);}}connection.commit();} catch (Exception e) {log.error("讀取大對象失敗", e);} finally {connection.close();}}/*** 刪除大對象(釋放磁盤空間)*/@Transactionalpublic void deleteLargeObject(long oid) throws SQLException {Connection connection = jdbcTemplate.getDataSource().getConnection();connection.setAutoCommit(false);try {LargeObjectManager lobjManager = connection.unwrap(org.postgresql.PGConnection.class).getLargeObjectAPI();lobjManager.delete(oid);connection.commit();} catch (Exception e) {connection.rollback();throw new SQLException("刪除大對象失敗", e);} finally {connection.close();}}
}
線程池工具類
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;/*** @Author:zrf* @Date:2023/8/14 10:05* @description:線程池工具類*/
public class ThreadPoolUtils {/*** 系統可用計算資源*/private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();/*** 核心線程數*/private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));/*** 最大線程數*/private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;/*** 空閑線程存活時間*/private static final int KEEP_ALIVE_SECONDS = 30;/*** 工作隊列*/private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128);/*** 工廠模式*/private static final MyThreadFactory MY_THREAD_FACTORY = new MyThreadFactory();/*** 飽和策略*/private static final ThreadRejectedExecutionHandler THREAD_REJECTED_EXECUTION_HANDLER = new ThreadRejectedExecutionHandler.CallerRunsPolicy();/*** 線程池對象*/private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;/*** 聲明式定義線程池工具類對象靜態變量,在所有線程中同步*/private static volatile ThreadPoolUtils threadPoolUtils = null;/*** 初始化線程池靜態代碼塊*/static {THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(//核心線程數CORE_POOL_SIZE,//最大線程數MAXIMUM_POOL_SIZE,//空閑線程執行時間KEEP_ALIVE_SECONDS,//空閑線程執行時間單位TimeUnit.SECONDS,//工作隊列(或阻塞隊列)POOL_WORK_QUEUE,//工廠模式MY_THREAD_FACTORY,//飽和策略THREAD_REJECTED_EXECUTION_HANDLER);}/*** 線程池工具類空參構造方法*/private ThreadPoolUtils() {}/*** 獲取線程池工具類實例* @return*/public static ThreadPoolUtils getNewInstance(){if (threadPoolUtils == null) {synchronized (ThreadPoolUtils.class) {if (threadPoolUtils == null) {threadPoolUtils = new ThreadPoolUtils();}}}return threadPoolUtils;}/*** 執行線程任務* @param runnable 任務線程*/public void executor(Runnable runnable) {THREAD_POOL_EXECUTOR.execute(runnable);}/*** 執行線程任務-有返回值* @param callable 任務線程*/public <T> Future<T> submit(Callable<T> callable) {return THREAD_POOL_EXECUTOR.submit(callable);}/*** 提交批量任務并等待所有任務完成* @param totalTasks 總任務數量* @param taskConsumer 任務消費者(接收任務索引,處理具體任務邏輯)* @throws InterruptedException 等待被中斷時拋出*/public void submitBatchTasks(int totalTasks, Consumer<Integer> taskConsumer) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(totalTasks);for (int i = 0; i < totalTasks; i++) {final int taskIndex = i;// 使用現有線程池提交任務THREAD_POOL_EXECUTOR.submit(() -> {try {taskConsumer.accept(taskIndex); // 執行具體任務邏輯} finally {countDownLatch.countDown(); // 任務完成,計數器減1}});}countDownLatch.await(); // 等待所有任務完成}/*** 獲取線程池狀態* @return 返回線程池狀態*/public boolean isShutDown(){return THREAD_POOL_EXECUTOR.isShutdown();}/*** 停止正在執行的線程任務* @return 返回等待執行的任務列表*/public List<Runnable> shutDownNow(){return THREAD_POOL_EXECUTOR.shutdownNow();}/*** 關閉線程池*/public void showDown(){THREAD_POOL_EXECUTOR.shutdown();}/*** 關閉線程池后判斷所有任務是否都已完成* @return*/public boolean isTerminated(){return THREAD_POOL_EXECUTOR.isTerminated();}
}