HDFS (Hadoop Distributed File System) 作為大數據生態的核心存儲系統,提供了分布式、高容錯、高吞吐量的數據存儲能力。通過 Java API 操作 HDFS 是開發大數據應用的基礎技能。本文將基于你的筆記,詳細解析 HDFS Java API 的使用方法,并提供完整的代碼示例和最佳實踐。
一、前置準備與環境配置
1. 添加 Maven 依賴
在 pom.xml
中添加 Hadoop 客戶端依賴:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version>
</dependency>
推薦使用與生產環境一致的 Hadoop 版本(如 3.2.4)。
2. 初始化 FileSystem 對象
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.After;
import java.net.URI;public class HdfsTest {private FileSystem fileSystem;private Configuration configuration;public static final String HDFS_ENPOINT = "hdfs://ip:8020";public static final String basePath = "/test/java_api";@Beforepublic void preEnv() throws Exception {// 設置 Hadoop 用戶和路徑System.setProperty("HADOOP_USER_NAME", "hadoop");System.setProperty("HADOOP_HOME", "D:\\Program Files\\hadoop-3.0.0");System.setProperty("hadoop.home.dir", "D:\\document");// 配置 HDFS 連接configuration = new Configuration();fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration);}@Afterpublic void afterEnv() throws Exception {if (fileSystem != null) {fileSystem.close();}}
}
二、核心文件操作
1. 創建目錄
@Test
public void mkdir() throws Exception {boolean result = fileSystem.mkdirs(new Path(basePath));System.out.println("創建目錄: " + (result ? "成功" : "失敗"));
}
2. 創建文件并寫入內容
@Test
public void create() throws Exception {Path filePath = new Path(basePath + "/test.txt");try (FSDataOutputStream outputStream = fileSystem.create(filePath)) {outputStream.write("Java operation hdfs!".getBytes());System.out.println("文件創建成功: " + filePath);}
}
3. 查看文件內容
@Test
public void cat() throws Exception {Path filePath = new Path(basePath + "/test.txt");try (FSDataInputStream inputStream = fileSystem.open(filePath)) {IOUtils.copyBytes(inputStream, System.out, 1024, false);}
}
4. 文件重命名
@Test
public void rename() throws Exception {Path srcPath = new Path(basePath + "/test.txt");Path dstPath = new Path(basePath + "/renameTest.txt");boolean result = fileSystem.rename(srcPath, dstPath);System.out.println("重命名文件: " + (result ? "成功" : "失敗"));
}
三、文件上傳與下載
1. 本地上傳至 HDFS
@Test
public void copyFromLocalFile() throws Exception {Path localPath = new Path("D:\\document\\slow_query.log");Path hdfsPath = new Path(basePath);fileSystem.copyFromLocalFile(localPath, hdfsPath);System.out.println("文件上傳成功: " + localPath);
}
2. 帶進度條的大文件上傳
@Test
public void copyFromLocalFileWithProgress() throws Exception {Path hdfsPath = new Path(basePath + "/slow_query.log");File localFile = new File("D:\\document\\slow_query.log");try (InputStream is = new BufferedInputStream(Files.newInputStream(localFile.toPath()));FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {long totalBytes = localFile.length();long bytesWritten = 0;int lastProgress = 0;@Overridepublic void progress() {bytesWritten += 4096;int progress = (int) ((bytesWritten * 100) / totalBytes);if (progress > lastProgress) {System.out.print("\r上傳進度: " + progress + "%");lastProgress = progress;}}})) {IOUtils.copyBytes(is, outputStream, 4096, false);System.out.println("\n文件上傳完成: " + hdfsPath);}
}
3. 從 HDFS 下載到本地
@Test
public void copyToLocalFile() throws Exception {Path srcPath = new Path(basePath + "/renameTest.txt");Path dstPath = new Path("E://test.txt");// 使用原始本地文件系統,避免 Windows 兼容性問題fileSystem.copyToLocalFile(false, srcPath, dstPath, true);System.out.println("文件下載成功: " + dstPath);
}
四、文件管理與元數據操作
1. 查看文件列表
@Test
public void listFiles() throws Exception {Path dirPath = new Path(basePath);RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(dirPath, true);System.out.println("文件列表:");while (iterator.hasNext()) {LocatedFileStatus status = iterator.next();System.out.printf("%s\t%s\t%d\t%d\t%s%n",status.isDirectory() ? "文件夾" : "文件",status.getReplication(),status.getBlockSize(),status.getLen(),status.getPath());}
}
2. 刪除文件/目錄
@Test
public void delete() throws Exception {Path path = new Path(basePath + "/slow_query.log");boolean result = fileSystem.delete(path, false); // 非遞歸刪除System.out.println("刪除文件: " + (result ? "成功" : "失敗"));
}
五、高級特性與最佳實踐
1. 設置文件副本系數
@Test
public void setReplication() throws Exception {Path filePath = new Path(basePath + "/test.txt");short replication = 2; // 設置副本數為2boolean result = fileSystem.setReplication(filePath, replication);System.out.println("設置副本系數: " + (result ? "成功" : "失敗"));
}
2. 檢查文件是否存在
@Test
public void checkFileExists() throws Exception {Path filePath = new Path(basePath + "/test.txt");boolean exists = fileSystem.exists(filePath);System.out.println("文件是否存在: " + exists);
}
3. 獲取文件塊位置信息
@Test
public void getFileBlockLocations() throws Exception {Path filePath = new Path(basePath + "/test.txt");FileStatus fileStatus = fileSystem.getFileStatus(filePath);BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());System.out.println("文件塊位置信息:");for (BlockLocation location : locations) {System.out.printf("塊偏移量: %d, 長度: %d, 所在節點: %s%n",location.getOffset(),location.getLength(),Arrays.toString(location.getHosts()));}
}
六、常見問題與解決方案
1. Windows 環境兼容性問題
- 錯誤信息:
java.io.IOException: Could not locate executable
- 解決方案:
- 設置
HADOOP_HOME
和hadoop.home.dir
系統變量 - 下載 Windows 版 Hadoop 二進制文件(如 winutils.exe)
- 設置
2. 權限不足問題
- 錯誤信息:
Permission denied: user=xxx, access=WRITE
- 解決方案:
// 在創建 FileSystem 時指定用戶 fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration, "hdfs");
3. 大文件上傳性能優化
- 使用帶進度反饋的上傳方法
- 調整緩沖區大小:
FSDataOutputStream outputStream = fileSystem.create(filePath, true, 8192); // 8KB 緩沖區
七、完整示例代碼
下面是一個整合所有操作的完整示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import java.io.*;
import java.net.URI;
import java.nio.file.Files;public class HdfsOperationExample {private FileSystem fileSystem;private Configuration configuration;public static final String HDFS_ENPOINT = "hdfs://ip:8020";public static final String basePath = "/test/java_api";@Beforepublic void preEnv() throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");configuration = new Configuration();fileSystem = FileSystem.get(new URI(HDFS_ENPOINT), configuration);}@Afterpublic void afterEnv() throws Exception {if (fileSystem != null) {fileSystem.close();}}@Testpublic void testAllOperations() throws Exception {// 創建目錄mkdir();// 創建文件并寫入內容create();// 查看文件內容System.out.println("\n文件內容:");cat();// 重命名文件rename();// 上傳本地文件copyFromLocalFile();// 帶進度條的上傳copyFromLocalFileWithProgress();// 查看文件列表System.out.println("\n文件列表:");listFiles();// 獲取文件塊位置System.out.println("\n文件塊位置:");getFileBlockLocations();// 刪除文件delete();}// 其他方法實現...
}
八、性能調優建議
- 批量操作:避免頻繁創建和關閉 FileSystem 對象
- 緩沖區設置:
// 增大緩沖區提高寫入性能 FSDataOutputStream outputStream = fileSystem.create(filePath, true, 65536); // 64KB
- 異步操作:對于大文件上傳,考慮使用異步回調機制
- 連接池:在生產環境中使用連接池管理 FileSystem 實例
通過本文的示例,你可以全面掌握 HDFS Java API 的使用方法。在實際開發中,建議根據業務需求選擇合適的 API,并注意處理異常和資源釋放,以確保程序的健壯性和性能。