HDFS Full Block Report超限導致性能下降的原因分析

文章目錄

  • 前言
  • 發現問題
  • 失敗的為什么是FBR
  • 塊匯報頻率的變化
  • 為什么FBR會反復失敗
  • HDFS性能下降導致Yarn負載變高的形式化分析
    • 理解線程
    • 理解IO Wait
    • 理解HDFS性能下降導致Yarn負載和使用率增高
  • 引用

前言

我們的Yarn Cluster主要用來運行一批由Airflow定時調度的Spark Job,這些Spark Job。。這些Job根據其處理的時間粒度的不同(Batch Duration)和業務邏輯(Business Type)的不同而被打上不同的Prometheus標簽,然后我們構建了對應的Grafana Dashboard來監控各個Job處理的數據量以及運行時間的變化。
從某個時刻開始,我們的Yarn集群開始出現擁塞,但是業務層面其實沒有發生太大變化。在troubleshooting的過程中,我們逐漸找到了問題的元兇HDFS, 并在root cause層面講問題解決。

本文詳細介紹了我們從發現問題、找到懷疑對象、并最終解決問題以及事后對問題進行分析的全過程。

發現問題

大概從28/Apr下午開始,我們發現所有的批處理任務的運行時間逐漸變長,有些批處理任務的運行時間變成了之前的5到10倍,因此,我們開始分析原因。由于與此同時,我們觀察到對應的批處理任務的數據輸入量整體也有所增加(這是業務變化引起的,因此是預期之內的),如下圖所示,大概增加了50%左右:
在這里插入圖片描述

我們查看整個Yarn集群的狀態,我們發現集群的負載在對應的時間范圍內也逐漸升高,正在運行的Container數量逐漸增加直到Yarn集群持續滿負荷運行:
在這里插入圖片描述

同時,整個集群的可用Memory和CPU的可用資源和Pending Container如下所示:
在這里插入圖片描述
在這里插入圖片描述

從上圖可以看到,在數據量增加以前,我們Yarn集群的內存和CPU資源使用率以CPU為主,平均使用率也是35%左右,所以,如果系統運行正常,即使所有Spark Job的輸入數據量增加50%,Yarn集群的內存和CPU資源也是完全夠用的。
由于找不到特別明顯的原因,我們一度懷疑是由于Yarn集群的不合理設置導致在集群的資源沒有用完的情況下Yarn的資源分配已經出現了瓶頸。
但是,兩個現象讓我們的這種懷疑變得不特別Solid:

  1. 數據量增加的比例和我們的Spark Job運行時間的比例不成正比
  2. Spark Job運行時間的Peak時間點和輸入數據量的Peak時間點不吻合,即,當數據量開始逐漸下降的時候,Spark Job的運行時間依然在接下來的1個小時逐漸攀升。
  3. 當我們最初認為真的只是Yarn的資源不足的時候,作為一種讓線上系統快速恢復的手段,我們首先嘗試停止提交一些優先級和重要性不高的Job。但是,我們發現,問題完全無法解決。

經過更加深入的問題分析,我們最后發現,整個Yarn的資源使用率的降低是由于HDFS的性能降低引起的,因為無論是Yarn本身,還是Yarn上運行的Spark Job(從HDFS上讀取數據然后寫入到ClickHouse)都需要依賴HDFS,并且隨后發現,HDFS的性能急劇下降是由于DataNode的塊匯報失敗造成的鎖爭用。最終,我們在root cause層面解決問題。

在DataNode端,我們打印了對應堆棧,發現堆棧信息如下所示。
首先,我們看到有很多線程如下所示:

"DataXceiver for client DFSClient_NONMAPREDUCE_-1403695307_28 at /0.21.1.113:36094 [Receiving block BP-862625392-0.21.7.109-1687238140050:blk_3132608831_2058910583]" #460048
344 daemon prio=5 os_prio=0 tid=0x00007fbe86b2a800 nid=0x569 waiting for monitor entry [0x00007fbeddba6000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1377)- waiting to lock <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.<init>(BlockReceiver.java:199)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:675)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:169)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:106)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:246)at java.lang.Thread.run(Thread.java:748)

可以看到,這是多個DataXceiver的writeBlock()方法嘗試寫一個塊的時候,需要構造一個BlockReceiver對象,但是構造BlockReceiver對象的時候,需要調用一個Synchronized 的 實例方法FsDatasetImpl.createRbw(), 該方法需要獲取FsDatasetImpl對象的監視器鎖,因此持續在一個FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0上等待:

