聊聊storm nimbus的LeaderElector

為什么80%的碼農都做不了架構師?>>> ??hot3.png

本文主要研究一下storm nimbus的LeaderElector

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.java

    public static void main(String[] args) throws Exception {Utils.setupDefaultUncaughtExceptionHandler();launch(new StandaloneINimbus());}public static Nimbus launch(INimbus inimbus) throws Exception {Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);if (checkAcl) {AclEnforcement.verifyAcls(conf, fixupAcl);}return launchServer(conf, inimbus);}private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {StormCommon.validateDistributedMode(conf);validatePortAvailable(conf);StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);nimbus.launchServer();final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);metricsRegistry.startMetricsReporters(conf);Utils.addShutdownHookWithDelayedForceKill(() -> {metricsRegistry.stopMetricsReporters();nimbus.shutdown();server.stop();}, 10);if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {nimbus.initWorkerTokenManager();}LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);server.serve();return nimbus;}public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,StormMetricsRegistry metricsRegistry)throws Exception {//......if (blobStore == null) {blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);}this.blobStore = blobStore;if (topoCache == null) {topoCache = new TopoCache(blobStore, conf);}if (leaderElector == null) {leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),metricsRegistry);}this.leaderElector = leaderElector;this.blobStore.setLeaderElector(this.leaderElector);//......}public void launchServer() throws Exception {try {BlobStore store = blobStore;IStormClusterState state = stormClusterState;NimbusInfo hpi = nimbusHostPortInfo;LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));validator.prepare(conf);//add to nimbusesstate.addNimbusHost(hpi.getHost(),new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));leaderElector.addToLeaderLockQueue();this.blobStore.startSyncBlobs();for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {exec.prepare();}if (isLeader()) {for (String topoId : state.activeStorms()) {transition(topoId, TopologyActions.STARTUP, null);}clusterMetricSet.setActive(true);}//......} catch (Exception e) {if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {throw e;}if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {throw e;}LOG.error("Error on initialization of nimbus", e);Utils.exitProcess(13, "Error on initialization of nimbus");}}
  • Nimbus在構造器里頭調用Zookeeper.zkLeaderElector創建leaderElector
  • launchServer方法調用了leaderElector.addToLeaderLockQueue()參與leader選舉

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {return _instance.zkLeaderElectorImpl(conf, blobStore);}protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";String id = NimbusInfo.fromConf(conf).toHostPortString();AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,leaderLatchListenerAtomicReference, blobStore);}
  • 這里使用/leader-lock路徑創建了LeaderLatch,然后使用leaderLatchListenerImpl創建了LeaderLatchListener
  • 最后使用LeaderElectorImp創建ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    // Leader latch listener that will be invoked when we either gain or lose leadershippublic static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {final String hostName = InetAddress.getLocalHost().getCanonicalHostName();return new LeaderLatchListener() {final String STORM_JAR_SUFFIX = "-stormjar.jar";final String STORM_CODE_SUFFIX = "-stormcode.ser";final String STORM_CONF_SUFFIX = "-stormconf.ser";@Overridepublic void isLeader() {Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);// this finds all active topologies blob keys from all local topology blob keysSets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),generateJoinedString(diffTopology));if (diffTopology.isEmpty()) {Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);// this finds all dependency blob keys from active topologies from all local blob keysSets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),generateJoinedString(diffDependencies));if (diffDependencies.isEmpty()) {LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");} else {LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");closeLatch();}} else {LOG.info("code for all active topologies not available locally, giving up leadership.");closeLatch();}}@Overridepublic void notLeader() {LOG.info("{} lost leadership.", hostName);}//......private void closeLatch() {try {leaderLatch.close();} catch (IOException e) {throw new RuntimeException(e);}}};}
  • leaderLatchListenerImpl返回一個LeaderLatchListener接口的實現類
  • isLeader接口里頭做了一些校驗,即當被zookeeper選中為leader的時候,如果本地沒有所有的active topologies或者本地沒有所有dependencies,那么就需要調用leaderLatch.close()放棄leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.java

public class LeaderElectorImp implements ILeaderElector {private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);private final Map<String, Object> conf;private final List<String> servers;private final CuratorFramework zk;private final String leaderlockPath;private final String id;private final AtomicReference<LeaderLatch> leaderLatch;private final AtomicReference<LeaderLatchListener> leaderLatchListener;private final BlobStore blobStore;private final TopoCache tc;private final IStormClusterState clusterState;private final List<ACL> acls;private final StormMetricsRegistry metricsRegistry;public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,StormMetricsRegistry metricsRegistry) {this.conf = conf;this.servers = servers;this.zk = zk;this.leaderlockPath = leaderlockPath;this.id = id;this.leaderLatch = leaderLatch;this.leaderLatchListener = leaderLatchListener;this.blobStore = blobStore;this.tc = tc;this.clusterState = clusterState;this.acls = acls;this.metricsRegistry = metricsRegistry;}@Overridepublic void prepare(Map<String, Object> conf) {// no-op for zookeeper implementation}@Overridepublic void addToLeaderLockQueue() throws Exception {// if this latch is already closed, we need to create new instance.if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {leaderLatch.set(new LeaderLatch(zk, leaderlockPath));LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,metricsRegistry);leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");}// Only if the latch is not already started we invoke startif (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {leaderLatch.get().addListener(leaderLatchListener.get());leaderLatch.get().start();LOG.info("Queued up for leader lock.");} else {LOG.info("Node already in queue for leader lock.");}}@Override// Only started latches can be closed.public void removeFromLeaderLockQueue() throws Exception {if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {leaderLatch.get().close();LOG.info("Removed from leader lock queue.");} else {LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");}}@Overridepublic boolean isLeader() throws Exception {return leaderLatch.get().hasLeadership();}@Overridepublic NimbusInfo getLeader() {try {return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());} catch (Exception e) {throw Utils.wrapInRuntime(e);}}@Overridepublic List<NimbusInfo> getAllNimbuses() throws Exception {List<NimbusInfo> nimbusInfos = new ArrayList<>();Collection<Participant> participants = leaderLatch.get().getParticipants();for (Participant participant : participants) {nimbusInfos.add(Zookeeper.toNimbusInfo(participant));}return nimbusInfos;}@Overridepublic void close() {//Do nothing now.}
}
  • LeaderElectorImp實現了ILeaderElector接口
  • addToLeaderLockQueue方法檢測如果latch已經closed,則重新創建一個新的,然后檢測latch的狀態,如果還沒有start的話,則調用start參與選舉
  • 之所以對closed狀態的latch創建一個,主要有兩個原因:一是對已經closed的latch進行方法調用會拋異常,二是被zk選舉為leader,但是不滿意storm的一些leader條件會放棄leadership即close掉

