一、獲取文件或目錄
1. 獲取某個目錄下的文件
// 必須的依賴
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}// 獲取某個目錄下的文件路徑
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 獲取文件系統val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意這里用 URI 讓 Hadoop 根據 scheme 找對應 FS// 遞歸獲取該目錄下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 獲取文件路徑val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 關閉文件系統fs.close()// 返回結果buffer.toArray
}// 設定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") // 讀取oss的路徑// 需要指定的路徑
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)
2.? 獲取某個目錄下的子目錄
import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}/**
* 獲取某個目錄下所有子目錄的路徑, 以字符串數組的形式返回
*/
def getOnlineFirstDir: Array[String] = {// 獲取路徑val path = s"s3://aa/bb/"val filePath = new org.apache.hadoop.fs.Path( path )// 獲取文件系統val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取所有子目錄的路徑val allFiles = FileUtil.stat2Paths( fileSystem.listStatus( filePath ) )val res = allFiles.filter( fileSystem.getFileStatus( _ ).isDirectory() ).map( _.toString)// 返回結果res
}
二、刪除文件或目錄
/*** 刪除目錄*/
def deletePath(spark: SparkSession, path: String): Unit = {// 1 獲取文件系統val file_path = new org.apache.hadoop.fs.Path( path )val file_system = file_path.getFileSystem( spark.sparkContext.hadoopConfiguration )// 2 判斷路徑存在時, 則刪除if (file_system.exists( file_path )) {file_system.delete( file_path, true )}
}
三、獲取文件或目錄大小
/*** 獲取某個目錄的大小(單位b字節),注意:只能在driver端使用,可以多線程來提速。*/
def get_path_size(spark: SparkSession, path: String): Long = {//取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取該目錄的大小,單位是字節if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}
}
四、判讀文件或目錄是否存在
方式一
/*** 判斷目錄是否存在,注意:只能在driver端使用,可以多線程來提速。問題: 對刪除過的目錄可能會誤判*/
def pathIsExist(spark: SparkSession, path: String): Boolean = {//取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 判斷路徑是否存在fileSystem.exists( filePath )
}方式二
/*** 通過目錄是否大于0來判斷目錄是否存在(消除對刪除過的目錄的誤判),注意:只能在driver端使用,可以多線程來提速。*/
def def pathIsExist(spark: SparkSession, path: String): Boolean = //取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取該目錄的大小,單位是字節val size = if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}// 返回結果size > 0}
五、parquet文的行組信息
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.column.statistics.Statistics
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.Binaryimport java.{lang, util}// 獲取某個目錄下的文件路徑
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 獲取文件系統val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意這里用 URI 讓 Hadoop 根據 scheme 找對應 FS// 遞歸獲取該目錄下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 獲取文件路徑val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 關閉文件系統fs.close()// 返回結果buffer.toArray
}// 某個文件某列的行組信息
def print_row_groupp(conf: Configuration, file_name: String, col_name: String): Unit = {// 讀取元數據val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 遍歷每個行組,并手動添加索引val blocks: util.List[BlockMetaData] = footer.getBlocksfor (i <- 0 until blocks.size()) {val block = blocks.get(i)println(s"Row Group #${i}:")println(s" - Total Rows: ${block.getRowCount}")println(s" - Total Size: ${block.getTotalByteSize} bytes")// 遍歷每個列塊block.getColumns.forEach { columnChunkMetaData =>val columnPath = columnChunkMetaData.getPath.toDotString// 過濾目標列if (columnPath == col_name) {val statistics: Statistics[_] = columnChunkMetaData.getStatisticsprintln(s" Column: $columnPath")if (statistics != null) {// 獲取最小值和最大值并解碼val minValue = statistics.genericGetMin match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}val maxValue = statistics.genericGetMax match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}println(s" - Min Value: $minValue")println(s" - Max Value: $maxValue")println(s" - Null Count: ${statistics.getNumNulls}")} else {println(" - No statistics available for this column.")}println(" ------")}}println("======================")}}// 某個文件的行組數
def get_row_group_size(conf: Configuration, file_name: String): Int = {// 讀取元數據val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 行組數footer.getBlocks.size()
}// 設定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")// 需要指定的路徑
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)// 獲取第一個文件的行組信息
val first_file = file_paths(0)
print_row_groupp(conf, first_file, "odid")// 統計行組數
for (file_path <- file_paths) {val file_index = file_path.split("part-")(1).split("-")(0)println(file_index + " = " + get_row_group_size(conf, file_path))
}