文章目錄
- 前言
- 發現問題
- 失敗的為什么是FBR
- 塊匯報頻率的變化
- 為什么FBR會反復失敗
- HDFS性能下降導致Yarn負載變高的形式化分析
- 理解線程
- 理解IO Wait
- 理解HDFS性能下降導致Yarn負載和使用率增高
- 引用
前言
我們的Yarn Cluster主要用來運行一批由Airflow定時調度的Spark Job,這些Spark Job。。這些Job根據其處理的時間粒度的不同(Batch Duration)和業務邏輯(Business Type)的不同而被打上不同的Prometheus標簽,然后我們構建了對應的Grafana Dashboard來監控各個Job處理的數據量以及運行時間的變化。
從某個時刻開始,我們的Yarn集群開始出現擁塞,但是業務層面其實沒有發生太大變化。在troubleshooting的過程中,我們逐漸找到了問題的元兇HDFS, 并在root cause層面講問題解決。
本文詳細介紹了我們從發現問題、找到懷疑對象、并最終解決問題以及事后對問題進行分析的全過程。
發現問題
大概從28/Apr
下午開始,我們發現所有的批處理任務的運行時間逐漸變長,有些批處理任務的運行時間變成了之前的5到10倍,因此,我們開始分析原因。由于與此同時,我們觀察到對應的批處理任務的數據輸入量整體也有所增加(這是業務變化引起的,因此是預期之內的),如下圖所示,大概增加了50%左右:
我們查看整個Yarn集群的狀態,我們發現集群的負載在對應的時間范圍內也逐漸升高,正在運行的Container數量逐漸增加直到Yarn集群持續滿負荷運行:
同時,整個集群的可用Memory和CPU的可用資源和Pending Container如下所示:
從上圖可以看到,在數據量增加以前,我們Yarn集群的內存和CPU資源使用率以CPU為主,平均使用率也是35%左右,所以,如果系統運行正常,即使所有Spark Job的輸入數據量增加50%,Yarn集群的內存和CPU資源也是完全夠用的。
由于找不到特別明顯的原因,我們一度懷疑是由于Yarn集群的不合理設置導致在集群的資源沒有用完的情況下Yarn的資源分配已經出現了瓶頸。
但是,兩個現象讓我們的這種懷疑變得不特別Solid:
- 數據量增加的比例和我們的Spark Job運行時間的比例不成正比
- Spark Job運行時間的Peak時間點和輸入數據量的Peak時間點不吻合,即,當數據量開始逐漸下降的時候,Spark Job的運行時間依然在接下來的1個小時逐漸攀升。
- 當我們最初認為真的只是Yarn的資源不足的時候,作為一種讓線上系統快速恢復的手段,我們首先嘗試停止提交一些優先級和重要性不高的Job。但是,我們發現,問題完全無法解決。
經過更加深入的問題分析,我們最后發現,整個Yarn的資源使用率的降低是由于HDFS的性能降低引起的,因為無論是Yarn本身,還是Yarn上運行的Spark Job(從HDFS上讀取數據然后寫入到ClickHouse)都需要依賴HDFS,并且隨后發現,HDFS的性能急劇下降是由于DataNode的塊匯報失敗造成的鎖爭用。最終,我們在root cause層面解決問題。
在DataNode端,我們打印了對應堆棧,發現堆棧信息如下所示。
首先,我們看到有很多線程如下所示:
"DataXceiver for client DFSClient_NONMAPREDUCE_-1403695307_28 at /0.21.1.113:36094 [Receiving block BP-862625392-0.21.7.109-1687238140050:blk_3132608831_2058910583]" #460048
344 daemon prio=5 os_prio=0 tid=0x00007fbe86b2a800 nid=0x569 waiting for monitor entry [0x00007fbeddba6000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1377)- waiting to lock <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.<init>(BlockReceiver.java:199)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:675)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:169)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:106)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:246)at java.lang.Thread.run(Thread.java:748)
可以看到,這是多個DataXceiver的writeBlock()方法嘗試寫一個塊的時候,需要構造一個BlockReceiver對象,但是構造BlockReceiver對象的時候,需要調用一個Synchronized 的 實例方法FsDatasetImpl.createRbw(), 該方法需要獲取FsDatasetImpl對象的監視器鎖,因此持續在一個FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0
上等待:
BlockReceiver(final ExtendedBlock block, final StorageType storageType,final boolean pinning) throws IOException {try{this.block = block;.....// Open local disk out//if (isDatanode) { //replication or movereplicaHandler = datanode.data.createTemporary(storageType, block);} else {switch (stage) {case PIPELINE_SETUP_CREATE:replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); // 這是一個Synchronized方法
@Override // FsDatasetSpipublic synchronized ReplicaHandler createRbw( // 這是一個實例的同步方法StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)throws IOException {ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),b.getBlockId());if (replicaInfo != null) {
同時,我們也看到有很多線程如下所示:
"DataXceiver for client DFSClient_NONMAPREDUCE_-940941604_93 at /1.12.1.240:40478 [Sending block BP-862625392-0.21.7.109-1687238140050:blk_3132534170_2058835614]" #460048343daemon prio=5 os_prio=0 tid=0x00007fbda42c1800 nid=0x567 waiting for monitor entry [0x00007fbeddca7000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:240)- waiting to lock <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:537)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:148)at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:103)at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:246)at java.lang.Thread.run(Thread.java:748)
BlockSender(ExtendedBlock block, long startOffset, long length,boolean corruptChecksumOk, boolean verifyChecksum,boolean sendChecksum, DataNode datanode, String clientTraceFmt,CachingStrategy cachingStrategy)throws IOException {try {this.block = block;....synchronized(datanode.data) { // 嘗試獲取FsDatasetImpl的對象監視器鎖replica = getReplica(block, datanode);replicaVisibleLength = replica.getVisibleLength();if (replica instanceof FinalizedReplica) {// Load last checksum in case the replica is being written// concurrentlyfinal FinalizedReplica frep = (FinalizedReplica) replica;chunkChecksum = frep.getLastChecksumAndDataLen();}}
可以看到,讀請求也在同一把鎖上Block住了:DataXceiver在處理對一個Block的讀取請求的時候,需要構造一個BlockSender對象,但是構造BlockSender對象的時候,有一段synchronized代碼塊,該同步代碼塊需要的鎖也是DatasetImpl對象的監視器鎖,因此持續在一個FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0
上等待,如上述堆棧所示。
因此,我們需要看一下哪個線程已經獲取了FSDatasetImpl對象的監視器鎖呢?我們搜索 0x0000000438637dd0
這個鎖id,看到以下線程已經獲取了對應的監視器鎖:
"DataNode: [[[DISK]file:/corp/data/nvme1/dfs/dn/, [DISK]file:/corp/data/nvme2/dfs/dn/, [DISK]file:/corp/data/nvme3/dfs/dn/, [DISK]file:/corp/data/nvme4/dfs/dn/]]
heartbeating to rccp407-24a.iad6.prod.corp.com/0.21.7.109:8022" #176 daemon prio=5 os_prio=0 tid=0x00007fbf262eb000 nid=0x1ffa runnable [0x00007fbeee5a2000]java.lang.Thread.State: RUNNABLEat com.google.protobuf.CodedOutputStream.writeRawVarint64(CodedOutputStream.java:1039)at org.apache.hadoop.hdfs.protocol.BlockListAsLongs$Builder.add(BlockListAsLongs.java:242)at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.getBlockReports(FsDatasetImpl.java:1768)- locked <0x0000000438637dd0> (a org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.blockReport(BPServiceActor.java:287)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:561)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:695)at java.lang.Thread.run(Thread.java:748)
很明顯,是方法BPServiceActor.blockReport()
進行塊匯報,已經獲得了FsDatasetImpl對象對應的監視器鎖0x0000000438637dd0
。后來自己看代碼才知道,blockReport()是用來完成Full Block Report的方法。
但是,全量塊匯報每6個小時執行一次,并且應該在很短的時間內完成才對,那么是不是有可能我們打印堆棧的時機比對,即,我們打印堆棧的時候剛好就遇到了這個全量塊匯報呢?于是,過了15分鐘我們再次打印堆棧,發現堆棧狀態依然如上所示。這說明:全量塊匯報狀態不正常。
隨后,我們開始查看HDFS的相關日志。
DataNode端BlockReport失敗的有關日志如下所示:
2025-03-28 22:38:09,622 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x4bc7fea3d6c94cf6, containing 4 storage report(s), of which we sent 0. The reports had 20093201 total blocks and used 0 RPC(s). This took 1212 msec to generate and 43 msecs for RPC and NN processing. Got back no commands.
2025-03-28 22:38:09,622 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in offerService
java.io.EOFException: End of File Exception between local host is: "rccp401-25a.iad6.prod.corp.com/0.21.1.113"; destination host is: "rccp408-24a.iad6.prod.corp.com":8022; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFExceptionat sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)at org.apache.hadoop.ipc.Client.call(Client.java:1508)at org.apache.hadoop.ipc.Client.call(Client.java:1441)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)at com.sun.proxy.$Proxy20.blockReport(Unknown Source)at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.blockReport(DatanodeProtocolClientSideTranslatorPB.java:204)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.blockReport(BPServiceActor.java:323)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:561)at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:695)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFExceptionat java.io.DataInputStream.readInt(DataInputStream.java:392)at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1113)at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1006)
從上面的堆棧可以看到,DataNode發送了FBR的RPC請求給NameNode,但是在試圖獲取響應的時候發現連接斷開了。
然后,我們開始檢查NameNode的異常日志,并找出了如下對應異常:
2025-03-28 21:36:31,432 WARN org.apache.hadoop.ipc.Server: Requested data length 71428864 is longer than maximum configured RPC length 67108864. RPC came from 0.21.3.113
2025-03-28 21:36:31,432 INFO org.apache.hadoop.ipc.Server: Socket Reader #1 for port 8022: readAndProcess from client 0.21.3.113 threw exception [java.io.IOException: Requested data length 71428864 is longer than maximum configured RPC length 67108864. RPC came from 0.21.3.113]
java.io.IOException: Requested data length 71428864 is longer than maximum configured RPC length 67108864. RPC came from 0.21.3.113at org.apache.hadoop.ipc.Server$Connection.checkDataLength(Server.java:1601)at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1663)at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:887)at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:751)at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:722)
從這個異常可以看到,這個異常發生在IPC連接的處理層,即,這個異常根本還未到達上層的業務層,即業務層方法NameNodeRpcServer.blockReport()方法根本就沒有調用。
失敗的為什么是FBR
事后我們看到(下文會詳細介紹),DataNode和NameNode之間的間歇性通信包括了三種類型:
-
心跳(Heartbeat):這是通過方法
BPServiceActor.heartbeat()
來完成的,我們從下文的sendHeartbeat()
方法的實現可以很容易地看到,心跳信息主要用來告知NameNode自己的Storage元信息以及各種狀態信息(xceiver count, cache等)。心跳不攜帶任何與塊和塊的數量等信息:HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)throws IOException {scheduler.scheduleNextHeartbeat();StorageReport[] reports =dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());VolumeFailureSummary volumeFailureSummary = dn.getFSDataset().getVolumeFailureSummary();int numFailedVolumes = volumeFailureSummary != null ?volumeFailureSummary.getFailedStorageLocations().length : 0;return bpNamenode.sendHeartbeat(bpRegistration,reports, // 這個DataNode的每一個 Storage上的Storage元信息dn.getFSDataset().getCacheCapacity(),dn.getFSDataset().getCacheUsed(),dn.getXmitsInProgress(),dn.getXceiverCount(), // xceiver信息,NameNode會依據該信息判斷DataNode的繁忙程度,進而影響NameNode的塊調度策略numFailedVolumes,volumeFailureSummary,requestBlockReportLease);}
每一個Storage的原信息封裝在StorageReport對象中,如下所示,這里不贅述。
public class StorageReport {private final DatanodeStorage storage;private final boolean failed;private final long capacity;private final long dfsUsed;private final long nonDfsUsed;private final long remaining;private final long blockPoolUsed;
NameNode收到heartbeat以后,會更新自己所維護的DataNode的各種信息,這是通過DataNodeDescriptor的成員方法updateHeartbeatState()完成的:
/*** process datanode heartbeat or stats initialization.*/public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,long cacheUsed, int xceiverCount, int volFailures,VolumeFailureSummary volumeFailureSummary) {.....setCacheCapacity(cacheCapacity);setCacheUsed(cacheUsed);setXceiverCount(xceiverCount);setLastUpdate(Time.now()); this.volumeFailures = volFailures;this.volumeFailureSummary = volumeFailureSummary;for (StorageReport report : reports) {DatanodeStorageInfo storage = updateStorage(report.getStorage());if (checkFailedStorages) {failedStorageInfos.remove(storage);}storage.receivedHeartbeat(report);totalCapacity += report.getCapacity();totalRemaining += report.getRemaining();totalBlockPoolUsed += report.getBlockPoolUsed();totalDfsUsed += report.getDfsUsed();totalNonDfsUsed += report.getNonDfsUsed();}rollBlocksScheduled(getLastUpdate());// Update total metrics for the node.setCapacity(totalCapacity);setRemaining(totalRemaining);setBlockPoolUsed(totalBlockPoolUsed);setDfsUsed(totalDfsUsed);setNonDfsUsed(totalNonDfsUsed);.....}```同時,DataNode從NameNode領取的各種Command也不是NameNode直接發送給DataNode的,而是放在DataNode的heartbeat的response中的。所以,DataNode是需要處理heatbeat的響應的。
-
增量塊匯報(Incremental Block Report):這是通過方法
IncrementalBlockReportManager.sendIBRs()
方法完成的,主要用來匯報在過去的短暫時間內最新增加的和刪除的Block的信息。增量的塊信息是通過方法generateIBRs()
收集的:private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {final List<StorageReceivedDeletedBlocks> reports= new ArrayList<>(pendingIBRs.size());for (Map.Entry<DatanodeStorage, PerStorageIBR> entry: pendingIBRs.entrySet()) {final PerStorageIBR perStorage = entry.getValue();// Send newly-received and deleted blockids to namenodefinal ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();if (rdbi != null) {reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));}}readyToSend = false;return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);}
與heartbeat不同,增量塊匯報并無返回值,即這是單向的RPC調用。
-
全量塊匯報(Full Block Report):這是通過
BPServiceActor.blockReport()
來進行的,用來對DataNode端的全部塊信息進行匯報,其中,在進行全量塊匯報的時候,也會在內部先觸發一次增量塊匯報。這里不贅述,感興趣的讀者可以自行閱讀代碼。
我們從上面DataNode端的異常堆棧可以看到,異常的發生是在方法BPServiceActor.blockReport()
中,即發生在Full Block Report的過程中。這個是合理的:
- RPC Size隨著塊的數量的增加而增加,只有可能是Full Block Report。因為Incremental Block Report只會涉及到過去很短時間內的變化的塊信息,根本不會帶來超過64MB的RPC Size。
- 而且,根據HDFS的設計,無論是Heartbeat還是增量塊匯報,一旦發生問題,系統將會發生寫錯誤。只有Full Block Report發生問題的時候不會帶來直接的系統功能異常,而僅僅會造成我們當前遇到的寫性能的降低。比如:
- 如果心跳長期丟失,主要會導致NameNode將這個DataNode判別為Dead DataNode并開始對應塊的復制操作以保證副本數量。對heartbeat的檢查,發生在一個專門的異步線程HeartbeatManager.Monitor類中,這個HeartbeatManager.Monitor類是一個Runnable
- 由于heartbeat不夠及時,NameNode無法獲取實時的、準確的DataNode的Storage信息和狀態(xceiver, cache)信息
- 同時,心跳本身攜帶了DataNode發送給NameNode的一些Command,這些Command定義在DatanodeCommand類的實現類中,比如,BlockRecoveryCommand, RegisterCommand等等;
- DataNode無法收到NameNode通過heartbeat的response發送給DataNode的各種Command;
- 如果IBR長期丟失,那么寫一個Block的操作是無法完成的,因為寫到DataNode的塊是需要由DataNode向NameNode進行匯報才會最終被finalize的。這里不再贅述。
塊匯報頻率的變化
由于我們的dfs.datanode.block.report.intervalMsec
設置為21600000
,即每6個小時進行一次FBR(每天4次FBR)調度,所有,在事故沒有發生的時候,我們觀察到DataNode的FBR頻率正常,每臺DataNode每天會進行8次針對某臺NameNode的FBR(想象為啥是8
次不是4
次?因為FBR會同時發送給Active和StandBy NameNode),同時,對應的IBR的頻率大概為200K/10min
,而對應的heartbeat的頻率大概為400次/10min
,如下圖所示:
但是,在事故發生的過程中,FBR的頻率發生了陡增,幾乎到了平均20K/天
的頻率,而IBR和heartbeat的頻率則對應下降。如下圖所示:
我們還需要準確理解這個Dashboard的含義,即這里的塊匯報次數,是否包含失敗的塊匯報次數? 即,這個急劇增加的FBR次數,是否都是成功的FBR呢?
我們查看blockReport()
代碼可以看到,無論FBR成功和失敗,都是增加該Counter值:
List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();......try {if (totalBlockCount < dnConf.blockReportSplitThreshold) {.......success = true;} finally {.......dn.getMetrics().addBlockReport(brSendCost);// 增加fbr統計值 }
為什么FBR會反復失敗
綜上,我們從Metrics上看到FBR的頻率陡增,應該是失敗以后的重試。我們需要在代碼層面看一下FBR的重試邏輯。
在DataNode端,進行FBR、IBR和heartbeat發生在方法BPServiceActor::offerService()
中,每一個BPServiceActor
對象會綁定到一個NameNode節點上,因此,在我們Active/Standby架構下,會有兩個BPServiceActor
對象。每一個BPServiceActor
對象都是一個Runnable
,即一個獨立線程,用來和遠程的NameNode進行通信:
/*** A thread per active or standby namenode to perform:* <ul>* <li> Pre-registration handshake with namenode</li>* <li> Registration with namenode</li>* <li> Send periodic heartbeats to the namenode</li>* <li> Handle commands received from the namenode</li>* </ul>*/
@InterfaceAudience.Private
class BPServiceActor implements Runnable {
通信的內容包括向NameNode的注冊(DataNode啟動的時候),重新注冊(NameNode在發現DataNode失去聯系的時候會向DataNode發送DatanodeProtocol.DNA_REGISTER
命令以要求重新注冊),發送心跳,IBR和FBR,以及處理來自NameNode的命令。
/*** No matter what kind of exception we get, keep retrying to offerService().* That's the loop that connects to the NameNode and provides basic DataNode* functionality.** Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can* happen either at shutdown or due to refreshNamenodes.*/@Overridepublic void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storageconnectToNNAndHandshake(); // 和NN通信break;} catch (IOException ioe) {......}}runningState = RunningState.RUNNING;while (shouldRun()) { // 只要系統沒有停止服務,就不斷循環try {offerService();} catch (Exception ex) {.... // 即使offerService()拋出異常,sleep一會兒也會繼續運行sleepAndLogInterrupts(5000, "offering service");}}runningState = RunningState.EXITED;} catch (Throwable ex) {.....} }
可以看到,BPServiceActor的run()方法就是在首次啟動并連接到NameNode以后,開始保證offerService()
發生異常以后繼續運行。而offerService()
方法內部會不斷循環以向NameNode發送心跳,或者進行IBR,或者進行FBR:
/*** Main loop for each BP thread. Run until shutdown,* forever calling remote NameNode functions.*/private void offerService() throws Exception {.....while (shouldRun()) {try {final long startTime = scheduler.monotonicNow();//// Every so often, send heartbeat or block-report//final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);HeartbeatResponse resp = null;if (sendHeartbeat) {boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);if (!dn.areHeartbeatsDisabledForTests()) {resp = sendHeartBeat(requestBlockReportLease);assert resp != null;if (resp.getFullBlockReportLeaseId() != 0) {.....fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}.....}}if (ibrManager.sendImmediately() || sendHeartbeat) {ibrManager.sendIBRs(bpNamenode, bpRegistration,bpos.getBlockPoolId(), dn.getMetrics());}List<DatanodeCommand> cmds = null;boolean forceFullBr =scheduler.forceFullBlockReport.getAndSet(false);......if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId); // 在這里拋出異常fullBlockReportLeaseId = 0;}.....} catch(RemoteException re) {....LOG.warn("RemoteException in offerService", re);sleepAfterException();} catch (IOException e) {LOG.warn("IOException in offerService", e);sleepAfterException();}processQueueMessages();} // while (shouldRun())} // offerService
-
循環持續運行,只要 DataNode 應該繼續服務(即還沒有關閉)
while (shouldRun()) {try {final long startTime = scheduler.monotonicNow();
-
如果當前時間已到心跳時間,則向 NameNode 發送心跳信息:
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);HeartbeatResponse resp = null;if (sendHeartbeat) {boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);if (!dn.areHeartbeatsDisabledForTests()) {resp = sendHeartBeat(requestBlockReportLease);...}}
每個心跳會攜帶節點狀態(名稱、端口、容量等)。如果塊報告也到期,心跳中還會請求塊報告的租約(lease)。
返回的 HeartbeatResponse 包含后續指令。 -
處理 NameNode 返回的狀態,包括:更新租約 ID,檢查是否為 Active NameNode,執行來自 NameNode 的命令,更新高可用狀態(HA state):
if (resp.getFullBlockReportLeaseId() != 0) {...fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}dn.getMetrics().addHeartbeat(...);bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState());state = resp.getNameNodeHaState().getState();if (state == HAServiceState.ACTIVE) {handleRollingUpgradeStatus(resp);}if (!processCommand(resp.getCommands()))continue;
-
按需要進行增量塊匯報,即如果到達了增量塊匯報的時間,或者盡管沒到,但是剛剛進行了heartbeat的發送,那么就進行一次增量塊匯報:
if (ibrManager.sendImmediately() || sendHeartbeat) {ibrManager.sendIBRs(bpNamenode, bpRegistration,bpos.getBlockPoolId(), dn.getMetrics()); }
-
如果收到了NameNode端批準的全量塊匯報租約,或者,收到了強制進行塊匯報的請求,那么就進行全量塊匯報:
boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false);if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0; // 將fullBlockReportLeaseId取消置位,等待下一次的FBR時機}processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
-
其它的處理不再贅述。
從上述代碼我們看到,在正常情況下:
- 什么時候會嘗試去請求一個blockReport的租約:
-
heartbeat只會在blockReport的時間到了(6h),并且目前還沒有獲取租約,才會嘗試向NameNode去請求一個blockReport的租約:
// 只有當blockReport的時機到了(距離上一次blockReport過去6h,并且當前還沒有獲得租約)boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);boolean isBlockReportDue(long curTime) {return nextBlockReportTime - curTime <= 0;}
由于是且的關系,因此,正常情況下,最短6小時才會嘗試進行一次FBR租約的請求。
-
如果response中獲取到了租約,則將租約存下來:
if (resp.getFullBlockReportLeaseId() != 0) {fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();}
-
一般情況下(不考慮用戶強制FBR的情況),如果有了租約,則立刻進行FBR,FBR完成以后重新將租約清空避免反復FBR:
if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0;}
我們乍一看上述代碼,看到的FBR重試邏輯,如果FBR失敗了,后面會反復高頻重試嗎?
我們看一下blockReport()方法中關于下次調度時機的設置:
/*** Report the list blocks to the Namenode* @return DatanodeCommands returned by the NN. May be null.* @throws IOException*/List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {.....try {... // 發送blockReport請求success = true;} finally {....scheduler.scheduleNextBlockReport(); // 設置下一次進行fbr的時間return cmds.size() == 0 ? null : cmds;}void scheduleNextBlockReport() {// If we have sent the first set of block reports, then wait a random// time before we start the periodic block reports.if (resetBlockReportTime) {nextBlockReportTime = monotonicNow() +DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));resetBlockReportTime = false;} else {nextBlockReportTime +=(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /blockReportIntervalMs)) * blockReportIntervalMs;}}
可以看到,即使blockRepor太失敗了,scheduler.scheduleNextBlockReport()也會正常被調用,并且,我們進一步查看scheduleNextBlockReport()方法細節,下一次fbr的時間也與這次的fbr成功與否無關。所以,再結合offerService()
方法中的下面代碼:
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&scheduler.isBlockReportDue(startTime);
的條件判斷,不應該發生FBR反復失敗并反復調用的情況,預期的邏輯是,如果這次FBR失敗了,下次重試也是在6個小時以后了。
我想了很久,才看到了其中的奧秘:
if ((fullBlockReportLeaseId != 0) || forceFullBr) {cmds = blockReport(fullBlockReportLeaseId);fullBlockReportLeaseId = 0; // 如果blockReport拋出異常,這行代碼不會執行}
可以看到,如果blockReport(fullBlockReportLeaseId)
拋出異常,那么fullBlockReportLeaseId
不會被清空,而如果fullBlockReportLeaseId
沒有被清空,blockReport(fullBlockReportLeaseId)就會被反復執行,根本不再依賴調度時機,也不再依賴向NameNode發送新的租約請求了。
HDFS性能下降導致Yarn負載變高的形式化分析
由此我們可以看到,Yarn的資源使用率變高(負載變高),居然有可能是存儲系統的性能下降導致的。
我們可以類比操作系統層面我們經常遇到的問題,即我們在操作系統性能診斷的時候經常遇到的,IO Wait的大量增加,居然會導致CPU Load Average增加的例子,與此同時CPU Usage卻很低。
理解線程
我們先看一下常見的操作系統層面的線程狀態:
狀態 | 名稱 | 含義 | 常見場景說明 |
---|---|---|---|
R | Running | 正在運行或在就緒隊列中等待 CPU | CPU 正在執行線程,或線程在搶占 CPU |
S | Sleeping | 可中斷的休眠狀態 | 等待信號、網絡事件、定時器,正常休眠狀態 |
D | Uninterruptible Sleep | 不可中斷的休眠狀態(主要是 I/O 等待) | 等磁盤、NFS、網絡存儲、驅動響應;kill -9 無效 |
Z | Zombie | 僵尸進程 | 子進程退出但父進程未回收,常見于編程錯誤 |
T | Stopped | 被暫停 | 手動 kill -STOP ,或 Ctrl+Z 暫停終端程序 |
I | Idle (內核專用) | 內核線程空閑狀態 | Linux kernel 的空閑線程,普通用戶進程不會出現該狀態 |
X | Dead | 死亡狀態(極罕見) | 已徹底結束的線程,瞬間即逝 |
對于操作系統層面的線程,我們需要關鍵區分的問題是:
- R狀態的線程:不一定正在運行,也有可能處于就緒隊列中等待運行;
- S狀態的線程:當我們調用sleep方法的時候,當前線程進入休眠,這時候會主動讓出CPU資源,直到被喚醒并重新執行
- D狀態的線程: 當我們執行IO操作的時候,如果是等待磁盤讀寫完成,則進入到Uninterruptible Sleep狀態,這時候是無法通過kill -9進行中斷的。這時候線程也會讓出CPU資源。
- CPU 是靠操作系統的調度器來管理的。當某個線程進入 I/O wait(D 狀態),OS 就會把它從“可運行隊列”中移除。調度器會立刻選擇另一個 R 狀態(就緒狀態)的線程(如果有的話),分配給 CPU 執行。
對于Java線程,其線程狀態(Thread.State)和操作系統層面的狀態(比如 Linux 的 R, S, D, 等)不是一一對應。Java線程狀態如下:
Java 狀態 | 含義 |
---|---|
NEW | 線程創建了但還沒啟動 |
RUNNABLE | 正在運行或等待操作系統分配 CPU(對應 Linux 的 R) |
BLOCKED | 等待獲取某個鎖(synchronized ) |
WAITING | 進入無限等待(wait() 、join() 、LockSupport.park() ) |
TIMED_WAITING | 等待有限時間(如 sleep() 、wait(timeout) ) |
TERMINATED | 執行完畢,線程結束 |
最關鍵的,Java 中的 BLOCKED 是線程在嘗試進入一個 synchronized 塊或方法,但別的線程已經拿了鎖,所以它阻塞在鎖入口處,還沒進去。 在這個階段,Java 線程不是主動休眠,而是卡在等待互斥鎖的獲取。
通常會被操作系統標記為 S(Sleeping)狀態 或者 R(Running)狀態下阻塞住的),取決于實現和調度時機**。
Java 狀態 | 含義 |
---|---|
BLOCKED | 等待進入 synchronized 代碼塊,別的線程持有鎖,線程的該狀態中的操作系統線程狀態依賴于操作系統實現,有可能是S,有可能是R |
WAITING | 調用了 wait() 、join() 、LockSupport.park() ,主動休眠,對應操作系統層面應該是S |
TIMED_WAITING | 和 WAITING 類似,但帶有超時時間(如 sleep() 、wait(timeout) ),對應操作系統層面也應該是S |
理解IO Wait
操作系統層面,一個線程一旦 I/O 完成,中斷觸發,操作系統會把原線程從D狀態恢復為R狀態,等待下一輪調度。
-
CPU Load Average 的準確定義
-
Load Average 表示單位時間內系統處于 可運行狀態(Runnable, R狀態) 或 不可中斷睡眠狀態(Uninterruptible Sleep,即 D 狀態) 的進程數的平均值。
-
例如,1分鐘負載 2.0 表示過去1分鐘內平均有2個進程占用或等待CPU資源。
-
關鍵點:Load Average 不僅包括正在使用CPU的進程,還包括就緒隊列中的進程(等待CPU調度),或者因為IO阻塞處于 D 狀態的進程(如等待磁盤響應的進程)。
-
-
為什么高 IO Wait 會導致CPU Load Average 升高?
- 當磁盤性能下降時,進程發起IO請求后會被阻塞,進入 D 狀態(Uninterruptible Sleep)。 D 狀態的進程無法被中斷,直到IO完成。 這些D狀態的進程會被計入Load Average(因為它們是“等待系統資源的活動進程”)。
- 如果大量進程因IO阻塞,Load Average 會顯著上升,但是其實它們并未真正消耗CPU。
-
為什么IO Wait高的時候,CPU 使用率(Usage)不一定高?
- 因為CPU 使用率統計的是CPU 實際執行代碼的時間比例。當進程因IO阻塞時,它們處于D狀態,不占用CPU(CPU可能空閑或執行其他少數任務)。甚至,如果所有進程都在等IO,CPU可能因無任務可做而使用率為0。
- 因此,高IO Wait時:
- Load Average 高:大量進程阻塞在D狀態(被計入負載)
- CPU Usage 低:CPU本身沒有太多任務需要處理
-
類比解釋
想象一個銀行柜臺(CPU)和排隊的人(進程):- 正常情況:人們快速辦理業務(IO快),隊列很短(Load低);
- 磁盤慢時:柜臺處理速度不變,但每個人辦理業務時被卡在簽字環節(IO阻塞),隊伍越來越長(Load高),但柜臺可能空閑(CPU Usage低);
所以
- CPU Load Average 反映的是系統資源的整體壓力(CPU+IO等),而不僅是CPU使用率。D 狀態的進程是負載升高的“元兇”:它們被計入負載但影響CPU使用率。
- 高 Load + 低 CPU Usage 是典型的IO瓶頸特征(如磁盤慢、網絡存儲延遲等)。
理解這一點后,我們就知道為什么優化磁盤(如換SSD、調整文件系統)或減少IO密集型任務可以降低負載,即使CPU本身并不忙。
理解HDFS性能下降導致Yarn負載和使用率增高
這時候我們往往有一個問題,就是在操作系統層面如果IO Wait增加,那么伴隨著CPU Load Average的增加和CPU Usage的降低,但是,在HDFS性能下降的時候,Yarn的Waiting Application的確增加了,但是為什么Yarn的Memory和CPU的Usage也增加了呢?
這個也很好理解,因為Yarn上Memory和CPU的使用率是假的使用率。因為Yarn對Application的Memory Usage和CPU Usage的統計是在申請時確定的,而并不真正關心Application實際上占用多少內存或者占用了多少CPU。比如,一個Spark Batch Job申請了100GB 內存和 40個vCore,這個Spark job運行期間即使在進行sleep(Integer.MAX_VALUE), Yarn這邊也會認為這個Spark Batch Application占用了100GB 內存和40 vCore的CPU.
引用
HDFS的塊匯報和塊放置策略–從一次HDFS寫文件故障開始