小結

  • storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch來實現
  • storm nimbus自定義了LeaderLatchListener,對成為leader之后的nimbus進行校驗,需要本地擁有所有的active topologies以及所有dependencies,否則放棄leadership

doc

  • Highly Available Nimbus Design

轉載于:https://my.oschina.net/go4it/blog/2239477

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/281933.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/281933.shtml
英文地址,請注明出處:http://en.pswp.cn/news/281933.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Android框架式編程之BufferKnife

BufferKnife作為框架式編程的重要組成部分&#xff0c;使用BufferKnife能夠極大的精簡View層面的代碼量&#xff0c;并為MVP/MVC方式提供輔助。 一、配置 compile com.jakewharton:butterknife:(insert latest version) annotationProcessor com.jakewharton:butterknife-compi…

如果我去深圳,你會見我嗎

▲圖/ 深圳夜景初次見易小姐&#xff0c;還是21年的春節回老家的時候。想來20年因為疫情沒有回家&#xff0c;家母幾次三番電話里頭表達的思念以及建議一些不靠譜的回家計劃&#xff0c;著實有些不忍&#xff0c;確實有似“兒行千里母擔憂”之理&#xff0c;索性拿著年假和加班…

CodeForces - 1059D(二分+誤差)

鏈接&#xff1a;CodeForces - 1059D 題意&#xff1a;給出笛卡爾坐標系上 n 個點&#xff0c;求與 x 軸相切且覆蓋了所有給出點的圓的最小半徑。 題解&#xff1a;二分半徑即可。判斷&#xff1a;假設當前二分到的半徑是 R &#xff0c;因為要和 x 軸相切&#xff0c;所以圓心…

pureref 平移用不了_關于參考圖管理神器 PureRef 的一些快捷鍵

PureRef 的一些快捷鍵 軟件下載&#xff1a;點擊這里控制(配合左鍵)窗口內鼠標左鍵     框選窗口邊鼠標左鍵     調整窗口大小鼠標中鍵 或 按住Alt     移動畫布鼠標滾輪 或 按住Z     縮放畫布按住S     查看目標位置顏色信息(可復制16進制顏色…

Windows 10 版本信息

Windows 10 版本信息 原文 https://technet.microsoft.com/zh-cn/windows/release-info Windows 10 版本信息 Microsoft 已更新其服務模型。 半年頻道每年發布兩次功能更新&#xff0c;時間大概在 3 月和 9 月&#xff0c;每個版本的服務時間線為 18 個月。 從 Windows 10 版本…

開源輕量的 .NET 監控工具 - 看門狗

你好&#xff0c;這里是 Dotnet 工具箱&#xff0c;定期分享 Dotnet 有趣&#xff0c;實用的工具或組件&#xff0c;希望對您有用&#xff01;簡介WatchDog 是一個使用 C# 開發的開源的輕量監控工具&#xff0c;它可以記錄和查看 ASP.Net Core Web 和 WebApi 的實時消息、事件、…

python讀取oracle數據庫性能_用python對oracle進行簡單性能測試

一、概述dba在工作中避不開的兩個問題&#xff0c;sql使用綁定變量到底會有多少的性能提升&#xff1f;數據庫的審計功能如果打開對數據庫的性能會產生多大的影響&#xff1f;最近恰好都碰到了&#xff0c;索性做個實驗。sql使用綁定變量對性能的影響開通數據庫審計功能對性能的…

