背景
筆者有一個需求是把將近一億條數據上傳到FTP服務器中,這些數據目前是存儲在mysql中,是通過關聯幾張表查詢出來的,查詢出來的數據結果集一共是6個字段。要求傳輸的時候拆分成一個個小文件,每個文件大小不能超過500M。我的測試思路是對測試數據進行分頁查詢,比如每次分頁查詢10萬條數據,寫入到一個txt格式的文件中,攢到50萬條數據時,把這個txt文件上傳到Ftp中(粗略估算了一下,每個字段長度假設不超過255,),這就是一個小文件的上傳。
一、windows下FTP的安裝
筆者的開發環境是windows11,所以必須要搭建一個FTP環境以供測試使用
配置IIS web服務器
打開運行窗口【win+R】快捷鍵,輸入 optionalfeatures
后點擊確定:
在出來的彈框中找到Internet信息服務,并打開勾選以下配置 ,點擊確定,等待windows系統自行添加相關應用配置
配置IIS web站點
現在本地磁盤創建一個FtpServer
空文件夾
然后查看本機IP地址
打開運行【win+R】窗口輸入cmd回車
然后輸入ipconfig 查看IP
筆者本機連接的是無線網絡,如果是連接的有線網絡,則需要找對應的以太網適配器連接配置
接著 在開始欄中搜索 IIS 并點擊進入IIS管理器
打開后在左側 “網站” 右鍵菜單 打開 “添加FTP站點”
主要是填寫FTP站點名稱和服務的物理路徑
點擊下一頁,填寫本機當前網絡的ip地址
再點下一頁完成身份驗證和授權信息
點擊完成后,ftp服務器的windows搭建就結束了
打開防火墻,把以下服務勾選上
建立 FTP 服務之后,默認登陸 FTP 服務器的賬號和密碼就是本機 Administrator 的賬戶和密碼,但是筆者不記得密碼了,所以創建一個用戶來管理FTP登錄
此電腦->右擊->顯示更多選項->單擊管理->本地用戶和用戶組->用戶->右擊創建新用戶
ftp用戶名和密碼記好了
再在開始菜單找到IIS服務,點擊FTP授權規則
右擊編輯權限
點擊添加
輸入剛才創建的ftp用戶名稱,點擊檢查名稱
把下面的權限都勾選上,點擊確定
回到 Internet Information Services (IIS) 管理器,雙擊剛才選中的 “FTP授權規則”,點擊右側的"添加允許規則"
然后別忘了啟動ftp,右擊管理ftp站點,啟動
登錄ftp
地址是ftp://192.168.1.105,進入此電腦,輸入地址回車
輸入用戶名和密碼可以登錄
至于瀏覽器訪問,這在很早之前是可以的,但是后來各大瀏覽器廠商都禁止使用瀏覽器訪問ftp資源,這里也就作罷了
更換ftp的ip
當本機網絡環境發生改變時,比如無線網環境變了,導致ip地址變了,那么之前設置好的ip地址就失效了,ftp無法連接。
點開IIS管理器,點擊綁定
點擊編輯,修改IP地址即可
二、java連接ftp服務器
筆者使用java語言,所以給出springboot框架下訪問ftp的方法
首先引入pom依賴 Apache Commons net
<dependency><groupId>commons-net</groupId><artifactId>commons-net</artifactId><version>3.10.0</version> <!-- 或者使用最新的版本 -->
</dependency>
我這里使用的是最新版,jdk21,可以根據自己的jdk版本適當降低版本,不報錯就可以
FTP連接工具類
package com.execute.batch.executebatch.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;import java.io.*;
import java.time.Duration;/*** FTP工具類* @author hulei*/
@Slf4j
public class FtpUtil {/*** 上傳文件到FTP服務器的根目錄** @param host FTP服務器地址* @param port FTP服務器端口號,默認為21* @param username 用戶名* @param password 密碼* @param localFile 本地要上傳的文件* @return 成功返回true,否則返回false*/public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {FTPClient ftpClient = null;FileInputStream fis = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);ftpClient.setConnectTimeout(1000000000);Duration timeout = Duration.ofSeconds(1000000000);ftpClient.setDataTimeout(timeout);String remoteFileName = localFile.getName();fis = new FileInputStream(localFile);return ftpClient.storeFile(remoteFileName, fis);} catch (IOException e) {log.error("上傳文件失敗", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);if(fis != null){try {fis.close();} catch (IOException e) {log.error("關閉文件流失敗", e);}}}}/*** 上傳文件到FTP服務器的指定路徑** @param host FTP服務器地址* @param port FTP服務器端口號,默認為21* @param username 用戶名* @param password 密碼* @param remotePath FTP服務器上的目標路徑* @param localFile 本地要上傳的文件* @return 成功返回true,否則返回false*/public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {FTPClient ftpClient = null;FileInputStream fis = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);ftpClient.setConnectTimeout(1000000000);Duration timeout = Duration.ofSeconds(1000000000);ftpClient.setDataTimeout(timeout);createRemoteDirectories(ftpClient, remotePath);String remoteFileName = localFile.getName();String fullRemotePath = remotePath + "/" + remoteFileName;fis = new FileInputStream(localFile);return ftpClient.storeFile(fullRemotePath, fis);} catch (IOException e) {log.error("上傳文件失敗", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);if(fis != null){try {fis.close();} catch (IOException e) {log.error("關閉文件流失敗", e);}}}}/*** 在FTP服務器上創建指定路徑所需的所有目錄** @param ftpClient FTP客戶端* @param remotePath 需要創建的遠程路徑* @throws IOException 如果在創建目錄時發生錯誤*/private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {String[] directories = remotePath.split("/");String currentPath = "";for (String dir : directories) {if (!dir.isEmpty()) {currentPath += "/" + dir;if (!ftpClient.changeWorkingDirectory(currentPath)) {if (!ftpClient.makeDirectory(dir)) {throw new IOException("無法創建遠程目錄: " + currentPath);}ftpClient.changeWorkingDirectory(dir);}}}}/*** 連接到FTP服務器并登錄。** @param host FTP服務器的主機名或IP地址。* @param port FTP服務器的端口號。* @param username 登錄FTP服務器的用戶名。* @param password 登錄FTP服務器的密碼。* @return 成功連接并登錄后返回一個FTPClient實例,可用于后續操作。* @throws IOException 如果連接或登錄過程中遇到任何網絡問題,則拋出IOException。*/private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {FTPClient ftpClient = new FTPClient();ftpClient.connect(host, port);ftpClient.login(username, password);int replyCode = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(replyCode)) {throw new IOException("連接FTP服務器失敗");}return ftpClient;}/*** 斷開與FTP服務器的連接。* 該方法首先檢查FTP客戶端是否已連接到服務器。如果已連接,則嘗試登出,* 如果登出失敗,記錄錯誤信息。接著嘗試斷開與服務器的連接,如果斷開失敗,同樣記錄錯誤信息。** @param ftpClient 與FTP服務器交互的客戶端對象。*/private static void disconnect(FTPClient ftpClient) {if (ftpClient.isConnected()) {try {ftpClient.logout();} catch (IOException ioe) {log.error("登出FTP服務器失敗", ioe);}try {ftpClient.disconnect();} catch (IOException ioe) {log.error("斷開FTP服務器連接失敗", ioe);}}}/*** 設置FTP客戶端的文件傳輸類型為二進制。* 這個方法嘗試將FTP文件傳輸類型設置為BINARY,這是進行二進制文件傳輸的標準方式。* 如果設置失敗,會拋出一個運行時異常。** @param ftpClient 用于文件傳輸的FTP客戶端實例。* @throws RuntimeException 如果設置文件傳輸類型為二進制時發生IOException異常。*/private static void setBinaryFileType(FTPClient ftpClient) {try {ftpClient.setFileType(FTP.BINARY_FILE_TYPE);} catch (IOException e) {throw new RuntimeException("設置傳輸二進制文件失敗", e);}}}
主要提供了兩個方法uploadFileToRoot
和uploadFileToPath
,前者是上傳到ftp服務器根目錄下,后者上傳到指定目錄下,其中的連接時間設置的有點夸張,主要是傳輸時間長、數據量大,害怕斷開。
注意:所有涉及到操作文件的流,包括輸入流和輸出流,使用完了,要及時關閉,否則占用資源不說,還會導致臨時生成的文件無法刪除。
筆者在ftp服務器下新建了一個文件,測試上傳一個txt格式的文本文件,一個上傳到根目錄下,一個上傳到newFile
文件夾里
測試用例代碼
package com.execute.batch.executebatch.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;/*** FTP工具類* @author hulei*/
@Slf4j
public class FtpUtil {private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {FTPClient ftpClient = new FTPClient();ftpClient.connect(host, port);ftpClient.login(username, password);int replyCode = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(replyCode)) {throw new IOException("連接FTP服務器失敗");}return ftpClient;}private static void disconnect(FTPClient ftpClient) {if (ftpClient.isConnected()) {try {ftpClient.logout();} catch (IOException ioe) {log.error("登出FTP服務器失敗", ioe);}try {ftpClient.disconnect();} catch (IOException ioe) {log.error("斷開FTP服務器連接失敗", ioe);}}}private static void setBinaryFileType(FTPClient ftpClient) {try {ftpClient.setFileType(FTP.BINARY_FILE_TYPE);} catch (IOException e) {throw new RuntimeException("設置傳輸二進制文件失敗", e);}}/*** 上傳文件到FTP服務器的根目錄* @param host FTP服務器地址* @param port FTP服務器端口號,默認為21* @param username 用戶名* @param password 密碼* @param localFile 本地要上傳的文件* @return 成功返回true,否則返回false*/public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {FTPClient ftpClient = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);String remoteFileName = localFile.getName();return ftpClient.storeFile(remoteFileName, new FileInputStream(localFile));} catch (IOException e) {log.error("上傳文件失敗", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);}}/*** 上傳文件到FTP服務器的指定路徑* @param host FTP服務器地址* @param port FTP服務器端口號,默認為21* @param username 用戶名* @param password 密碼* @param remotePath FTP服務器上的目標路徑* @param localFile 本地要上傳的文件* @return 成功返回true,否則返回false*/public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {FTPClient ftpClient = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);createRemoteDirectories(ftpClient, remotePath);String remoteFileName = localFile.getName();String fullRemotePath = remotePath + "/" + remoteFileName;return ftpClient.storeFile(fullRemotePath, new FileInputStream(localFile));} catch (IOException e) {log.error("上傳文件失敗", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);}}/*** 在FTP服務器上創建指定路徑所需的所有目錄* @param ftpClient FTP客戶端* @param remotePath 需要創建的遠程路徑* @throws IOException 如果在創建目錄時發生錯誤*/private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {String[] directories = remotePath.split("/");String currentPath = "";for (String dir : directories) {if (!dir.isEmpty()) {currentPath += "/" + dir;if (!ftpClient.changeWorkingDirectory(currentPath)) {if (!ftpClient.makeDirectory(dir)) {throw new IOException("無法創建遠程目錄: " + currentPath);}ftpClient.changeWorkingDirectory(dir);}}}}
}
執行后查看ftp服務器
發現根目錄下和文件夾下都有上傳的文件了
批量數據生成
筆者這里只模擬生成500萬條數據,供測試使用
批處理工具類
package com.execute.batch.executebatch.utils;import jakarta.annotation.Resource;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import java.util.List;
import java.util.function.BiFunction;@Component
public class BatchInsertUtil {@Resourceprivate final SqlSessionFactory sqlSessionFactory;public BatchInsertUtil(SqlSessionFactory sqlSessionFactory) {this.sqlSessionFactory = sqlSessionFactory;}/*** 批量插入數據* @param entityList 待插入的數據列表* @param mapperClass 映射器接口的Class對象*/@SuppressWarnings("all")public <T,U,R> int batchInsert(List<T> entityList, Class<U> mapperClass, BiFunction<T,U,R> function) {int i = 1;SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);try {U mapper = sqlSession.getMapper(mapperClass);for (T entity : entityList) {function.apply(entity,mapper);i++;}sqlSession.flushStatements();sqlSession.commit();} catch (Exception e) {throw new RuntimeException("批量插入數據失敗", e);}finally {sqlSession.close();}return i-1;}
}
跑批數據
package com.execute.batch.executebatch;import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.BatchInsertUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** DataSeeder 批量生成數據* @author hulei*/
@Component
public class DataSeeder implements CommandLineRunner {@Resourceprivate ApplicationContext applicationContext;private ExecutorService executorService;private static final int TOTAL_RECORDS = 5000000;private static final int BATCH_SIZE = 10000;private static final int THREAD_POOL_SIZE = 10;@PostConstructpublic void init() {executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);}@Overridepublic void run(String... args) {long startTime = System.currentTimeMillis();List<Runnable> tasks = new ArrayList<>();for (int i = 0; i < TOTAL_RECORDS; i += BATCH_SIZE) {int finalI = i;tasks.add(() -> insertBatch(finalI, BATCH_SIZE));}tasks.forEach(executorService::execute);executorService.shutdown();long endTime = System.currentTimeMillis();System.out.println("Total time taken: " + (endTime - startTime) / 1000 + " seconds.");}public void insertBatch(int startId, int batchSize) {List<User> batch = new ArrayList<>(batchSize);Random random = new Random();for (int i = 0; i < batchSize; i++) {User user = createUser(startId + i, random);batch.add(user);System.out.println(user);}BatchInsertUtil util = new BatchInsertUtil(applicationContext.getBean(SqlSessionFactory.class));util.batchInsert(batch, UserMapper.class, (item,mapper)-> mapper.insertBatch(item));}private User createUser(int id, Random random) {User user = new User();user.setId(id);user.setName("User" + id);user.setEmail("user" + id + "@example.com");user.setAge(random.nextInt(80) + 20); // 年齡在20到99之間user.setAddress("Address" + id);user.setPhoneNumber("1234567890"); // 簡化處理,實際應生成隨機電話號碼return user;}
}
整個生成過程是十分漫長的,40分鐘左右,數據查詢結果生成了500萬條數據
測試上傳ftp
下面展示的兩個是mybatis手動分頁的寫法,如果有其他查詢參數,則可以建一個實體類,把rowbounds參數囊括進去作為一個屬性即可
mapper接口層
mybatis的xml
測試用例
package com.execute.batch.executebatch.controller;import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.FtpUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.RowBounds;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;/*** @author hulei* @date 2024/5/22 10:42*/@RestController
@RequestMapping("/FTP")
@Slf4j
public class FTPController {@Resourceprivate UserMapper userMapper;private final Object lock = new Object();@GetMapping(value = "/upload")public void upload() throws InterruptedException {String host = "192.168.1.103";int port = 21; // 默認FTP端口String username = "hulei";String password = "hulei";int pageSize = 450000;int offset = 0;int uploadCycle = 0;int totalUploaded = 0;boolean noData = false;while (true) {uploadCycle++;// 將查詢結果處理并寫入本地文件File tempFile = new File("D:/FTPFile", "user_data_" + uploadCycle + ".txt");while (true) {RowBounds rowBounds = new RowBounds(offset, pageSize);List<User> list = userMapper.queryBatch(rowBounds);if (!list.isEmpty()) {MultiThreadWriteToFile(list, tempFile, getConsumer());offset += pageSize;totalUploaded += list.size();}if (list.isEmpty()) {noData = true;break;}// 檢查總數據量是否達到500000,如果達到則上傳文件if (totalUploaded >= 600000) {break;}}// 上傳本地文件到FTP服務器if(!tempFile.exists()){break;}boolean uploadSuccess = FtpUtil.uploadFileToRoot(host, port, username, password, tempFile);if (uploadSuccess) {System.out.println("文件上傳成功");} else {System.out.println("文件上傳失敗");}System.out.println("上傳完成,已上傳" + uploadCycle + "個批次");totalUploaded = 0;if (noData) {break;}}}private <T> void MultiThreadWriteToFile(List<T> list, File tempFile, BiConsumer<BufferedWriter, T> writeItemConsumer) throws InterruptedException {Path filePath = tempFile.toPath(); // 將文件對象轉換為路徑對象,用于后續的文件寫入操作。try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8,StandardOpenOption.CREATE, // 如果文件不存在則創建StandardOpenOption.WRITE, // 打開文件進行寫入StandardOpenOption.APPEND)) { // 追加模式寫入,而不是覆蓋 // 使用 UTF-8 編碼打開文件緩沖寫入器。ExecutorService executor = Executors.newFixedThreadPool(10); // 創建一個固定大小的線程池,包含10個線程。BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(list.size()); // 創建一個阻塞隊列,用于存儲要處理的任務索引。for (int i = 0; i < list.size(); i++) { // 預填充任務隊列,為每個列表元素創建一個任務。taskQueue.add(i);}for (int i = 0; i < list.size(); i++) { // 提取隊列中的索引,并提交相應的任務給線程池執行。int index = taskQueue.take();executor.submit(() -> writeItemConsumer.accept(writer, list.get(index)));}executor.shutdown(); // 關閉線程池,等待所有任務完成。boolean terminated = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 等待線程池中的所有任務完成。if (!terminated) { // 如果線程池在指定時間內未能關閉,則記錄警告信息。log.warn("線程池關閉超時");}} catch (IOException e) { // 捕獲并記錄文件操作相關的異常。log.error("創建或寫入文件發生錯誤: {},異常為: {}", tempFile.getAbsolutePath(), e.getMessage());}}private BiConsumer<BufferedWriter, User> getConsumer() {return (writer, item) -> {String str = String.join("|",String.valueOf(item.getId()),item.getName(),item.getEmail(),String.valueOf(item.getAge()),item.getAddress(),item.getPhoneNumber());log.info("告警入湖數據拼接字符串:{}", str);try {synchronized (lock) {writer.write(str);writer.newLine();}} catch (IOException e) {log.error("寫入告警入湖數據發生異常: {}", e.getMessage());}};}
}
簡單分析下:分頁查詢數據,每次查詢pageSize條數據,寫入一個txt文件,當寫入的總條數超過totalUpload時,就跳出內部while循環,上傳當前txt文件。然后進入第二次外層while循環,創建第二個txt文件,內部循環分頁查詢數據寫入第二個txt文件。。。以此類推,直至最后查不出數據為止。
注意:pageSize和totalUpload最好是倍數關系,比如pageSize = 50000,那么totalUpload最好是pageSize 的整數倍,如100000,150000,200000,這樣可以保證當文件數較多時,大部分的文件中數據條數一樣。
以下是我分批上傳到ftp服務器的文件,一共500萬條數據,字段做了處理,使用 | 拼接
寫入的數據是亂序的,要求順序寫入的話,就不要使用多線程了。