1.問題現象
上午上班以后發現ES日志集群狀態不正確,集群頻繁地重新發起選主操作。對外不能正常提供數據查詢服務,相關日志數據入庫也產生較大延時
2.問題原因
相關日志
查看ES集群日志如下:
00:00:51開始集群各個節點與當時的master節點通訊超時
Time | level | data |
---|---|---|
00:00:51.140 | WARN | Received response for a request that has timed out, sent [12806ms] ago, timed out [2802ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [864657514] |
00:01:24.912 | WARN | Received response for a request that has timed out, sent [12205ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143113108] |
00:01:24.912 | WARN | Received response for a request that has timed out, sent [12206ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835936906] |
00:01:27.731 | WARN | Received response for a request that has timed out, sent [20608ms] ago, timed out [10604ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137999525] |
00:01:44.686 | WARN | Received response for a request that has timed out, sent [18809ms] ago, timed out [8804ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143114372] |
00:01:44.686 | WARN | Received response for a request that has timed out, sent [18643ms] ago, timed out [8639ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835938242] |
00:01:56.523 | WARN | Received response for a request that has timed out, sent [20426ms] ago, timed out [10423ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137250155] |
00:01:56.523 | WARN | Received response for a request that has timed out, sent [31430ms] ago, timed out [21426ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137249119] |
觸發各個節點發起重新選主的操作
Time | level | data |
---|---|---|
00:00:51.140 | WARN | Received response for a request that has timed out, sent [12806ms] ago, timed out [2802ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [864657514] |
00:01:24.912 | WARN | Received response for a request that has timed out, sent [12206ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835936906] |
00:01:24.912 | WARN | Received response for a request that has timed out, sent [12205ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143113108] |
00:01:27.731 | WARN | Received response for a request that has timed out, sent [20608ms] ago, timed out [10604ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137999525] |
00:01:44.686 | WARN | Received response for a request that has timed out, sent [18643ms] ago, timed out [8639ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835938242] |
00:01:44.686 | WARN | Received response for a request that has timed out, sent [18809ms] ago, timed out [8804ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143114372] |
新的主節點被選出,但頻繁在3個候選節點間切換,集群狀態始終處于不穩定狀態
Time | level | data |
---|---|---|
00:52:37.264 | DEBUG | executing cluster state update for [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]] |
00:52:37.264 | TRACE | will process [elected-as-master ([2] nodes joined)[_FINISH_ELECTION_]] |
00:52:37.264 | TRACE | will process [elected-as-master ([2] nodes joined)[_BECOME_MASTER_TASK_]] |
00:52:37.264 | TRACE | will process [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader]] |
00:52:37.264 | TRACE | will process [elected-as-master ([2] nodes joined)[{hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader]] |
00:52:37.584 | DEBUG | took [200ms] to compute cluster state update for [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]] |
00:52:37.828 | TRACE | cluster state updated, source [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]] |
問題分析
綜合上述日志、集群狀態及近期所做的操作后,發現這是由于為解決前期ES集群SSD磁盤IO不均,部分磁盤達到IO上限的問題,為平衡各節點、各SSD磁盤的IO,將index的shard均勻分配至每個節點的每塊SSD上,增加了在每個節點上的shard分配數量。這雖然避免了熱點盤的問題,有效地均衡了磁盤IO,但導致了shard數目的快速增加 (之前集群shard總數一般控制在2萬左右,出現問題時集群shard數目接近6萬)進而觸發如下ES bug(該bug在ES 7.6及以上版本被修復),導致平時可以在短時間內正常完成的處理(freeze index,delete index,create index)長時間不能完成,同時造成master節點負載過高,最終出現大量處理超時等錯誤:
-
https://github.com/elastic/elasticsearch/pull/47817
-
https://github.com/elastic/elasticsearch/issues/46941
-
https://github.com/elastic/elasticsearch/pull/48579
這3個bug所表述的事情是同一個,即:為了確定節點中一個shard是否需要發生移動,ES集群需要查看集群中所有shard是否處于RELOCATING或者INITIALIZING狀態,以獲取其shard的大小。在bug未修復版本中,集群里的每個shard都會重復上述操作,而這些工作都由master節點通過實時計算來完成。當集群的shard數增多后,master節點計算工作量會急劇上升,從而導致master節點處理緩慢,引發一系列的問題。由于集群shard數上升,導致master節點的工作負載急劇上升,出現相關處理緩慢的情況,進而導致以下問題:
(1)Master節點由于負載過高長時間不能響應其他節點的請求導致超時,進而觸發集群重新選主,但由于新選出的Master仍然不能承載集群相關工作,再次導致超時,再次觸發重新選主,周而復始,最后集群異常。
(2)Master節點處理緩慢,導致大面積作業堆積(冷凍索引、創建索引、刪除索引、數據遷移等作業)
該問題最早是由華為工程師發現并提交社區的,相關堆棧信息為:
"elasticsearch[iZ2ze1ymtwjqspsn3jco0tZ][masterService#updateTask][T#1]"?#39?daemon?prio=5?os_prio=0?cpu=150732651.74ms?elapsed=258053.43s?tid=0x00007f7c98012000?nid=0x3006?runnable??[0x00007f7ca28f8000]java.lang.Thread.State:?RUNNABLEat?java.util.Collections$UnmodifiableCollection$1.hasNext(java.base@13/Collections.java:1046)at?org.elasticsearch.cluster.routing.RoutingNode.shardsWithState(RoutingNode.java:148)at?org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.sizeOfRelocatingShards(DiskThresholdDecider.java:111)at?org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.getDiskUsage(DiskThresholdDecider.java:345)at?org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain(DiskThresholdDecider.java:290)at?org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain(AllocationDeciders.java:108)at?org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.decideMove(BalancedShardsAllocator.java:668)at?org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.moveShards(BalancedShardsAllocator.java:628)at?org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate(BalancedShardsAllocator.java:123)at?org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:405)at?org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:370)at?org.elasticsearch.cluster.metadata.MetaDataIndexStateService$1$1.execute(MetaDataIndexStateService.java:168)at?org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:47)at?org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:702)at?org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:324)at?org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:219)at?org.elasticsearch.cluster.service.MasterService.access$000(MasterService.java:73)at?org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:151)at?org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)at?org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)at?org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)at?org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)at?org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)at?java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13/ThreadPoolExecutor.java:1128)at?java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13/ThreadPoolExecutor.java:628)at?java.lang.Thread.run(java.base@13/Thread.java:830)
????/***?Determine?the?shards?with?a?specific?state*?@param?states?set?of?states?which?should?be?listed*?@return?List?of?shards*/public?List<ShardRouting>?shardsWithState(ShardRoutingState...?states)?{List<ShardRouting>?shards?=?new?ArrayList<>();for?(ShardRouting?shardEntry?:?this)?{for?(ShardRoutingState?state?:?states)?{if?(shardEntry.state()?==?state)?{shards.add(shardEntry);}}}return?shards;}
在shardsWithState中會對所有shard進行遍歷找到符合狀態的shard,并返回。在ES7.2后由于pr#39499功能的引入,導致即使index被關閉也將被統計,隨著集群shard數的增加需要遍歷的工作量急劇增加,導致處理緩慢
下面是ES官方給出的統計數據:
Shards Nodes Shards per node Reroute time without relocations Reroute time with relocations 60000 10 6000 ~250ms ~15000ms 60000 60 1000 ~250ms ~4000ms 10000 10 1000 ~60ms ~250ms 由此可見即使在正常情況下,隨著集群shard數的增加系統的處理耗時也是在快速增加的,需要進行優化
代碼改進
為修復該問題,在新版本的ES中修改了RoutingNode的結構,在原來的基礎上新增了兩個LinkedHashSet結構的initializingShards和relocatingShards,分別用來存儲INITIALIZING狀態和RELOCATING狀態的shard。在其構造函數中添加了對shard分類的邏輯,將INITIALIZING狀態和RELOCATING狀態的shard信息分別存儲在兩個LinkedHashSet結構中,具體代碼如下:
+?? private final LinkedHashSet<ShardRouting> initializingShards;
+???private?final?LinkedHashSet<ShardRouting>?relocatingShards;RoutingNode(String?nodeId,?DiscoveryNode?node,?LinkedHashMap<ShardId,?ShardRouting>?shards)?{this.nodeId?=?nodeId;this.node?=?node;this.shards?=?shards;
+???????this.relocatingShards?=?new?LinkedHashSet<>();
+???????this.initializingShards?=?new?LinkedHashSet<>();
+???????for?(ShardRouting?shardRouting?:?shards.values())?{
+???????????if?(shardRouting.initializing())?{
+???????????????initializingShards.add(shardRouting);
+???????????}?else?if?(shardRouting.relocating())?{
+???????????????relocatingShards.add(shardRouting);
+???????????}
+????????}
+???????assert?invariant();
}
由于RoutingNode的結構中新增了initializingShards和relocatingShards,所以其add、update、remove、numberOfShardsWithState和shardsWithState也需要同步做改動,具體如下:
void?add(ShardRouting?shard)?{
+???????assert?invariant();if?(shards.containsKey(shard.shardId()))?{throw?new?IllegalStateException("Trying?to?add?a?shard?"?+?shard.shardId()?+?"?to?a?node?["?+?nodeId+?"]?where?it?already?exists.?current?["?+?shards.get(shard.shardId())?+?"].?new?["?+?shard?+?"]");}shards.put(shard.shardId(),?shard);+???????if?(shard.initializing())?{
+???????????initializingShards.add(shard);
+???????}?else?if?(shard.relocating())?{
+???????????relocatingShards.add(shard);
+???????}
+???????assert?invariant();}
void?update(ShardRouting?oldShard,?ShardRouting?newShard)?{
+???????assert?invariant();if?(shards.containsKey(oldShard.shardId())?==?false)?{//?Shard?was?already?removed?by?routing?nodes?iterator//?TODO:?change?caller?logic?in?RoutingNodes?so?that?this?check?can?go?awayreturn;}ShardRouting?previousValue?=?shards.put(newShard.shardId(),?newShard);assert?previousValue?==?oldShard?:?"expected?shard?"?+?previousValue?+?"?but?was?"?+?oldShard;+???????if?(oldShard.initializing())?{
+???????????boolean?exist?=?initializingShards.remove(oldShard);
+???????????assert?exist?:?"expected?shard?"?+?oldShard?+?"?to?exist?in?initializingShards";
+???????}?else?if?(oldShard.relocating())?{
+???????????boolean?exist?=?relocatingShards.remove(oldShard);
+???????????assert?exist?:?"expected?shard?"?+?oldShard?+?"?to?exist?in?relocatingShards";
+???????}
+???????if?(newShard.initializing())?{
+???????????initializingShards.add(newShard);
+???????}?else?if?(newShard.relocating())?{
+???????????relocatingShards.add(newShard);
+???????}
+???????assert?invariant();}
void?remove(ShardRouting?shard)?{
+???????assert?invariant();ShardRouting?previousValue?=?shards.remove(shard.shardId());assert?previousValue?==?shard?:?"expected?shard?"?+?previousValue?+?"?but?was?"?+?shard;
+???????if?(shard.initializing())?{
+???????????boolean?exist?=?initializingShards.remove(shard);
+???????????assert?exist?:?"expected?shard?"?+?shard?+?"?to?exist?in?initializingShards";
+???????}?else?if?(shard.relocating())?{
+???????????boolean?exist?=?relocatingShards.remove(shard);
+???????????assert?exist?:?"expected?shard?"?+?shard?+?"?to?exist?in?relocatingShards";
+???????}
+???????assert?invariant();
+????}
public?int?numberOfShardsWithState(ShardRoutingState...?states)?{
+???????if?(states.length?==?1)?{
+???????????if?(states[0]?==?ShardRoutingState.INITIALIZING)?{
+???????????????return?initializingShards.size();
+???????????}?else?if?(states[0]?==?ShardRoutingState.RELOCATING)?{
+???????????????return?relocatingShards.size();
+???????????}
+???????}int?count?=?0;for?(ShardRouting?shardEntry?:?this)?{for?(ShardRoutingState?state?:?states)?{if?(shardEntry.state()?==?state)?{count++;}}}return?count;}
public?List<ShardRouting>?shardsWithState(String?index,?ShardRoutingState...?states)?{List<ShardRouting>?shards?=?new?ArrayList<>();+???????if?(states.length?==?1)?{
+???????????if?(states[0]?==?ShardRoutingState.INITIALIZING)?{
+???????????????for?(ShardRouting?shardEntry?:?initializingShards)?{
+????????????????if?(shardEntry.getIndexName().equals(index)?==?false)?{
+????????????????????continue;
+????????????????}
+????????????????shards.add(shardEntry);
+????????????}
+????????????return?shards;
+????????}?else?if?(states[0]?==?ShardRoutingState.RELOCATING)?{
+????????????for?(ShardRouting?shardEntry?:?relocatingShards)?{
+????????????????if?(shardEntry.getIndexName().equals(index)?==?false)?{
+????????????????????continue;
+????????????????}
+????????????????shards.add(shardEntry);
+????????????}
+????????????return?shards;
+??????????}
+???????}for?(ShardRouting?shardEntry?:?this)?{if?(!shardEntry.getIndexName().equals(index))?{continue;}for?(ShardRoutingState?state?:?states)?{if?(shardEntry.state()?==?state)?{shards.add(shardEntry);}}}return?shards;}
????public?int?numberOfOwningShards()?{
-????????int?count?=?0;
-????????for?(ShardRouting?shardEntry?:?this)?{
-????????????if?(shardEntry.state()?!=?ShardRoutingState.RELOCATING)?{
-????????????????count++;
-????????????}
-????????}
-
-????????return?count;
+????????return?shards.size()?-?relocatingShards.size();}+????private?boolean?invariant()?{
+????
+????????//?initializingShards?must?consistent?with?that?in?shards
+????????Collection<ShardRouting>?shardRoutingsInitializing?=
+????????????shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList());
+????????assert?initializingShards.size()?==?shardRoutingsInitializing.size();
+????????assert?initializingShards.containsAll(shardRoutingsInitializing);+????????//?relocatingShards?must?consistent?with?that?in?shards
+????????Collection<ShardRouting>?shardRoutingsRelocating?=
+????????????shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList());
+????????assert?relocatingShards.size()?==?shardRoutingsRelocating.size();
+????????assert?relocatingShards.containsAll(shardRoutingsRelocating);+????????return?true;
+????}
上面的add、update、remove方法的開始和結尾處都添加了assert invariant(),這個確保了initializingShards和relocatingShards中存儲的INITIALIZING狀態和RELOCATING狀態的shard在任何時候都是最新的,但是,隨著shard的數量級的增長,invariant()方法花費的時間也會增大,所以在shard進行add、update、remove操作時所耗費的時間也會增大。
該修復通過使用兩個LinkedHashSet結構來存儲initializingShards和relocatingShards的信息,同時在每次shard更新時同步更新LinkedHashSet里面的信息,由此降低了每次使用時都需要重新統計全量shard信息的開銷,提高了處理效率。該問題在ES 7.2-7.5間的版本上,當集群shard超過50000以上就極有可能觸發。BUG在ES 7.6上被修復。
3.問題處理
當時為快速恢復服務,對集群進行了重啟操作。但集群相關作業處理仍然很慢,整個恢復過程持續很長時間。后續我們的處理方法是:
-
臨時設置設置集群參數"cluster.routing.allocation.disk.include_relocations":"false"(不推薦使用,在ES 7.5后該參數被廢棄。在磁盤使用率接近高水位時會出現錯誤的計算,導致頻繁的數據遷移)
-
減少集群的shard數目,縮短在線數據查詢時間范圍為最近20天,目前控制集群shard總數在5萬左右
上面的處理方法只能緩解問題,沒有從根本上解決,如果要解決該問題可以進行以下處理:
-
升級ES的版本至已修復bug的版本
-
控制集群總shard數目在合理范圍內