BZOJ 3231: [Sdoi2008]遞歸數列 (JZYZOJ 1353) 矩陣快速冪

http://www.lydsy.com/JudgeOnline/problem.php?id3231和斐波那契一個道理在最后加一個求和即可1 #include<cstdio>2 #include<cstring>3 #include<iostream>4 //using namespace std;5 const int maxn10010;6 const double eps1e-8;7 long long modn;8 lon…

馬斯克的火箭上天了,SpaceX開源項目也登上了熱榜!

python知識手冊SpaceX于美國東部時間5月30日下午3&#xff1a;22分將兩位美國宇航員送往國際空間站&#xff0c;雖然這只是Demo任務&#xff0c;但SpaceX已經以其卓越工程優勢、低廉的發射成本贏得了全球航天產業的信賴。同時也是除美俄中這些航天國家隊以外&#xff0c;唯一獨…

EasyMock學習筆記

目前在接觸平臺側的開發&#xff0c;發現平臺側的東西和以前javacard開發很不一樣&#xff0c;看來以后要學的東西還有很多很多。今天接觸了下EasyMock。 Mock 方法是單元測試中常見的一種技術&#xff0c;它的主要作用是模擬一些在應用中不容易構造或者比較復雜的對象&#xf…

app啟動廣告頁的實現,解決了廣告圖片要實時更新的問題

網上很多的實現方法很多都是顯示第一次的緩存的圖片&#xff0c;這樣就造成后臺更新廣告圖片App不能實時展示的問題。 我的具體實現思路是&#xff1a; 1.啟動時先獲取啟動頁的圖片全屏展示。 2.設計一個等待時間&#xff0c;如果超過等待時間還沒拿到圖片就把獲取的啟動頁去掉…

vue中點擊插入html_Vue中插入HTML代碼的方法

我們需要吧Hello World插入到My name is Pjee應該如何做&#xff1f;一、使用v-htmlv-html:更新元素的 innerHTMLconst text Hello World>My name is Pjee注意&#xff1a;你的站點上動態渲染的任意 HTML 可能會非常危險&#xff0c;因為它很容易導致 XSS 攻擊。請只對可信…

進程共享變量#pragma data_seg用法

#pragma data_seg介紹用#pragma data_seg建立一個新的數據段并定義共享數據&#xff0c;其具體格式為&#xff1a;   #pragma data_seg &#xff08;"shareddata")   HWND sharedwndNULL;//共享數據   #pragma data_seg() ---------------------------------…

機器視覺Halcon教程(1.介紹)

前言本期教程主要教大家如何使用Halcon機器視覺&#xff0c;通過使用Halcon, 我們可以實現一些機器視覺的應用開發。例如: OCR識別、視覺定位、缺陷檢測等內容。什么是halcon&#xff1f;簡單來說, Halcon就是一款應用于機器視覺的軟件&#xff0c;它提供了一套開發工具&#x…

網絡時間的那些事及 ntpq 詳解

2019獨角獸企業重金招聘Python工程師標準>>> GMT (Greenwich Mean Time)格林威治時間 UTC (Coordinated Universal Time) 協調世界時 IAT (International Atomic Time),TAI 國際原子時 CST (Chinese Standard Time), 北京時間Gentoo&#xff08;也許其他發行版也是&…

【前端芝士樹】Javascript的原型與原型鏈

【前端芝士樹】Javascript的原型、原型鏈以及繼承機制 前端的面試中經常會遇到這個問題&#xff0c;自己也是一直似懂非懂&#xff0c;趁這個機會整理一下0. 為什么會出現原型和原型鏈的概念 1994年&#xff0c;網景公司&#xff08;Netscape&#xff09;發布了Navigator瀏覽器…

神奇的幻方2015提高組d1t1

題目描述 幻方是一種很神奇的N*N矩陣&#xff1a;它由數字1,2,3,……,N*N構成&#xff0c;且每行、每列及兩條對角線上的數字之和都相同。 當N為奇數時&#xff0c;我們可以通過以下方法構建一個幻方&#xff1a; 首先將1寫在第一行的中間。 之后&#xff0c;按如下方式從小到大…

goldengate mysql_使用GoldenGate實現MySQL到Oracle的數據實時同步

step 1: 配置mysql修改配置文件my.ini#for goldengatelog-bin "C:/mysql/logbin/logbin.log"binlog-format ROWlog-bin-index "C:\mysql\logindex"binlog_cache_size32mmax_binlog_cache_size512mmax_binlog_size512m添加數據庫用戶ggs&#xff0c;具有…

C# 反射之Activator用法舉例

概述程序運行時&#xff0c;通過反射可以得到其它程序集或者自己程序集代碼的各種信息&#xff0c;包括類、函數、變量等來實例化它們&#xff0c;執行它們&#xff0c;操作它們&#xff0c;實際上就是獲取程序在內存中的映像&#xff0c;然后基于這個映像進行各種操作。Activa…

MyBatis批量插入

轉載于:https://blog.51cto.com/12701034/1929672