資源&版本信息
Flink版本1.14.6
運行平臺:K8s
HA使用ZK(使用K8s的ETC應該是一個道理)
詳解Flink HA原理
Flink啟動時會創建HighAvailabilityServices提供HA和相關基礎服務,其中包括leaderRetrievalService和LeaderElectionService服務;
-
leaderRetrievalService用于高可用組件的調用方獲得leader節點,例如在JobManager中通過ResourceManagerLeaderRetriever服務獲取ResourceManager的Leader節點;
-
LeaderElectionService用于主節點競選,一旦當前組件被選為Leader節點,就可以對外提供服務,leaderRetrievalService就能夠獲取已注冊且有效的Leader節點;
LeaderRetrievalService講解
DefaultLeaderRetrievalService類結構如下
-
LeaderRetrievalService默認實現類DefaultLeaderRetrievalService;DefaultLeaderRetrievalService還實現了LeaderRetrievalEventHandler接口(該接口只有notifyLeaderAddress方法,用于狀態變化時被回調);
-
DefaultLeaderRetrievalService中notifyLeaderAddress方法會判斷當前是否處于運行狀態,然后調用leaderListener.notifyLeaderAddress方法通知監聽器leader變更!
-
DefaultLeaderRetrievalService.leaderListener是LeaderRetrievalListener一種實現JobManagerLeaderListener,用于TaskManager監聽Jobmanager變更的實現類,實現在jobmanager變更時及時修改連接信息。
如何實現HA變更時發送告警信息了?
JobManager宕機重啟或ZK不可用后恢復,此時肯定會發生HA切換,其次根據代碼觀察每次HA切換必會導致leaderId變化(每次連接),根據上述背景知識逐個情況分析。
JobManager宕機:
根據日志觀察leaderListener.notifyLeaderAddress方法會被調用兩次,第一次是將leaderId地址設置為空,在JobManager啟動并選舉為leader后,notifyLeaderAddress會被再次調用將leaderId設置為最新的leaderId;
ZK不可用:
根據日志觀察leaderListener.notifyLeaderAddress方法會被調用三次,假設leaderId原先是A,先被設置為null,然后被設置為A,再被設置為B;
結論:
根據上述情況,我們可以在leaderListener.notifyLeaderAddress方法中記錄每次的leaderId的值,當該值發生變化時,變為null或者由A變成B時發送HA變更告警即可;為了更加精準,選擇leaderId在A變成B時,或者leaderId在null變成B時發送告警
如何在leaderListener.notifyLeaderAddress方法中將告警發出了?
使用java agent,在flink啟動時設置agent即可(-javaagent:agent.jar=123
其中123是入參);具體代碼插入點可選擇方法進入時或同步塊中
@Override
public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable final UUID leaderId) {Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();synchronized (lock) {if (stopped) {LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. "+ "However, the service is no longer running.",DefaultJobLeaderService.class.getSimpleName(),jobId);} else {final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",jobId,leaderAddress,jobMasterId);if (leaderAddress == null || leaderAddress.isEmpty()) {// the leader lost leadership but there is no other leader yet.jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);closeRpcConnection();} else {// check whether we are already connecting to this leaderif (Objects.equals(jobMasterId, currentJobMasterId)) {LOG.debug("Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.",jobId);} else {closeRpcConnection();openRpcConnectionTo(leaderAddress, jobMasterId);}}}}// send callbacks outside of the lock scopejobManagerLostLeadership.ifPresent(oldJobMasterId ->jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}
最終實現待更新~