BlockReceiver(final ExtendedBlock block, final StorageType storageType,final boolean pinning) throws IOException {try{this.block = block;.....// Open local disk out//if (isDatanode) { //replication or movereplicaHandler = datanode.data.createTemporary(storageType, block);} else {switch (stage) {case PIPELINE_SETUP_CREATE:replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); // 這是一個Synchronized方法
  @Override // FsDatasetSpipublic synchronized ReplicaHandler createRbw( // 這是一個實例的同步方法StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)throws IOException {ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),b.getBlockId());if (replicaInfo != null) {

同時,我們也看到有很多線程如下所示:

"DataXceiver for client DFSClient_NONMAPREDUCE_-940941604_93 at /1.12.1.240:40478 [Sending block BP-862625392-0.21.7.109-1687238140050:blk_3132534170_2058835614]" #460048343daemon prio=5 os_prio=0 tid=0x00007fbda42c1800 nid=0x567 waiting for monitor entry [0x00007fbeddca7000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:240)- waiting to lock <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:537)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:148)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:103)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:246)at java.lang.Thread.run(Thread.java:748)
BlockSender(ExtendedBlock block, long startOffset, long length,boolean corruptChecksumOk, boolean verifyChecksum,boolean sendChecksum, DataNode datanode, String clientTraceFmt,CachingStrategy cachingStrategy)throws IOException {try {this.block = block;....synchronized(datanode.data) {  // 嘗試獲取FsDatasetImpl的對象監視器鎖replica = getReplica(block, datanode);replicaVisibleLength = replica.getVisibleLength();if (replica instanceof FinalizedReplica) {// Load last checksum in case the replica is being written// concurrentlyfinal FinalizedReplica frep = (FinalizedReplica) replica;chunkChecksum = frep.getLastChecksumAndDataLen();}}

可以看到,讀請求也在同一把鎖上Block住了:DataXceiver在處理對一個Block的讀取請求的時候,需要構造一個BlockSender對象,但是構造BlockSender對象的時候,有一段synchronized代碼塊,該同步代碼塊需要的鎖也是DatasetImpl對象的監視器鎖,因此持續在一個FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0上等待,如上述堆棧所示。

因此,我們需要看一下哪個線程已經獲取了FSDatasetImpl對象的監視器鎖呢?我們搜索 0x0000000438637dd0這個鎖id,看到以下線程已經獲取了對應的監視器鎖:

"DataNode: [[[DISK]file:/corp/data/nvme1/dfs/dn/, [DISK]file:/corp/data/nvme2/dfs/dn/, [DISK]file:/corp/data/nvme3/dfs/dn/, [DISK]file:/corp/data/nvme4/dfs/dn/]]
heartbeating to rccp407-24a.iad6.prod.corp.com/0.21.7.109:8022" #176 daemon prio=5 os_prio=0 tid=0x00007fbf262eb000 nid=0x1ffa runnable [0x00007fbeee5a2000]java.lang.Thread.State: RUNNABLEat com.google.protobuf.CodedOutputStream.writeRawVarint64(CodedOutputStream.java:1039)at org.apache.hadoop.hdfs.protocol.BlockListAsLongs$Builder.add(BlockListAsLongs.java:242)at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.getBlockReports(FsDatasetImpl.java:1768)- locked <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.blockReport(BPServiceActor.java:287)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:561)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:695)at java.lang.Thread.run(Thread.java:748)

很明顯,是方法BPServiceActor.blockReport()進行塊匯報,已經獲得了FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0。后來自己看代碼才知道,blockReport()是用來完成Full Block Report的方法。
但是,全量塊匯報每6個小時執行一次,并且應該在很短的時間內完成才對,那么是不是有可能我們打印堆棧的時機比對,即,我們打印堆棧的時候剛好就遇到了這個全量塊匯報呢?于是,過了15分鐘我們再次打印堆棧,發現堆棧狀態依然如上所示。這說明:全量塊匯報狀態不正常

隨后,我們開始查看HDFS的相關日志。

DataNode端BlockReport失敗的有關日志如下所示:

2025-03-28 22:38:09,622 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x4bc7fea3d6c94cf6,  containing 4 storage report(s), of which we sent 0. The reports had 20093201 total blocks and used 0 RPC(s). This took 1212 msec to generate and 43 msecs for RPC and NN processing. Got back no commands.
2025-03-28 22:38:09,622 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in offerService
java.io.EOFException: End of File Exception between local host is: "rccp401-25a.iad6.prod.corp.com/0.21.1.113"; destination host is: "rccp408-24a.iad6.prod.corp.com":8022; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFExceptionat sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)at org.apache.hadoop.ipc.Client.call(Client.java:1508)at org.apache.hadoop.ipc.Client.call(Client.java:1441)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)at com.sun.proxy.$Proxy20.blockReport(Unknown Source)at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.blockReport(DatanodeProtocolClientSideTranslatorPB.java:204)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.blockReport(BPServiceActor.java:323)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:561)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:695)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFExceptionat java.io.DataInputStream.readInt(DataInputStream.java:392)at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1113)at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1006)

從上面的堆棧可以看到,DataNode發送了FBR的RPC請求給NameNode,但是在試圖獲取響應的時候發現連接斷開了。

然后,我們開始檢查NameNode的異常日志,并找出了如下對應異常:

2025-03-28 21:36:31,432 WARN org.apache.hadoop.ipc.Server: Requested data length 71428864 is longer than maximum configured RPC length 67108864.  RPC came from 0.21.3.113
2025-03-28 21:36:31,432 INFO org.apache.hadoop.ipc.Server: Socket Reader #1 for port 8022: readAndProcess from client 0.21.3.113 threw exception [java.io.IOException: Requested data length 71428864 is longer than maximum configured RPC length 67108864.  RPC came from 0.21.3.113]
java.io.IOException: Requested data length 71428864 is longer than maximum configured RPC length 67108864.  RPC came from 0.21.3.113at org.apache.hadoop.ipc.Server$Connection.checkDataLength(Server.java:1601)at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1663)at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:887)at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:751)at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:722)

