hadoop的api操作對象存儲

一、獲取文件或目錄

1. 獲取某個目錄下的文件


// 必須的依賴
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}// 獲取某個目錄下的文件路徑
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 獲取文件系統val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意這里用 URI 讓 Hadoop 根據 scheme 找對應 FS// 遞歸獲取該目錄下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 獲取文件路徑val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 關閉文件系統fs.close()// 返回結果buffer.toArray
}// 設定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") // 讀取oss的路徑// 需要指定的路徑
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)

2.? 獲取某個目錄下的子目錄

import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}/**
* 獲取某個目錄下所有子目錄的路徑, 以字符串數組的形式返回
*/
def getOnlineFirstDir: Array[String] = {// 獲取路徑val path = s"s3://aa/bb/"val filePath = new org.apache.hadoop.fs.Path( path )// 獲取文件系統val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取所有子目錄的路徑val allFiles = FileUtil.stat2Paths( fileSystem.listStatus( filePath ) )val res = allFiles.filter( fileSystem.getFileStatus( _ ).isDirectory() ).map( _.toString)// 返回結果res
}

二、刪除文件或目錄

/*** 刪除目錄*/
def deletePath(spark: SparkSession, path: String): Unit = {// 1 獲取文件系統val file_path = new org.apache.hadoop.fs.Path( path )val file_system = file_path.getFileSystem( spark.sparkContext.hadoopConfiguration )// 2 判斷路徑存在時, 則刪除if (file_system.exists( file_path )) {file_system.delete( file_path, true )}
}

三、獲取文件或目錄大小

/*** 獲取某個目錄的大小(單位b字節),注意:只能在driver端使用,可以多線程來提速。*/
def get_path_size(spark: SparkSession, path: String): Long = {//取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取該目錄的大小,單位是字節if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}
}

四、判讀文件或目錄是否存在

方式一
/*** 判斷目錄是否存在,注意:只能在driver端使用,可以多線程來提速。問題: 對刪除過的目錄可能會誤判*/
def pathIsExist(spark: SparkSession, path: String): Boolean = {//取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 判斷路徑是否存在fileSystem.exists( filePath )
}方式二
/*** 通過目錄是否大于0來判斷目錄是否存在(消除對刪除過的目錄的誤判),注意:只能在driver端使用,可以多線程來提速。*/
def def pathIsExist(spark: SparkSession, path: String): Boolean = //取文件系統val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 獲取該目錄的大小,單位是字節val size = if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}// 返回結果size > 0}

