Hadoop 是一個開源的分布式計算系統,用于存儲和處理大規模數據集。Hadoop 主要由HDFS(Hadoop Distributed File System)、MapReduce、Yarn(Jobtracker,TaskTracker)三大核心組件組成。其中HDFS是分布式文件系統,用于存儲文件,MapReducer是計算框架,可以分為Map和Reduce兩部分,簡單來說就是先分組,后計算,而Yarn則是對主機資源的協調,輔助計算的順利進行。
1. HDFS(Hadoop Distributed File System)
HDFS基本架構
HDFS負責存儲所有文件。他將大型文件分成若干個數據塊,默認情況下,HDFS 的數據塊大小為 128MB(可以配置),方便計算的分布式進行,提高計算效率。每個數據塊還可以生成多個副本,存儲在不同主機中提高系統容錯率。
HDFS中有三個角色發揮著主要作用,分別是NameNode,DataNode,Secondary NameNode。
- NameNode:主要負責管理集群中的所有數據,包括DataNode節點信息,以及文件保存的位置信息等等。
- DataNode:實際存儲數據的節點,一個集群中存在多個DataNode,互相不知道對方的信息,需要和NameNode保持心跳,匯報存儲狀態。當DataNode沒有定期向NameNode發送心跳時,會觸發NameNode的故障恢復,例如副本重新分配。
- Secondary NameNode:從名字來看,難道是NameNode的備份節點?當NameNode宕機時代替NameNode發揮作用?實際上并不是,他的作用是幫助NameNode優化磁盤空間。和大多數持久化數據的中間件一樣,HDFS對于集群元數據的持久化也是通過快照和日志來持久化進磁盤,不過Hadoop作為大型文件分布式處理系統,其操作日志非常龐大,如果靠操作日志來持久化文件,將要占用極高的磁盤空間,使用快照文件能夠顯著的壓縮信息持久化體積,不過由于操作日志的內容過于巨大,將操作日志變為快照的過程極為耗時,如果這一操作由NameNode完成,可能會導致Hadoop集群的正常服務受到影響,所以Hadoop使用Secondary NameNode這一角色來完成這一過程,Secondary NameNode會定期的向NameNode獲取快照文件(FsImage ),以及操作日志(EditLog),并且講操作日志的內容,補充進快照文件,再將快照文件返回給NameNode,幫助NameNode完成一次信息壓縮。
通常上面的學習,我們可以得到一個簡單的HDFS架構圖如下:
HDFS存儲數據流程
- 當一個客戶端想要向Hadoop中的HDFS中存儲數據時,首先他需要將大型文件按照要求的文件大小進行分塊,
- 將文件進行分塊后,客戶端會向NameNode發送請求(多個數據塊可能會并發請求),NameNode再確認文件不存在后(如果已經存在會拋出錯誤),集群的元數據信息,給這個數據塊,及其副本分配位置。
NameNode在給數據塊分配位置時,會考慮到節點當前的負載程度,存儲空間,節點是否存活等因素,并且還會盡可能將其副本分配到不同主機,甚至機架中(機架需要在配置文件中配置機架感知)。
- 客戶端在接收到NameNode返回的信息后,會按照NameNode的安排,開始將數據塊傳向第一個DataNode。當第一個DataNode接收完成后,會繼續將數據發向下一個DataNode(注意這里是第一個DataNode向第二個DataNode發送數據塊,而不是客戶端向第二個DataNode發送)。在此期間,DataNode會持續向NameNode以及客戶端匯報進度。
- 當所有數據傳輸完成后,客戶端會向NameNode發送一個寫入完成的請求,NameNode會根據客戶端發送的信息來更新自己的集群元數據。
流程圖如下:
2. MapReduce
MapReduce是建立在HDFS基礎之上的Hadoop計算框架之一(還有很多其他的計算框架),用于處理大量數據塊并發計算的計算框架,MapReduce可以分為四個階段。
輸入分割(Input Splitting)
在這一階段,輸入數據被分割成較小的塊,稱為輸入分割(Input Splits)。每個輸入分割的大小通常與 Hadoop 分布式文件系統(HDFS)的塊大小一致,用戶可以通過配置參數(如 mapreduce.input.fileinputformat.split.minsize 和 mapreduce.input.fileinputformat.split.maxsize)調整大小。這些分割被分配給不同的映射任務(Map Tasks)進行處理。客戶端也可以自由選擇分塊大小,甚至大于HDFS分塊大小,因為MapReducer的分塊是邏輯分塊,是指向實際文件的引用,并不是物理分塊,不受HDFS分塊大小的限制。
Map階段(Mapping)
分塊完畢后,每個Map任務通過RecordReader
從輸入分片中解析出一個個鍵值對。這個過程涉及到如何定義記錄邊界,例如在文本文件中,可能每一行被視為一個記錄。RecordReader
的作用是將輸入分片的內容轉換為可以作為Map函數輸入的鍵值對形式。例如,在單詞計數中,RecordReader
可能將文本行分割成單詞,并輸出如 <“Hadoop”, 1> 的鍵值對。
Hadoop會將映射任務盡量分配到其數據所在節點上,以節省網絡帶寬和提升性能。也就是計算資源向數據移動,這被稱為數據本地性(Data Locality)。當無法將計算任務分配到目標主機時,Hadopp考慮機架感知,將任務分配到同一機架的主機中(機架感知需要手動配置)。
洗牌和排序階段(Shuffling and Sorting)
在Map輸出鍵值對后、Reducer輸入前進行一次局部規約操作,稱為Combiner。這一步驟是可選的優化項,可以看作是對數據的局部計算,比如說在單詞計數的例子中,規約就會將當前分塊的單詞進行局部匯總。然后將得到的結果傳入分區,通過規約的方式可以減少傳輸到Reducer的數據量,提高整體效率。
Combiner完成后,會進一步對結果進行分區(Partition)、排序(Sort)等多個子階段。在這個過程中,Map輸出的結果會被寫入內存緩沖區(默認大小 100MB,可通過 mapreduce.task.io.sort.mb 配置)。當緩沖區達到閾值(如 80%,可通過 mapreduce.map.sort.spill.percent 配置)時,后臺線程會將其溢寫到磁盤;隨后,Reduce任務會從各個Map節點拉取屬于自己的那部分數據,并對其進行合并、排序、分組等預處理操作。
分區操作是將鍵值對發向目標reducer,而排序是將分區結果進一步分類,將相同的key,放在一起,比如說如果分區結果一個reducer需要處理的鍵值對如下:
(world, 1) (hadoop, 1) (world, 1) (mapreduce, 1)
那么經過排序后,結果是:
(hadoop, 1) (mapreduce, 1) (world, 1) (world, 1)
如果在Combiner階段出現兩個(world, 1),可能會變成(world, 2),這就叫規約。
reducer在開始拉取數據時,還需要再次進行合并、排序和分組,因為在洗牌和排序階段的排序是單節點的,而reducer需要從多個節點拉取數據,所以需要將結果進行合并,在排序,并且按照既定規則進行分組。分組規則可以自定義,reducer每次會根據分組處理一組數據。
Reducer階段
分組結束后,Hadoop會將這些分組數據依次交給對應的Reducer執行,Reducer的數量由用戶通過配置參數 mapreduce.job.reduces
預先指定,默認值為 1。Reduce 任務數會影響并行度和輸出文件數量。Reducer 的輸入是一個迭代器(Iterator),它指向當前分組的所有值。每次調用Reducer時都會傳入一個分組。Reducer 會依次按照用戶定義的Reducer函數(一個傳入HDFS的jar包)處理每一組數據。并將結果將處理結果輸出到指定的目標位置,通常是 HDFS,且默認以鍵值對形式存儲(可以通過 OutputFormat 自定義輸出格式,例如文本或序列化文件)。
流程圖
3. Yarn
對于計算框架中,計算像向數據移動的理念,需要一個調度器來輔助執行,Yarn就是這樣的一個調度器,他承接這在計算過程中的資源管理和任務調度的工作,讓每個一個任務都能分配到最佳節點,并監控整個任務的執行情況。
在Hadoop1.x版本中,這一功能是由jobtracker和tasktracker完成的。
3.1 JobTracker和TaskTracker
JobTracker
是一個全局服務組件,它在整個Hadoop集群中是唯一的,并且通常運行在一個專門配置的主節點上。他的主要職責包括:
- 作業調度:接收來自客戶端提交的作業,并將其分解為多個任務(
Map
任務和Reduce
任務)。 - 資源管理:監控整個集群的資源使用情況,并決定哪些
TaskTracker
節點可以執行新任務。 - 狀態監控:持續跟蹤所有正在運行的任務的狀態,并處理任務失敗或節點失效的情況,必要時重新調度任務
當用戶通過客戶端提交一個MapReduce作業時,首先會創建一個JobClient
實例。這個JobClient
負責與JobTracker
進行交互。它會將作業所需的所有文件(如輸入分片信息、客戶端配置文件、jar包(jar包就是客戶端編寫的計算任務)等)上傳到HDFS上,并向JobTracker發送請求來注冊該作業。
JobTracker
接收到新的作業請求后,會根據客戶端的配置參數將整個作業拆分為多個Map和Reducer任務,并向NameNode
請求數據塊所在位置,以及TaskTracker
的狀態信息,盡可能將任務分配給數據所在主機的TaskTracker。當數據所在的主機的TaskTracker過于繁忙時,也會根據機架感知,分配給一個機架的TaskTracker
。
TaskTracker
是運行在每個工作節點上的從屬服務。每個節點上只會有一個TaskTracker
實例,它負責以下任務:
- 任務執行:根據
JobTracker
的指令執行分配給它的具體任務。 - 狀態匯報:定期向
JobTracker
發送心跳信號,報告自身健康狀況及所執行任務的進展。
TaskTracker
通過心跳機制,和JobTracker
保持連接,并在發送心跳時發送自己的狀態信息。JobTracker
會根據這些狀態信息以及數據所在位置,合理的分配任務,并在返回心跳時,返回給目標TaskTracker
為其分配的任務。
TaskTracker
接收到來自JobTracker
分配下來的具體任務之后,會為每一個任務生成一個Task實例,并且啟動相應的Java虛擬機(JVM)去實際運行這個任務(因為任務的計算本質上是執行jar包的內容)。根據接收到的不同類型的動作命令(例如啟動任務、提交結果、終止任務等),TaskTracker會采取相應措施來滿足要求。
如果某個TaskTracker失去聯系超過一定時間,則會被標記為不可用,并且其上正在運行的任務可能需要重新分配給其他可用節點。對于任何失敗的任務,JobTracker會嘗試重新啟動它們直到達到最大重試次數為止。
當所有的Map和Reduce任務都順利完成之后,TaskTracker
會通知JobTracker
。隨后,JobTracker
將正式宣布該作業已完成,并清理相關的臨時資源。同時,如果存在輸出數據的話,也會告知客戶端可以從指定位置下載最終結果。
流程圖
這個資源調度架構有一些明顯缺陷:
- 首先所有調度任務都會集中在一個
JobTracker
上,這樣隨著集群的擴展和任務的增加,jobtrakcer的性能會成為集群擴展的瓶頸。 - 一旦jobtracker故障,所有計算任務都無法進行。
- Hadoop1.x中將
TaskTracker
中可用資源抽象為插槽,這些插槽的數量以及分類(map還是reducer)由啟動時的hadoop配置決定,這就導致了如果map任務和reducer任務的比例和插槽分類的比例不一致,就會導致資源浪費的問題。 - jobtracker不能滿足不同計算框架的任務調度需求。
為了解決這些問題,在hadoop2.x版本中重新引入了Yarn
。
3.2 Yarn
Yarn將原本的jobtracker負責的任務一分為二,將資源管理和任務調度,分別用兩套結構分別負責。
資源管理
資源管理由ResourceManager和NodeManager負責管理,每個主機上都有一個NodeManager,NodeManager會向集群中唯一的ResourceManager發送主機資源信息,并維持心跳。
當客戶端需要發起一個工作請求時,首先需要攜帶AppMaster、啟動AppMaster的命令、用戶程序等向ResourceManager發起請求,ResourceManager接受請求后會根據資源情況為其分配一個Contarin,并且尋找對應的NodeManager。ResourceManager會將任務分配到對應的NodeManager,NodeManager在接收到任務后會生成一個Container,負責容納AppMaster。
Container是用來代替hadoop1.x中的插槽概念的,和插槽不同的是Container不僅可以隨意容納map和reducer任務,還可以容納AppMaster。
AppMaster是負責任務調度的組件。在Yarn的架構中,為了兼容更多的計算框架的不同的任務調度需求。任務調度組件由計算框架自己實現,也就是說不同的計算框架會使用不同的調度組件,所以需要有客戶端發送。
資源調度
AppMaster在被啟動后,需要向ResourceManager進行注冊,并匯報任務運行狀態。和jobTracker不同的是,ResourceManager不在負責管理大量的map,reducer任務,而是由AppMaster進行管理。
AppMaster會通過心跳向ResourceManager申請任務資源,申請成功后會通知NodeManager,為任務創建Contrainer,并啟動任務。各個任務需要和AppMaster維持心跳并匯報工作進度。在程序運行時,客戶端可以隨時向AppMaster發起請求查看任務進度。
當應用程序運行完成后,ApplicationMaster通知ResourceManager釋放已分配的資源。
通過這些改變,Hadoop2.x解決了Hadoop1.x中的集群擴展問題,ResourceManager的負載能力不再是集群擴展的瓶頸。Container解決了插槽中對于資源利用的問題。多計算框架的兼容問題也隨著AppMaster的出現而被解決。于此同時Hadoop2.x還可以配置多個ResourceManager來解決集群的高可用問題。
集群高可用
ResourceManager的高可用性是通過Active/Standby架構模式實現的,這種設計確保了在任意時刻只有一個ResourceManager處于Active狀態,其余的則處于Standby狀態。Active狀態的ResourceManager會正常處理客戶端的請求,而Standby狀態則處于待機狀態,隨時等待Active狀態的ResourceManager宕機時接管其任務。
為了保證故障切換時的狀態一致性,Active狀態的ResourceManager會將其狀態信息寫入到一個共享的狀態存儲系統中。這個狀態存儲系統可以是基于ZooKeeper的state-store或基于FileSystem的state-store。
FileSystem是Hadoop的一個抽象類,它定義了文件系統的基本操作接口,如創建文件、刪除文件、打開文件、重命名文件等。通過
FileSystem
抽象類,Hadoop 可以輕松地支持多種文件系統。基于FileSystem的state-store就是利用HDFS自身的能力為ResourceManager提供狀態存儲系統能力,可以通過配置實現。
Yarn依賴于Zookeeper來實現自動故障轉移,當Active節點故障時,Standby會通過搶占Zookeeper節點的方式獲取Active狀態,并讀取共享的狀態存儲系統來恢復功能。
如果未啟用自動故障轉移,則管理員必須手動將其中一個ResourceManager轉換為Active。要從一個ResourceManager到另一個ResourceManager進行故障轉移,他們應該先將Active狀態的ResourceManager轉換為Standby,然后將Standby狀態的ResourceManager轉換為Active。所有這些都可以使用yarn rmadmin
命令完成。