HDFS 塊重構和RedundancyMonitor詳解

文章目錄

  • 1. 前言
  • 2 故障塊的重構(Reconstruct)
    • 2.1 故障塊的狀態定義和各個狀態的統計信息
    • 2.2 故障文件塊的查找收集
      • 2.5.2.1 misReplica的檢測
      • 2.5.2.2 延遲隊列(postponedMisreplicatedBlocks)的構造和實現
        • postponedMisreplicatedBlocks中Block的添加
        • postponedMisreplicatedBlocks中Block的移除
      • 2.5.2.3 待定隊列(PendingReconstructionBlock)的構造和實現
        • 對pendingReconstruction的添加和刪除操作
        • 對pendingReconstruction的超時處理
      • 2.5.2.4 重構隊列(neededReconstruction)的構造和實現
    • 2.5.2.5 RedundancyMonitor的運行
  • 2. RedundancyMonitor的工作原理
    • 2.1 從優先級隊列中選擇待重構塊
    • 2.2 構建待重構塊的統計信息和元數據信息,為選擇source節點做準備
    • 2.3 選擇target節點
    • 2.4 將任務調度出去
    • 2.5 DataNode對重構任務的處理
      • 2.5.1 StripedReader的構造
      • 2.5.2 StripedReader的初始化
      • 2.5.3 StripedWriter的構造
      • 2.5.4 StripedWriter的初始化
      • 2.5.5 開始重構
        • 讀取
        • Decode
        • 傳輸到目標target
        • Target節點接收數據
  • 3. 結束

1. 前言

本節將會講解故障塊的重構(Reconstruction)的基本流程,
故障塊的恢復(Recovery)指的是寫數據過程中,由于某些非正常原因,造成最終的塊文件沒有達到最后的COMPLETE狀態,比如,寫過程客戶端的突然離開,寫過程中DataNode的突然當機。
而故障塊的重構(Reconstruction)發生在寫完成以后,由于系統情況的改變,比如,DataNode宕機,DataNode匯報上來的存儲故障,用戶手動修改了副本數導致副本數量過多,用戶手動進行decommission或者maintenance導致副本的數量變化,都需要進行相關副本的重構。
本文主要講解整個重構過程,包括:

  • NameNode端待重構塊的生成過程(包含了各種待重構的情況),
  • NameeNode端基于生成的待重構塊進行重構工作的調度,
  • DataNode端對于重構任務的處理。由于基于副本復制的冗余策略基本上就是數據的拷貝,比較簡單,因此,本文偏向于講解基于糾刪碼的冗余策略的DataNode端的處理過程。

2 故障塊的重構(Reconstruct)

故障塊的重構都是基于已經處于BlockUCState.COMPLETE狀態的,通過COMPLETE的定義我們知道,意味著重構(Reconstruct)針對的是已經寫完成(匯報上來的塊副本達到了要求比如最小允許副本數),但是后來塊的狀態發生問題的那些塊。

2.1 故障塊的狀態定義和各個狀態的統計信息

本章節講解塊重構過程中的塊狀態,這些狀態是StoredReplicaState的狀態。StoredReplicaState是塊已經完成寫操作以后的狀態階段中的不同狀態,而不是塊在寫入過程中的狀態BlockUCState。
需要區別這個Replica的狀態StoredReplicaState和節點的狀態(AdminState),這里不再詳述。
BlockManager通過方法checkReplicaOnStorage()來統一收集并計算副本狀態,并返回一個收集了副本狀態信息的對象NumberReplicas。 從NumberReplicas的名字聽起來似乎這個類是統計副本的總數量,但是其實這個類是統計在各個狀態下的副本數量的分類統計信息。那么,副本會有哪些狀態呢?
所有的副本狀態被StoredReplicaState表述。顧名思義,StoredReplicaState所表述的副本狀態是已經存儲下來的副本的整個狀態,這些層面的狀態一方面是為了給用戶的諸如fsck的命令返回副本的統計信息,更重要的,這些狀態的統計信息,將用來決定下一步對副本是否需要進行重構的策略,比如副本數是否太低,太低的副本需要進行重構,副本數是否太高,副本數太高的副本需要進行部分的刪除。

