ElasticMQ是一臺消息服務器,具有Scala,Java和與Amazon SQS兼容的接口。 它通過跨服務器群集復制消息來支持有保證的消息傳遞,并通過日志記錄實現消息持久性。
消息復制是ElasticMQ的核心功能之一。 但是,如果您看一下代碼,則只有少數幾個類,最長的類有76行(請記住,這是Scala,雖然;))。 這是因為ElasticMQ使用JGroups作為基礎通信庫。 JGroups已經很老了,特別是對于Java庫而言-JGroups的第一個發行版是在1999年(!)。 但是,它遠不是過時和過時的-它具有一個不錯的API,可以正常工作,擁有一個良好的社區。 并且因為任何Java庫都可以與Scala很好地協作。
JGroups具有許多有用的功能:
- 可靠的組播
- 集群管理
- 故障檢測
- 節點發現
- 多年的性能改進
它們廣泛用于在ElasticMQ中實施復制。 以下是其完成方式的摘要。
ElasticMQ集群如何工作?
在單個ElasticMQ集群中,一個節點始終是主節點。 您只能對此節點執行操作。 然后將每個操作的結果復制到其他成員。 有兩種與阻止有關的選項; 復制可以是完全異步的,也可以等待直到至少一個或所有節點確認該操作。 為了確保在群集分區的情況下不會從不同分區收到相同的消息,只有具有至少一半+1節點操作的分區處于活動狀態。
ElasticMQ中的中心概念是消息存儲。 存儲器執行命令(例如,發送消息命令,刪除消息命令等)。 復制層只是任何其他存儲的包裝。 但是請注意,我們只能復制產生的存儲突變(因此在執行命令之后),而不是原始命令本身。 例如,如果命令是“接收消息”,則在每臺計算機上執行該命令的結果可能會有所不同。 因此,如果接收消息成功,我們將僅復制消息可見性的更改(在ElasticMQ中,類似于Amazon SQS ,如果接收到消息,則會在指定的時間段內阻止后續接收該消息)。 您可以在JGroupsReplicatedStorage中看到此基本邏輯。

初始化集群
在開始復制之前,首先要做的是初始化集群。 這是在ReplicatedStorageConfigurator中完成的。 作為參數,我們需要一個JGroups配置文件,該文件是協議棧。 您實際上并不需要知道每種協議的功能以及所有這些配置參數的含義。 最有用的兩個是udp.xml和tcp.xml 。 如果您的網絡中有多播,則應使用第一個;如果所有通信都應通過TCP(例如,在EC2上),則應使用第二個。 在后一種情況下,您還需要提供初始IP列表。 該列表不必詳盡無遺,只需列出種子即可。
擁有協議棧,ElasticMQ創建一個JChannel并將其連接,這僅意味著連接到集群。 實際上,這就是使用JGroups創建集群所需要做的所有工作-非常簡單,對吧? 正如您在ReplicatedStorageConfigurator的末尾看到的那樣,連接之后的第一件事是對channel.getState(null,0)的調用。 這將轉到當前的主節點(稍后會進行有關主選舉的更多信息),獲取狀態(當前的隊列和消息)并將其應用于當前的節點(請參閱非常簡單的JGroupsStateTransferMessageListener-處理發送和接收)。 這里有兩件事要注意。 首先,此傳輸不會阻止整個群集正常運行。 其次,如果在狀態轉移期間執行了一項操作,則該操作也會被復制。 因此,可能會在新節點上執行一次命令兩次。 但這無關緊要,因為每個復制的命令都是冪等的,因此可以多次應用。 在其他情況下,必須實施某些應用程序側機制以防止此類情況。

復制數據
最后,我們進入核心:復制命令。 在發送方,這由JGroupsReplicationMessageSender處理。 同樣,這不是一個非常復雜的類。 它使用來自JGroups的MessageDispatcher “構建塊”,除了在整個集群中對消息進行多重處理外,還使您能夠等待,直到指定數量的節點接收到它為止。 在接收方,我們有JGroupsRequestHandler 。 同樣,非常簡單。 收到消息后,它僅發送到存儲。

集群管理
您可能還注意到SetMaster特殊消息。 用戶需要此權限才能讀取當前主節點的節點地址。 主選舉(決定哪個節點是主節點)完全由JGroups處理。 JGroups中沒有特定的算法來選舉主節點,但是我們可以利用以下事實:每個節點都有相同的集群視圖,由JGroups View類表示。 我們要做的就是簡單地從列表中獲取第一個(或最后一個或第3個,等等-只要在所有節點上都相同),然后將其設置為主節點即可。
群集視圖由最后一個“核心”復制類JGroupsMembershipListener處理 。 那里發生了兩件事。 每當新節點加入或離開集群時,都會調用viewAccepted回調。 每個具有View類的實例(很好,等于:))的節點。 主機在單獨的線程中廣播其地址(這是ElasticMQ服務器地址,而不是內部JGroups集群通信地址)。 在一個JGroups回調方法中執行阻塞操作是一個非常容易的錯誤。 您永遠不應該那樣做,因為整個堆棧都可以鎖定。 我們還需要FLUSH協議(總是在集群設置過程中添加); 該協議可確保在所有節點都安裝新視圖之前,不發送新消息,因此,我們確保新節點始終接收主信息。
成員資格偵聽器還處理集群合并。 同樣,JGroups為我們提供了合并分區的視圖以及新的合并視圖。 在ElasticMQ中,除了主分區(最大分區)以外的所有分區都請求狀態轉移,就像連接到集群之后一樣。 這樣,數據將保持一致狀態。

加起來
還值得注意的是,使用ScalaTest對ElasticMQ的復制進行了全面測試。 每個測試都會創建一個內存存儲集群,創建新節點或模擬節點崩潰。 請參見JGroupsReplicatedStorageTest類。
有了JGroups的機制,就可以輕松實現集群通信。 但是,與往常一樣,您需要記住一些有關并發的陷阱(例如,新節點加入時可能會有集群活動;分區和合并可能隨時發生;正常消息和集群視圖更改之間沒有順序) ;可以在狀態轉移期間發送消息;等等。 但是,JGroups 教程和手冊都非常全面,并且得到了論壇的其他幫助(感謝Bela!),您應該一切順利。
您可以通過下載獨立的ElasticMQ 發行版或以嵌入式方式運行它,來嘗試復制在實踐中的工作方式。
參考:來自Adam Warski博客的Blog的 JCG合作伙伴 Adam Warski使用JGroups在ElasticMQ中實現消息復制 。
翻譯自: https://www.javacodegeeks.com/2012/06/elasticmq-message-replication-with.html