五、parquet文的行組信息


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.column.statistics.Statistics
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.Binaryimport java.{lang, util}// 獲取某個目錄下的文件路徑
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 獲取文件系統val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意這里用 URI 讓 Hadoop 根據 scheme 找對應 FS// 遞歸獲取該目錄下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 獲取文件路徑val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 關閉文件系統fs.close()// 返回結果buffer.toArray
}// 某個文件某列的行組信息
def print_row_groupp(conf: Configuration, file_name: String, col_name: String): Unit = {// 讀取元數據val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 遍歷每個行組,并手動添加索引val blocks: util.List[BlockMetaData] = footer.getBlocksfor (i <- 0 until blocks.size()) {val block = blocks.get(i)println(s"Row Group #${i}:")println(s"  - Total Rows: ${block.getRowCount}")println(s"  - Total Size: ${block.getTotalByteSize} bytes")// 遍歷每個列塊block.getColumns.forEach { columnChunkMetaData =>val columnPath = columnChunkMetaData.getPath.toDotString// 過濾目標列if (columnPath == col_name) {val statistics: Statistics[_] = columnChunkMetaData.getStatisticsprintln(s"  Column: $columnPath")if (statistics != null) {// 獲取最小值和最大值并解碼val minValue = statistics.genericGetMin match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}val maxValue = statistics.genericGetMax match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}println(s"    - Min Value: $minValue")println(s"    - Max Value: $maxValue")println(s"    - Null Count: ${statistics.getNumNulls}")} else {println("    - No statistics available for this column.")}println("    ------")}}println("======================")}}// 某個文件的行組數
def get_row_group_size(conf: Configuration, file_name: String): Int = {// 讀取元數據val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 行組數footer.getBlocks.size()
}// 設定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")// 需要指定的路徑
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)// 獲取第一個文件的行組信息
val first_file = file_paths(0)
print_row_groupp(conf, first_file, "odid")// 統計行組數
for (file_path <- file_paths) {val file_index = file_path.split("part-")(1).split("-")(0)println(file_index + " = " + get_row_group_size(conf, file_path))
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/96398.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/96398.shtml
英文地址,請注明出處:http://en.pswp.cn/web/96398.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《UE5_C++多人TPS完整教程》學習筆記52 ——《P53 FABRIK 算法(FABRIK IK)》

本文為B站系列教學視頻 《UE5_C多人TPS完整教程》 —— 《P53 FABRIK 算法&#xff08;FABRIK IK&#xff09; 的學習筆記&#xff0c;該系列教學視頻為計算機工程師、程序員、游戲開發者、作家&#xff08;Engineer, Programmer, Game Developer, Author&#xff09; Stephen …

HttpServletRequest vs ServletContext 全面解析

HttpServletRequest vs ServletContext 全面解析 一、 核心區別概覽特性HttpServletRequest (請求對象)ServletContext (Servlet上下文/應用對象)作用域請求范圍應用范圍生命周期從客戶端發出請求開始&#xff0c;到服務器返回響應結束。從Web應用啟動&#xff08;部署&#xf…

Java后端工程師如何學AI

Java后端工程師如何學AI 目錄 前言為什么Java后端工程師要學習AIAI學習路徑規劃基礎知識體系實踐項目建議學習資源推薦學習時間規劃常見問題與解決方案職業發展建議總結 前言 隨著人工智能技術的快速發展&#xff0c;AI已經不再是計算機科學專業的專屬領域。作為Java后端工…

Django REST Framework 中 @action 裝飾器詳解

概述 action 裝飾器是 Django REST Framework (DRF) 中 ViewSet 的一個核心功能&#xff0c;用于定義自定義路由方法。它允許開發者在標準的 CRUD 操作&#xff08;list、create、retrieve、update、destroy&#xff09;之外&#xff0c;創建符合特定業務需求的接口&#xff0c…

【重磅更新】RetroBoard 全面升級,讓敏捷回顧更高效、更安全、更貼心!

??????? ??????? ??????? ??????? ??????? ??????? ??????? ??????? ??????? ??????? ??????? ???????…

中州養老:華為云設備管理接口開發全流程

需求分析點擊同步數據時,要把華為云的數據拉取到我們的系統中對于新增設備操作,實際上這些參數與華為云產品我們添加設備時的參數是一樣的表結構設計E-R圖數據庫字段接口分析對于設備中的數據,我們既要再IOT平臺存儲,又要在數據庫中存儲.之所以保存兩份數據的原因:IOT平臺中只是…

Llama-Factory微調Qwen2.5-VL從數據集制作到部署記錄

Llama-Factory微調Qwen2.5-VL從數據集制作到部署記錄 電腦環境配置&#xff1a; 1.ubuntu24 2.3090(24G) 3.Cuda12.9 一、數據集制作 我的數據集主要是對圖像內容進行描述 1.Label-studio制作數據集 這是最原始的從零開始制作數據集的方法&#xff0c;不建議這樣做&#xff01;…

【藍橋杯真題67】C++數位和為偶數的數 第十五屆藍橋杯青少年創意編程大賽 算法思維 C++編程選拔賽真題解

C++數位和為偶數的數 第十五屆藍橋杯青少年創意編程大賽C++選拔賽真題 博主推薦 所有考級比賽學習相關資料合集【推薦收藏】 1、C++專欄 電子學會C++一級歷年真題解析 電子學會C++二級歷年真題解析

【計算機網絡 | 第11篇】寬帶接入技術及其發展歷程

文章目錄寬帶接入技術詳解數字傳輸系統技術演進早期電話網的傳輸技術演變數字傳輸系統技術演進&#xff1a;從碎片到統一寬帶接入技術 ADSLADSL的基本原理與非對稱特性DMT調制技術&#xff1a;多子信道并行傳輸ADSL接入網組成電話分離器的設計原理與優勢ADSL的升級&#xff1a;…

(論文速讀)SCSegamba:用于結構裂紋分割的輕量級結構感知視覺曼巴

論文題目&#xff1a;SCSegamba: Lightweight Structure-Aware Vision Mamba for Crack Segmentation in Structures&#xff08;用于結構裂紋分割的輕量級結構感知視覺曼巴&#xff09;會議&#xff1a;CVPR2025摘要&#xff1a;不同場景下的結構裂縫像素級分割仍然是一個相當…

《蘇超風云》亮相時尚大賞,成短劇行業發展新風向

當男頻短劇憑借《一品布衣》五天橫掃10億播放的數據宣告逆襲&#xff0c;短劇市場格局正經歷深刻洗牌。風口之下&#xff0c;頭條視聽、中皋文旅、國內時尚視覺與短視頻創作領域的頭部廠牌“大灣視頻”攜手下場&#xff0c;打造精品男頻短劇《蘇超風云》&#xff0c;劍指2025年…

HTML5新年元旦網站源碼

新年主題網站開發概述 本項目基于HTML5、CSS3與JavaScript技術棧&#xff0c;打造了一個功能豐富、交互體驗流暢的新年主題網站&#xff0c;涵蓋文化展示、互動娛樂與社交分享三大核心模塊&#xff0c;通過現代化前端技術實現沉浸式節日氛圍營造。 1.1、核心功能架構 網站采…

CentOS 7 下iscsi存儲服務配置驗證

一、環境說明 centos7服務器*2服務器ip&#xff1a;服務端10.10.10.186 客戶端10.10.10.184服務端存儲卷sda1提前關閉防火墻&#xff0c;或開放默認 iSCSI 使用 3260 端口 二、服務端&#xff08;Target&#xff09;配置 安裝 iSCSI target 服務 yum install -y targetcli syst…

立即數、棧、匯編與C函數的調用

一、立即數在 ARM 架構中&#xff0c;立即數是指在指令中直接編碼的常量值&#xff0c;而不是通過寄存器或內存引用的值立即數的特點編碼限制&#xff1a;ARM指令是固定長度的&#xff08;32位&#xff09;&#xff0c;因此立即數不能占用太多位數。典型的算術和邏輯指令通常只…

貪心算法與動態規劃:數學原理、實現與優化

貪心算法與動態規劃&#xff1a;數學原理、實現與優化 引言&#xff1a;算法選擇的本質 在計算機科學領域&#xff0c;算法選擇的本質是對問題特征的數學建模與求解策略的匹配。貪心算法與動態規劃作為兩種經典的優化算法&#xff0c;分別在不同問題域展現出獨特優勢。本文將從…

Leetcode 刷題記錄 21 —— 技巧

Leetcode 刷題記錄 21 —— 技巧 本系列為筆者的 Leetcode 刷題記錄&#xff0c;順序為 Hot 100 題官方順序&#xff0c;根據標簽命名&#xff0c;記錄筆者總結的做題思路&#xff0c;附部分代碼解釋和疑問解答&#xff0c;01~07為C語言&#xff0c;08及以后為Java語言&#xf…

Android Studio Meerkat | 2024.3.1 Gradle Tasks不展示

把這兩個開關打開&#xff0c;然后刷新gradle文件

Java中方法重寫與重載的區別

目錄 1. 方法重載 (Overload) 什么是方法重載&#xff1f; 重載的特點&#xff1a; 重載的示例&#xff1a; 重載的調用&#xff1a; 2. 方法重寫 (Override) 什么是方法重寫&#xff1f; 重寫的特點&#xff1a; 重寫的示例&#xff1a; 重寫的調用&#xff1a; 3.…

微信小程序發送訂閱消息-一次訂閱,一直發送消息。

實現思路長期訂閱要求太高&#xff0c;需要政府、公共交通等單位才有資格&#xff0c;所以只能使用一次性訂閱。 就像是買奶茶&#xff0c;下單以后&#xff0c;會彈出讓用戶訂閱消息那種。以買奶茶為例:用戶第一次下單成功&#xff0c;點擊了訂閱消息。&#xff08;一般都有三…

408 Request Timeout:請求超時,服務器等待客戶端發送請求的時間過長。

408 Request Timeout 是 HTTP 狀態碼之一&#xff0c;表示客戶端在發送請求時&#xff0c;服務器等待的時間過長&#xff0c;最終放棄了處理該請求。此問題通常與網絡延遲、客戶端配置、服務器設置或者應用程序的性能有關。1. 常見原因1.1 客戶端問題網絡連接延遲或不穩定&…