從這個異常可以看到,這個異常發生在IPC連接的處理層,即,這個異常根本還未到達上層的業務層,即業務層方法NameNodeRpcServer.blockReport()方法根本就沒有調用。

失敗的為什么是FBR

事后我們看到(下文會詳細介紹),DataNode和NameNode之間的間歇性通信包括了三種類型:

  1. 心跳(Heartbeat):這是通過方法BPServiceActor.heartbeat()來完成的,我們從下文的sendHeartbeat()方法的實現可以很容易地看到,心跳信息主要用來告知NameNode自己的Storage元信息以及各種狀態信息(xceiver count, cache等)。心跳不攜帶任何與塊和塊的數量等信息:

      HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)throws IOException {scheduler.scheduleNextHeartbeat();StorageReport[] reports =dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());VolumeFailureSummary volumeFailureSummary = dn.getFSDataset().getVolumeFailureSummary();int numFailedVolumes = volumeFailureSummary != null ?volumeFailureSummary.getFailedStorageLocations().length : 0;return bpNamenode.sendHeartbeat(bpRegistration,reports, // 這個DataNode的每一個 Storage上的Storage元信息dn.getFSDataset().getCacheCapacity(),dn.getFSDataset().getCacheUsed(),dn.getXmitsInProgress(),dn.getXceiverCount(), // xceiver信息,NameNode會依據該信息判斷DataNode的繁忙程度,進而影響NameNode的塊調度策略numFailedVolumes,volumeFailureSummary,requestBlockReportLease);}

    每一個Storage的原信息封裝在StorageReport對象中,如下所示,這里不贅述。

    public class StorageReport {private final DatanodeStorage storage;private final boolean failed;private final long capacity;private final long dfsUsed;private final long nonDfsUsed;private final long remaining;private final long blockPoolUsed;
    

    NameNode收到heartbeat以后,會更新自己所維護的DataNode的各種信息,這是通過DataNodeDescriptor的成員方法updateHeartbeatState()完成的:

        /*** process datanode heartbeat or stats initialization.*/public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,long cacheUsed, int xceiverCount, int volFailures,VolumeFailureSummary volumeFailureSummary) {.....setCacheCapacity(cacheCapacity);setCacheUsed(cacheUsed);setXceiverCount(xceiverCount);setLastUpdate(Time.now());    this.volumeFailures = volFailures;this.volumeFailureSummary = volumeFailureSummary;for (StorageReport report : reports) {DatanodeStorageInfo storage = updateStorage(report.getStorage());if (checkFailedStorages) {failedStorageInfos.remove(storage);}storage.receivedHeartbeat(report);totalCapacity += report.getCapacity();totalRemaining += report.getRemaining();totalBlockPoolUsed += report.getBlockPoolUsed();totalDfsUsed += report.getDfsUsed();totalNonDfsUsed += report.getNonDfsUsed();}rollBlocksScheduled(getLastUpdate());// Update total metrics for the node.setCapacity(totalCapacity);setRemaining(totalRemaining);setBlockPoolUsed(totalBlockPoolUsed);setDfsUsed(totalDfsUsed);setNonDfsUsed(totalNonDfsUsed);.....}```同時,DataNodeNameNode領取的各種Command也不是NameNode直接發送給DataNode的,而是放在DataNode的heartbeat的response中的。所以,DataNode是需要處理heatbeat的響應的。
  2. 增量塊匯報(Incremental Block Report):這是通過方法IncrementalBlockReportManager.sendIBRs()方法完成的,主要用來匯報在過去的短暫時間內最新增加的和刪除的Block的信息。增量的塊信息是通過方法generateIBRs()收集的:

      private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {final List<StorageReceivedDeletedBlocks> reports= new ArrayList<>(pendingIBRs.size());for (Map.Entry<DatanodeStorage, PerStorageIBR> entry: pendingIBRs.entrySet()) {final PerStorageIBR perStorage = entry.getValue();// Send newly-received and deleted blockids to namenodefinal ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();if (rdbi != null) {reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));}}readyToSend = false;return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);}

    與heartbeat不同,增量塊匯報并無返回值,即這是單向的RPC調用

  3. 全量塊匯報(Full Block Report):這是通過BPServiceActor.blockReport()來進行的,用來對DataNode端的全部塊信息進行匯報,其中,在進行全量塊匯報的時候,也會在內部先觸發一次增量塊匯報。這里不贅述,感興趣的讀者可以自行閱讀代碼。

我們從上面DataNode端的異常堆棧可以看到,異常的發生是在方法BPServiceActor.blockReport()中,即發生在Full Block Report的過程中。這個是合理的:

  1. RPC Size隨著塊的數量的增加而增加,只有可能是Full Block Report。因為Incremental Block Report只會涉及到過去很短時間內的變化的塊信息,根本不會帶來超過64MB的RPC Size。
  2. 而且,根據HDFS的設計,無論是Heartbeat還是增量塊匯報,一旦發生問題,系統將會發生寫錯誤。只有Full Block Report發生問題的時候不會帶來直接的系統功能異常,而僅僅會造成我們當前遇到的寫性能的降低。比如:
    • 如果心跳長期丟失,主要會導致NameNode將這個DataNode判別為Dead DataNode并開始對應塊的復制操作以保證副本數量。對heartbeat的檢查,發生在一個專門的異步線程HeartbeatManager.Monitor類中,這個HeartbeatManager.Monitor類是一個Runnable
    • 由于heartbeat不夠及時,NameNode無法獲取實時的、準確的DataNode的Storage信息和狀態(xceiver, cache)信息
    • 同時,心跳本身攜帶了DataNode發送給NameNode的一些Command,這些Command定義在DatanodeCommand類的實現類中,比如,BlockRecoveryCommand, RegisterCommand等等;
    • DataNode無法收到NameNode通過heartbeat的response發送給DataNode的各種Command;
    • 如果IBR長期丟失,那么寫一個Block的操作是無法完成的,因為寫到DataNode的塊是需要由DataNode向NameNode進行匯報才會最終被finalize的。這里不再贅述。

塊匯報頻率的變化

由于我們的dfs.datanode.block.report.intervalMsec設置為21600000,即每6個小時進行一次FBR(每天4次FBR)調度,所有,在事故沒有發生的時候,我們觀察到DataNode的FBR頻率正常,每臺DataNode每天會進行8次針對某臺NameNode的FBR(想象為啥是8次不是4次?因為FBR會同時發送給Active和StandBy NameNode),同時,對應的IBR的頻率大概為200K/10min,而對應的heartbeat的頻率大概為400次/10min,如下圖所示:

在這里插入圖片描述
但是,在事故發生的過程中,FBR的頻率發生了陡增,幾乎到了平均20K/天的頻率,而IBR和heartbeat的頻率則對應下降。如下圖所示:
在這里插入圖片描述

我們還需要準確理解這個Dashboard的含義,即這里的塊匯報次數,是否包含失敗的塊匯報次數? 即,這個急劇增加的FBR次數,是否都是成功的FBR呢?
我們查看blockReport()代碼可以看到,無論FBR成功和失敗,都是增加該Counter值:

List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();......try {if (totalBlockCount < dnConf.blockReportSplitThreshold) {.......success = true;} finally {.......dn.getMetrics().addBlockReport(brSendCost);// 增加fbr統計值  }

為什么FBR會反復失敗

綜上,我們從Metrics上看到FBR的頻率陡增,應該是失敗以后的重試。我們需要在代碼層面看一下FBR的重試邏輯。

在DataNode端,進行FBR、IBR和heartbeat發生在方法BPServiceActor::offerService()中,每一個BPServiceActor對象會綁定到一個NameNode節點上,因此,在我們Active/Standby架構下,會有兩個BPServiceActor對象。每一個BPServiceActor對象都是一個Runnable,即一個獨立線程,用來和遠程的NameNode進行通信:


/*** A thread per active or standby namenode to perform:* <ul>* <li> Pre-registration handshake with namenode</li>* <li> Registration with namenode</li>* <li> Send periodic heartbeats to the namenode</li>* <li> Handle commands received from the namenode</li>* </ul>*/
@InterfaceAudience.Private
class BPServiceActor implements Runnable {

通信的內容包括向NameNode的注冊(DataNode啟動的時候),重新注冊(NameNode在發現DataNode失去聯系的時候會向DataNode發送DatanodeProtocol.DNA_REGISTER命令以要求重新注冊),發送心跳,IBR和FBR,以及處理來自NameNode的命令。

/*** No matter what kind of exception we get, keep retrying to offerService().* That's the loop that connects to the NameNode and provides basic DataNode* functionality.** Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can* happen either at shutdown or due to refreshNamenodes.*/@Overridepublic void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storageconnectToNNAndHandshake(); // 和NN通信break;} catch (IOException ioe) {......}}runningState = RunningState.RUNNING;while (shouldRun()) { // 只要系統沒有停止服務,就不斷循環try {offerService();} catch (Exception ex) {.... // 即使offerService()拋出異常,sleep一會兒也會繼續運行sleepAndLogInterrupts(5000, "offering service");}}runningState = RunningState.EXITED;} catch (Throwable ex) {.....} }

可以看到,BPServiceActor的run()方法就是在首次啟動并連接到NameNode以后,開始保證offerService()發生異常以后繼續運行。而offerService()方法內部會不斷循環以向NameNode發送心跳,或者進行IBR,或者進行FBR:

  /*** Main loop for each BP thread. Run until shutdown,* forever calling remote NameNode functions.*/private void offerService() throws Exception {.....while (shouldRun()) {try {final long startTime = scheduler.monotonicNow();//// Every so often, send heartbeat or block-report//final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);HeartbeatResponse resp = null;if (sendHeartbeat) {boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);if (!dn.areHeartbeatsDisabledForTests()) {resp = sendHeartBeat(requestBlockReportLease);assert resp != null;if (resp.getFullBlockReportLeaseId() != 0) {.....fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}.....}}if (ibrManager.sendImmediately() || sendHeartbeat) {ibrManager.sendIBRs(bpNamenode, bpRegistration,bpos.getBlockPoolId(), dn.getMetrics());}List<DatanodeCommand> cmds = null;boolean forceFullBr =scheduler.forceFullBlockReport.getAndSet(false);......if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId); // 在這里拋出異常fullBlockReportLeaseId = 0;}.....} catch(RemoteException re) {....LOG.warn("RemoteException in offerService", re);sleepAfterException();} catch (IOException e) {LOG.warn("IOException in offerService", e);sleepAfterException();}processQueueMessages();} // while (shouldRun())} // offerService
  • 循環持續運行,只要 DataNode 應該繼續服務(即還沒有關閉)

    while (shouldRun()) {try {final long startTime = scheduler.monotonicNow();
  • 如果當前時間已到心跳時間,則向 NameNode 發送心跳信息:

        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);HeartbeatResponse resp = null;if (sendHeartbeat) {boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);if (!dn.areHeartbeatsDisabledForTests()) {resp = sendHeartBeat(requestBlockReportLease);...}}
    

    每個心跳會攜帶節點狀態(名稱、端口、容量等)。如果塊報告也到期,心跳中還會請求塊報告的租約(lease)。
    返回的 HeartbeatResponse 包含后續指令。

  • 處理 NameNode 返回的狀態,包括:更新租約 ID,檢查是否為 Active NameNode,執行來自 NameNode 的命令,更新高可用狀態(HA state):

        if (resp.getFullBlockReportLeaseId() != 0) {...fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}dn.getMetrics().addHeartbeat(...);bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState());state = resp.getNameNodeHaState().getState();if (state == HAServiceState.ACTIVE) {handleRollingUpgradeStatus(resp);}if (!processCommand(resp.getCommands()))continue;
  • 按需要進行增量塊匯報,即如果到達了增量塊匯報的時間,或者盡管沒到,但是剛剛進行了heartbeat的發送,那么就進行一次增量塊匯報:

    if (ibrManager.sendImmediately() || sendHeartbeat) {ibrManager.sendIBRs(bpNamenode, bpRegistration,bpos.getBlockPoolId(), dn.getMetrics());
    }
  • 如果收到了NameNode端批準的全量塊匯報租約,或者,收到了強制進行塊匯報的請求,那么就進行全量塊匯報:

        boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false);if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0; // 將fullBlockReportLeaseId取消置位,等待下一次的FBR時機}processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
    
  • 其它的處理不再贅述。

從上述代碼我們看到,在正常情況下:

  1. 什么時候會嘗試去請求一個blockReport的租約:
  • heartbeat只會在blockReport的時間到了(6h),并且目前還沒有獲取租約,才會嘗試向NameNode去請求一個blockReport的租約:

        // 只有當blockReport的時機到了(距離上一次blockReport過去6h,并且當前還沒有獲得租約)boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);boolean isBlockReportDue(long curTime) {return nextBlockReportTime - curTime <= 0;}
    

    由于是的關系,因此,正常情況下,最短6小時才會嘗試進行一次FBR租約的請求。

  • 如果response中獲取到了租約,則將租約存下來:

    if (resp.getFullBlockReportLeaseId() != 0) {fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}
    
  • 一般情況下(不考慮用戶強制FBR的情況),如果有了租約,則立刻進行FBR,FBR完成以后重新將租約清空避免反復FBR:

            if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0;}
    

我們乍一看上述代碼,看到的FBR重試邏輯,如果FBR失敗了,后面會反復高頻重試嗎?
我們看一下blockReport()方法中關于下次調度時機的設置:

/*** Report the list blocks to the Namenode* @return DatanodeCommands returned by the NN. May be null.* @throws IOException*/List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {.....try {... // 發送blockReport請求success = true;} finally {....scheduler.scheduleNextBlockReport(); // 設置下一次進行fbr的時間return cmds.size() == 0 ? null : cmds;}void scheduleNextBlockReport() {// If we have sent the first set of block reports, then wait a random// time before we start the periodic block reports.if (resetBlockReportTime) {nextBlockReportTime = monotonicNow() +DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));resetBlockReportTime = false;} else {nextBlockReportTime +=(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /blockReportIntervalMs)) * blockReportIntervalMs;}}

可以看到,即使blockRepor太失敗了,scheduler.scheduleNextBlockReport()也會正常被調用,并且,我們進一步查看scheduleNextBlockReport()方法細節,下一次fbr的時間也與這次的fbr成功與否無關。所以,再結合offerService()方法中的下面代碼:

          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);  

的條件判斷,不應該發生FBR反復失敗并反復調用的情況,預期的邏輯是,如果這次FBR失敗了,下次重試也是在6個小時以后了

我想了很久,才看到了其中的奧秘:

        if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0; // 如果blockReport拋出異常,這行代碼不會執行}

可以看到,如果blockReport(fullBlockReportLeaseId)拋出異常,那么fullBlockReportLeaseId不會被清空,而如果fullBlockReportLeaseId沒有被清空,blockReport(fullBlockReportLeaseId)就會被反復執行,根本不再依賴調度時機,也不再依賴向NameNode發送新的租約請求了。

HDFS性能下降導致Yarn負載變高的形式化分析

由此我們可以看到,Yarn的資源使用率變高(負載變高),居然有可能是存儲系統的性能下降導致的。

我們可以類比操作系統層面我們經常遇到的問題,即我們在操作系統性能診斷的時候經常遇到的,IO Wait的大量增加,居然會導致CPU Load Average增加的例子,與此同時CPU Usage卻很低。

理解線程

我們先看一下常見的操作系統層面的線程狀態:

狀態名稱含義常見場景說明
RRunning正在運行或在就緒隊列中等待 CPUCPU 正在執行線程,或線程在搶占 CPU
SSleeping可中斷的休眠狀態等待信號、網絡事件、定時器,正常休眠狀態
DUninterruptible Sleep不可中斷的休眠狀態(主要是 I/O 等待等磁盤、NFS、網絡存儲、驅動響應;kill -9 無效
ZZombie僵尸進程子進程退出但父進程未回收,常見于編程錯誤
TStopped被暫停手動 kill -STOP,或 Ctrl+Z 暫停終端程序
IIdle (內核專用)內核線程空閑狀態Linux kernel 的空閑線程,普通用戶進程不會出現該狀態
XDead死亡狀態(極罕見)已徹底結束的線程,瞬間即逝

對于操作系統層面的線程,我們需要關鍵區分的問題是:

  1. R狀態的線程:不一定正在運行,也有可能處于就緒隊列中等待運行;
  2. S狀態的線程:當我們調用sleep方法的時候,當前線程進入休眠,這時候會主動讓出CPU資源,直到被喚醒并重新執行
  3. D狀態的線程: 當我們執行IO操作的時候,如果是等待磁盤讀寫完成,則進入到Uninterruptible Sleep狀態,這時候是無法通過kill -9進行中斷的。這時候線程也會讓出CPU資源。
    • CPU 是靠操作系統的調度器來管理的。當某個線程進入 I/O wait(D 狀態),OS 就會把它從“可運行隊列”中移除。調度器會立刻選擇另一個 R 狀態(就緒狀態)的線程(如果有的話),分配給 CPU 執行。

對于Java線程,其線程狀態(Thread.State)和操作系統層面的狀態(比如 Linux 的 R, S, D, 等)不是一一對應。Java線程狀態如下:

Java 狀態含義
NEW線程創建了但還沒啟動
RUNNABLE正在運行或等待操作系統分配 CPU(對應 Linux 的 R)
BLOCKED等待獲取某個鎖(synchronized
WAITING進入無限等待(wait()join()LockSupport.park()
TIMED_WAITING等待有限時間(如 sleep()wait(timeout)
TERMINATED執行完畢,線程結束

最關鍵的,Java 中的 BLOCKED 是線程在嘗試進入一個 synchronized 塊或方法,但別的線程已經拿了鎖,所以它阻塞在鎖入口處,還沒進去。 在這個階段,Java 線程不是主動休眠,而是卡在等待互斥鎖的獲取。

通常會被操作系統標記為 S(Sleeping)狀態 或者 R(Running)狀態下阻塞住的),取決于實現和調度時機**。

Java 狀態含義
BLOCKED等待進入 synchronized 代碼塊,別的線程持有鎖,線程的該狀態中的操作系統線程狀態依賴于操作系統實現,有可能是S,有可能是R
WAITING調用了 wait()join()LockSupport.park(),主動休眠,對應操作系統層面應該是S
TIMED_WAITINGWAITING 類似,但帶有超時時間(如 sleep()wait(timeout)),對應操作系統層面也應該是S

理解IO Wait

操作系統層面,一個線程一旦 I/O 完成,中斷觸發,操作系統會把原線程從D狀態恢復為R狀態,等待下一輪調度。

  1. CPU Load Average 的準確定義

    • Load Average 表示單位時間內系統處于 可運行狀態(Runnable, R狀態) 或 不可中斷睡眠狀態(Uninterruptible Sleep,即 D 狀態) 的進程數的平均值。

    • 例如,1分鐘負載 2.0 表示過去1分鐘內平均有2個進程占用或等待CPU資源。

    • 關鍵點:Load Average 不僅包括正在使用CPU的進程,還包括就緒隊列中的進程(等待CPU調度),或者因為IO阻塞處于 D 狀態的進程(如等待磁盤響應的進程)。

  2. 為什么高 IO Wait 會導致CPU Load Average 升高?

    • 當磁盤性能下降時,進程發起IO請求后會被阻塞,進入 D 狀態(Uninterruptible Sleep)。 D 狀態的進程無法被中斷,直到IO完成。 這些D狀態的進程會被計入Load Average(因為它們是“等待系統資源的活動進程”)。
    • 如果大量進程因IO阻塞,Load Average 會顯著上升,但是其實它們并未真正消耗CPU。
  3. 為什么IO Wait高的時候,CPU 使用率(Usage)不一定高?

    • 因為CPU 使用率統計的是CPU 實際執行代碼的時間比例。當進程因IO阻塞時,它們處于D狀態,不占用CPU(CPU可能空閑或執行其他少數任務)。甚至,如果所有進程都在等IO,CPU可能因無任務可做而使用率為0。
    • 因此,高IO Wait時:
      • Load Average 高:大量進程阻塞在D狀態(被計入負載)
      • CPU Usage 低:CPU本身沒有太多任務需要處理
  4. 類比解釋
    想象一個銀行柜臺(CPU)和排隊的人(進程):

    • 正常情況:人們快速辦理業務(IO快),隊列很短(Load低);
    • 磁盤慢時:柜臺處理速度不變,但每個人辦理業務時被卡在簽字環節(IO阻塞),隊伍越來越長(Load高),但柜臺可能空閑(CPU Usage低);

所以

  • CPU Load Average 反映的是系統資源的整體壓力(CPU+IO等),而不僅是CPU使用率。D 狀態的進程是負載升高的“元兇”:它們被計入負載但影響CPU使用率。
  • 高 Load + 低 CPU Usage 是典型的IO瓶頸特征(如磁盤慢、網絡存儲延遲等)。

理解這一點后,我們就知道為什么優化磁盤(如換SSD、調整文件系統)或減少IO密集型任務可以降低負載,即使CPU本身并不忙。

理解HDFS性能下降導致Yarn負載和使用率增高

這時候我們往往有一個問題,就是在操作系統層面如果IO Wait增加,那么伴隨著CPU Load Average的增加和CPU Usage的降低,但是,在HDFS性能下降的時候,Yarn的Waiting Application的確增加了,但是為什么Yarn的Memory和CPU的Usage也增加了呢?
這個也很好理解,因為Yarn上Memory和CPU的使用率是假的使用率。因為Yarn對Application的Memory Usage和CPU Usage的統計是在申請時確定的,而并不真正關心Application實際上占用多少內存或者占用了多少CPU。比如,一個Spark Batch Job申請了100GB 內存和 40個vCore,這個Spark job運行期間即使在進行sleep(Integer.MAX_VALUE), Yarn這邊也會認為這個Spark Batch Application占用了100GB 內存和40 vCore的CPU.

引用

HDFS的塊匯報和塊放置策略–從一次HDFS寫文件故障開始

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/76917.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/76917.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/76917.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Kubernetes基礎--Pod深入理解】--查閱筆記2

深入理解Pod 為什么要有個Pod1. 容器協作與資源共享2. 簡化調度和資源管理3. 設計模式支持 Pod 基本用法Pod 容器共享 VolumePod 的配置管理ConfigMap 概述創建 ConfigMap 資源對象在 Pod 中使用 ConfigMap使用 ConfigMap 的限制條件 為什么要有個Pod Pod 的引入并非技術冗余&…

Margin和Padding在WPF和CSS中的不同

CSS和WPF中 margin 與 padding 在方向上的規定基本一致&#xff0c;但在使用場景和一些細節上有所不同。 CSS - 方向規定&#xff1a; margin 和 padding 屬性可以分別指定上、右、下、左四個方向的值。例如 margin:10px 20px 30px 40px; 表示上外邊距為10px、右外邊距為20…

gravity`(控制 View 內部內容的對齊方式)

文章目錄 **1. 常用取值****示例** **2. layout_gravity&#xff08;控制 View 在父容器中的對齊方式&#xff09;****常用取值****示例** **3. gravity vs layout_gravity 對比****4. 注意事項****5. 總結** 作用對象&#xff1a;當前 View 的內部內容&#xff08;如 TextView…

Go:使用共享變量實現并發

競態 在串行程序中&#xff0c;步驟執行順序由程序邏輯決定&#xff1b;而在有多個 goroutine 的并發程序中&#xff0c;不同 goroutine 的事件先后順序不確定&#xff0c;若無法確定兩個事件先后&#xff0c;它們就是并發的。若一個函數在并發調用時能正確工作&#xff0c;稱…

Vue3 SSR Serverless架構革命:彈性計算與量子加速

一、全維度Serverless SSR架構 1.1 蜂巢式彈性調度系統 1.2 冷啟動時間優化表 優化策略Node.js冷啟(ms)Deno冷啟(ms)Bun冷啟(ms)裸啟動1800960420預編譯二進制650380210內存快照預熱22016090WASM實例池15011075量子狀態預載453832 二、邊緣渲染協議升級 2.1 流式SSR響應協議…

FPAG IP核調用小練習

一、調用步驟 1、打開Quartus 右上角搜索ROM&#xff0c;如圖所示 2、點擊后會彈出如圖所示 其中文件路徑需要選擇你自己的 3、點擊OK彈出如圖所示 圖中紅色改為12與1024 4、然后一直點NEXT&#xff0c;直到下圖 這里要選擇后綴為 .mif的文件 5、用C語言生成 .mif文件 //…

Spring Cloud 服務間調用深度解析

前言 在構建微服務架構時&#xff0c;服務間的高效通信是至關重要的。Spring Cloud 提供了一套完整的解決方案來實現服務間的調用、負載均衡、服務發現等功能。本文將深入探討 Spring Cloud 中服務之間的調用機制&#xff0c;并通過源碼片段和 Mermaid 圖表幫助讀者更好地理解…

AF3 generate_chain_data_cache腳本解讀

AlphaFold3 generate_chain_data_cache 腳本在源代碼的scripts文件夾下。該腳本從指定目錄中批量解析 mmCIF/PDB 文件的工具,并將每個鏈的基本信息(序列、分辨率、是否屬于聚類等)提取并寫入 JSON 文件,主要用于后續蛋白質建模、過濾或訓練數據準備。 源代碼: import ar…

vue項目打包部署到maven倉庫

需要的資源文件&#xff0c;都放在根目錄下&#xff1a; 1. versionInfo.js const fs require(fs) const path require(path) const mkdirp require(mkdirp) const spawn require(child_process).spawnconst packageObj require(./package.json) const versionNo packa…

MegaTTS3: 下一代高效語音合成技術,重塑AI語音的自然與個性化

在近期的發布中&#xff0c;浙江大學趙洲教授團隊與字節跳動聯合推出了革命性的第三代語音合成模型——MegaTTS3&#xff0c;該模型不僅在多個專業評測中展現了卓越的性能&#xff0c;還為AI語音的自然性和個性化開辟了新的篇章。 MegaTTS3技術亮點 零樣本語音合成 MegaTTS3采用…

【教程】PyTorch多機多卡分布式訓練的參數說明 | 附通用啟動腳本

轉載請注明出處&#xff1a;小鋒學長生活大爆炸[xfxuezhagn.cn] 如果本文幫助到了你&#xff0c;歡迎[點贊、收藏、關注]哦~ 目錄 torchrun 一、什么是 torchrun 二、torchrun 的核心參數講解 三、torchrun 會自動設置的環境變量 四、torchrun 啟動過程舉例 機器 A&#…

計算機視覺——基于 Yolov8 目標檢測與 OpenCV 光流實現目標追蹤

1. 概述 目標檢測&#xff08;Object Detection&#xff09;和目標追蹤&#xff08;Object Tracking&#xff09;是計算機視覺中的兩個關鍵技術&#xff0c;它們在多種實際應用場景中發揮著重要作用。 目標檢測指的是在靜態圖像或視頻幀中識別出特定類別的目標對象&#xff0…

MySQL——流程控制

一、IF條件語句 語法 IF condition THENstatements; ELSEIF condition THENstatements; ELSEstatements; END IF; 判斷成績等級 # 判斷成績等級 # 輸入學生的編號,取出學生的第一門課&#xff0c;然后判斷當前的課程的等級 drop procedure if exists p2; delimiter $$ crea…

C# + Python混合開發實戰:優勢互補構建高效應用

文章目錄 前言&#x1f94f;一、典型應用場景1. 桌面應用智能化2. 服務端性能優化3. 自動化運維工具 二、四大技術實現方案方案1&#xff1a;進程調用&#xff08;推薦指數&#xff1a;★★★★☆&#xff09;方案2&#xff1a;嵌入Python解釋器&#xff08;推薦指數&#xff1…

MLflow 入門

官方主頁 MLflow | MLflow官方文檔 MLflow: A Tool for Managing the Machine Learning Lifecycle | MLflow 0. 簡介 MLflow 是一個開源平臺&#xff0c;專門為了幫助機器學習的從業者和團隊處理機器學習過程中的復雜性而設計。MLflow 關注機器學習項目的完整生命周期&#x…

【藍橋杯選拔賽真題101】Scratch吐絲的蜘蛛 第十五屆藍橋杯scratch圖形化編程 少兒編程創意編程選拔賽真題解析

目錄 scratch吐絲的蜘蛛 一、題目要求 1、準備工作 2、功能實現 二、案例分析 1、角色分析 2、背景分析 3、前期準備 三、解題思路 四、程序編寫 五、考點分析 六、推薦資料 1、scratch資料 2、python資料 3、C++資料 scratch吐絲的蜘蛛 第十五屆青少年藍橋杯s…

智譜最新模型GLM4是如何練成的

寫在前面 這篇博客將基于《ChatGLM: A Family of Large Language Models from GLM-130B to GLM-4 All Tools》,深入剖析 GLM-4 系列在**模型架構設計、預訓練、后訓練(對齊)、以及關鍵技術創新(如長上下文處理、Agent 能力構建)**等環節的實現邏輯與設計考量,帶你全面了…

第二屆電氣技術與自動化工程國際學術會議 (ETAE 2025)

重要信息 2025年4月25-27日 中國廣州 官網: http://www.icetae.com/ 部分 征稿主題 Track 1&#xff1a;電氣工程 輸配電、電磁兼容、高電壓和絕緣技術、電氣工程、電氣測量、電力電子及其應用、機電一體化、電路與系統、電能質量和電磁兼容性、電力系統及其自…

設備調試--反思與總結

最近回顧項目&#xff0c; 發現&#xff1a;在調試過程中最耽誤時間的可能不是技術難度&#xff0c;而是慣性思維&#xff1b; 例如&#xff1a; 我寫can通信濾波器的時候&#xff0c;可能是不過濾的&#xff1b;是接收所有的id報文&#xff0c;然后用業務邏輯過濾&#xff08…

C++項目:高并發內存池_下

目錄 8. thread cache回收內存 9. central cache回收內存 10. page cache回收內存 11. 大于256KB的內存申請和釋放 11.1 申請 11.2 釋放 12. 使用定長內存池脫離使用new 13. 釋放對象時優化成不傳對象大小 14. 多線程環境下對比malloc測試 15. 調試和復雜問題的調試技…