public enum StoredReplicaState {// live replicas. for a striped block, this value excludes redundant// replicas for the same internal blockLIVE,READONLY,// decommissioning replicas. for a striped block ,this value excludes// redundant and live replicas for the same internal block.DECOMMISSIONING,DECOMMISSIONED,// We need live ENTERING_MAINTENANCE nodes to continue// to serve read request while it is being transitioned to live// IN_MAINTENANCE if these are the only replicas left.// MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -// Live ENTERING_MAINTENANCE.// 當 一個節點處于ENTERING_MAINTENANCE中(還沒到達最終的IN_MAINTENANCE), 這個節點上的internal block如果沒有其它副本,// 那么這個node還是會接著serve 這個replica 的讀請求。顯然,當節點進入到IN_MAINTENANCE中的時候,讀請求就不會過來了,// 因此進入MAINTENANCE_NOT_FOR_READMAINTENANCE_NOT_FOR_READ,// Live ENTERING_MAINTENANCE nodes to serve read requests.MAINTENANCE_FOR_READ,CORRUPT,// excess replicas already tracked by blockmanager's excess mapEXCESS, // 副本數量超過了要求,這些超過要求的replica 最終會被刪除STALESTORAGE,// for striped blocks only. number of redundant internal block replicas// that have not been tracked by blockmanager yet (i.e., not in excess)REDUNDANT}

副本的狀態StoredReplicaState是對連續布局和條帶布局統一而言的,并不是專指某種布局方式。但是,某些特定狀態在兩種布局模式下的含義稍有不同。副本每一個狀態的具體含義,我們可以從BlockManager構造NumberReplicas對象的方法checkReplicaOnStorage()清楚地看到:

private StoredReplicaState checkReplicaOnStorage(NumberReplicas counters,BlockInfo b, DatanodeStorageInfo storage,Collection<DatanodeDescriptor> nodesCorrupt, boolean inStartupSafeMode) {final StoredReplicaState s;if (storage.getState() == State.NORMAL) {final DatanodeDescriptor node = storage.getDatanodeDescriptor();if (nodesCorrupt != null && nodesCorrupt.contains(node)) {s = StoredReplicaState.CORRUPT;} else if (inStartupSafeMode) {s = StoredReplicaState.LIVE;counters.add(s, 1);return s;} else if (node.isDecommissionInProgress()) {s = StoredReplicaState.DECOMMISSIONING;} else if (node.isDecommissioned()) {s = StoredReplicaState.DECOMMISSIONED;} else if (node.isMaintenance()) {if (node.isInMaintenance() || !node.isAlive()) {s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;} else {s = StoredReplicaState.MAINTENANCE_FOR_READ;}} else if (isExcess(node, b)) {s = StoredReplicaState.EXCESS;} else {s = StoredReplicaState.LIVE;}counters.add(s, 1);// //如果這個Storage是stale storage,那么,認為這個replica是stale狀態,直到收到對應DN的heartbeatif (storage.areBlockContentsStale()) {counters.add(StoredReplicaState.STALESTORAGE, 1);}} else if (!inStartupSafeMode &&storage.getState() == State.READ_ONLY_SHARED) {s = StoredReplicaState.READONLY;counters.add(s, 1);

副本的大部分狀態是由這個副本所在的存儲狀態決定的:

  • LIVE: 就是我們最常見的正常的存活狀態。
    • 對于條帶布局模式,雖然預期是每一個internal block只有一個副本,但是有可能存在某個internal block同時存在LIVE狀態和其它狀態,在這種情況下,LIVE狀態是排他的,即,這個Internal Block只要有一個副本是LIVE,那么我們就認為這個Internal Block的狀態是LIVE。關于節點狀態的一些去重操作,參考BlockManager.countLiveAndDecommissioningReplicas()
    • 從代碼可以看到,對于與DECOMMISSION或者MAINTENANCE相關的狀態,即使此時機器還在線,副本可讀,副本狀態并不會是LIVE,而是對應的DECOMMISSION或者MAINTENANCE的相關狀態。
  • READONLY: 如果replica所在的Storage的類型是StorageType.State.READ_ONLY_SHARED, 那么這個副本的狀態就是READONLY。這個READ_ONLY_SHARED是一個特殊的HDFS特性,我看了一下對應的HDFS issue HDFS-5318(issue里面有對應的design doc, 感興趣的讀者可以看看),其大致意思就是將我們傳統的通過物理磁盤存儲block的方式轉移到通過共享存儲(NAS, S3等)存儲塊信息,由于不再存放在某臺DataNode上,因此客戶端可以通過任意DataNode讀取到這個Block。這個READONLY狀態和本文的關系不大,不做詳細解釋。
  • DECOMMISSIONING: 一個機器decommision的過程就意味著上面的replica需要全部轉移(copy)到其它機器上,在全部轉移完成以前,這個機器上的block都是DECOMMISSIONING的狀態。顯然,對于一個條帶布局的 internal block,如果這個塊已經成功轉移到其它的Live的機器上,那么這同一個internal block就會在NameNode端同時存在LIVE和DECOMMISSIONING的狀態,這時候,NameNode的判定狀態是Live狀態。關于節點狀態的一些去重操作,參考BlockManager.countLiveAndDecommissioningReplicas()
  • DECOMMISSIONED: 副本所在的機器已經decommission結束。
  • MAINTENANCE_FOR_READ: 關于機器的maintenance狀態,感興趣的讀者可以自行學習,它發生在我們需要暫時將某個節點進行下線或者升級同時又不希望這個節點的短暫下線引起集群大量的副本拷貝的場景。與maintenance相關的狀態與讀寫的關系是:
    • 凡是與Maintenance相關的狀態(ENTERING_MAINTENANCE或者IN_MAINTENANCE狀態),機器都不可能再服務寫請求
    • 處于ENTERING_MAINTENANCE狀態并且依然存活的機器,是可以服務讀請求的;
    • 一個機器可能在ENTERING_MAINTENANCE的過程中死亡,這時候顯然上面的所有副本也是不可讀的;
    • 一個機器一旦真正進入了IN_MAINTENANCE狀態,無論是否存活,都不會再server任何請求,包括讀請求。因為我們將一個機器進入MAINTENANCE的目的大部分都是希望機器短暫停機維修等。
    • 所以,只有當一個機器處于ENTERING_MAINTENANCE并且存活,它上面的副本狀態才會是MAINTENANCE_FOR_READ
  • MAINTENANCE_NOT_FOR_READ:從上面的分析可以看到,如果一個機器已經進入到IN_MAINTENANCE狀態,或者這個機器在ENTERING_MAINTENANCE的狀態中死亡,這時候這個機器將拒絕任何請求,包括讀請求。
  • CORRUPT: 這個replica所在的機器存儲已經CORRUPT。請注意區分Block corrupt和Replica corrupt的區別,當且僅當一個Block的所有的replica都已經corrupt了,那么這個Block會被認為是corrupt。NameNode端corrupt的replica都被一個叫做CorruptReplicasMap的對象管理,存放了所有的corrupted的replica以及對應的Corrupt的原因:
    public class CorruptReplicasMap{/** The corruption reason code */public enum Reason {NONE,                // not specified.ANY,                 // wildcard reasonGENSTAMP_MISMATCH,   // mismatch in generation stampsSIZE_MISMATCH,       // mismatch in sizesINVALID_STATE,       // invalid stateCORRUPTION_REPORTED  // client or datanode reported the corruption}// 存放了所有corrupt的replicaprivate final Map<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =new HashMap<Block, Map<DatanodeDescriptor, Reason>>();
    
    在BlockManager中維護了CorruptReplicasMap的引用,所有corrupt replica都是通過BlockManager.markReplicaAsCorrupt()來添加到corruptReplicas中的,主要有以下情況:
    1. 來自DataNode的增量塊匯報(DatanodeProtocol.blockReceivedAndDelete接口)或者全量塊匯報(DatanodeProtocol.blockReport)接口。在收到這些塊匯報以后,NameNode會對所有匯報上來的塊進行時間戳和size的檢測,如果發現匯報上來的塊和自己在blockMap中存儲的塊的時間戳或者size不一致,則認定該塊是corrupt塊
    2. 來自DataNode直接匯報的badBlock(通過DatanodeProtocol.reportBadBlocks()),NameNode收到這些會報上來的badBlock會直接通過調用markBlockAsCorrupt()將其標記為corrupt block。DataNode在什么情況下會直接上報badBlocks呢?這主要發生在比如DataNode被分配了reconstruct的任務時,會從遠程讀取一些replica進行重構,這時候如果讀取發生問題,就會認為是corrupted replica,然后通過DatanodeProtocol.reportBadBlocks()接口上報NameNode。
    3. 客戶端在讀取塊的過程中發現塊的校驗失敗,會通過(ClientProtocol.reportBadBlocks()接口)告知NameNode
    4. DataNode在完成了某個Recovery以后,通過DataNodeProtocol.commitBlockSynchronization()接口告知NameNode,NameNode會在適當情況下將replica標記為corrupt
  • STALESTORAGE:這種狀態其實是一種Corner Case。當NameNode發生重啟或者Failover(從standby到達active狀態)發生,NameNode會將所有的DataNode 的所有的Storage標記為Stale(陳舊)狀態,這時候這些storage上的replica也全部為stale狀態,直到收到了對應的DataNode發送過來的關于這個Storage的心跳信息,才會解除Stale狀態。Stale狀態是為了處理在NameNode發生狀態轉移的時候DataNode和新的NameNode發生的一些不一致狀態。在Stale狀態的副本的大多數處理都會延遲,因為這是一個中間狀態,比如我們發現一個replica的副本數過多,但是其中有一個副本是STALESTORAGE狀態,那么這時候我們不可以貿然去刪除多余副本,因為這時候有可能對應的 DataNode已經將副本刪除,等匯報上來的時候,NameNode也將副本在另外機器上刪除,造成副本丟失。
  • REDUNDANT: 只適用于條帶布局,某一個internal block的副本數量多余一個。
  • EXCESS: 含義其實于REDUNDANT,只不過EXCESS指的是NameNode已經發現了該replica有多余副本(比如通過fsck,或者通過RedundancyMonitor的定時掃描線程),同時將這個replica的信息放到excessRedundancyMap中去,所有放到excessRedundancyMap中的internal block都會采用響應策略刪除一個replica的多余副本。
    從上面的狀態分析可以看到,副本的狀態和一些存儲的狀態有一部分是用戶操作的,比如Decommission和Maintenance,有些是機器自己匯報的,比如CORRUPT, 有些是集群本身的狀態發展而成的,比如LIVE, STALESTORAGE, REDUNDANT, EXCESS,

2.2 故障文件塊的查找收集

在這里插入圖片描述

2.5.2.1 misReplica的檢測

一個Block的replica的數量處于不正常的狀態,比如,有多余的副本,或者低于預期的副本等,都屬于副本不正常的情況。注意,副本不正常的情況都是指已經處于COMPLETE狀態的副本,即不可變的塊。可變的、正在寫的塊不在重構的處理范圍內,而是塊恢復(Recover)的職責。
NameNode會通過各種方式不斷檢測每個副本,確定這個副本是否屬于錯誤副本,如果屬于錯誤副本,則進行進一步處理

  enum MisReplicationResult {/** The block should be invalidated since it belongs to a deleted file. */INVALID, // 這個塊屬于一個被刪除的文件,副本可以刪除了/** The block is currently under-replicated. */UNDER_REPLICATED, // 這個塊的副本數不足/** The block is currently over-replicated. */OVER_REPLICATED,/** A decision can't currently be made about this block. */POSTPONE,/** The block is under construction, so should be ignored. */UNDER_CONSTRUCTION,/** The block is properly replicated. */OK}

BlockManager.run()通過調用processMisReplicatedBlock() 來掃描當前replica的位置有問題(放置不正確,或者缺少replica)的所有block, 如果這個Replica的確需要重構,那么就放入到neededConstruction中。 processMisReplicatedBlock() 的掃描操作會在以下情況下被觸發:

  • 用戶在運行fsck命令的過程中,如果添加了-replicate參數,那么fsck不僅僅會檢查會返回并檢查塊的狀態,并且會將檢查到的副本數量不足的塊(misReplicatedBlocks)進行進一步處理。
  • NameNode啟動時,Active NameNode會通過BlockManager啟動一個異步獨立的Daemon線程,這個線程會周期性掃描當前BlockManager管理的所有的Block,每個block都會檢查其replication狀態。
  • RedundancyMonitor每一輪運行的時候,都會掃描postponedMisreplicatedBlocks并嘗試對著里面的超時的block進行處理,如果依然處理失敗

2.5.2.2 延遲隊列(postponedMisreplicatedBlocks)的構造和實現

BlockManager使用postponedMisreplicatedBlocks保存了一些需要延遲處理的、放置(MisReplicationResult中表述的比如UNDER_REPLICATED,OVER_REPLICATED)狀態不正常的Block。

  /*** After a failover, over-replicated blocks may not be handled* until all of the replicas have done a block report to the* new active. This is to make sure that this NameNode has been* notified of all block deletions that might have been pending* when the failover happened.*/private final Set<Block> postponedMisreplicatedBlocks =new LinkedHashSet<Block>();

延遲處理是因為,系統剛剛才啟動,或者,Failover剛發生(當前的這個Active NameNode是剛剛才從Standby到Active狀態的),因此,還沒有收到一部分DataNode的第一次block report,即目前NameNode關于Block中副本的狀態是過時(Stale)的,因此需要延遲對這個misReplicatedBlock進行進一步處理。

本質上,這是為了解決新的NameNode所看到的集群狀態其實并非集群最新狀態的特殊情況。這種特殊情況在HDFS特指需要刪除副本數過多的Block的情況:
新的NameNode發現這個Block的副本超過了要求的副本數量(over-replicated),因此需要將某一個或者多個replica 加入到invalidateBlocks中,隨后被刪掉。但是,NameNode也發現這個Block有一個副本是在Stale DataNode上,這時候問題出現了:由于NameNode當前掌握的某個replica-1的DataNode(DN-1)的狀態是過期(Stale)的,很有可能之前的Active NameNode已經發現了這個over-replicated的Block并且已經讓這個Stale DataNode刪掉了副本,但是當前的新的Active NameNode還不知道。如果這時候Active NameNode直接指示其它DataNode(DN-2)刪掉副本,當之前的DN-1匯報說我已經把副本replica-1已經刪除,這時候就造成Block的副本數不足。因此最好等待這個Block中位于Stale DataNode的heartbeat上來再對這個Invalidate操作進行處理。

在NameNode剛剛切換到 Active狀態的時候,會將所有的DataNodeStorageInfo的狀態置為Stale,直到第一次收到這個DataNode對應的heartbeat:

// NameNode剛啟動,會將所有DataNode標記為Stalevoid markStaleAfterFailover() {heartbeatedSinceFailover = false;blockContentsStale = true;}// 收到heartbeatvoid receivedHeartbeat(StorageReport report) {updateState(report);heartbeatedSinceFailover = true;}//收到heartvoid receivedBlockReport() {if (heartbeatedSinceFailover) { // Failover以后已經收到了heartbeatblockContentsStale = false; // 將DataNodeStorageInfo置為非Stale狀態}blockReportCount++;}
postponedMisreplicatedBlocks中Block的添加

上面講過,postponedMisreplicatedBlocks中的所有Block都是那種1) replica數量超過預期值并且 2) 有Replica在stale DataNode上。
大概有下面的這些情況,NameNode會往postponedMisreplicatedBlocks中添加Block:

  1. BlockManager.run()是一個不斷執行并掃描所有Block的Daemon線程,對于每一個Block,一旦發現這個Block存在上面的情況,就放到postponedMisreplicatedBlocks中
  2. 每當收到DataNode的Block report以后,都會對Block的狀態進行一系列的檢查,同樣的,如果發現這個Block滿足上面的條件,就放到postponedMisreplicatedBlocks中
  3. 當用戶進行了手動降副本操作,比如,手動通過setReplica命令將副本從3降低到2,這時候這個Block的副本數量可能會超過預期并且有副本在Stale DataNode上,就會將這個replica放到postponedMisreplicatedBlocks中
  4. 在嘗試處理一個Corrupted Block的時候(markBlockAsCorrupt()方法),也可能將這個Block的某些replica進行invalidate操作(invalidateBlock()操作),如果發現這個replica存在上述情況,也會放到postponedMisreplicatedBlocks中
postponedMisreplicatedBlocks中Block的移除

RedundancyMonitor這個Daemon線程負責重新掃描postponedMisreplicatedBlocks中的每一個Block,用來對這里的Block的狀態進行重新的確認。重新掃描的邏輯發生在方法rescanPostponedMisreplicatedBlocks()中:
顯然,postponedMisreplicatedBlocks中的Block會發生以下的各種情況:

  1. 最簡單的,這個Block依然是over-replicated的:
    • 但是有副本存放在Stale DataNode上,這時候什么也不用做,對這個Block的處理繼續延遲
    • 沒有副本在Stale DataNode上,那么直接通過processExtraRedundancyBlock()進行over-replicated塊的處理(比如刪除多余replica等)
  2. 如果塊的狀態是已經刪除,比如,對應文件已經刪掉了,則加入到InvalidateBlocks中,這個塊將被刪除
  3. 這個塊并不是COMPLETE狀態,不做任何處理,不是塊重構的處理范圍。
  4. 檢查發現這個Block需要進行Reconstruction(下文講pendingReconstruction的時候會將isNeededReconstruction(),判斷一個Block是否需要進行重構),那么就將其從postponedMisreplicatedBlocks中移除,轉而加入到neededReconstruction
  5. 如果都不是,那么這個Block的狀態完全正常,不需要處理。

2.5.2.3 待定隊列(PendingReconstructionBlock)的構造和實現

BlockManager中維護了一個PendingReconstructionBlocks pendingReconstruction對象,用來監控那些尚且需要獲取更多已經存儲的副本的Block:

class PendingReconstructionBlocks {private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
  static class PendingBlockInfo {private long timeStamp;private final List<DatanodeStorageInfo> targets; // 這個block需要復制到的位置
對pendingReconstruction的添加和刪除操作

從PendingReconstructionBlocks的定義可以看到,pendingReconstructions是一個以block作為key、以PendingBlockInfo作為value的map,代表這個block對應的PendingBlockInfo里面的target尚未向NameNode進行匯報。我們都知道客戶端在寫數據并且addBlock()的時候,NameNode會通過PlacementPolicy為這個Block尋找合適的target location并且返回給客戶端,在這個block commit了以后, NameNode就會把這個block和對應的 target添加到PendingReconstructionBlocks中,意思是:這個block會寫入到這些targets,但是我目前還沒有收到這些targets對這個block的匯報,所以就把這個Block和它對應的targets 記錄下來,當收到了DataNode的匯報以后,就把這個DataNode從這個block的記錄里刪除,如果這個blocks的所有targets都匯報了這個block,就可以把這個block從pendingReconstructions中徹底刪除。
PendingReconstructionBlocks.increment()方法是向PendingReconstructionBlocks對象中添加targets記錄的方法,我們跟蹤這個方法的調用者,可以看到什么時候NameNode會往pendingReconstruction中添加block -> targets 記錄:

  1. 客戶端在addBlock()的時候:往往在創建新的block以前,需要確認當前文件的最后一個block是commit的狀態,因此,如果這個block commit成功了,那么會將這當前的最后一個block -> targets 添加到pendingReconstructions中
  2. 客戶端在close()文件的時候: 客戶端關閉文件的時候,當所有的Block已經寫到DataNode,客戶端會向NameNode發起close文件的請求,NameNode收到請求以后,會通過addCommittedBlocksToPending()方法,將Block -> targets對應關系添加到pendingReconstructions中
  3. DataNode進行塊匯報的時候:NameNode收到來自DataNode的block report,會通過addCommittedBlocksToPending()方法將這個block當前還期待的但是沒有收到report的block -> targets的關系放到pendingReconstruction中

PendingReconstructionBlocks.increment()相反,PendingReconstructionBlocks.decrement()會將一個block -> datanode的對應關系從pendingReconstruction中刪除,這發生在DataNode在通過DatanodeProtocol.blockReceivedAndDeleted 接口進行增量塊匯報的時候,會將這個block -> datanode的對應關系從pendingReconstruction中刪除。同時,如果這個block對應的所有targets都已經完成了匯報,就把block從這個 pendingReconstruction中刪除。

對pendingReconstruction的超時處理

上面講過,PendingReconstructionBlocks就像Block的守護者一樣,NameNode通過它可以知道目前還有哪些block在等待Datanode向上匯報的信息。正常情況下,一個block存放到在PendingReconstructionBlocks,當所有的targets完成塊匯報,很快就會從pendingReconstruction中刪除。但是,分布式系統中,任何異常狀態都有可能發生,如果pendingReconstruction 中的某個Block所預期的DataNode始終沒有全部匯報上來,應該怎么樣呢?
原來,為了識別這種很長時間依然沒有刪除的block, PendingReconstructionBlocks通過一個單獨的線程PendingReconstructionMonitor去監控所有添加進來的block并為每個添加進來的block記時,超過指定時間,就會被標記為超時是block。在RedundancyMonitor線程的每一輪運行中,都會通過方法processPendingReconstructions()嘗試去處理pendingReconstruction中超時的block:

  private class RedundancyMonitor implements Runnable {@Overridepublic void run() {while (namesystem.isRunning()) { // 這是一個無限循環,只要是active namenode,就不斷運行try {// Process recovery work only when active NN is out of safe mode.if (isPopulatingReplQueues()) {computeDatanodeWork(); // 選擇并且將需要re-construct的節點發送給對應的Node 去執行processPendingReconstructions(); // 看看pendingReconstruction 中有哪些block需要加入到neededConstruction 中去rescanPostponedMisreplicatedBlocks();/*** If there were any reconstruction requests that timed out, reap them* and put them back into the neededReconstruction queue*/void processPendingReconstructions() {// pendingReconstruction主要來自與文件最后close的時候的最后一個block,在這里,如果pendingReconstruction// 中的block在指定時間內依然沒有完成construction,那么就需要放到neededConstruction中去BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();// .......for (int i = 0; i < timedOutItems.length; i++) {BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);NumberReplicas num = countNodes(timedOutItems[i]);if (isNeededReconstruction(bi, num)) { //這個pendingReconstruction 中的block的確需要re-constructionneededReconstruction.add(bi, num.liveReplicas(),num.readOnlyReplicas(), num.outOfServiceReplicas(),getExpectedRedundancyNum(bi));}}} finally {namesystem.writeUnlock();

如上面代碼所示,RedundancyMonitor線程會從pendingReconstruction取出超時的blocks,如果發現這些block需要進行重構,則將這需要重構的block加入到neededReconstruction中。下一輪循環過來,就會通過computeDatanodeWork()對needConstruction中需要重構的block進行處理。而一個block是否需要重構的判斷入下代碼所示:

  boolean isNeededReconstruction(BlockInfo storedBlock,NumberReplicas numberReplicas, int pending) {return storedBlock.isComplete() &&!hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);}// 如果 live + pending replicas 的數量不小于所需要的replica數量,并且(還有pending的replica,或者沒有pending的并且已經滿足放置策略)// 這里的含義是,如果有pending的,那么我們先不用考慮是否滿足placement policyboolean hasEnoughEffectiveReplicas(BlockInfo block,NumberReplicas numReplicas, int pendingReplicaNum) {int required = getExpectedLiveRedundancyNum(block, numReplicas); // 先看看需要多少個live的副本int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;return (numEffectiveReplicas >= required) &&(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));}// 可容忍的最少的live replica的數量public short getExpectedLiveRedundancyNum(BlockInfo block,NumberReplicas numberReplicas) {// 對于striped block,expectedRedundancy 數量指的是data block + parity block的數量,// 對于replication,expectedRedundancy指的是replication factorfinal short expectedRedundancy = getExpectedRedundancyNum(block);// 假如當前我的block配置的副本是5, 有2個副本是處于maintenance,那么我期待的live 副本數量是5-2=3return (short)Math.max(expectedRedundancy -numberReplicas.maintenanceReplicas(), // 處于maintenance中的replica也算是live,這本身就是maintenance的目的,讓replica短時間可以容忍丟失,但是處于decommission的不能算live// 最小的maintenance 副本數量。對于striped block,最小的maintenance副本數量就是data block 的數量,說明對于// stripe block, 不需與data block丟失getMinMaintenanceStorageNum(block));}public short getExpectedRedundancyNum(BlockInfo block) {return block.isStriped() ?((BlockInfoStriped) block).getRealTotalBlockNum() : // 比如RS(6,2), realTotalBlockNum 就是 6 + 2 = 8block.getReplication(); // 連續布局情況下,就是配置的塊副本數}// 能夠允許進入maintenance狀態private short getMinMaintenanceStorageNum(BlockInfo block) {if (block.isStriped()) {return ((BlockInfoStriped) block).getRealDataBlockNum(); // stripe block實際占用數據的塊的數量} else {return (short) Math.min(minReplicationToBeInMaintenance,block.getReplication()); // 或者是replication factor, 或者是最小允許的進入MAINTENANCE狀態live replica的數量}}

可以看到,對于一個Block是否需要進行reconstruction,是基于已經統計好的Replica的各種狀態信息的統計(統一存放在NumberReplicas對象中),就是:

  • 這個節點處于COMPLETE狀態并且有足夠多的有效Replica(參考isNeededReconstruction())
  • 什么是待定副本數(PendingReplica): 就是我們講的PendingReconstructionBlocks對象維護的replica -> targets信息,即一個block寫完(COMMITTED或者COMPLETED)但是NameNode還沒收到足夠的DataNode匯報,那么預期還需要收到多少個DataNode的匯報的數量。比如3副本的Block,NameNode目前只收到1個DataNode的匯報,那么這個Block的pendingReplicaNum是2。
  • 什么叫有效副本數量(EffectiveReplicas):從方法hasEnoughEffectiveReplicas()的代碼 int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;,即存活狀態的副本數量再加上待定狀態的副本數量。存活狀態的副本是有效副本很容易理解,但是為什么待定的副本也算作有效副本呢?因為待定的副本是正常狀態下還缺失的待匯報的數量,這種缺失是很正常的,正常情況下只需要再等等就可以,因此,待定的副本也算作有效。
  • 什么叫允許進入Maintenance狀態的最小存活副本數:即要想將某一個replica所載的DataNode進入maintenance狀態,那么這個DataNode上的每一個Block需要多少個存活的副本?這是為了避免某個節點進入maintenance狀態造成了某些數據不可讀的狀態。從方法getMinMaintenanceStorageNum()可以看到,對于糾刪碼,如果進入ENTERING_MAINTENANCE狀態以后,依然最少有dataBlockNum個副本存活,那么這個replica就可以進入Maintenance狀態。對于3副本,這個默認值是1,即ENTERING_MAINTENANCE的節點中,如果所有的Block都至少有一個存活的副本,那么這個Datanode就可以正式進入MAINTENANCE狀態。這就是為什么StoredReplicaState在進入正式的MAINTENANCE狀態以前,有一個ENTERING_MAINTENANCE狀態,在ENTERING_MAINTENANCE狀態時,DataNode會有對應的Monitor確認所有的Block的live replica數量都大于getMinMaintenanceStorageNum(),才會從ENTERING_MAINTENANCE進入到IN_MAINTENANCE狀態,避免進入到IN_MAINTENANCE狀態(進入IN_MAINTENANCE狀態以后的節點很可能出現短時不可服務,比如短暫的關機維修升級等)以后導致數據副本缺失。
  • 當前期望的存活副本數量(Expected Live Redundancy): 從getExpectedLiveRedundancyNum()方法的return (short)Math.max(expectedRedundancy - numberReplicas.maintenanceReplicas(),getMinMaintenanceStorageNum(block));代碼可以看到,不考慮特殊情況,對于連續布局,期望的副本數量就是配置的副本數,對于糾刪碼,期待的副本數量就是dataBlocksNum + parityBlocksNum, 比如RS(6,2)中等于8。但是,由于有些Block的部分副本有可能處于maintenance狀態,這些副本雖然暫時不可用,但是并沒有丟失,因此,期待的副本數量應該減去這些處于Maintenance狀態的副本。比如RS(6,2)中,有3個副本都處于maintenance狀態,那么我期待的存活副本數量是8-3=5。同時期望的存活副本數量應該不小于允許進入maintenance的最小存活副本數量。
  • 什么叫足夠多的有效Replica: 即當前有效的副本數量不小于期望的副本數量,并且當前還有pending的副本,或者雖然沒有pending的副本,但是整個Block的replica分布處于滿足PlacementPolicy要求的狀態。
    • 這意味著如果有效的副本數量不小于期望的副本數量,并且還有pending的副本,這時候即使整個Block的副本不滿足 PlacementPolicy要求的狀態,也認為有足夠多的有效Replica,這樣判定是因為存在pending的副本,所以認為極有可能當pending的副本的DataNode匯報上來以后,PlacementPolicy就會被滿足。

2.5.2.4 重構隊列(neededReconstruction)的構造和實現

BlockManager中有一個LowRedundancyBlocks neededReconstruction變量,記錄了當前需要進行reconstruction的所有block。
由于不同情況的恢復優先級不同(比如,3副本情況下丟失1個副本和丟失2個副本的緊急程度顯然不同,RS(6,2)的情況下丟失1個replica和丟失2個replica的緊急程度顯然不同),LowRedundancyBlocks對象根據緊急程度定義了優先級,每個優先級都有一個對應的block隊列,代表了這個block需要進行該優先級的re-construct。

class LowRedundancyBlocks implements Iterable<BlockInfo> {static final int QUEUE_HIGHEST_PRIORITY = 0;static final int QUEUE_VERY_LOW_REDUNDANCY = 1;static final int QUEUE_LOW_REDUNDANCY = 2;static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;

顯然,對于連續布局和條帶布局,確定優先級的基本思想一致,但是計算方式并不相同。我們通過getPriorityContiguous()方法和getPriorityStriped()方法可以清晰地看到對于兩種布局方式確定其優先級的計算邏輯。
先拋開布局方式的差異,一般來講,各種優先級的含義是:

  • QUEUE_HIGHEST_PRIORITY的最高優先級指的是這個block再丟失一個replica就會corrupt(無法再通過re-construct恢復),因此情況非常緊急。
  • QUEUE_VERY_LOW_REDUNDANCY指的是這個Block再丟一個replica依然還有恢復的可能,但是丟兩個就無法恢復了。
  • QUEUE_LOW_REDUNDANCY則是指這個Block還能容忍兩個或者兩個以上的Replica的丟失而依然可以通過reconstruct來恢復數據。
  • QUEUE_WITH_CORRUPT_BLOCKS則代表這個Block已經corrupt,不可能通過reconstruct恢復了,因此不用對其構建reconstruction任務。
    /*** @param curReplicas 當前live的replica* @param readOnlyReplicas 處在READONLY狀態的replica,* @param outOfServiceReplicas 指的是狀態處在MAINTENANCE_NOT_FOR_READ ||  MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica* @param expectedReplicas 預期的replica數量,比如我們配置的文件副本數量為3*/private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,int outOfServiceReplicas, int expectedReplicas) {if (curReplicas == 0) {// If there are zero non-decommissioned replicas but there are// some out of service replicas, then assign them highest priorityif (outOfServiceReplicas > 0) {return QUEUE_HIGHEST_PRIORITY;}if (readOnlyReplicas > 0) {// only has read-only replicas, highest risk// since the read-only replicas may go down all together.return QUEUE_HIGHEST_PRIORITY;}//all we have are corrupt blocksreturn QUEUE_WITH_CORRUPT_BLOCKS;} else if (curReplicas == 1) {// only one replica, highest risk of loss// highest priorityreturn QUEUE_HIGHEST_PRIORITY;} else if ((curReplicas * 3) < expectedReplicas) {//can only afford one replica loss 還能再承受一個replica loss,即如果還有兩個replica loss,數據就丟失了,block就corrupt了//this is considered very insufficiently redundant blocks.return QUEUE_VERY_LOW_REDUNDANCY;} else {//add to the normal queue for insufficiently redundant blocksreturn QUEUE_LOW_REDUNDANCY;}}/*** @param curReplicas 當前live的replica的數量* @param outOfServiceReplicas 指的是狀態處在MAINTENANCE_NOT_FOR_READ ||  MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica* @param dataBlkNum 配置的預期的數據塊的數量,比如RS(6,2)中,dataBlkNum=6* @param parityBlkNum 配置的預期的校驗塊的數量,比如RS(6,2)中,parityBlkNum=2*/private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,short dataBlkNum, short parityBlkNum) {if (curReplicas < dataBlkNum) {// There are some replicas on decommissioned nodes so it's not corruptedif (curReplicas + outOfServiceReplicas >= dataBlkNum) { //有一部分replica在decommissiong 節點上,因此它還沒有corrupt,但是必須立刻恢復了return QUEUE_HIGHEST_PRIORITY;}return QUEUE_WITH_CORRUPT_BLOCKS; // 已經corrupt了,無法恢復了} else if (curReplicas == dataBlkNum) { // 再丟失一個replica數據就無法恢復了// highest risk of loss, highest priorityreturn QUEUE_HIGHEST_PRIORITY;} else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {// can only afford one replica loss// this is considered very insufficiently redundant blocks.return QUEUE_VERY_LOW_REDUNDANCY; // 再丟失一個replica就corrupt了} else {// add to the normal queue for insufficiently redundant blocks.return QUEUE_LOW_REDUNDANCY;// 一般優先級}}

條帶布局的優先級計算方法如下圖所示:
在這里插入圖片描述
連續布局的優先級計算方法如下圖所示:
在這里插入圖片描述
LowRedundancyBlocks會通過getPriorityContiguous()獲取優先級,然后將對應的需要進行reconstruct的block放入到對應的優先級隊列中:

  private boolean add(BlockInfo blockInfo, int priLevel, int expectedReplicas) {if (priorityQueues.get(priLevel).add(blockInfo)) {return true;}return false;}

然后,RedundancyMonitor對應的Daemon 線程會根據LowRedundancyBlocks neededReconstruction中維護的優先級隊列,按照優先級選擇Block, 構造對應的ReconstructionTask。通過blocksToProcess控制每輪循環最多需要進行re-construct的block的數量,每次循環都通過bookmark的方式從上一輪循環處理的位置接著進行:

 synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(int blocksToProcess, boolean resetIterators) {final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);int count = 0;int priority = 0;HashSet<BlockInfo> toRemove = new HashSet<>();// 依次遍歷每一個LEVEL的queue,形成一個需要進行re-construct的List<List>, 外層list就是優先級,內層list就是這個優先級下面的需要進行re-construct的blockfor (; count < blocksToProcess && priority < LEVEL; priority++) {final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);// 這個PriorityQueue的bookmark,每次循環都從上一次循環的位置開始,而不是重新開始final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();final List<BlockInfo> blocks = new LinkedList<>(); if (!inCorruptLevel) { // 因為corrupt block已經沒有恢復的必要了blocksToReconstruct.add(blocks);}for(; count < blocksToProcess && i.hasNext(); count++) {BlockInfo block = i.next();if (block.isDeleted()) { // 這個block在neededReconstruction中,但是其實已經被NN刪除了,因此必須要構造re-construct了toRemove.add(block);continue;}if (!inCorruptLevel) {blocks.add(block);}}for (BlockInfo bInfo : toRemove) {remove(bInfo, priority); // 這個Block已經被系統刪除了(比如,文件刪除了),因此已經不需要reconstruct了}toRemove.clear();}....return blocksToReconstruct;}

2.5.2.5 RedundancyMonitor的運行

上面介紹了待重構的塊的生成過程。下面,就是針對這些塊進行具體的重構工作了。塊的重構是由一個持續運行的Daemon線程RedundancyMonitor負責調度的,由于待重構的Block信息逗存放在BlockManager.neededReconstruction(重構隊列)中,因此RedundancyMonitor主要功能就是從重構隊列中取出Block,構造重構任務,調度出去。同時,它還負責從待定隊列中取出那些超時的Block,以及從延遲隊列中取出已經不需要繼續延遲(Stale狀態結束)的節點,如果這些節點需要重構,則創建重構任務和調度重構任務。

2. RedundancyMonitor的工作原理

下圖顯示了塊重構的基本流程,從圖中我們可以看到:

  1. 待重構塊的發現是由NameNode負責的
  2. 待重構快的重構策略(從哪里讀取,寫入到哪里去)是由RedundancyMonitor決定的
  3. 任務調度出去以后,對應的DataNode會執行對應的重構任務。如果是基于復制(Replication Factor),重構過程相對簡單,就是數據的拷貝。如果是基于糾刪碼(Erasure Coding),整個過程會更加復雜,因為這涉及到向不同的節點讀取stripe然后通過計算還原數據的過程。
  4. DataNode重構完成以后,會向NameNode進行對應的IBR(Incremental Block Report),DataNode進而更新Block的狀態以根據需求制定新的策略。
    在這里插入圖片描述

下面是RedundancyMonitor的基本結構代碼。可以看到RedundancyMonitor是一個Runnable。這個Runnable是一個獨立運行的線程,每一輪運行結束以后都會sleep一段時間(由dfs.namenode.redundancy.interval.seconds配置,默認是3s)然后繼續進行:

  private class RedundancyMonitor implements Runnable {@Overridepublic void run() {while (namesystem.isRunning()) { // 這是一個無限循環,只要是active namenode,就不斷運行try {// Process recovery work only when active NN is out of safe mode.if (isPopulatingReplQueues()) {computeDatanodeWork(); // 從重構隊列中取出Block, 選擇并且將需要re-construct的節點發送給對應的Node 去執行processPendingReconstructions(); // 從待定隊列中取出Block,,如果需要reconstruct,加入到neededReconstruction中rescanPostponedMisreplicatedBlocks(); // 從延遲隊列中取出Block,如果需要reconstruct,加入到neededReconstruction中}TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);.....

在這里插入圖片描述

2.1 從優先級隊列中選擇待重構塊

RedundancyMonitor的computeDatanodeWork()負責創建塊重建的任務并調度出去

我們在上文講解過,重構隊列(neededReconstruction)是一個優先級隊列,重構任務的創建和調度肯定是按照優先級從高到底進行。在LowRedundancyBlocks.chooseLowRedundancyBlocks()中可以看到基于優先級從重構隊列中獲取Block的邏輯:

  int computeBlockReconstructionWork(int blocksToProcess) {List<List<BlockInfo>> blocksToReconstruct = null;namesystem.writeLock();........// Choose the blocks to be reconstructedblocksToReconstruct = neededReconstruction // blocksToProcess存儲了每次最多可以處理的blocks的數量.chooseLowRedundancyBlocks(blocksToProcess, reset); // 每次最多處理blocksToProcess個,然后每次都是從上次的bookmark的位置接著進行......// 根據挑選的 需要進行reconstruct的block,對他們進行重新構建return computeReconstructionWorkForBlocks(blocksToReconstruct);}// 選擇那些處于風險中的塊,按照優先級構成一個2層list。注意,這時候只是選出了需要進行重構的Block,至于怎么重構,需要computeReconstructionWorkForBlocks()來確定synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(int blocksToProcess, boolean resetIterators) {final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);int count = 0;int priority = 0;HashSet<BlockInfo> toRemove = new HashSet<>();// 依次遍歷每一個LEVEL的queue,形成一個需要進行re-construct的List<List>, 外層list就是優先級,內層list就是這個優先級下面的需要進行re-construct的blockfor (; count < blocksToProcess && priority < LEVEL; priority++) {// Go through all blocks that need reconstructions with current priority.// Set the iterator to the first unprocessed block at this priority level// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need// to look for deleted blocks if any.final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);// 這個PriorityQueue的bookmark,每次循環都從上一次循環的位置開始,而不是重新開始final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();final List<BlockInfo> blocks = new LinkedList<>();if (!inCorruptLevel) { // 因為corrupt block已經沒有恢復的必要了blocksToReconstruct.add(blocks);}.......return blocksToReconstruct;}}

可以看到,chooseLowRedundancyBlocks每次都按照優先級從高到低的順序,最多選取blocksToProcess 個Block進行重構。請注意,整個分布式系統是一個不斷變化的系統,從重構隊列中取出的Block很可能已經不需要再重構了,比如(文件已經刪除,由于DataNode恢復因此塊狀態已經完全恢復)等,所以從重構隊列中取出的塊都會時刻進行狀態的重新判斷,只有確定這個塊的確需要重構,才會放入到blocksToReconstruct。

2.2 構建待重構塊的統計信息和元數據信息,為選擇source節點做準備

在上一節選定了需要進行重構的Block(基于復制或者基于糾刪碼),下面,就開始對這個塊的信息進行分析,比如,這個Block是基于復制還是基于基于糾刪碼,這個Block的每一個replica在哪里(比如EC編碼中每一個Internal Block的Storage信息,或者基于復制的Block的每一個Replica的信息),每一個replica的狀態是什么樣的(LIVE, DECOMMISSIONGING, MAINTENANCE_FOR_READ…),目前丟失因此需要重構的physical block有幾個,是哪些(比如EC編碼中用internal block index表示,而在復制方式中則只需要關心還缺幾個副本)。

由于重構的讀取過程顯然會帶來對應source節點的負載,假如有多種選擇方案(比如,ReplicationWork中,有兩個副本可用,只需要選擇一個source replica拷貝到第三個節點,選哪個更好?),那么基本原則是選擇負載最輕的、對系統運行影響最小的節點。

// 為chooseLowRedundancyBlocks()返回的blocks構造BlockReconstructionWork,并調度出去int computeReconstructionWorkForBlocks(List<List<BlockInfo>> blocksToReconstruct) { // 按照優先級進行排列的,每一個List<BlockInfo>都是這個對應優先級的block的列表List<BlockReconstructionWork> reconWork = new ArrayList<>();....// 對這些block的重構任務進行分類,構造BlockReconstructionWork的具體實現,ErasureCodingWork或者ReplicationWorkfor (int priority = 0; priority < blocksToReconstruct .size(); priority++) { // 優先級從高到低,0的優先級最高for (BlockInfo block : blocksToReconstruct.get(priority)) {// 創建對應的ReconstructionWork,但是還沒有到挑選節點的階段BlockReconstructionWork rw = scheduleReconstruction(block,priority);if (rw != null) {reconWork.add(rw);}......

選擇Source節點的邏輯在方法chooseSourceDataNodes()中,其基本思路為:

  • 一個非正常不可讀狀態的Replica,肯定不會作為source節點,比如,StoredReplicaState.CORRUPT,StoredReplicaState.EXCESS,StoredReplicaState.MAINTENANCE_NOT_FOR_READ,StoredReplicaState.DECOMMISSIONED
  • 考慮一個節點上目前已經分配的replica任務,超過了hard limit(dfs.namenode.replication.max-streams-hard-limit,默認值為4), 即使是優先級最高的重構任務,也不會將該節點作為source節點
  • 對于一個處于正在decommission(DECOMMISSIONING)或者正在準備進入maintenance(MAINTENANCE_FOR_READ),是比普通的正常replica更應該被選為source,因為這種狀態的節點的workload往往很低,因此重構過程中對系統影響較小,其具體規則為:
  • 如果是最高優先級(LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY),不管該節點目前是LIVE,或者DECOMMISSIONING或者MAINTENANCE_FOR_READ, 只要該節點當前正在運行和準備運行的replica任務數量沒有超過hard limit(dfs.namenode.replication.max-streams-hard-limit),那么依然會作為source nodes。
  • 或者,雖然不是最高優先級,但是這個replica的狀態是DECOMMISSIONING或者MAINTENANCE_FOR_READ,即使超過了soft limit(dfs.namenode.replication.max-streams),那么依然會作為source,因為這兩個狀態的Storage往往負載較低,是優選的對象。
  • 或者,雖然優先級不高,并且replica的狀態是LIVE狀態(不是優選的DECOMMISSIONING或者MAINTENANCE_FOR_READ狀態),只要該節點上的任務沒有超過soft limit(dfs.namenode.replication.max-streams),這個節點就可以作為source

下面是chooseSourceDatanodes()的代碼,請注意,這些參數比如containingNodes, nodesContainingLiveReplicas,numReplicas,liveBlockIndices,liveBusyBlockIndices都是將這個Block對以的狀態信息從方法里面帶出方法外,供chooseSourceDatanodes()的調用者scheduleReconstruction()進行重構的下一步調度。

  • containingNodes表示包含這個Block的某個replica的節點
  • nodesContainingLiveReplicas 表示包含這個Block的某個replica并且狀態為LIVE的節點的信息
  • numReplicas 就是這個Block的各個replica的在各個StoredReplicaState的統計信息
  • liveBlockIndices 只針對條帶布局,可以看到liveBlockIndices不僅僅是包含LIVE狀態的replica,而是經過chooseSourceDataNodes()的判斷,認為可以從上面讀取replica的節點的internal block index(比如節點是LIVE的狀態,節點是MAINTENANCE_FOR_READ或者DECOMMISSIONING狀態等),都放入到liveBlockIndices中。
  • liveBusyBlockIndicies 只針對條帶布局,所有處于LIVE或者DECOMMISSIONING并且這個節點上目前已經存在的復制任務超過一定數量
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,List<DatanodeDescriptor> containingNodes,List<DatanodeStorageInfo> nodesContainingLiveReplicas,NumberReplicas numReplicas,  // 傳到這里的numReplicas是一個剛剛初始化、空的統計信息List<Byte> liveBlockIndices,List<Byte> liveBusyBlockIndices, int priority) {if (isStriped) {int blockNum = ((BlockInfoStriped) block).getTotalBlockNum(); // data unit + parity unitliveBitSet = new BitSet(blockNum);decommissioningBitSet = new BitSet(blockNum);}for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // 對于存儲這個Block的每一個 DatanodeStorageInfofinal DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);//final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, // 獲取這個節點上的 replica的狀態storage, corruptReplicas.getNodes(block), false);if (state == StoredReplicaState.LIVE) { //存活狀態if (storage.getStorageType() == StorageType.PROVIDED) {storage = new DatanodeStorageInfo(node, storage.getStorageID(),storage.getStorageType(), storage.getState());}nodesContainingLiveReplicas.add(storage); // 加入存活列表}containingNodes.add(node);// do not select the replica if it is corrupt or excessif (state == StoredReplicaState.CORRUPT ||state == StoredReplicaState.EXCESS) { // CORRUPT和EXCESS狀態不考慮continue;}// Never use maintenance node not suitable for read// or unknown state replicas.if (state == null|| state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {continue;// 不可讀狀態不考慮}if (state == StoredReplicaState.DECOMMISSIONED) { // 已經decommissioned,這個replica不考慮if (decommissionedSrc == null ||ThreadLocalRandom.current().nextBoolean()) {decommissionedSrc = node;}continue;}byte blockIndex = -1;if (isStriped) {blockIndex = ((BlockInfoStriped) block).getStorageBlockIndex(storage); // 這個節點所存儲的internal block的block indexcountLiveAndDecommissioningReplicas(numReplicas, state,liveBitSet, decommissioningBitSet, blockIndex);}if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY&& (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { // maxReplicationStreams默認值是2if (isStriped && (state == StoredReplicaState.LIVE|| state == StoredReplicaState.DECOMMISSIONING)) {liveBusyBlockIndices.add(blockIndex);}continue; // already reached replication limit 已經超過限制,查看下一個replica}if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { // replicationStreamsHardLimit默認值是4if (isStriped && (state == StoredReplicaState.LIVE|| state == StoredReplicaState.DECOMMISSIONING)) {liveBusyBlockIndices.add(blockIndex);}continue;}if(isStriped || srcNodes.isEmpty()) {srcNodes.add(node);if (isStriped) {liveBlockIndices.add(blockIndex); // 加入到healthy block中}continue;}.....} //循環結束//  對于基于復制的布局方式,如果沒有一個存活的節點包含replica,還沒有找到一個srcNodes,但是發現了一個已經decommissioned節點包含,那么,依然準備使用這個 decommissioned節點// Pick a live decommissioned replica, if nothing else is available.if (!isStriped && nodesContainingLiveReplicas.isEmpty() &&srcNodes.isEmpty() && decommissionedSrc != null) {srcNodes.add(decommissionedSrc);}// 如果是基于復制,那么srcNodes肯定只有一個元素return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);}

scheduleReconstruction()通過chooseSourceDatanodes()選定Source節點的同時,生成了對應的Block的source節點的一些統計信息作為引用參數返回,比如containingNodes, nodesContainingLiveReplicas,liveBlockIndices,liveBusyBlockIndices。然后,計算還需要多少個replica需要重構,將這些綜合信息封裝成BlockReconstructionWork,然后開始進行重構任務的調度。

總的來說,chooseSourceDatanodes()的基本策略是:

  1. 對于一個Block,會遍歷這個Block的所有的Replica,直到找到一個合適的Source;
  2. 對于DECOMMISSIONING_IN_PROGRESS、ENTERING_MAINTENANCE、MAINTENANCE_LIVE狀態的StoredReplicaState,對于它們的處理方式和LIVE狀態沒有區別, 即就認為它們是處于LIVE狀態;
  3. 對于CORRUPT, EXCESS,MAINTENANCE_NOT_FOR_READ(即處于MAINTENANCE并且不在線了)的StoredReplicaState,這些副本將無法作為source使用;
  4. 對于依然存活的已經DECOMMISSIONED的節點,如果我們除了這個節點以外找不到其他節點作為source,那么這個節點將會被選作source節點。所以,我們在一個集群中突然decommission多個節點,顯然,這個decommission操作會觸發這些節點上的Replica的Reconstruction,如果某一個block全部分布在這些DECOMMISSIONING的節點上,NameNode會選擇其中一個DECOMMISSIONING為這個Block進行replica的復制,并不會出現replica的丟失。
  5. 這個方法中有很多針對Erasure Coding的很多corner case的處理。

塊的重構細節都被封裝在對應的BlockReconstructionWork對象中,根據塊布局方式的不同,分為ErasureCodingWorkReplicationWork兩種實現。顯然,ReplicationWork不需要block index的這類信息的,但是ErasureCodingWork需要。在方法的,兩個子類都重寫chooseTargets()方法和addTaskToDataNode()方法,因為在選擇目標節點上,不同的塊布局有不同的PlacementPolicy,同時,在將任務添加到DataNode的時候,也有不同的邏輯。下文將會詳細講解。

在這里插入圖片描述

class ErasureCodingWork extends BlockReconstructionWork {private final byte[] liveBlockIndicies;private final byte[] liveBusyBlockIndicies;private final String blockPoolId;public ErasureCodingWork(String blockPoolId,BlockInfo block,BlockCollection bc,DatanodeDescriptor[] srcNodes,// srcNodes.size()和liveBlockIndicies.size()一樣,并且liveBlockIndicies相同位置的值就是對應的internal block在group中的位置List<DatanodeDescriptor> containingNodes, // 包含這個Block的所有的節點,包括DECOMMISSION和MAINTENANCE節點List<DatanodeStorageInfo> liveReplicaStorages, // 包含這個Block的Live節點,因此不包含DECOMMISSION和MAINTENANCEint additionalReplRequired,  // 還缺少個replicaint priority,byte[] liveBlockIndicies,// 只針對條帶布局,所有live狀態的replica的internal block indexbyte[] liveBusyBlockIndicies) { // 只針對條帶布局,所有處于LIVE或者DECOMMISSIONING并且這個節點上目前已經存在的復制任務超過一定數量super(block, bc, srcNodes, containingNodes,liveReplicaStorages, additionalReplRequired, priority);this.blockPoolId = blockPoolId;this.liveBlockIndicies = liveBlockIndicies;this.liveBusyBlockIndicies = liveBusyBlockIndicies;}
class ReplicationWork extends BlockReconstructionWork {public ReplicationWork(BlockInfo block, BlockCollection bc,DatanodeDescriptor[] srcNodes, // 源節點,對于ReplicationWork, srcNodes的大小一定是1List<DatanodeDescriptor> containingNodes,// 包含這個Block的所有的節點,包括DECOMMISSION和MAINTENANCE節點List<DatanodeStorageInfo> liveReplicaStorages,  // 包含這個Block的Live節點,因此不包含DECOMMISSION和MAINTENANCEint additionalReplRequired, // 還需要多少個副本int priority) {super(block, bc, srcNodes, containingNodes,liveReplicaStorages, additionalReplRequired, priority);// 構造ReplicationWork的時候,還沒有確定target節點,但是先進行計數getSrcNodes()[0].incrementPendingReplicationWithoutTargets();

scheduleReconstruction()方法的具體代碼如下,其基本邏輯為:

  • 先通過調用getExpectedLiveRedundancyNum()看看我當前期望這個Block有多少個副本存活。上文在講getExpectedLiveRedundancyNum()方法的時候已經詳細講解了期望的存活副本數量(Expected Live Redundancy)的具體含義。

  • 基于當前的期望副本數,和這個副本當前的實際情況(有多少replica是正常的LIVE狀態,有多少個replica是pending狀態(即pendingReconstructions中的副本,pendingReconstructions的含義上文已經講過)),我還額外需要多少個副本呢?這是通過下面的代碼計算的:

     if (numReplicas.liveReplicas() < requiredRedundancy) { // 當前的live replica還不夠requiredRedundancy的數量additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()- pendingNum;}
    
  • 即使在數量上我看起來不再需要副本(當前Block的實際存活的副本數已經不小于期望的存活副本數量(Expected Live Redundancy)),那有可能副本的位置并不滿足PlacementPolicy的需求,這也是需要進行重構的,代碼如下:

    else { // 盡管副本總數量夠了,但是有可能按照Placement policy,副本的位置不符合要求// Violates placement policy. Needed on a new rack or domain etc.BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);additionalReplRequired = placementStatus.getAdditionalReplicasRequired();}
    
  • 對于條帶布局,在確定targets的數量的時候,還去掉了正處在DECOMMISSIONING和處在MAINTENANCE_FOR_READ的Replica,因為這些replica目前算作LIVE的replica(查看chooseSourceDatanodes()在確定liveBusyBlockIndicies的基本邏輯),而這些Replica即將不可讀(因為即將進入DECOMMISSIONED和MAINTENANCE_NOT_FOR_READ狀態),因此必須盡快利用這些節點上的Replica進行其它Replica的重構。

  • 在按照上述邏輯確定了targets的數量以后,構造ErasureCodingWork或者ReplicationWork對象,開始選擇目標節點。

    // 雖然創建了ErasureCodingWork但是有可能是進行replica work,具體調度出去的時候會進行具體分析return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,containingNodes, liveReplicaNodes, additionalReplRequired,priority, newIndices, busyIndices);} else {return new ReplicationWork(block, bc, srcNodes,containingNodes, liveReplicaNodes, additionalReplRequired,priority);}
    

2.3 選擇target節點

目標節點是重構的時候需要寫入重構數據的節點。顯然,這剛好是PlacementPolicy做的事情。代碼如下:

 //  為每一個reconstruction work挑選target節點for (BlockReconstructionWork rw : reconWork) {// Exclude all of the containing nodes from being targets.// This list includes decommissioning or corrupt nodes.final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());// Exclude all nodes which already exists as targets for the blockList<DatanodeStorageInfo> targets = // 排除掉這個Block當前所在的targetspendingReconstruction.getTargets(rw.getBlock());if (targets != null) {for (DatanodeStorageInfo dn : targets) {excludedNodes.add(dn.getDatanodeDescriptor());}}//根據布局方式(連續布局還是條帶布局),選擇合適的塊放置策略final BlockPlacementPolicy placementPolicy =placementPolicies.getPolicy(rw.getBlock().getBlockType());rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);// 最終會調BlockPlacementPolicy.chooseTarget()}

從上面的代碼可以看到,選擇目標節點是通過調用BlockReconstructionWork.chooseTargets()方法實現的,這個方法在父類BlockReconstructionWork上是抽象方法,需要子類去實現。兩個子類對這個方法的實現方式大致一致,都是選擇PlacementPolicy.chooseTargets()去調度,這里不貼二者實現的代碼。

在上文講條帶布局的寫過程時我們說過,連續布局的默認塊放置策略BlockPlacementPolicyDefault會將第一個replica放在一個rack的某臺機器上,另外兩個replica會放在另外一個rack的兩臺不同機器上。而條帶布局的默認塊放置策略是BlockPlacementPolicyRackFaultTolerant,傾向于講所有的internal block放到不同的基架上去。
我們看一下BlockPlacementPolicy接口,就知道它需要哪些信息:

  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath, // 這個塊對應的文件的pathint numOfReplicas, // 額外需要的節點數量Node writer, // 哪個節點負責寫數據,這種情況下,如果這個節點的確是一個DataNode,并且這個DataNode不在excludedNodes里面,那么第一個replica會寫到這個節點上去List<DatanodeStorageInfo> chosen, // 已經被選擇作為targets的節點boolean returnChosenNodes, // 是否返回被選擇的節點Set<Node> excludedNodes, // 需要排除的節點long blocksize, // 一個group中的data block的大小的總和BlockStoragePolicy storagePolicy, // 這個節點應該放在哪種類型(StorageType比如SSD,DISK等)的介質上EnumSet<AddBlockFlag> flags //分配block的時候的一些hint(暗示)的flag信息以修改這個PlacementPolicy的默認行為,比如默認會將第一個replica放到writer所在的DataNode上,但是如果指定了NO_LOCAL_WRITER,就不會把第一個replica放到跟writer在一個host上);

在調用BlockPlacementPolicy.chooseTarget()進行目標節點選擇的時候,基本思路是,我告訴你 1) 已經選擇作為target的節點(chosen) 2) 需要排除的節點,即不能作為目標的節點,那么,請根據你的BlockPlacementPolicy的實現,為我選擇numOfReplicas個節點作為target節點。

  • chosen: 在塊重構的場景下,已經選擇作為target的節點(chosen)是這個Block的處于LIVE狀態的replica。為什么需要這個參數,可以參考具體BlockPlacementPolicy的代碼,本文不做論述。
  • excludedNodes: 而根據上面的代碼,我們知道excludedNodes的選擇規則,即只要是這個Block的replica所在的節點(存放在containingNodes中,無論節點的狀態),以及這個Block還處于pending狀態的節點(上文講過pendingReconstruction,這些節點是寫Block完成但是還沒有進行Block report的節點,我們assume它過一段時間就會匯報,因此重構塊的其它節點的時候,不應該選擇它)。

2.4 將任務調度出去

到這一階段,我們已經構建了BlockReconstructionWork的具體實現,ErasureCodingWork或者ReplicationWork,包含了srcNodes(這個Block的每一個replica所在的并且滿足一定條件比如節點狀態,當前上面已經有的replication的task的數量),containingNodes(這個Block的每一個replica所在的節點,無論什么狀態,無論上面已經有多少replication的task),targets(重構的目標節點),liveBlockIndicies(處于LIVE狀態或者其他狀態但是滿足要求的replica的internal block index,只針對條帶布局),liveReplicaStorages(處于LIVE狀態的DataNodeStorage),現在可以將任務調度出去了。調度的主要邏輯在方法validateReconstructionWork()中(方法名字似乎跟它做的事情并不一致。。。。)

    // 將task調度給對應的DN,進行重構namesystem.writeLock();try { // 遍歷每一個reconstruction work, 然后調度出去for (BlockReconstructionWork rw : reconWork) {.......synchronized (neededReconstruction) {if (validateReconstructionWork(rw)) { //validateReconstructionWork會將任務調度出去,即attach到對應的DataNode,等待收到心跳,就把attach到DataNode的任務作為response發送給DataNode....}.....return scheduledWork;

validateReconstructionWork()做了以下事情

  1. 再次判斷是否有足夠副本,是否真的需要重構
    首先再次確認BlockReconstructionWork對應的Block的確沒有足夠的redundancy,即是否有足夠的有效副本數。如果有效副本數已經足夠,則取消任務。關于有效副本數(hasEnoughEffectiveReplicas()方法)的定義,上文已經講解。
        NumberReplicas numReplicas = countNodes(block);// 再次統計這個block當前的狀態統計信息final short requiredRedundancy =getExpectedLiveRedundancyNum(block, numReplicas);final int pendingNum = pendingReconstruction.getNumReplicas(block);if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { // 算上pending的,已經有足夠的replica,不需要進行re-construct了neededReconstruction.remove(block, priority);.......return false;}
    
  2. 是副本不夠,還是副本夠但是機架不夠
    這里的考慮是,一個重構任務,包括ErasureCodingWork,有可能并不是因為某一個replica的缺失,而是,盡管replica并不缺失,但是位置不對,比如,糾刪碼要求每個replica都盡量在不同的機架上,但是現在卻發現有兩個replica在同一機架上,并且有多余的機架可以滿足要求,此時依然需要進行重構,只不過,即使是ErasureCodingWork,也是通過復制的方式去實現(將一個replica從一個rack拷貝到另一個rack以)。
    if ((numReplicas.liveReplicas() >= requiredRedundancy) &&(!placementStatus.isPlacementPolicySatisfied())) { // 如果replica數量足夠,僅僅是分布方式不滿足放置策略BlockPlacementStatus newPlacementStatus =getBlockPlacementStatus(block, targets); // 當添加了targets以后,獲取對應的status// 節點添加進來以后,依然沒有滿足當前的放置策略,并且,即使把target加進來,還需要額外的節點數量大于不加入target的時候還需要的節點數量,那么,加入這些target沒有任何價值// 也就是說,如果這些targets加進來以后,放置策略被滿足,或者,雖然依然不滿足,但是至少讓所需要的節點數量減少了,那么,把這些target加進來就有意義if (!newPlacementStatus.isPlacementPolicySatisfied() &&(newPlacementStatus.getAdditionalReplicasRequired() >=placementStatus.getAdditionalReplicasRequired())) { // 分布不滿足分布策略,// If the new targets do not meet the placement policy, or at least// reduce the number of replicas needed, then no use continuing.return false;}// mark that the reconstruction work is to replicate internal block to a// new rack.rw.setNotEnoughRack(); // 僅僅需要把replica放到一個新的rack上}
    
    每一個BlockPlacementPolicy都必須實現 verifyBlockPlacement()方法,它返回一個BlockPlacementStatus對象,封裝了當前的副本放置是否滿足要求、還需要幾個副本等等信息,供調用者進行決策。
     public abstract BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numOfReplicas);
    
    	public interface BlockPlacementStatus {public boolean isPlacementPolicySatisfied(); // 當前的放置狀態是否滿足了放置策略public String getErrorDescription();// 當前的放置狀態沒有滿足放置策略的一些錯誤或者描述信息int getAdditionalReplicasRequired(); // 還需要多少個額外副本才能保證放置策略得到滿足}
    

可以看到,如果副本數量足夠但是verifyBlockPlacement()返回的BlockPlacementStatus顯示Block當前的Replica放置不滿足要求,那么就會調用BlockReconstructionWork.setNotEnoughRack(),將boolean notEnoughRack置為True,這個變量將會影響到向DataNode添加任務的邏輯,因為如果notEnoughRack=True,那么即使是ErasureCodingWork,這個任務也不再是進行encode/decode類型的計算,而是通過將某個internal block進行拷貝來買滿足PlacementPolicy的要求。下文會詳細介紹向DataNode添加任務的流程。
3. 將任務Attach到DataNode
在BlockReconstructionWorkd的類圖中可以看到,addTaskToDataNode()方法是一個抽象方法,因此ErasureCodingWork和ReplicationWork對這個方法都有自己的實現。

  • ErasureCodingWork的任務分配
    請注意,ErasureCodingWork實際分配出去的任務不一定就是糾刪碼的encode/decode任務,也可能也是復制任務。其基本邏輯是:
    1. 是否是不缺副本但是缺少機架。在糾刪碼場景下,缺少rack的意思是,有rack上的存放了這個Block的多個副本。在這種情況下,通過chooseSource4SimpleReplication(),選擇一個Replica的DataNode,將任務調度給這個DataNode,任務是:將你上面的這個replica 挪到target上面去。選擇這個DataNode的原則也很直觀:看看這個Block在哪個Rack上存放了多個replica,那么就選擇這個Rack上的某個DataNode,讓這個DataNode自己負責把自己的Replica挪走。
    2. 是否整個副本數量足夠(無需通過encode/decode進行internal block的重算),但是當前有的Replica的狀態是DECOMMISSIONING或者MAINTENANCE_FOR_READ的狀態。這種情況下,會給每一個處在DECOMMISSIONING或者MAINTENANCE_FOR_READ狀態的DataNode分發任務,任務的內容是:將你上面這個Block的replica立刻拷貝到遠程的targets機器上去。
    3. 既不是缺少機架,同時也不是有節點即將進行decommioning或者maintenance,而是的確缺少internal block。這種情況下,的確需要調度糾刪碼的encode/decode任務。調度的基本邏輯是,將任務調度給選定的targets節點中的第一個節點,顯然,這個節點將會負責從source節點上讀取internal block,通過encode或者decode,讓生成的一個或者多個internal block,發送到targets上去。由于它自己本身就是一個target,顯然有一個internal block是給分配給自己的。
  • ReplicationWork的任務分配
    ReplicationWork的任務就很簡單了,都是基于拷貝的多副本情況下副本數量不夠,因此將任務調度給source節點的第一個節點,這個節點負責將自己存放的這個replica拷貝到target節點上去。

將任務attach到DataNode是通過DatanodeDescriptor.addBlockToBeReplicated()和DatanodeDescriptor.addBlockToBeErasureCoded()方法進行的。我們從下面的DatanodeDescriptor的類圖可以看到,DatanodeDescriptor上保存了NameNode即將發送給該DataNode的各種類型的副本操作信息,這些信息將會在下一次DataNode到來的時候作為repsonse發送給DataNode.

在這里插入圖片描述
從下面的代碼可以看到,addBlockToBeReplicated()和addBlockToBeErasureCoded()就是將對應的任務添加到該DatanodeDescriptor的對應的任務隊列replicaBlocks和erasurecodeBlocks中去。

---------------------------------------------DatanodeDescriptor----------------------------------------------------public void addBlockToBeReplicated(Block block, // 這個block是internal blockDatanodeStorageInfo[] targets) {assert(block != null && targets != null && targets.length > 0);replicateBlocks.offer(new BlockTargetPair(block, targets));}/*** Store block erasure coding work.*/void addBlockToBeErasureCoded(ExtendedBlock block,DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {assert (block != null && sources != null && sources.length > 0);// 構造這個BlockECReconstructionInfo的block是一個block group,即我還不知道需要對哪個internal block進行重算BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,sources, targets, liveBlockIndices, ecPolicy);erasurecodeBlocks.offer(task);}
  1. 任務的派發
    NameNode從來不會主動向DataNode發送信息(你清高行了吧?),只會在收到DataNode發送過來的心跳信息的時候,將任務通過心跳的響應返回給DataNode。

當NameNode 收到了DataNode發過來的信息,會在handleHeartbeat()方法中查看當前需要回復給這個DataNode的命令,其中就包括需要該DataNode復制replica到別的節點的命令,或者進行糾刪碼的encode/decode以恢復糾刪碼內部塊的命令。
所有的命令封裝在DatanodeCommand中的對應具體實現類中,比如,對Block進行基于復制的重構new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList), 對Block進行基于糾刪碼編解碼的重構new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList), 刪除一個Block的Replica的命令new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)等等。幾個重要的DatanodeProtocol:

  final static int DNA_TRANSFER = 1;   // 將副本從一個節點傳輸到另一個節點final static int DNA_INVALIDATE = 2; // 刪除一個replicafinal static int DNA_SHUTDOWN = 3;   // 關閉節點final static int DNA_REGISTER = 4;   // 重新注冊final static int DNA_RECOVERBLOCK = 6;  // 恢復一個eblockfinal static int DNA_ACCESSKEYUPDATE = 7;  // update access keyfinal static int DNA_BALANCERBANDWIDTHUPDATE = 8; // 更新balancer的帶寬final static int DNA_CACHE = 9;      // 緩存一個blockfinal static int DNA_UNCACHE = 10;   // 將一個block從緩存中拿掉final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // 基于糾刪碼的重構命令

2.5 DataNode對重構任務的處理

在DataNode端,DataNode會為每一個NameNode(每一個NameService的每一個NameNode,Active或者Standby)構建一個BPOfferService,用來處理這個NameNode發過來的命令。最終,會在BPOfferService.processCommandFromActive()處理來自Active NameNode的命令

----------------------------------------------BPOfferService------------------------------------------------
boolean processCommandFromActor(DatanodeCommand cmd,BPServiceActor actor) throws IOException {.......try {if (actor == bpServiceToActive) {return processCommandFromActive(cmd, actor);} else {return processCommandFromStandby(cmd, actor);}private boolean processCommandFromActive(DatanodeCommand cmd,BPServiceActor actor) throws IOException {.......switch(cmd.getAction()) {case DatanodeProtocol.DNA_TRANSFER: // 處理replica復制任務非常簡單,就是根據副本信息,將副本復制到指定的遠程節點// Send a copy of a block to another datanodedn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),bcmd.getTargets(), bcmd.getTargetStorageTypes(),bcmd.getTargetStorageIDs());break;case DatanodeProtocol.DNA_INVALIDATE:......case DatanodeProtocol.DNA_CACHE:.....case DatanodeProtocol.DNA_UNCACHE:.......case DatanodeProtocol.DNA_SHUTDOWN:......case DatanodeProtocol.DNA_FINALIZE:......case DatanodeProtocol.DNA_RECOVERBLOCK:.......case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:.......// 對于continuous block的恢復,參考DatanodeProtocol.DNA_TRANSFERcase DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");Collection<BlockECReconstructionInfo> ecTasks =((BlockECReconstructionCommand) cmd).getECTasks();dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);.......}return true;}

NameNode對應,DatanodeProtocol.DNA_TRANSFER用來進行副本的復制,而DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION用來進行糾刪碼的塊重構。基于副本復制的重構方式很簡單,我們不做贅述。主要講解糾刪碼的塊重構。
DataNode會構造一個全局的叫做ErasureCodingWorker的對象負責進行糾刪碼的塊重構任務(注意和NameNode端的ErasureCodingWork類區分開)。DataNode啟動的時候會初始化ErasureCodingWorker,在ErasureCodingWorker構造時,會用一個線程池專門執行塊的重構任務

 public ErasureCodingWorker(Configuration conf, DataNode datanode) {......initializeStripedBlkReconstructionThreadPool(conf.getInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY,DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT));}private void initializeStripedBlkReconstructionThreadPool(int numThreads) {LOG.debug("Using striped block reconstruction; pool threads={}",numThreads);stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(numThreads,numThreads, 60, new LinkedBlockingQueue<>(),"StripedBlockReconstruction-", false);stripedReconstructionPool.allowCoreThreadTimeOut(true);}

其實就是創建了一個corePoolSize=8, maxPoolSize=8, keepAliveTime=60s, 任務隊列是一個阻塞無界隊列LinkedBlockingQueue的線程池。我們都知道,這個在線程池的劃分上叫做叫做固定線程數量的線程池(可以參考文章《Java的5中線程池》)。線程池中的每一個線程負責一個Block(注意不是replica)的重構。

------------------------------------------------ErasureCodingWorker---------------------------------------------------public void processErasureCodingTasks(Collection<BlockECReconstructionInfo> ecTasks) {// 對于每一個重構的taskfor (BlockECReconstructionInfo reconInfo : ecTasks) {try {StripedReconstructionInfo stripedReconInfo =new StripedReconstructionInfo(reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),reconInfo.getTargetStorageIDs());final StripedBlockReconstructor task =new StripedBlockReconstructor(this, stripedReconInfo);if (task.hasValidTargets()) {stripedReconstructionPool.submit(task);

可以看到,ErasureCodingWorker將塊的重構信息封裝到StripedReconstructionInfo中,然后封裝為一個StripedBlockReconstructor,提交給ErasureCodingWorker在初始化的時候構造的固定線程數的線程池stripedReconstructionPool。由此可見,StripedBlockReconstructor肯定是一個Runnable,它的run()方法結構非常清晰,做的事情一目了然:

--------------------------------------------StripedBlockReconstructor----------------------------------------------
class StripedBlockReconstructor extends StripedReconstructorimplements Runnable {private StripedWriter stripedWriter;......@Overridepublic void run() {try {initDecoderIfNecessary(); // 根據ECPolicy創建對應的decoderinitDecodingValidatorIfNecessary();  // 根據ECPolicy創建對應的decoder的validatorgetStripedReader().init();//初始化條帶塊的讀取stripedWriter.init(); //初始化條帶塊的寫入reconstruct(); // 重構stripedWriter.endTargetBlocks(); // 結束目標塊的寫入,重構完成.....

從上面的代碼可以看到,StripedBlockReconstructor會首先初始化Decoder和DecodingValidator,然后初始化了Reader(getStripedReader().init())和Writer(stripedWriter.init()),然后進行重構,最后進行一些關閉操作。
具體架構如下圖所示:
在這里插入圖片描述

關于Decoder和DecodingValidator:

  1. 關于Decoder和DecodingValidator
    Decoder就是通過已有的replica計算出缺失的block,比如RS(6,2),index=(3,6)的塊丟失,因此需要通過Decode的方式重算丟失塊。DecodingValidator的過程就是將已經Decode的結果重新作為Input進行Decode,看看生成的塊與當前的塊是否匹配。
    比如,通過decode(d0,d1,d2,d4,p0,p1) 恢復出 (d0, d5), 然后校驗過程就可以是decode(d1, d2, d3,d4,d5, p0) 計算出d0’,然后對比d0’和d0,如果一致則校驗成功。
    在這里插入圖片描述

  2. 關于Reader

2.5.1 StripedReader的構造

顯然,糾刪碼的重構需要從多個Source讀取internal block,才能進行重構。比如RS(6,2),應該至少需要從6臺DataNode上讀取6個internal block(待會兒會講到文件最后的一個Group可能不需要讀這么多的block),才能重構丟失的internal block,和每一個DataNode的通信都需要建立獨立的連接,需要多個reader。StripedBlockReconstructor將這些reader的管理交付給StripedReader負責,它管理了StripedBlockReader的List, 每一個StripedBlockReader負責一個DataNode上Replica的讀取。
StripeReader的構造方法如下:

	 StripedReader(StripedReconstructor reconstructor, DataNode datanode,Configuration conf, StripedReconstructionInfo stripedReconInfo) {......dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)/ stripedReconInfo.getEcPolicy().getCellSize() + 1);minRequiredSources = Math.min(cellsNum, dataBlkNum); // minRequiredSources可能等于data block的數量,有可能等于當前這個stripe的cell數量if (minRequiredSources < dataBlkNum) { // cellNum小于dataBlkNum,說明這個block group的總的數據量甚至都沒有一個stripe那么多,這往往是一個文件的最后一個Block Groupint zeroStripNum = dataBlkNum - minRequiredSources;zeroStripeBuffers = new ByteBuffer[zeroStripNum];zeroStripeIndices = new short[zeroStripNum];}......readers = new ArrayList<>(sources.length);readService = reconstructor.createReadService(); //用來進行并發讀取的StripedBlockReader的線程池}``````javaCompletionService<BlockReadStats> createReadService() {return new ExecutorCompletionService<>(stripedReadPool);}// StripedReader的初始化void init() throws IOException {initReaders();initBufferSize();initZeroStrip();}

從剛上面的代碼可以看到,在構造的時候主要做了以下3件事情:

  • 確定最少需要從多少個Source讀取數據
    • 以RS(6,2),cell大小為1MB為例,為了重構內部物理塊,難道不都是需要至少6個內部塊嗎?答案是不一定。特殊情況發生在文件的最后一個Block Group,假如這個Block Group的大小小于等于(6 - 1) * 1M = 6M,那么就至少有一個internla block是全0的,HDFS對于這種小文件,盡管NameNode會照常分配6+2=8個物理塊,但是客戶端也只會寫有有效數據的塊,根本不會寫沒有任何有效數據的塊。這個結果我們在本文的實驗驗證部分已經驗證了。下面的StripedReader的構造方法就是確定最少需要多少個source,即需要構造多少個實際讀取數據的reader。剩下的空的 internalblock 也會構建對應的Buffer,但是數據全是0。

      根據上面的計算方式,我們以RS-6-2-1024k為例:
      如果這個BlockGroup的大小是3.5 * 1024k = 1536kB,由于一個cell大小為1024KB, 那么這個Block Group只會有 (3584 - 1)/1024 + 1 = 4個internal block,剩下兩個internal Block不會生成。

      如果這個BlockGroup的大小是8 * 1024 = 8192KB,由于一個cell大小為1024KB,那么(8192 - 1)/1024 + 1 = 8,與最多的6取最小值,因此是需要6個internal block。
      兩種情況的糾刪碼的條帶布局如下圖所示:
      在這里插入圖片描述

  • 按照source的數量,構造Reader的數組
  • 構造Reader的線程池readService

可以看到,這里是構造了一個線程池stripedReadPool,corePoolSize=0, maxPoolSize=Integet.MAX_VALUE, keepAliveTime=60s, 任務隊列是一個阻塞無界隊列SynchronousQueue的線程池。我們都知道,這個在線程池的劃分上叫做叫做緩存線程池(Cached Thread Pool,可以參考文章[《Java的5中線程池》]。SynchronousQueue一般用在無界的緩存線程池中,是一個不存儲元素的阻塞隊列,會直接將任務交給消費者,必須等隊列中的添加元素被消費后才能繼續添加新的元素。
請把這個線程池與上文提到的ErasureCodingWorker中構造的線程池stripedReconstructionPool區分開,stripedReconstructionPool中的每一個task是一個StripedBlockReconstructor,每一個task負責一個Block的reconstruction,這里的線程池stripedReadPool的每一個task則負責一個Block下的Replica的讀取。

 // 構造線程池private void initializeStripedReadThreadPool() {// Essentially, this is a cachedThreadPool.stripedReadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,60, TimeUnit.SECONDS,new SynchronousQueue<>(),new Daemon.DaemonFactory() {.....},new ThreadPoolExecutor.CallerRunsPolicy() {.......});stripedReadPool.allowCoreThreadTimeOut(true);}// 將線程池封裝為ExecutorCompletionServiceCompletionService<BlockReadStats> createReadService() {return new ExecutorCompletionService<>(stripedReadPool);}

2.5.2 StripedReader的初始化

剛剛說過,在StripedBlockReconstructor.run()中,會通過調用getStripedReader().init()來初始化Reader。
我們可以看到,StripedReader初始化主要是初始化每一個Replica的StripedBlockReader,以及初始化buffer和zeroStripe(上面講到的空的internal block,這些block沒有有效數據,decode運算的時候需要假設上面的數據是0)。
從下面的代碼可以看到,一共初始化了minRequiredSources個reader,每一個reader會負責與一個DataNode通信拉去對應的replica,所有的reader放在一個readers數組中。

  void init() throws IOException {initReaders();initBufferSize();initZeroStrip();}private void initReaders() throws IOException {// 初始化一個success list, 代表可以進行internal block讀取的DN,由于每次讀取的過程中可能發現有些DN不好用,// 因此會更新這個listsuccessList = new int[minRequiredSources];StripedBlockReader reader;int nSuccess = 0; // 我們有sources.length個reader,但是實際上我們只需要minRequiredSources個readerfor (int i = 0; i < sources.length && nSuccess < minRequiredSources; i++) {reader = createReader(i, 0); // offset都是0,因為每一個reader都是從自己負責的replica的0的位置開始讀取readers.add(reader);if (reader.getBlockReader() != null) { // 連接已經建立好了initOrVerifyChecksum(reader);successList[nSuccess++] = i; //successList[nSuccess++]存放了readers數組的索引}}}/*** 注意,idxInSources只是在liveIndices數組中的索引,而liveIndices[idxInSources]才是internal block在block group中的索引*/StripedBlockReader createReader(int idxInSources, long offsetInBlock) {return new StripedBlockReader(this, datanode,conf, liveIndices[idxInSources], // liveIndices[idxInSources] 存的是internal block在group中的indexreconstructor.getBlock(liveIndices[idxInSources]),sources[idxInSources], offsetInBlock);}

initBufferSize()的代碼如下,buffer的大小通過dfs.datanode.ec.reconstruction.stripedread.buffer.size配置,默認64KB。每次構造的時候,minRequiredSources中的每一個StripedBlockReader都會讀取bufferSize的數據,進行decode操作,然后寫入到target中。

  private void initBufferSize() {int bytesPerChecksum = checksum.getBytesPerChecksum(); // 默認一個chunk是512B,算上4B的checksum,是516B// The bufferSize is flat to divide bytesPerChecksumint readBufferSize = stripedReadBufferSize; // 默認64kb// 假如用戶配置的stripedReadBufferSize是9KB, bytesPerChecksum是4KB,那么真正的bufferSize只需要8KB就行,不需要9KBbufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :readBufferSize - readBufferSize % bytesPerChecksum;}

initZeroStrip()方法主要是為上文講的空block構造buffer。空的internal block中雖然沒有有效數據,卻需要置0來進行統一decode運算。這里不做詳細講解。

2.5.3 StripedWriter的構造

同StripedReader在邏輯上相似,StripedWriter也是封裝了一個或者多個StripedBlockWriter,每一個StripedBlockWriter負責一個target replica的寫操作。

 StripedWriter(StripedReconstructor reconstructor, DataNode datanode,Configuration conf, StripedReconstructionInfo stripedReconInfo) {......../*** 有多個writer,可以看到一個StripedWriter負責一個BlockGroup 的一個或者多個internal block的恢復* 一個StripedBlockWriter負責一個 replica的生成操作*/writers = new StripedBlockWriter[targets.length];targetIndices = new short[targets.length];//targetIndices數組中的每一個元素的值代表了這個target在Block Index中的索引Preconditions.checkArgument(targetIndices.length <= parityBlkNum,"Too much missed striped blocks.");initTargetIndices();long maxTargetLength = 0L;for (short targetIndex : targetIndices) { // 遍歷每一個internal block,獲取最大的internal block的長度maxTargetLength = Math.max(maxTargetLength,reconstructor.getBlockLen(targetIndex));}reconstructor.setMaxTargetLength(maxTargetLength);
}// 初始化targetIndices數組,targetIndices數組的每一個元素代表了需要恢復的block在Block Group中的indexprivate void initTargetIndices() {BitSet bitset = reconstructor.getLiveBitSet(); // 從StripedReconstructor構造函數可以看到,liveBitSet中存放了liveIndices中的每一個value,即每一個存活的(也包含decommissioning和maintenance_for_read,參考chooseSourceDataNodes()方法)的internal block的indexint m = 0;hasValidTargets = false;for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {if (!bitset.get(i)) { // 缺數據if (reconstructor.getBlockLen(i) > 0) { // 的確需要數據if (m < targets.length) { // 最多構造m個targettargetIndices[m++] = (short)i;hasValidTargets = true;}.......}

StripedWriter的構造方法主要做了以下幾件事情:

  1. 根據targets的數量,初始化StripedBlockWriter[]數組
  2. 設置targetIndices數組。數組中的每一個元素保存了需要進行恢復的internal block在Block Group中的索引。
  3. 設置最大的目標長度
    這里指的是最長的internal block的長度。對于一個小文件,或者一個大文件的最后一個BlockGroup,由于沒有將Block Group寫滿,不一定每一個Internal Block的大小都是滿載的128MB,很有可能這個Block Group的所有的Internal Block都很小,因此我們先計算這個Block Group的所有 internal block的最大size,只要寫滿這個size,就結束了,不用再寫了。比如下圖,一個文件8MB,最大的internal block也就2MB:

2.5.4 StripedWriter的初始化

剛剛說過,在StripedBlockReconstructor.run()中,會通過調用stripedWriter.init()來初始化writer。
StripedWriter的初始化主要是初始化了寫數據過程中的data buffer和checksum buffer。一個data buffer用來存放一個packet,一個packet是有頭部信息和多個chunk組成。這里不做贅述

  void init() throws IOException {DataChecksum checksum = reconstructor.getChecksum();checksumSize = checksum.getChecksumSize();// 4BbytesPerChecksum = checksum.getBytesPerChecksum(); // 512Bint chunkSize = bytesPerChecksum + checksumSize; // 516BmaxChunksPerPacket = Math.max( // WRITE_PACKET_SIZE是64KB,需要包含PKT_MAX_HEADER_LEN的header(WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1);int maxPacketSize = chunkSize * maxChunksPerPacket+ PacketHeader.PKT_MAX_HEADER_LEN; // 最大的packet的大小packetBuf = new byte[maxPacketSize]; // 數據的bufferint tmpLen = checksumSize *(reconstructor.getBufferSize() / bytesPerChecksum);checksumBuf = new byte[tmpLen]; // checksum的buffer的長度}

2.5.5 開始重構

上面講過,方法reconstruct()是重構的具體過程。拋開細節,整個重構是一輪一輪進行,一輪重構是指讀取一個Stripe中的source的數據(通過上面的線程池stripedReadPool并發進行,但是顯然,在進行decode之前必須等所有task成功結束),進行decode,然后發送給一個或者多個targets (注意,多個targets的發送并沒有使用線程池的方式并發進行,為什么?)

void reconstruct() throws IOException {// 在internal block中的位置依然小于最大的target internal block的長度, 意味著還需要接著讀數據while (getPositionInBlock() < getMaxTargetLength()) { // 當前的position還小于最大的internal block的長度DataNodeFaultInjector.get().stripedBlockReconstruction();long remaining = getMaxTargetLength() - getPositionInBlock();final int toReconstructLen = // 最多只能讀取buffer size大小的數據(從StripedReader的構造函數可以看到,buffer size肯定是chunk的整數倍)(int) Math.min(getStripedReader().getBufferSize(), remaining);// step1: read from minimum source DNs required for reconstruction.// The returned success list is the source DNs we do real read fromgetStripedReader().readMinimumSources(toReconstructLen);// 從遠程最多讀取toReconstructLen byte的數據// step2: decode to reconstruct targetsreconstructTargets(toReconstructLen); // 重算,構造數據// step3: transfer data// 把數據發送給targetif (stripedWriter.transferData2Targets() == 0) { // 將數據發送給 targetString error = "Transfer failed for all targets.";throw new IOException(error);}updatePositionInBlock(toReconstructLen);clearBuffers();}
讀取

讀取的邏輯很簡單,根據需要讀取的reconstructLength,并發提交minRequiredSources個任務進行讀取操作,每一個任務是一個StripedBlockReader,負責和遠程的DataNode通信以讀取一個Internal Block,每一個submit都會返回一個Future句柄,供調用者查詢結果。提交以后,開始輪詢所有的Future。
正常情況下,minRequiredSources個reader都成功,則這一輪讀取結束,所有讀取的結果都存放在各自的StripedBlockReader的緩存中,供待會兒Decode使用。
如果失敗或者超市,則會嘗試更換另外一個DataNode讀取。顯然,只有在minRequiredSources小于liveSources的時候,還有可以更換的DataNode,否則,重構失敗。

  int[] doReadMinimumSources(int reconstructLength,CorruptedBlocks corruptedBlocks)throws IOException {int nSuccess = 0;int[] newSuccess = new int[minRequiredSources];BitSet usedFlag = new BitSet(sources.length);/** Read from minimum source DNs required, the success list contains* source DNs which we think best.*/// 只需要真正的從minRequiredSources個reader中讀取數據,剩下的則直接向buffer中填充0for (int i = 0; i < minRequiredSources; i++) { // 最小的reader數量中的每一個reader都要去讀數據StripedBlockReader reader = readers.get(successList[i]);// successList[i]代表對應的需要從中拉取數據的block,readers.get將獲取這個block對應的StripedBlockReaderint toRead = getReadLength(liveIndices[successList[i]],reconstructLength);.....Callable<BlockReadStats> readCallable =reader.readFromBlock(toRead, corruptedBlocks);Future<BlockReadStats> f = readService.submit(readCallable);futures.put(f, successList[i]);.....}while (!futures.isEmpty()) { // 一直循環等待futures清空try {StripingChunkReadResult result =StripedBlockUtil.getNextCompletedStripedRead(readService, futures, stripedReadTimeoutInMills); // 每次取出一個成功的Future,即一個Replica的讀取結果,這個結果有可能是成功,有可能是超時,有可能是失敗int resultIndex = -1;if (result.state == StripingChunkReadResult.SUCCESSFUL) {resultIndex = result.index;} else if (result.state == StripingChunkReadResult.FAILED) { // 和下面的Timeout的處理策略一樣,換一個DataNode讀取(因為有可能minRequiredSources小于sources的數量,即還有剩余的DataNode可以嘗試 )StripedBlockReader failedReader = readers.get(result.index);failedReader.closeBlockReader();resultIndex = scheduleNewRead(usedFlag,reconstructLength, corruptedBlocks);} else if (result.state == StripingChunkReadResult.TIMEOUT) {resultIndex = scheduleNewRead(usedFlag,reconstructLength, corruptedBlocks); // 和上面的Failed的處理策略一樣,換一個DataNode讀取(因為有可能minRequiredSources小于sources的數量,即還有剩余的DataNode可以嘗試 )}if (resultIndex >= 0) {newSuccess[nSuccess++] = resultIndex; // 記錄這個成功的讀取,如果成功數量足夠了,則這一輪結束if (nSuccess >= minRequiredSources) { // 足夠了,剩下的不用再讀了cancelReads(futures.keySet());clearFuturesAndService();break; // 成功數量足夠了,這一輪讀取結束......}
Decode

在minRequiredSources個StripedBlockReader都完成了一輪讀取,可以進行這一輪的Decode了。這是通過方法reconstructTargets()進行的:

  private void reconstructTargets(int toReconstructLen) throws IOException {// 收集所有的StripedBlockReader剛剛讀取的數據ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);int[] erasedIndices = stripedWriter.getRealTargetIndices();ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);markBuffers(inputs); // 記錄當前的positiondecode(inputs, erasedIndices, outputs); // 在這里進行解碼,將解碼以后的數據寫入到outputs中,outputs其實就是StripedBlockWriter中的bufresetBuffers(inputs); // 還原position....getValidator().validate(inputs, erasedIndices, outputs);}......stripedWriter.updateRealTargetBuffers(toReconstructLen);}private void decode(ByteBuffer[] inputs, int[] erasedIndices,ByteBuffer[] outputs) throws IOException {long start = System.nanoTime();getDecoder().decode(inputs, erasedIndices, outputs);long end = System.nanoTime();this.getDatanode().getMetrics().incrECDecodingTime(end - start);}

可以看到,reconstructTargets的任務就是從所有的StripedBlockReader的Buffer中拿到剛剛讀取的數據,調用decode(),將數據寫入到StripedBlockWriter的Buffer中去。寫入完成,并且校驗成功,剩下的任務就交給StripedBlockWriter將自己的buffer數據發送給遠程的targets。

decode的具體算法我們不做討論,從其參數可以看到,inputs[]就是所有的StripedBlockReader讀取的數據,erasedIndices就是Decode以后生成的internal block的index,outputs就是decode生成的數據即StripedBlockWriter的buffer。可以看到,decode()方法可以在一輪生成多個丟失的targets。

傳輸到目標target

如上文所講,此時計算好的internal block的數據已經存放到了對應的index的StripedBlockWriter的buffer 中去,StripedBlockWriter會通過調用調用transferData2Target()將數據發送到對應的target node上去。
上文已經說個,一個StripedBlockWriter和一個待恢復的internal block以及存放這個internal block的target DataNode是一一對應的關系,并且在StripedBlockWriter初始化的時候,已經構建好了和這個遠程的DataNode的socket連接。
這個發送的過程根客戶端寫數據的過程是一模一樣的。從原始數據、到以chunk為單位計算checksum、到組裝成packet、到將數據通過socket發送給DataNode的過程都放在方法transferData2Target()中:

-------------------------------------------------StripedBlockWriter----------------------------------------------------void transferData2Target(byte[] packetBuf) throws IOException {// 現在數據已經存放在targetBuffer中(有可能是direct,有可能不是direct),如果targetBuffer是direct,那么計算// checksum的時候存放checksum的ByteBuffer也得是direct,如果不是direct(在heap中),那么checksum也存放在heap中if (targetBuffer.isDirect()) {ByteBuffer directCheckSumBuf =BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);stripedWriter.getChecksum().calculateChunkedSums(targetBuffer, directCheckSumBuf); // 對targetBuffer的數據做校驗,寫入directCheckSumBufdirectCheckSumBuf.get(stripedWriter.getChecksumBuf()); // directCheckSumBuf寫入到stripedWriter的buf中去BUFFER_POOL.putBuffer(directCheckSumBuf); //  歸還directCheckSumBuf到BUFFER_POOL中} else { // 如果targetBuffer不是direct,那么直接基于數組進行計算stripedWriter.getChecksum().calculateChunkedSums( // 只有在非direct(即存放在heap中)的情況下array()方法才會有返回值targetBuffer.array(), 0, targetBuffer.remaining(),stripedWriter.getChecksumBuf(), 0);}int ckOff = 0;while (targetBuffer.remaining() > 0) { // 一個packet可能裝不下這么多數據,因此會分多個packet進行發送DFSPacket packet = new DFSPacket(packetBuf,stripedWriter.getMaxChunksPerPacket(),blockOffset4Target, seqNo4Target++,stripedWriter.getChecksumSize(), false);int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket() // 單個packet中的chunk數量 * chunk的長度* stripedWriter.getBytesPerChecksum();int toWrite = targetBuffer.remaining() > maxBytesToPacket ?maxBytesToPacket : targetBuffer.remaining(); // 實際發送的數據長度(不包含checksum,checksum此時是單獨存放在checksumBuf中的)int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1)* stripedWriter.getChecksumSize();// checksum的總長度,比如,我們需要10個checkum,每個checksum是4B,那么ckLen就是40Bytepacket.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen); // 把checksum中的數據存入packet中ckOff += ckLen;//  把inBuffer中的數據寫入到Packet中去(注意并不是發送,只是寫入到Packet對應的buf中)packet.writeData(targetBuffer, toWrite); // 把存放在targetBuffer中的長度為toWrite的數據存放到packet中packet.writeTo(targetOutputStream); // 把packet中的數據發送到targetOutputStream綁定的遠程 DN中去......}
Target節點接收數據

Target接收寫入的internal block的過程和普通客戶端寫入block的過程完全相同,接收完成以后會將塊信息匯報給NameNode,然后NameNode的BlockManager會更新對應的塊信息。這個過程本文不做詳細講解。有興趣的讀者可以自行查找資料學習。

3. 結束

本文主要講解整個塊的重構過程,包括:

  • NameNode端待重構塊的生成過程(包含了各種待重構的情況),
  • NameeNode端基于生成的待重構塊進行重構工作的調度,
  • DataNode端對于重構任務的處理。由于基于副本復制的冗余策略基本上就是數據的拷貝,比較簡單,因此,本文偏向于講解基于糾刪碼的冗余策略的DataNode端的處理過程。

如果讀者對Erasure Coding的整個過程感興趣,可以參考我的另一篇文章HDFS的EC(Erasure Coding, 糾刪碼)和塊管理。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/44530.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/44530.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/44530.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

綠盟培訓入侵排查

一、webshell 排查 1、文件特征 2、windows 3、linux 4、內存馬 二、web 日志排查 1、日志排查 2、中間件報錯排查 三、服務器失陷處置

Element-UI Select組件使用value-key屬性,讓綁定值可以為一個對象

當我們使用 Elemet UI 的 Select 組件的綁定值是一個對象 :value="item" 如: <el-form-item label="選擇應用" prop="appInfo"><el-select v-model=

每日一題cf

文章目錄 Swap and Reverse題意&#xff1a;題解&#xff1a;代碼&#xff1a; Swap and Reverse 題意&#xff1a; 給定一個長度為n的正整數數組&#xff0c;給定k。可以進行任意次一下操作 選定 i i i&#xff0c;交換 a i a_{i} ai?和 a i 2 a_{i2} ai2?的值選定 i i …

Windows環境人大金倉數據庫命令常規操作

Windows環境人大金倉數據庫命令常規操作 下文將介紹人大金倉數據庫常見命令操作&#xff0c;包括具體使用命令如創建數據庫、創建用戶、授權等相關操作。 1、打開命令提示符窗口 找到數據庫安裝目錄進入server/bin目錄&#xff0c;輸入cmd,打開命令提示符窗口&#xff0c;如…

Java getSuperclass和getGenericSuperclass

1.官方API對這兩個方法的介紹 getSuperclass : 返回表示此 Class 所表示的實體&#xff08;類、接口、基本類型或 void&#xff09;的超類的 Class。如果此 Class 表示 Object 類、一個接口、一個基本類型或 void&#xff0c;則返回 null。如果此對象表示一個數組類&#xff…

探秘微信廣告設計組:一位產品體驗設計師的日常與成長

目錄 我的工位&#xff1a;靈感與回憶的匯聚地 我們的設計&#xff1a;用心定格每一個瞬間 設計的多樣性&#xff1a;從社交廣告到過年IP形象 咖啡與工作的日常&#xff1a;從抵觸到入坑 廣告設計&#xff1a;我選擇&#xff0c;我熱愛 實習生的培養&#xff1a;實踐與思…

Qt(四)事件

文章目錄 一、概念二、&#xff08;一&#xff09;&#xff08;二&#xff09;QImage類&#xff08;三&#xff09;鼠標事件和鍵盤事件1. 鼠標事件2. 鍵盤事件 &#xff08;四&#xff09;定時器事件1. 采用定時器事件2. QTimer定時器類 三、 一、概念 事件是由窗口系統或者自…

充電樁項目

1. 多對一&#xff08;多個監測設備檢測&#xff0c;數據發送給一個服務器&#xff09; 2. 原理 充電樁溫度變化引起PT100阻值變換&#xff08;測溫電流衰減微弱&#xff0c;幾乎恒定&#xff0c;電壓隨之變化)&#xff0c;經過測溫模塊轉化成電壓的變化&#xff08;內部是電流…

小程序內容管理系統設計

設計一個小程序內容管理系統&#xff08;CMS&#xff09;時&#xff0c;需要考慮以下幾個關鍵方面來確保其功能完善、用戶友好且高效&#xff1a; 1. 需求分析 目標用戶&#xff1a;明確你的目標用戶群體&#xff0c;比如企業、媒體、個人博主等&#xff0c;這將決定系統的功…

zynq啟動和程序固化流程

普通FPGA啟動 FPGA的啟動方式主要包含主動模式、被動模式和JTAG模式。 主動模式&#xff08;AS模式&#xff09; 當FPGA器件上電時&#xff0c;它作為控制器從配置器件EPCS中主動發出讀取數據信號&#xff0c;并將EPCS的數據讀入到自身中&#xff0c;實現對FPGA的編程。這種…

Mac的系統數據怎么刪除 cleanmymac會亂刪東西嗎 cleanmymac有用嗎

作為一款專業級的蘋果電腦清理軟件&#xff0c;CleanMyMac可以精準識別系統垃圾&#xff0c;有效防止Mac系統數據被誤刪。軟件可以深入系統底層&#xff0c;清理無用的系統數據&#xff0c;優化蘋果電腦設置&#xff0c;提升Mac系統性能。有關Mac的系統數據可以刪嗎&#xff0c…

javascript如何定義數組和從數組取值,獲取數組長度

javascript如何定義數組 javascript定義數組的格式是 var 數組名[數組元素]或者 let 數組名[數組元素] javascript數組和python的列表很相似&#xff0c;真要懶的話&#xff0c;不用定義數據類型&#xff0c;不像c語言和java那樣限制數據類型。 定義數組示例代碼 <bod…

Unity Addressable魔改

新增回調 在使用過程中&#xff0c;輸出之后還需要手動拷貝到服務器上會麻煩&#xff0c;一旦未拷貝編輯器還會因為加載&#xff08;同步加載&#xff09;的問題卡死。所以可以到Unity的PacakgeCache中修改本地倉庫中的Addressable對應版本的包。找不到位置可以用everything搜…

計算機視覺之ResNet50圖像分類

前言 圖像分類是計算機視覺應用中最基礎的一種&#xff0c;屬于有監督學習類別。它的任務是給定一張圖像&#xff0c;判斷圖像所屬的類別&#xff0c;比如貓、狗、飛機、汽車等等。本章將介紹使用ResNet50網絡對CIFAR-10數據集進行分類。 ResNet網絡介紹 ResNet50網絡是由微…

Nature Protocols:整合多組學并進行因果推理的系統框架

轉載自&#xff1a;MetaAI 在生物學研究中&#xff0c;隨著實驗和計算技術的進步&#xff0c;生物系統研究產生了大量高通量數據。技術努力主要集中在提高吞吐量、降低成本和提升實驗與計算效率。因此&#xff0c;整合不同類型組學數據&#xff0c;并通過關聯分析識別關鍵因素…

linux 內核 紅黑樹接口說明

紅黑樹(rbtree)在linux內核中使用非常廣泛,cfs調度任務管理&#xff0c;vma管理等。本文不會涉及關于紅黑樹插入和刪除時的各種case的詳細描述,感興趣的讀者可以查閱其他資料。本文主要聚焦于linux內核中經典rbtree和augment-rbtree操作接口的說明。 1、基本概念 二叉樹:每個…

基于主成分分析PCA的一維時間序列信號降噪方法(Python)

主成分分析PCA是面向模式分類的特征提取最典型的工具&#xff0c;是滿足上述映射準則的一種數據壓縮的方法。作為經典的特征提取方法&#xff0c;是在不減少原始數據所包含的內在信息前提下&#xff0c;將原始數據集轉化為由維數較少的“有效”特征成分來表示&#xff0c;使其在…

GD32F303之CAN通信

1、CAN時鐘 GD32F303主時鐘頻率最大是120Mhz,然后APB1時鐘最大是60Mhz,APB2時鐘最大是120Mhz,CAN掛載在APB1總線上面 所以一般CAN的時鐘頻率是60Mhz,這個頻率和后面配置波特率有關 2、GD32F303時鐘配置 首先我們知道芯片有幾個時鐘 HXTAL&#xff1a;高速外部時鐘&#xff1…

用理解與包容對待阿斯伯格綜合征患者

在我們的社會中&#xff0c;存在著這樣一個特殊的群體——阿斯伯格綜合征患者。他們在社交互動、溝通交流和行為模式上有著獨特的表現&#xff0c;需要我們以正確的方式去理解和對待。 我們要認識到阿斯伯格綜合征是一種神經發育障礙&#xff0c;而非個人的選擇或過錯。患者可能…

AI Earth——中國城市綠地對大氣污染干沉降作用估計數據集(DDEP)應用APP

基于數學模型量化植被的干沉降過程,突破傳統站點尺度研究的局限性,結合多源衛星遙感產品形成2000-2020年中國城市綠地對PM2.5和PM10的干沉降量估計柵格數據集,對城市大氣污染防治、綠地區域規劃和城市可持續發展有重要意義. 應用結果 代碼 #導入安裝包 import os import …