消息隊列基礎
適合消息隊列解決的問題
- 異步處理:處理完關鍵步驟后直接返回結果,后續放入隊列慢慢處理
- 流量控制:
- 使用消息隊列隔離網關和后端服務,以達到流量控制和保護后端服務的目的。能根據下游的處理能力自動調節流量,達到“削峰填谷”的作用
- 網關在收到請求后,將請求放入請求消息隊列;
- 后端服務從請求消息隊列中獲取 APP 請求,完成后續秒殺處理過程,然后返回結果。
- 用消息隊列實現一個令牌桶,更簡單地進行流量控制。
- 原理:單位時間內只發放固定數量的令牌到令牌桶中,規定服務在處理請求之前必須先從令牌桶中拿出一個令牌,如果令牌桶中沒有令牌,則拒絕請求。這樣就保證單位時間內,能處理的請求不超過發放令牌的數量,起到了流量控制的作用。
- 實現:只要網關在處理 APP 請求時增加一個獲取令牌的邏輯。
- 使用消息隊列隔離網關和后端服務,以達到流量控制和保護后端服務的目的。能根據下游的處理能力自動調節流量,達到“削峰填谷”的作用
- 服務解耦
網關如何接收服務端的秒殺結果
處理 APP 請求的線程
- 網關在收到 APP 的秒殺請求后,直接給消息隊列發消息。消息的內容只要包含足夠的字段就行了,比如用戶 ID、設備 ID、請求時間等等。另外,還需要包含這個請求的 ID 和網關的 ID。
- 如果發送消息失敗,可以直接給 APP 返回秒殺失敗結果,成功發送消息之后,線程就阻塞等待秒殺結果,設定一個等待的超時時間。
- 等待結束之后,去存放秒殺結果的 Map 中查詢是否有返回的秒殺結果,如果有就構建 Response,給 APP 返回秒殺結果,如果沒有,按秒殺失敗處理。
網關如何來接收從后端秒殺服務返回的秒殺結果。
- 選擇用 RPC 的方式來返回秒殺結果,這里網關節點是 RPC 服務端,后端服務為客戶端。之前網關發出去的消息中包含了網關的 ID,后端服務可以通過這個網關 ID 來找到對應的網關實例,秒殺結果中需要包含請求 ID,這個請求 ID 也是從消息中獲取的。、
- 網關收到后端服務的秒殺結果之后,用請求 ID 為 Key,結果保存到秒殺結果的 Map 中,然后通知對應的處理 APP 請求的線程,結束等待。處理 APP 請求的線程,在結束等待之后,會去秒殺的結果 Map 中查詢這個結果,然后再給 APP 返回響應。
如何選擇消息隊列
- 選擇標準
- 開源產品、社區活躍
- 消息的可靠傳遞:確保不丟消息;
- Cluster:支持集群,確保不會因為某個節點宕機導致服務不可用,當然也不能丟消息;
- 性能:具備足夠好的性能,能滿足絕大多數場景的性能要求。
- 消息隊列產品
- RabbitMQ
- RocketMQ
- Kafka
消息模型:主題和隊列有什么區別?
- 隊列:生產者,消費者,存放消息的容器稱為隊列
- 主題:發布者,訂閱者,服務端存放消息的容器稱為主題。訂閱者先訂閱主題
rabbitMq
- 使用隊列解決多個消費者問題
- Exchange 位于生產者和隊列之間,生產者將消息發送給 Exchange,由 Exchange 上配置的策略來決定將消息投遞到哪些隊列中。
- 同一份消息如果需要被多個消費者來消費,需要配置 Exchange 將消息發送到多個隊列,每個隊列中都存放一份完整的消息數據,可以為一個消費者提供消費服務。這也可以變相地實現新發布 - 訂閱模型中,“一份消息數據可以被多個訂閱者來多次消費”這樣的功能。
RocketMQ和kafka
- 假設結構:主題MyTopic,為主題創建5個隊列(Q1Q2Q3Q4Q5),分布在2個broker中(broker0,broker1),3 個生產者實例:Produer0,Produer1 和 Producer2。
- 3 個生產者不用對應,隨便發。
- 每個消費組就是一份訂閱,要消費主題 MyTopic 下,所有隊列的全部消息。
- 注意:隊列里的消息并不是消費掉就沒有了,這里的“消費”,只是去隊列里面讀了消息,并沒有刪除,消費完這條消息還是在隊列里面。
- 多個消費組在消費同一個主題時,消費組之間是互不影響的。
- 比如 2 個消費組:G0 和 G1。G0 消費了哪些消息,G1 是不知道的,G0 消費過的消息,G1 還可以消費。即使 G0 積壓了很多消息,對 G1 來說也沒有任何影響。
- 消費組的內部:在同一個消費組里面,每個隊列只能被一個消費者實例占用。至于如何分配,這里面有很多策略。保證每個隊列分配一個消費者就行了。
- 比如,可以讓消費者 C0 消費 Q0,Q1 和 Q2,C1 消費 Q3 和 Q4,如果 C0 宕機了,會觸發重新分配,這時候 C1 同時消費全部 5 個隊列。
- 隊列占用只是針對消費組內部來說的,對于其他的消費組來說是沒有影響的。
- 比如隊列 Q2 被消費組 G1 的消費者 C1 占用了,對于消費組 G2 來說,是完全沒有影響的,G2 也可以分配它的消費者來占用和消費隊列 Q2。
- 消費位置:每個消費組內部維護自己的一組消費位置,每個隊列對應一個消費位置。消費位置在服務端保存,并且,消費位置和消費者是沒有關系的。每個消費位置一般就是一個整數,記錄這個消費組中,這個隊列消費到哪個位置了,這個位置之前的消息都成功消費了,之后的消息都沒有消費或者正在消費。
實現單個隊列的并行消費?
比如說,隊列中當前有 10 條消息,對應的編號是 0-9,當前的消費位置是 5。同時來了三個消費者來拉消息,把編號為 5、6、7 的消息分別給三個消費者,每人一條。過了一段時間,三個消費成功的響應都回來了,這時候就可以把消費位置更新為 8 了,這樣就實現并行消費。
編號為 6、7 的消息響應回來了,編號 5 的消息響應一直回不來,怎么辦?
這個位置 5 就是一個消息空洞。為了避免位置 5 把這個隊列卡住,可以先把消費位置 5 這條消息,復制到一個特殊重試隊列中,然后依然把消費位置更新為 8,繼續消費。再有消費者來拉消息的時候,優先把重試隊列中的那條消息給消費者就可以了。
這是并行消費的一種實現方式。需要注意的是,并行消費開銷還是很大的,不應該作為一個常規的,提升消費并發的手段,如果消費慢需要增加消費者的并發數,還是需要擴容隊列數。
保證消息的嚴格順序
主題層面是無法保證嚴格順序的,只有在隊列上才能保證消息的嚴格順序。
-
如果業務必須要求全局嚴格順序,就只能把消息隊列數配置成 1,生產者和消費者也只能是一個實例,這樣才能保證全局嚴格順序。
-
大部分情況下只要保證局部有序就可以滿足要求了。比如,在傳遞賬戶流水記錄的時候,只要保證每個賬戶的流水有序就可以了,不同賬戶之間的流水記錄是不需要保證順序的。
- 保證局部嚴格順序實現。在發送端,使用賬戶 ID 作為 Key,采用一致性哈希算法計算出隊列編號,指定隊列來發送消息。一致性哈希算法可以保證,相同 Key 的消息總是發送到同一個隊列上,這樣可以保證相同 Key 的消息是嚴格有序的。如果不考慮隊列擴容,也可以用隊列數量取模的簡單方法來計算隊列編號。
利用事務消息實現分布式事務
引
消息隊列中的“事務”,主要解決的是消息生產者和消息消費者的數據一致性問題。
例:訂單系統,購物車系統訂閱主題,接收訂單創建的消息,清理購物車,刪除購物車的商品。
可能的異常:
- 創建了訂單,沒有清理購物車
- 訂單沒有創建成功,購物車里面的 商品被清理掉了
要保證訂單庫和購物車庫這兩個庫的數據一致性。
問題的關鍵點集中在訂單系統,創建訂單和發送消息這兩個步驟要么都操作成功,要么都操作失敗,不允許一個成功而另一個失敗的情況出現。
分布式事務
常見的分布式事務實現有 2PC(Two-phase Commit,也叫二階段提交)、TCC(Try-Confirm-Cancel) 和事務消息。每一種實現都有其特定的使用場景,也有各自的問題,都不是完美的解決方案。
消息隊列是如何實現分布式事務的?
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JAtvDcUH-1692190252202)(C:\Users\308158\AppData\Roaming\Typora\typora-user-images\image-20230808102403197.png)]
-
訂單系統在消息隊列上開啟一個事務。
-
訂單系統給消息服務器發送一個“半消息”(包含的內容就是完整的消息內容,但在事務提交之前,對于消費者來說,這個消息是不可見的。)
-
半消息發送成功后,訂單系統就可以執行本地事務了,在訂單庫中創建一條訂單記錄,并提交訂單庫的數據庫事務。
- 如果訂單創建成功,那就提交事務消息,購物車系統就可以消費到這條消息繼續后續的流程。
- 如果訂單創建失敗,那就回滾事務消息,購物車系統就不會收到這條消息。
-
有一個問題是沒有解決的。如果在第四步提交事務消息時失敗了怎么辦?
- Kafka 的解決方案比較簡單粗暴,直接拋出異常,讓用戶自行處理。可以在業務代碼中反復重試提交,直到提交成功,或者刪除之前創建的訂單進行補償。
- RocketMQ :增加了事務反查的機制來解決事務消息提交失敗的問題。如果 Producer 也就是訂單系統,在提交或者回滾事務消息時發生網絡異常,RocketMQ 的 Broker 沒有收到提交或者回滾的請求,Broker 會定期去 Producer 上反查這個事務對應的本地事務的狀態,然后根據反查結果決定提交或者回滾這個事務。
如何確保消息不會丟失
- 利用消息隊列的有序性來驗證是否有消息丟失
- 在 Producer 端,給每個發出的消息附加一個連續遞增的序號,然后在 Consumer 端來檢查這個序號的連續性。
- 大多數消息隊列的客戶端都支持攔截器機制,利用攔截器機制,在 Producer 發送消息之前的攔截器中將序號注入到消息中,在 Consumer 收到消息的攔截器中檢測序號的連續性。消息檢測的代碼不會侵入業務代碼中,待系統穩定后,也方便將這部分檢測的邏輯關閉或者刪除。
- 在分布式系統中實現這個檢測方法
- Kafka 和 RocketMQ 不保證在 Topic 上的嚴格順序的,只能保證分區上的消息是有序的,在發消息時必須要指定分區,并在每個分區單獨檢測消息序號的連續性。
- Producer 如果是多實例的,由于并不好協調多個 Producer 之間的發送順序,所以也需要每個 Producer 分別生成各自的消息序號,并且需要附加上 Producer 的標識,在 Consumer 端按照每個 Producer 分別來檢測序號的連續性。
- Consumer 實例的數量最好和分區數量一致,做到 Consumer 和分區一一對應,方便在 Consumer 內檢測消息序號的連續性。
- 確保消息可靠傳遞
- 丟失
- 生產階段: 從消息在 Producer 創建出來,經過網絡傳輸發送到 Broker 端。
- 正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失
- 同步發送,捕獲異常
- 異步發送,在回調方法里進行檢查
- 正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失
- 存儲階段: 消息在 Broker 端存儲,如果是集群,消息會在這個階段被復制到其他的副本上。
- 如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
- 對于單個節點的 Broker,需要配置 Broker 參數,在收到消息后,將消息寫入磁盤后再給 Producer 返回確認響應,這樣即使發生宕機,由于消息已經被寫入磁盤,就不會丟失消息,恢復后還可以繼續消費。
- 如果是 Broker 是由多個節點組成的集群,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失
- 如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
- 消費階段: Consumer 從 Broker 上拉取消息,經過網絡傳輸發送到 Consumer 上。
- 不要在收到消息后就立即發送消費確認,而是在執行完所有消費業務邏輯之后,再發送消費確認。
- 你可以看到,在消費的回調方法 callback 中,正確的順序是,先是把消息保存到數據庫中,然后再發送消費確認響應。這樣如果保存消息到數據庫失敗了,就不會執行消費確認的代碼,下次拉到的還是這條消息,直到消費成功。
- 不要在收到消息后就立即發送消費確認,而是在執行完所有消費業務邏輯之后,再發送消費確認。
- 生產階段: 從消息在 Producer 創建出來,經過網絡傳輸發送到 Broker 端。
- 丟失
處理消費過程中的重復消息?
- 消息重復情況
- At most once: 至多一次。消息在傳遞時,最多會被送達一次。換一個說法就是,沒什么消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不太高的監控場景使用,比如每分鐘上報一次機房溫度數據,可以接受數據少量丟失。
- At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。
- Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。
- 冪等性解決重復消息問題
- 在消費端,讓消費消息的操作具備冪等性。
- 從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。
- 利用數據庫的唯一約束實現冪等
- 例如:對于同一個轉賬單同一個賬戶只能插入一條記錄,后續重復的插入操作都會失敗,這樣就實現了一個冪等的操作。
- 為更新的數據設置前置條件
- 例如:給數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等更新。
- 記錄并檢查操作
- 在執行數據更新操作之前,先檢查一下是否執行過這個更新操作。
- 在發送消息時,給每條消息指定一個全局唯一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,如果沒有消費過,才更新數據,然后將消費狀態置為已消費。
- 利用數據庫的唯一約束實現冪等
處理消息積壓
消息積壓的直接原因:系統中的某個部分出現了性能問題,來不及處理上游發送的消息,才會導致消息積壓。
- 優化性能避免消息積壓
- 發送端性能優化
- 如果代碼發送消息的性能上不去,需要優先檢查是不是發消息之前的業務邏輯耗時太多導致的。
- 只需要注意設置合適的并發和批量大小,就可以達到很好的發送性能。
- 消費端性能優化
- 如果消費的速度跟不上發送端生產消息的速度,就會造成消息積壓。
- 設計系統時,一定要保證消費端的消費性能要高于生產端的發送性能
- 消費端的性能優化除了優化消費業務邏輯以外,也可以通過水平擴容,增加消費端的并發數來提升總體的消費性能。注意:在擴容 Consumer 的實例數量的同時,必須同步擴容主題中的分區(也叫隊列)數量,確保 Consumer 的實例數和分區數量是相等的。如果 Consumer 的實例數量超過分區數量,這樣的擴容實際上是沒有效果的
- 發送端性能優化
- 處理消息積壓
- 某一個時刻,突然就開始積壓消息并且積壓持續上漲。這種情況下需要你在短時間內找到消息積壓的原因,迅速解決問題才不至于影響業務。
- 能導致積壓突然增加,最粗粒度的原因:要么是發送變快了,要么是消費變慢了。
- 如果是單位時間發送的消息增多,比如趕上大促或者搶購,短時間內不太可能優化消費端的代碼來提升消費性能,唯一的方法是通過擴容消費端的實例數來提升總體的消費能力。
- 如果短時間內沒有足夠的服務器資源進行擴容,將系統降級,通過關閉一些不重要的業務,減少發送方發送的數據量,最低限度讓系統還能正常運轉,服務一些重要業務。
- 或者,無論是發送消息的速度還是消費消息的速度和原來都沒什么變化,需要檢查一下消費端,是不是消費失敗導致的一條消息反復消費這種情況比較多,這種情況也會拖慢整個系統的消費速度。
- 監控到消費變慢了,需要檢查消費實例,分析一下是什么原因導致消費變慢。優先檢查日志是否有大量的消費錯誤,如果沒有錯誤,可以通過打印堆棧信息,看一下消費線程是不是卡在什么地方不動了,比如觸發了死鎖或者卡在等待某些資源上了。
- 某一個時刻,突然就開始積壓消息并且積壓持續上漲。這種情況下需要你在短時間內找到消息積壓的原因,迅速解決問題才不至于影響業務。
使用異步設計提升系統性能
異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量的場景中,極大提升系統的整體性能,顯著降低時延。
異步設計提升系統性能
-
同步實現的性能瓶頸
- 每處理一個請求需要耗時 100ms,并在這 100ms 過程中是需要獨占一個線程,
- 結論:每個線程每秒鐘最多可以處理 10 個請求。每臺計算機上的線程資源并不是無限的,假設使用的服務器同時打開的線程數量上限是 10,000,可以計算出這臺服務器每秒鐘可以處理的請求上限是: 10,000 (個線程)* 10(次請求每秒) = 100,000 次每秒。
- 如果請求速度超過這個值,那么請求就不能被馬上處理,只能阻塞或者排隊,服務的響應時延由 100ms 延長到了:排隊的等待時延 + 處理時延 (100ms)。即在大量請求的情況下,微服務的平均響應時延變長了。
- 采用同步實現的方式,整個服務器的所有線程大部分時間都沒有在工作,而是都在等待。
- 每處理一個請求需要耗時 100ms,并在這 100ms 過程中是需要獨占一個線程,
-
采用異步實現解決等待問題
- 方法增加參數,一個回調方法。區別只是在線程模型上由同步順序調用改為了異步調用和回調的機制。
- 性能分析:
- 低請求數量的場景下,平均響應時延一樣是 100ms。
- 超高請求數量場景下,異步的實現不再需要線程等待執行結果,只需要個位數量的線程,即可實現同步場景大量線程一樣的吞吐量。
- 由于沒有了線程的數量的限制,總體吞吐量上限會大大超過同步實現,并且在服務器 CPU、網絡帶寬資源達到極限之前,響應時延不會隨著請求數量增加而顯著升高,幾乎可以一直保持約 100ms 的平均響應時延。
簡單實用的異步框架: CompletableFuture
CompletableFuture add(int account, int amount);
接口中定義的方法的返回類型都是一個帶泛型的 CompletableFeture,尖括號中的泛型類型就是真正方法需要返回數據的類型,我們這兩個服務不需要返回數據,所以直接用 Void 類型就可以。
return accountService.add(fromAccount, -1 * amount).thenCompose(v -> accountService.add(toAccount, amount)); 實現異步依次調用兩次賬戶服務完整轉賬。
調用異步方法獲得返回值 CompletableFuture 對象后,既可以調用 CompletableFuture 的 get 方法,像調用同步方法那樣等待調用的方法執行結束并獲得返回值,也可以像異步回調的方式一樣,調用 CompletableFuture 那些以 then 開頭的一系列方法,為 CompletableFuture 定義異步方法結束之后的后續操作。
實現高性能的異步網絡傳輸
同步模型和異步模型
- 同步模型會阻塞線程等待資源
- 異步模型不會阻塞線程,它是等資源準備好后,再通知業務代碼來完成后續的資源處理邏輯。可以很好的解決IO 等待的問題.
IO 密集型和計算密集型
- IO 密集型系統大部分時間都在執行 IO 操作(包括網絡 IO 和磁盤 IO,以及與計算機連接的一些外圍設備的訪問)。適合使用異步的設計來提升系統性能
- 計算密集型系統,大部分時間都是在使用 CPU 執行計算操作。業務系統,很少有非常耗時的計算,更多的是網絡收發數據,讀寫磁盤和數據庫這些 IO 操作。
理想的異步網絡框架
- 要實現通過網絡來傳輸數據,需要用到開發語言提供的網絡通信類庫。一個 TCP 連接建立后,用戶代碼會獲得一個用于收發數據的通道。每個通道會在內存中開辟兩片區域用于收發數據的緩存。
- 發送數據:直接往這個通道里面來寫入數據就可以了。用戶代碼在發送時寫入的數據會暫存在緩存中,然后操作系統會通過網卡,把發送緩存中的數據傳輸到對端的服務器上。發送數據的時候同步發送就可以了,沒有必要異步。
- 接收數據:當有數據到來的時候,操作系統會先把數據寫入接收緩存,然后給接收數據的線程發一個通知,線程收到通知后結束等待,開始讀取數據。處理完這一批數據后,繼續阻塞等待下一批數據到來,這樣周而復始地處理收到的數據。
同步網絡 IO 的模型。同步網絡 IO 模型在處理少量連接的時候,是沒有問題的。但是如果要同時處理非常多的連接,同步的網絡 IO 模型就有點兒力不從心了。每個連接都需要阻塞一個線程來等待數據,大量的連接數就會需要相同數量的數據接收線程。當這些 TCP 連接都在進行數據收發的時候,會有大量的線程來搶占 CPU 時間,造成頻繁的 CPU 上下文切換,導致 CPU 的負載升高,整個系統的性能就會比較慢。
理想的異步框架:只用少量的線程就能處理大量的連接,有數據到來的時候能第一時間處理就可以了。事先定義好收到數據后的處理邏輯,把這個處理邏輯作為一個回調方法,在連接建立前就通過框架提供的 API 設置好。當收到數據的時候,由框架自動來執行這個回調方法就好了。
使用Netty實現異步網絡通信
邏輯功能
- 創建了一個 EventLoopGroup 對象,命名為 group(可以理解為一組線程。作用是來執行收發數據的業務邏輯)
- 使用 Netty 提供的 ServerBootstrap 來初始化一個 Socket Server,綁定到本地 9999 端口上。
- 在真正啟動服務之前,給 serverBootstrap 傳入了一個 MyHandler 對象(自己實現的一個類,需要繼承 Netty 提供的一個抽象類:ChannelInboundHandlerAdapter, MyHandler 里面定義收到數據后的處理邏輯)這個設置 Handler 的過程就是預先來定義回調方法的過程。
- 最后就可以真正綁定本地端口,啟動 Socket 服務了。
服務啟動后,如果有客戶端來請求連接,Netty 會自動接受并創建一個 Socket 連接。
收到來自客戶端的數據后,Netty 就會在 EventLoopGroup 對象中,獲取一個 IO 線程,并調用接收數據的回調方法,來執行接收數據的業務邏輯(MyHandler方法)。
真正需要業務代碼來實現的就兩個部分:一個是把服務初始化并啟動起來,另一個是實現收發消息的業務邏輯 MyHandler。
Netty 維護一組線程來執行數據收發的業務邏輯。如果業務需要更靈活的實現,自己來維護收發數據的線程,可以選擇更加底層的 Java NIO
使用NIO實現異步網絡通信
提供了一個 Selector 對象,來解決一個線程在多個網絡連接上的多路復用問題。
在 NIO 中,每個已經建立好的連接用一個 Channel 對象來表示。希望能實現,在一個線程里,接收來自多個 Channel 的數據。
一個線程對應多個 Channel,有可能會出現這兩種情況:
- 線程在忙著處理收到的數據,這時候 Channel 中又收到了新數據;
- 線程閑著沒事兒干,所有的 Channel 中都沒收到數據,也不能確定哪個 Channel 會在什么時候收到數據。
實現:Selecor 通過一種類似于事件的機制來解決這個問題。
- 首先把連接也就是 Channel 綁定到 Selector
- 在接收數據的線程來調用 Selector.select() 方法來等待數據到來。(一個阻塞方法,這個線程會一直卡在這兒,直到這些 Channel 中的任意一個有數據到來,就會結束等待返回數據。)它的返回值是一個迭代器,可以從這個迭代器里面獲取所有 Channel 收到的數據,然后來執行數據接收的業務邏輯。
- 可以選擇直接在這個線程里面來執行接收數據的業務邏輯,也可以將任務分發給其他的線程來執行
序列化與反序列化:通過網絡傳輸結構化的數據
在 TCP 的連接上,它傳輸數據的基本形式就是二進制流
要想使用網絡框架的 API 來傳輸結構化的數據,必須得先實現結構化的數據與字節流之間的雙向轉換。這種將結構化數據轉換成字節流的過程,稱為序列化,反過來轉換,就是反序列化。
序列化實現權衡因素:
- 序列化后的數據最好是易于人類閱讀的;
- 實現的復雜度是否足夠低;
- 序列化和反序列化的速度越快越好;
- 序列化后的信息密度越大越好,即同樣的一個結構化數據,序列化之后占用的存儲空間越小越好;
實現高性能的序列化和反序列化
很多的消息隊列都選擇自己實現高性能的專用序列化和反序列化。
可以固定字段的順序,這樣在序列化后的字節里面就不必包含字段名,只要字段值就可以了,不同類型的數據也可以做針對性的優化
專用的序列化方法顯然更高效,序列化出來的字節更少,在網絡傳輸過程中的速度也更快。但缺點是,需要為每種對象類型定義專門的序列化和反序列化方法,實現起來太復雜了,大部分情況下是不劃算的。
傳輸協議:應用程序之間對話的語言
- 斷句:給每句話前面加一個表示這句話長度的數字,收到數據的時候,按照長度來讀取就可以了。
- 單工通信:任何一個時刻,數據只能單向傳輸,一問一答。(HTTP1協議)
- 雙工通信:可以同時進行數據的雙向收發,互相是不會受到任何影響的。發送請求的時候,給每個請求加一個序號,這個序號在本次會話內保證唯一,然后在響應中帶上請求的序號,只要需要確保請求和響應能夠正確對應上就可以了
內存管理:避免內存溢出和頻繁的垃圾回收
問題:一個業務邏輯非常簡單的微服務,日常情況下都能穩定運行,一到大促就卡死甚至進程掛掉?一個做數據匯總的應用,按照小時、天這樣的粒度進行數據匯總都沒問題,到年底需要匯總全年數據的時候,沒等數據匯總出來,程序就死掉了。
原因是,程序在設計的時候,沒有針對高并發高吞吐量的情況做好內存管理
自動內存管理機制的實現原理
- 申請內存
- 計算要創建對象所需要占用的內存大小;
- 在內存中找一塊兒連續并且是空閑的內存空間,標記為已占用;
- 把申請的內存地址綁定到對象的引用上,這時候對象就可以使用了。
- 內存回收
- 要找出所有可以回收的對象,將對應的內存標記為空閑
- GC算法:
- 標記清除
- 標記階段:從 GC Root 開始(程序入口的那個對象),標記所有可達的對象,因為程序中所有在用的對象一定都會被這個 GC Root 對象直接或者間接引用。
- 清除階段:遍歷所有對象,找出所有沒有標記的對象。這些沒有標記的對象都是可以被回收的,清除這些對象,釋放對應的內存即可。
- 問題:在執行標記和清除過程中,必須把進程暫停,否則計算的結果就是不準確的,即為什么垃圾回收時,我們的程序會被卡死。
- 標記清除
- GC算法:
- 整理內存碎片。
- 將不連續的空閑內存移動到一起,以便空出足夠的連續內存空間供后續使用。
- 要找出所有可以回收的對象,將對應的內存標記為空閑
為什么在高并發下程序會卡死
微服務在收到一個請求后,執行一段業務邏輯,然后返回響應。這個過程中,會創建一些對象,比如說請求對象、響應對象和處理中間業務邏輯中需要使用的一些對象等等。隨著這個請求響應的處理流程結束,創建的這些對象也就都沒有用了,它們將會在下一次垃圾回收過程中被釋放。直到下一次垃圾回收之前,這些已經沒有用的對象會一直占用內存。
高并發的情況下,程序會非常繁忙,短時間內就會創建大量的對象,這些對象將會迅速占滿內存,這時候,由于沒有內存可以使用了,垃圾回收被迫開始啟動,并且,這次被迫執行的垃圾回收面臨的是占滿整個內存的海量對象,它執行的時間也會比較長,相應的,這個回收過程會導致進程長時間暫停。
進程長時間暫停,又會導致大量的請求積壓等待處理,垃圾回收剛剛結束,更多的請求立刻涌進來,迅速占滿內存,再次被迫執行垃圾回收,進入了一個惡性循環。如果垃圾回收的速度跟不上創建對象的速度,還可能會產生內存溢出的現象。
高并發下的內存管理技巧
只有使用過被丟棄的對象才是垃圾回收的目標,所以,在處理大量請求的同時,盡量少的產生這種一次性對象。
最有效的方法就是,優化代碼中處理請求的業務邏輯,盡量少的創建一次性對象,特別是占用內存較大的對象。比如說,可以把收到請求的 Request 對象在業務流程中一直傳遞下去,而不是每執行一個步驟,就創建一個內容和 Request 對象差不多的新對象。
對于需要頻繁使用,占用內存較大的一次性對象,可以考慮自行回收并重用這些對象。實現:可以為這些對象建立一個對象池。收到請求后,在對象池內申請一個對象,使用完后再放回到對象池中,這樣就可以反復地重用這些對象,非常有效地避免頻繁觸發垃圾回收。
使用更大內存的服務器,也可以非常有效地緩解這個問題。
Kafka如何實現高性能IO?
使用批量消息提升服務端處理能力
Kafka 內部,消息都是以“批”為單位處理的。一批消息從發送端到接收端
- 發送端:Kafka 的客戶端 SDK 在實現消息發送邏輯的時候,采用了異步批量發送的機制。調用 send() 方法發送一條消息之后,無論是同步發送還是異步發送,Kafka 都不會立即就把這條消息發送出去。它會先把這條消息,存放在內存中緩存起來,然后選擇合適的時機把緩存中的所有消息組成一批,一次性發給 Broker。簡單地說,就是攢一波一起發。
- Kafka 的服務端(Broker ):每批消息都會被當做一個“批消息”來處理。也就是說,在 Broker 整個處理流程中,無論是寫入磁盤、從磁盤讀出來、還是復制到其他副本這些流程中,批消息都不會被解開,一直是作為一條“批消息”來進行處理的。
- 消費時,消息同樣是以批為單位進行傳遞的,Consumer 從 Broker 拉到一批消息后,在客戶端把批消息解開,再一條一條交給用戶代碼處理。
使用順序讀寫提升磁盤 IO 性能
操作系統每次從磁盤讀寫數據的時候,需要先尋址,先要找到數據在磁盤上的物理位置,然后再進行數據讀寫。如果是機械硬盤,這個尋址需要比較長的時間,因為它要移動磁頭,這是個機械運動。
順序讀寫相比隨機讀寫省去了大部分的尋址時間,它只要尋址一次,就可以連續地讀寫下去,所以說,性能要比隨機讀寫要好很多。
Kafka 充分利用了磁盤的這個特性。它的存儲設計非常簡單,對于每個分區,它把從 Producer 收到的消息,順序地寫入對應的 log 文件中,一個文件寫滿了,就開啟一個新的文件這樣順序寫下去。消費的時候,也是從某個全局的位置開始,也就是某一個 log 文件中的某個位置開始,順序地把消息讀出來。
利用 PageCache 加速消息讀寫
PageCache 就是操作系統在內存中給磁盤上的文件建立的緩存。調用系統的 API 讀寫文件的時候,并不會直接去讀寫磁盤上的文件,應用程序實際操作的都是 PageCache。
應用程序在
- 寫入文件:操作系統會先把數據寫入到內存中的 PageCache,然后再一批一批地寫到磁盤上。
- 讀取文件的時候,也是從 PageCache 中來讀取數據
這時候會出現兩種可能情況:
- PageCache 中有數據,那就直接讀取,這樣就節省了從磁盤上讀取數據的時間;
- PageCache 中沒有數據,操作系統會引發一個缺頁中斷,應用程序的讀取線程會被阻塞,操作系統把數據從文件中復制到 PageCache 中,然后應用程序再從 PageCache 中繼續把數據讀出來,這時會真正讀一次磁盤上的文件,這個讀的過程就會比較慢。
用戶的應用程序在使用完某塊 PageCache 后,操作系統并不會立刻就清除這個 PageCache,而是盡可能地利用空閑的物理內存保存這些 PageCache,除非系統內存不夠用,操作系統才會清理掉一部分 PageCache。清理的策略一般是 LRU 或它的變種算法,保留 PageCache 的邏輯是:優先保留最近一段時間最常使用的那些 PageCache。
Kafka 在讀寫消息文件的時候,充分利用了 PageCache 的特性。一般來說,消息剛剛寫入到服務端就會被消費,按照 LRU 的“優先清除最近最少使用的頁”這種策略,讀取的時候,對于這種剛剛寫入的 PageCache,命中的幾率會非常高。
大部分情況下,消費讀消息都會命中 PageCache,帶來的好處有兩個:一個是讀取的速度會非常快,另外一個是,給寫入消息讓出磁盤的 IO 資源,間接也提升了寫入的性能。
ZeroCopy:零拷貝技術
服務端,處理消費的大致邏輯是這樣的:
- 首先,從文件中找到消息數據,讀到內存中;
- 然后,把消息通過網絡發給客戶端。
這個過程中,數據實際上做了 2 次或者 3 次復制:
- 從文件復制數據到 PageCache 中,如果命中 PageCache,這一步可以省掉;
- 從 PageCache 復制到應用程序的內存空間中,也就是我們可以操作的對象所在的內存;
- 從應用程序的內存空間復制到 Socket 的緩沖區,這個過程就是我們調用網絡應用框架的 API 發送數據的過程。
Kafka 使用零拷貝技術可以把這個復制次數減少一次,上面的 2、3 步驟兩次復制合并成一次復制。直接從 PageCache 中把數據復制到 Socket 緩沖區中,這樣不僅減少一次數據復制,更重要的是,由于不用把數據復制到用戶內存空間,DMA 控制器可以直接完成數據復制,不需要 CPU 參與,速度更快。
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
前兩個參數分別是目的端和源端的文件描述符,后面兩個參數是源端的偏移量和復制數據的長度,返回值是實際復制數據的長度。
緩存策略:使用緩存來減少磁盤IO
磁盤是個持久化存儲,即使服務器掉電也不會丟失數據,
磁盤致命問題:讀寫速度很慢
使用內存作為緩存來加速應用程序的訪問速度,是幾乎所有高性能系統都會采用的方法。
選擇只讀緩存還是讀寫緩存
讀緩存還是讀寫緩存唯一的區別就是,在更新數據的時候,是否經過緩存。
在數據寫到 PageCache 中后,它并不是同時就寫到磁盤上了,這中間是有一個延遲的。操作系統可以保證,即使是應用程序意外退出了,操作系統也會把這部分數據同步到磁盤上。但是,如果服務器突然掉電了,這部分數據就丟失了。
讀寫緩存的這種設計,它天然就是不可靠的,是一種犧牲數據一致性換取性能的設計
為什么 Kafka 可以使用 PageCache 來提升它的性能呢?這是由消息隊列的一些特點決定的。
保持緩存數據新鮮
盡量讓緩存中的數據與磁盤上的數據保持同步。
- 可以使用分布式事務來解決,只是付出的性能、實現復雜度等代價比較大。
- 定時將磁盤上的數據同步到緩存中。(適用于對數據不敏感的場景)
- 每次同步時直接全量更新就可以了,因為是在異步的線程中更新數據,同步的速度即使慢一些也不是什么大問題。
- 如果緩存的數據太大,更新速度慢到無法接受,也可以選擇增量更新,每次只更新從上次緩存同步至今這段時間內變化的數據,代價是實現起來會稍微有些復雜。
- 缺點是緩存更新不那么及時,優點是實現起來非常簡單,魯棒性非常好。
- 不去更新緩存中的數據,而是給緩存中的每條數據設置一個比較短的過期時間,數據過期以后即使它還存在緩存中,我們也認為它不再有效,需要從磁盤上再次加載這條數據,這樣就變相地實現了數據更新。(適用于對數據不敏感的場景)
緩存置換策略
在內存有限的情況下,要優先緩存哪些數據,讓緩存的命中率最高。
如果系統是可以預測未來訪問哪些數據的系統,比如說,有的系統它會定期做數據同步,每次同步的數據范圍都是一樣的,像這樣的系統,緩存策略很簡單,就是你要訪問什么數據,就緩存什么數據,甚至可以做到百分之百的命中。
緩存置換:一般會在數據首次被訪問的時候,順便把這條數據放到緩存中。隨著訪問的數據越來越多,總有把緩存占滿的時刻,需要把緩存中的一些數據刪除掉,以便存放新的數據。
刪掉哪些數據?
- 命中率最高的置換策略,一定是根據你的業務邏輯,定制化的策略。
- 使用通用的置換算法。LRU 算法,最近最少使用算法。思想是,最近剛剛被訪問的數據,它在將來被訪問的可能性也很大,而很久都沒被訪問過的數據,未來再被訪問的幾率也不大。
綜合考慮下的淘汰算法,不僅命中率更高,還能有效地避免“挖墳”問題:例如某個客戶端正在從很舊的位置開始向后讀取一批歷史數據,內存中的緩存很快都會被替換成這些歷史數據,相當于大部分緩存資源都被消耗掉了,這樣會導致其他客戶端的訪問命中率下降。加入位置權重后,比較舊的頁面會很快被淘汰掉,減少“挖墳”對系統的影響。
package com.evo;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class LRUCache<V> {/*** 容量*/private int capacity = 1024;/*** Node記錄表*/private Map<String, ListNode<String, V>> table = new ConcurrentHashMap<>();/*** 雙向鏈表頭部*/private ListNode<String, V> head;/*** 雙向鏈表尾部*/private ListNode<String, V> tail;public LRUCache(int capacity) {this();this.capacity = capacity;}public LRUCache() {head = new ListNode<>();tail = new ListNode<>();head.next = tail;head.prev = null;tail.prev = head;tail.next = null;}public V get(String key) {ListNode<String, V> node = table.get(key);//如果Node不在表中,代表緩存中并沒有if (node == null) {return null;}//如果存在,則需要移動Node節點到表頭//截斷鏈表,node.prev -> node -> node.next ====> node.prev -> node.next// node.prev <- node <- node.next ====> node.prev <- node.nextnode.prev.next = node.next;node.next.prev = node.prev;//移動節點到表頭node.next = head.next;head.next.prev = node;node.prev = head;head.next = node;//存在緩存表table.put(key, node);return node.value;}public void put(String key, V value) {ListNode<String, V> node = table.get(key);//如果Node不在表中,代表緩存中并沒有if (node == null) {if (table.size() == capacity) {//超過容量了 ,首先移除尾部的節點table.remove(tail.prev.key);tail.prev = tail.next;tail.next = null;tail = tail.prev;}node = new ListNode<>();node.key = key;node.value = value;table.put(key, node);}//如果存在,則需要移動Node節點到表頭node.next = head.next;head.next.prev = node;node.prev = head;head.next = node;}/*** 雙向鏈表內部類*/public static class ListNode<K, V> {private K key;private V value;ListNode<K, V> prev;ListNode<K, V> next;public ListNode(K key, V value) {this.key = key;this.value = value;}public ListNode() {}}public static void main(String[] args) {LRUCache<ListNode> cache = new LRUCache<>(4);ListNode<String, Integer> node1 = new ListNode<>("key1", 1);ListNode<String, Integer> node2 = new ListNode<>("key2", 2);ListNode<String, Integer> node3 = new ListNode<>("key3", 3);ListNode<String, Integer> node4 = new ListNode<>("key4", 4);ListNode<String, Integer> node5 = new ListNode<>("key5", 5);cache.put("key1", node1);cache.put("key2", node2);cache.put("key3", node3);cache.put("key4", node4);cache.get("key2");cache.put("key5", node5);cache.get("key2");}
}
正確使用鎖保護共享數據,協調異步線程?**
由于并發讀寫導致的數據錯誤。使用鎖可以非常有效地解決這個問題。
鎖的原理:任何時間都只能有一個線程持有鎖,只有持有鎖的線程才能訪問被鎖保護的資源。
避免濫用鎖
如果能不用鎖,就不用鎖;如果你不確定是不是應該用鎖,那也不要用鎖
- 加鎖和解鎖過程都是需要 CPU 時間的,這是一個性能的損失。另外,使用鎖就有可能導致線程等待鎖,等待鎖過程中線程是阻塞的狀態,過多的鎖等待會顯著降低程序的性能。
- 如果對鎖使用不當,很容易造成死鎖,導致整個程序“卡死”,這是非常嚴重的問題。本來多線程的程序就非常難于調試,如果再加上鎖,出現并發問題或者死鎖問題,你的程序將更加難調試。
只有在并發環境中,共享資源不支持并發訪問,或者說并發訪問共享資源會導致系統錯誤的情況下,才需要使用鎖。
鎖的用法
- 在訪問共享資源之前,先獲取鎖。
- 如果獲取鎖成功,就可以訪問共享資源了。
- 最后,需要釋放鎖,以便其他線程繼續訪問共享資源。
避免死鎖
死鎖的原因
- 獲取了鎖之后沒有釋放
- 鎖重入問題
- 在持有這把鎖的情況下,再次去嘗試獲取這把鎖
- 不一定。**會不會死鎖取決于,你獲取的這把鎖它是不是可重入鎖。**如果是可重入鎖,那就沒有問題,否則就會死鎖。
- 在持有這把鎖的情況下,再次去嘗試獲取這把鎖
- 如果程序中存在多把鎖,就有可能出現這些鎖互相鎖住的情況
使用
- 避免濫用鎖
- 對于同一把鎖,加鎖和解鎖必須要放在同一個方法中,這樣一次加鎖對應一次解鎖,代碼清晰簡單,便于分析問題。
- 盡量避免在持有一把鎖的情況下,去獲取另外一把鎖,就是要盡量避免同時持有多把鎖。
- 如果需要持有多把鎖,一定要注意加解鎖的順序,解鎖的順序要和加鎖順序相反。比如,獲取三把鎖的順序是 A、B、C,釋放鎖的順序必須是 C、B、A。
- 給程序中所有的鎖排一個順序,在所有需要加鎖的地方,按照同樣的順序加解鎖。比如我剛剛舉的那個例子,如果兩個線程都按照先獲取 lockA 再獲取 lockB 的順序加鎖,就不會產生死鎖。
使用讀寫鎖要兼顧性能和安全性
無論是只讀訪問,還是讀寫訪問,都是需要加鎖的。
- 讀訪問可以并發執行。
- 寫的同時不能并發讀,也不能并發寫。
read() 方法是可以多個線程并行執行的,讀數據的性能依然很好。
寫數據的時候,獲取寫鎖,當一個線程持有寫鎖的時候,其他線程既無法獲取讀鎖,也不能獲取寫鎖,達到保護共享數據的目的。
用硬件同步原語(CAS)替代鎖
硬件同步原語
由計算機硬件提供的一組原子操作,比較常用的原語主要是 CAS 和 FAA 這兩種。
cas
<< atomic >> function cas(p : pointer to int, old : int, new : int) returns bool { if *p ≠ old { return false } *p ← new return true }
輸入參數一共有三個,分別是:
- p: 要修改的變量的指針。
- old: 舊值。
- new: 新值。
返回的是一個布爾值,標識是否賦值成功。
邏輯:先比較一下變量 p 當前的值是不是等于 old,如果等于就把變量 p 賦值為 new,并返回 true,否則就不改變變量 p,并返回 false。
FAA
<< atomic >> function faa(p : pointer to int, inc : int) returns int { int value <- *location *p <- value + inc return value }
語義是:先獲取變量 p 當前的值 value,然后給變量 p 增加 inc,最后返回變量 p 之前的值 value。
某些情況下,原語可以用來替代鎖,實現一些即安全又高效的并發操作。
數據壓縮:時間換空間的游戲
什么情況適合使用數據壓縮?
比如,進程之間通過網絡傳輸數據
- 不壓縮直接傳輸需要的時間是:傳輸未壓縮數據的耗時
- 使用數據壓縮需要的時間是:壓縮耗時+傳輸壓縮數據耗時+解壓耗時
影響因素非常多:數據的壓縮率、網絡帶寬、收發兩端服務器的繁忙程度等等。
壓縮和解壓的操作都是計算密集型的操作,非常耗費 CPU 資源。
- 如果處理業務邏輯就需要耗費大量的 CPU 資源,就不太適合再進行壓縮和解壓。
- 如果系統的瓶頸是磁盤的 IO 性能,CPU 資源又很閑,非常適合在把數據寫入磁盤前先進行壓縮。
- 如果系統讀寫比嚴重不均衡,還要考慮,每讀一次數據就要解壓一次是不是劃算。
壓縮它的本質是資源的置換,是一個時間換空間,或者說是 CPU 資源換存儲資源的游戲。
應該選擇什么壓縮算法?
有損壓縮和無損壓縮。
- 有損壓縮主要是用來壓縮音視頻,它壓縮之后是會丟失信息的。
- 無損壓縮,數據經過壓縮和解壓過程之后,與壓縮之前相比,是 100% 相同的。
目前常用的壓縮算法包括:ZIP,GZIP,SNAPPY,LZ4 等等。考慮數據的壓縮率和壓縮耗時。一般來說,壓縮率越高的算法,壓縮耗時也越高。
- 如果是對性能要求高的系統,可以選擇壓縮速度快的算法,比如 LZ4;
- 如果需要更高的壓縮比,可以考慮 GZIP 或者壓縮率更高的 XZ 等算法。
壓縮樣本對壓縮速度和壓縮比的影響也是比較大的,同樣大小的一段數字和一段新聞的文本,即使是使用相同的壓縮算法,壓縮率和壓縮時間的差異也是比較大的。所以,有的時候在選擇壓縮算法的之前,用系統的樣例業務數據做一個測試,可以幫助你找到最合適的壓縮算法。
如何選擇合適的壓縮分段?
大部分的壓縮算法,區別主要是,對數據進行編碼的算法,壓縮的流程和壓縮包的結構大致一樣的。而在壓縮過程中,最需要了解的就是如何選擇合適的壓縮分段大小。
壓縮時,給定的被壓縮數據它必須有確定的長度,是有頭有尾的,不能是一個無限的數據流,如果要對流數據進行壓縮,那必須把流數據劃分成多個幀,一幀一幀的分段壓縮。
原因:壓縮算法在開始壓縮之前,一般都需要對被壓縮數據從頭到尾進行一次掃描(目的是確定如何對數據進行劃分和編碼,一般的原則是重復次數多、占用空間大的內容,使用盡量短的編碼,這樣壓縮率會更高。)
被壓縮的數據長度越大,重碼率會更高,壓縮比也就越高。
分段也不是越大越好
- 實際上分段大小超過一定長度之后,再增加長度對壓縮率的貢獻就不太大了。
- 過大的分段長度,在解壓縮的時候,會有更多的解壓浪費。比如,一個 1MB 大小的壓縮文件,即使你只是需要讀其中很短的幾個字節,也不得不把整個文件全部解壓縮,造成很大的解壓浪費。
根據業務,選擇合適的壓縮分段,在壓縮率、壓縮速度和解壓浪費之間找到一個合適的平衡。
- 壓縮:確定如何對數據進行劃分和壓縮算法后進行壓縮,壓縮的過程就是用編碼來替換原始數據的過程。壓縮之后的壓縮包就是由這個編碼字典和用編碼替換之后的數據組成的。
- 解壓:先讀取編碼字典,然后按照字典把壓縮編碼還原成原始的數據就可以了。
Kafka 是如何處理消息壓縮的?
- Kafka 可以配置是否開啟壓縮,也支持配置使用哪一種壓縮算法。原因:不同的業務場景是否需要開啟壓縮,選擇哪種壓縮算法是不能一概而論的。
- 在開啟壓縮時,Kafka 選擇一批消息一起壓縮,每一個批消息就是一個壓縮分段。使用者也可以通過參數來控制每批消息的大小。可以整批直接存儲,然后整批發送給消費者。最后,批消息由消費者進行解壓。在服務端不用解壓,就不會耗費服務端寶貴的 CPU 資源,同時還能獲得壓縮后,占用傳輸帶寬小,占用存儲空間小的這些好處,這是一個非常聰明的設計。
- 在使用 Kafka 時,如果生產者和消費者的 CPU 資源不是特別吃緊,開啟壓縮后,可以節省網絡帶寬和服務端的存儲空間,提升總體的吞吐量。
Kafka Consumer源碼分析:消息消費的實現過程**
收發消息兩個過程
kafka消費模型
- Kafka 的每個 Consumer(消費者)實例屬于一個 ConsumerGroup(消費組);
- 在消費時,ConsumerGroup 中的每個 Consumer 獨占一個或多個 Partition(分區);
- 對于每個 ConsumerGroup,在任意時刻,每個 Partition 至多有 1 個 Consumer 在消費;
- 每個 ConsumerGroup 都有一個 Coordinator(協調者)負責分配 Consumer 和 Partition 的對應關系,當 Partition 或是 Consumer 發生變更是,會觸發 reblance(重新分配)過程,重新分配 Consumer 與 Partition 的對應關系;
- Consumer 維護與 Coordinator 之間的心跳,這樣 Coordinator 就能感知到 Consumer 的狀態,在 Consumer 故障的時候及時觸發 rebalance。
Kafka 的 Consumer 入口類
// 設置必要的配置信息Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 創建 Consumer 實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱 Topicconsumer.subscribe(Arrays.asList("foo", "bar"));// 循環拉消息while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
流程:訂閱 + 拉取消息
- 設置必要的配置信息,包括:起始連接的 Broker 地址,Consumer Group 的 ID,自動提交消費位置的配置和序列化配置;
- 創建 Consumer 實例;
- 訂閱了 2 個 Topic:foo 和 bar;
- 循環拉取消息并打印在控制臺上。
Kafka 在消費過程中,每個 Consumer 實例是綁定到一個分區上的,那 Consumer 是如何確定,綁定到哪一個分區上的呢?這個問題也是可以通過分析消費流程來找到答案的。
訂閱過程實現
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {acquireAndEnsureOpen();try {// 省略部分代碼// 重置訂閱狀態this.subscriptions.subscribe(new HashSet<>(topics), listener);// 更新元數據metadata.setTopics(subscriptions.groupSubscription());} finally {release();}}
訂閱的主流程主要更新了兩個屬性:
- 訂閱狀態 subscriptions,
- 更新元數據中的 topic 信息。訂閱狀態 subscriptions 主要維護了訂閱的 topic 和 patition 的消費位置等狀態信息。屬性 metadata 中維護了 Kafka 集群元數據的一個子集,包括集群的 Broker 節點、Topic 和 Partition 在節點上分布,以及我們聚焦的第二個問題:Coordinator 給 Consumer 分配的 Partition 信息。
在訂閱的實現過程中,Kafka 更新了訂閱狀態 subscriptions 和元數據 metadata 中的相關 topic 的一些屬性,將元數據狀態置為“需要立即更新”,但是并沒有真正發送更新元數據的請求,整個過程沒有和集群有任何網絡數據交換
拉取消息過程實現
- 消費者拉消息:poll
- updateAssignmentMetadataIfNeeded(): 更新元數據:,實現了與 Cluster 通信,在 Coordinator 上注冊 Consumer 并拉取和更新元數據。
- pollForFetches():拉取消息。
- 如果緩存里面有未讀取的消息,直接返回這些消息;
- 構造拉取消息請求,并發送;
- 發送網絡請求并拉取消息,等待直到有消息返回或者超時;
- 返回拉到的消息。
消息復制
面臨問題
- 性能:
- 需要寫入的節點數量越多,可用性和數據可靠性就越好,但是寫入性能就越低
- 不管采用哪種復制方式,消費消息的時候,都只是選擇多副本中一個節點去讀數據而已,這和單節點消費并沒有差別。
- 一致性:
- 不丟消息,嚴格順序
- 主 - 從”的復制方式
- 高可用:
- 當某個主節點宕機的時候,盡快再選出一個主節點來接替宕機的主節點。
- 比較快速的實現:使用一個第三方的管理服務來管理這些節點,發現某個主節點宕機的時候,由管理服務來指定一個新的主節點
- 消息隊列選擇自選舉:由還存活的這些節點通過投票,來選出一個新的主節點,這種投票的實現方式,優點是沒有外部依賴,可以實現自我管理。缺點就是投票的實現都比較復雜,
kafka實現復制
- Kafka 中,復制的基本單位是分區。每個分區的幾個副本之間,構成一個小的復制集群,Broker 是這些分區副本的容器,所以 Kafka 的 Broker 是不分主從的。
- 分區的多個副本中也是采用一主多從的方式。Kafka 在寫入消息的時候,采用的也是異步復制的方式。消息在寫入到主節點之后,并不會馬上返回寫入成功,而是等待足夠多的節點都復制成功后再返回。
- ISR(In Sync Replicas),:“保持數據同步的副本”。ISR 的數量是可配的,注意: ISR 中是包含主節點的。
- Kafka 使用 ZooKeeper 來監控每個分區的多個節點,如果發現某個分區的主節點宕機了,Kafka 會利用 ZooKeeper 來選出一個新的主節點,這樣解決了可用性的問題。ZooKeeper 是一個分布式協調服務,選舉的時候,會從所有 ISR 節點中來選新的主節點,這樣可以保證數據一致性。
- 默認情況下,如果所有的 ISR 節點都宕機了,分區就無法提供服務了。也可以選擇配置成讓分區繼續提供服務,這樣只要有一個節點還活著,就可以提供服務,代價是無法保證數據一致性,會丟消息。
Kafka的協調服務ZooKeeper:實現分布式系統的“瑞士軍刀”
ZooKeeper 的作用
- 集群選舉:ZooKeeper 分布式的協調服務框架,主要用來解決分布式集群中,應用系統需要面對的各種通用的一致性問題。ZooKeeper 本身可以部署為一個集群,集群的各個節點之間可以通過選舉來產生一個 Leader,選舉遵循半數以上的原則,所以一般集群需要部署奇數個節點。
- ZooKeeper 最核心的功能:提供了一個分布式的存儲系統,數據的組織方式類似于 UNIX 文件系統的樹形結構。分布式系統中一些需要整個集群所有節點都訪問的元數據,比如集群節點信息、公共配置信息等,特別適合保存在 ZooKeeper 中。
- znode:在這個樹形的存儲結構中,每個節點被稱為一個“ZNode”。ZooKeeper 提供了一種特殊的 ZNode 類型:臨時節點。這種臨時節點有一個特性:如果創建臨時節點的客戶端與 ZooKeeper 集群失去連接,這個臨時節點就會自動消失。在 ZooKeeper 內部,它維護了 ZooKeeper 集群與所有客戶端的心跳,通過判斷心跳的狀態,來確定是否需要刪除客戶端創建的臨時節點。
- watcher:ZooKeeper 還提供了一種訂閱 ZNode 狀態變化的通知機制:Watcher,一旦 ZNode 或者它的子節點狀態發生了變化,訂閱的客戶端會立即收到通知。:利用 ZooKeeper 臨時節點和 Watcher 機制,我們很容易隨時來獲取業務集群中每個節點的存活狀態,并且可以監控業務集群的節點變化情況,當有節點上下線時,都可以收到來自 ZooKeeper 的通知。
Kafka 在 ZooKeeper 中保存了哪些信息?
- Broker 信息:/brokers/ids/[0…N],每個臨時節點對應著一個在線的 Broker,Broker 啟動后會創建一個臨時節點,代表 Broker 已經加入集群可以提供服務了,節點名稱就是 BrokerID,節點內保存了包括 Broker 的地址、版本號、啟動時間等等一些 Broker 的基本信息。如果 Broker 宕機或者與 ZooKeeper 集群失聯了,這個臨時節點也會隨之消失。
- 主題和分區的信息。/brokers/topics/ 節點下面的每個子節點都是一個主題,節點的名稱就是主題名稱。每個主題節點下面都包含一個固定的 partitions 節點,pattitions 節點的子節點就是主題下的所有分區,節點名稱就是分區編號
- 每個分區節點下面是一個名為 state 的臨時節點,節點中保存著分區當前的 leader 和所有的 ISR 的 BrokerID。這個 state 臨時節點是由這個分區當前的 Leader Broker 創建的。如果這個分區的 Leader Broker 宕機了,對應的這個 state 臨時節點也會消失,直到新的 Leader 被選舉出來,再次創建 state 臨時節點。
Kafka 客戶端如何找到對應的 Broker?
Kafka 客戶端如何找到主題、隊列對應的 Broker
先根據主題和隊列,找到分區對應的 state 臨時節點,state 節點中保存了這個分區 Leader 的 BrokerID。拿到這個 Leader 的 BrokerID 后,再去找到 BrokerID 對應的臨時節點,就可以獲取到 Broker 真正的訪問地址了。
Kafka 的客戶端并不會去直接連接 ZooKeeper,它只會和 Broker 進行遠程通信,那ZooKeeper 上的元數據應該是通過 Broker 中轉給每個客戶端的。
Broker 處理所有 RPC 請求的入口類在 kafka.server.KafkaApis#handle 這個方法里面,找到對應處理更新元數據的方法 handleTopicMetadataRequest(RequestChannel.Request),
先根據請求中的主題列表,去本地的元數據緩存 MetadataCache 中過濾出相應主題的元數據,右半部分的那棵樹的子集,然后再去本地元數據緩存中獲取所有 Broker 的集合,最后把這兩部分合在一起,作為響應返回給客戶端。
Kafka 在每個 Broker 中都維護了一份和 ZooKeeper 中一樣的元數據緩存,并不是每次客戶端請求元數據就去讀一次 ZooKeeper。由于 ZooKeeper 提供了 Watcher 這種監控機制,Kafka 可以感知到 ZooKeeper 中的元數據變化,從而及時更新 Broker 中的元數據緩存。
kafka的事務
Kafka 的事務和 Exactly Once 可以解決什么問題?
kafka事務:解決的問題是,確保在一個事務中發送的多條消息,要么都成功,要么都失敗。注意,這里面的多條消息可以是發往多個主題和分區的消息。更多的情況下被用來配合 Kafka 的冪等機制來實現 Kafka 的 Exactly Once 語義。
exactly once:在流計算中,用 Kafka 作為數據源,并且將計算結果保存到 Kafka, 這種場景下,數據從 Kafka 的某個主題中消費,在計算集群中計算,再把計算結果保存在 Kafka 的其他主題中。這樣的過程中,保證每條息都被恰好計算一次,確保計算結果正確。
**事務實現:**基于兩階段提交來實現
- 事務協調者:負責在服務端協調整個事務。是 Broker 進程的一部分,協調者和分區一樣通過選舉來保證自身的可用性。存在多個協調者,每個協調者負責管理和使用事務日志中的幾個分區
- 用于記錄事務日志的主題:記錄的數據就是類似于“開啟事務”“提交事務”這樣的事務日志。
實現流程:
-
開啟事務的時候,生產者會給協調者發一個請求來開啟事務,協調者在事務日志中記錄下事務 ID。
-
生產者在發送消息之前,還要給協調者發送請求,告知發送的消息屬于哪個主題和分區,這個信息也會被協調者記錄在事務日志中。接下來,生產者就可以像發送普通消息一樣來發送事務消息, Kafka 在處理未提交的事務消息時,和普通消息是一樣的,直接發給 Broker,保存在這些消息對應的分區中,Kafka 會在客戶端的消費者中,暫時過濾未提交的事務消息。
-
消息發送完成后,生產者給協調者發送提交或回滾事務的請求,由協調者來開始兩階段提交,完成事務。
- 第一階段:協調者把事務的狀態設置為“預提交”,并寫入事務日志。到這里,實際上事務已經成功了,無論接下來發生什么情況,事務最終都會被提交。
- 開始第二階段:協調者在事務相關的所有分區中,都會寫一條“事務結束”的特殊消息,當 Kafka 的消費者,也就是客戶端,讀到這個事務結束的特殊消息之后,它就可以把之前暫時過濾的那些未提交的事務消息,放行給業務代碼進行消費了。最后,協調者記錄最后一條事務日志,標識這個事務已經結束了。
Kafka 這個兩階段的流程,準備階段,生產者發消息給協調者開啟事務,然后消息發送到每個分區上。提交階段,生產者發消息給協調者提交事務,協調者給每個分區發一條“事務結束”的消息,完成分布式事務提交。
MQTT協議:如何支持海量的在線IoT設備
IoT特點:便宜;無線連接,經常移動(網絡連接不穩定)–加入心跳和會話機制;服務端需要支撐海量的 IoT 設備同時在線,需要支撐的客戶端數量遠不止幾萬幾十萬
MQTT集群支持海量在線的IoT設備
負載均衡:首先接入的地址最好是一個域名(不是必須的),這樣域名的后面可以配置多個 IP 地址做負載均衡。也可以直接連接負載均衡器。負載均衡可以選擇像 F5 這種專用的負載均衡硬件,也可以使用 Nginx 這樣的軟件,只要是四層或者支持 MQTT 協議的七層負載均衡設備,都可以選擇。
proxy:負載均衡器的后面,需要部署一個 Proxy 集群,作用:
- 來承接海量 IoT 設備的連接
- 來維護與客戶端的會話
- 作為代理,在客戶端和 Broker 之間進行消息轉發。
在 Proxy 集群的后面是 Broker 集群,負責保存和收發消息。
有的 MQTT Server 它的集群架構是這樣的:
Proxy 和 Broker 的功能集成到了一個進程中。前置 Proxy 的方式很容易解決海量連接的問題,Proxy 是可以水平擴展的,只要用足夠多數量的 Proxy 節點,就可以抗住海量客戶端同時連接。每個 Proxy 和每個 Broker 只用一個連接通信就可以了,這樣對于每個 Broker 來說,它的連接數量最多不會超過 Proxy 節點的數量。
Proxy 對于會話的處理方式,可以借鑒 Tomcat 處理會話的方式。
- 將會話保存在 Proxy 本地,每個 Proxy 節點都只維護連接到自己的這些客戶端的會話。這種方式需要配合負載均衡來使用,負載均衡設備需要支持 sticky session,保證將相同會話的連接總是轉發到同一個 Proxy 節點上。
- 將會話保存在一個外置的存儲集群中,比如一個 Redis 集群或者 MySQL 集群。這樣 Proxy 就可以設計成完全無狀態的,對于負載均衡設備也沒有特殊的要求。但這種方式要求外置存儲集群具備存儲千萬級數據的能力,同時具有很好的性能。
對于如何支持海量的主題,比較可行的解決方案是,在 Proxy 集群的后端,部署多組 Broker 小集群,比如說,可以是多組 Kafka 小集群,每個小集群只負責存儲一部分主題。這樣對于每個 Broker 小集群,主題的數量就可以控制在可接受的范圍內。由于消息是通過 Proxy 來進行轉發的,我們可以在 Proxy 中采用一些像一致性哈希等分片算法,根據主題名稱找到對應的 Broker 小集群。這樣就解決了支持海量主題的問題。
并發下的冪等性
如果可以保證以下這些操作的原子性,哪些操作在并發調用的情況下具備冪等性?答案:D
- A. f(n, a):給賬戶 n 轉入 a 元
- B. f(n, a):將賬戶 n 的余額更新為 a 元
- C. f(n, b, a):如果賬戶 n 當前的余額為 b 元,那就將賬戶的余額更新為 n 元
- D. f(n, v, a):如果賬戶 n 當前的流水號等于 v,那么給賬戶的余額加 a 元,并將流水號加一
一個操作是否冪等,還跟調用順序有關系,在線性調用情況下,具備冪等性的操作,在并發調用時,就不一定具備冪等性了。
第二十九節。第三十節
實現簡單的RPC框架
這里所說的 RPC 框架,是指類似于 Dubbo、gRPC 這種框架,應用程序可以“在客戶端直接調用服務端方法,就像調用本地方法一樣。
而一些基于 REST 的遠程調用框架,雖然同樣可以實現遠程調用,但它對使用者并不透明,無論是服務端還是客戶端,都需要和 HTTP 協議打交道,解析和封裝 HTTP 請求和響應。這類框架并不能算是“RPC 框架”。
原理-RPC 框架是怎么調用遠程服務的
例:spring和Dubbo配合的微服務體系,RPC框架是如何實現調用遠程服務的。
一般來說,客戶端和服務端分別是這樣的:Dubbo 看起來就像把服務端進程中的實現類“映射”到了客戶端進程中一樣
@Component
public class HelloClient {@Reference // dubbo 注解 @Reference 注解,獲得一個實現了 HelloServicer 這個接口的對象private HelloService helloService;public String hello() {return helloService.hello("World");}
}@Service // dubbo 注解
@Component
public class HelloServiceImpl implements HelloService {@Overridepublic String hello(String name) {return "Hello " + name;}
}
在客戶端,業務代碼得到的 HelloService 這個接口的實例實際上是由 RPC 框架提供的一個代理類的實例。這個代理類有一個專屬的名稱,叫“樁(Stub)”。不同的 RPC 框架中,這個樁的生成方式并不一樣,有些是在編譯階段生成的,有些是在運行時動態生成的,
HelloService 的樁,同樣要實現 HelloServer 接口,客戶端在調用 HelloService 的 hello 方法時,實際上調用的是樁的 hello 方法, hello 方法里會構造一個請求,這個請求就是一段數據結構,請求中包含兩個重要的信息:
- 請求的服務名,客戶端調用的是 HelloService 的 hello 方法;
- 請求的所有參數,參數 name, 它的值是“World”。
然后,它會把這個請求發送給服務端,等待服務的響應。
服務端處理請求:把請求中的服務名解析出來->根據服務名找服務端進程中,有沒有這個服務名對應的服務提供者。
例:在收到請求后,可以通過請求中的服務名找到 HelloService 真正的實現類 HelloServiceImpl。找到實現類之后,RPC 框架會調用這個實現類的 hello 方法,使用的參數值就是客戶端發送過來的參數值。服務端的 RPC 框架在獲得返回結果之后,再將結果封裝成響應,返回給客戶端。客戶端 RPC 框架的樁收到服務端的響應之后,從響應中解析出返回值,返回給客戶端的調用方。這樣就完成了一次遠程調用。
客戶端是如何找到服務端地址的呢?在 RPC 框架中,實現原理和消息隊列的實現是完全一樣的,通過一個 NamingService 來解決的。
在 RPC 框架中, NamingService 稱為注冊中心。
- 服務端的業務代碼在向 RPC 框架中注冊服務之后,RPC 框架就會把這個服務的名稱和地址發布到注冊中心上。
- 客戶端的樁在調用服務端之前,會向注冊中心請求服務端的地址,請求的參數就是服務名稱 HelloService#hello,注冊中心會返回提供這個服務的地址。
- 客戶端再去請求服務端。
只要 RPC 框架保證在不同的編程語言中,使用相同的序列化協議,就可以實現跨語言的通信。
實現一個簡單的 RPC 框架并不是很難,絕大部分技術,包括:高性能網絡傳輸、序列化和反序列化、服務路由的發現方法等。
RPC總體結構
定義
RPC 框架對外提供的所有服務定義在一個接口 RpcAccessPoint 中
/*** RPC 框架對外提供的服務接口*/
public interface RpcAccessPoint extends Closeable{/*** 客戶端使用:客戶端獲取遠程服務的引用* @param uri 遠程服務地址* @param serviceClass 服務的接口類的 Class* @param <T> 服務接口的類型* @return 遠程服務引用*/<T> T getRemoteService(URI uri, Class<T> serviceClass);/*** 服務端使用:服務端注冊服務的實現實例* @param service 實現實例* @param serviceClass 服務的接口類的 Class* @param <T> 服務接口的類型* @return 服務地址*/<T> URI addServiceProvider(T service, Class<T> serviceClass);/*** 服務端啟動 RPC 框架,監聽接口,開始提供遠程服務。* @return 服務實例,用于程序停止的時候安全關閉服務。*/Closeable startServer() throws Exception;
}
注冊中心的接口 NameService:
/*** 注冊中心*/
public interface NameService {/*** 注冊服務* @param serviceName 服務名稱* @param uri 服務地址*/void registerService(String serviceName, URI uri) throws IOException;/*** 查詢服務地址* @param serviceName 服務名稱* @return 服務地址*/URI lookupService(String serviceName) throws IOException;
}
使用
定義一個服務接口:
public interface HelloService {String hello(String name);
}
客戶端:
//調用注冊中心方法查詢服務地址
URI uri = nameService.lookupService(serviceName);
//獲取遠程服務本地實例--樁
HelloService helloService = rpcAccessPoint.getRemoteService(uri, HelloService.class);
//調用方法
String response = helloService.hello(name);
logger.info(" 收到響應: {}.", response);
服務端:
public class HelloServiceImpl implements HelloService {@Overridepublic String hello(String name) {String ret = "Hello, " + name;return ret;}
}
將這個實現注冊到 RPC 框架上,并啟動 RPC 服務:
//啟動rpc框架的服務
rpcAccessPoint.startServer();
//調用rpc框架方法,注冊 helloService 服務
URI uri = rpcAccessPoint.addServiceProvider(helloService, HelloService.class);
//調用注冊中心方法注冊服務地址
nameService.registerService(serviceName, uri);
通信與序列化-RPC 框架是怎么調用遠程服務的
設計一個通用的高性能序列化實現?
可擴展的,通用方法
public class SerializeSupport {//用于反序列化public static <E> E parse(byte [] buffer) {// ...}//用于序列化public static <E> byte [] serialize(E entry) {// ...}
}
使用
// 序列化
MyClass myClassObject = new MyClass();
byte [] bytes = SerializeSupport.serialize(myClassObject);
// 反序列化
MyClass myClassObject1 = SerializeSupport.parse(bytes);
一般的 RPC 框架采用的都是通用的序列化實現,比如:
- gRPC 采用的是 Protobuf 序列化實現
- Dubbo 支持 hession2 等好幾種序列化實現。
RPC 框架不像消息隊列一樣,采用性能更好的專用的序列化實現。
原因:
- 消息隊列需要序列化數據的類型是固定的,只是它自己的內部通信的一些命令。
- RPC 框架需要序列化的數據是,用戶調用遠程方法的參數,這些參數可能是各種數據類型,所以必須使用通用的序列化實現,確保各種類型的數據都能被正確的序列化和反序列化。
給所有序列化的實現類定義一個 Serializer 接口,所有的序列化實現類都實現這個接口就可以了:
public interface Serializer<T> {/*** 計算對象序列化后的長度,主要用于申請存放序列化數據的字節數組* @param entry 待序列化的對象* @return 對象序列化后的長度*/int size(T entry);/*** 序列化對象。將給定的對象序列化成字節數組* @param entry 待序列化的對象* @param bytes 存放序列化數據的字節數組* @param offset 數組的偏移量,從這個位置開始寫入序列化數據* @param length 對象序列化后的長度,也就是{@link Serializer#size(java.lang.Object)}方法的返回值。*/void serialize(T entry, byte[] bytes, int offset, int length);/*** 反序列化對象* @param bytes 存放序列化數據的字節數組* @param offset 數組的偏移量,從這個位置開始寫入序列化數據* @param length 對象序列化后的長度* @return 反序列化之后生成的對象*/T parse(byte[] bytes, int offset, int length);/*** 用一個字節標識對象類型,每種類型的數據應該具有不同的類型值*/byte type();/*** 返回序列化對象類型的 Class 對象:* 目的:在執行序列化的時候,通過被序列化的對象類型找到對應序列化實現類。*/Class<T> getSerializeClass();
}
利用 Serializer 接口,實現 SerializeSupport 這個支持任何對象類型序列化的通用靜態類了。首先我們定義兩個 Map,這兩個 Map 中存放著所有實現 Serializer 接口的序列化實現類。
利用 Serializer 接口,實現 SerializeSupport 這個支持任何對象類型序列化的通用靜態類了。首先我們定義兩個 Map,這兩個 Map 中存放著所有實現 Serializer 接口的序列化實現類。
//key:序列化實現類對應的序列化對象的類型,用途是在序列化時,通過被序列化的對象類型,找到對應的序列化實現類
private static Map<Class<?>/* 序列化對象類型 */, Serializer<?>/* 序列化實現 */> serializerMap = new HashMap<>();//key 是序列化實現類的類型,用于在反序列化的時候,從序列化的數據中讀出對象類型,然后找到對應的序列化實現類。
private static Map<Byte/* 序列化實現類型 */, Class<?>/* 序列化對象類型 */> typeMap = new HashMap<>();
實現序列化和反序列化實現思路:通過一個類型在這兩個 Map 中進行查找,查找的結果就是對應的序列化實現類的實例,也就是 Serializer 接口的實現,然后調用對應的序列化或者反序列化方法就可以了。
所有的 Serializer 的實現類是怎么加載到 SerializeSupport 的那兩個 Map 中利用了 Java 的一個 SPI 類加載機制。
使用序列化的模塊只要依賴 SerializeSupport 這個靜態類,調用它的序列化和反序列化方法就可以了,不需要依賴任何序列化實現類。對于序列化實現的提供者來說,也只需要依賴并實現 Serializer 這個接口就可以了。
eg:
//統一使用 UTF8 編碼。否則,如果遇到執行序列化和反序列化的兩臺服務器默認編碼不一樣,就會出現亂碼。我們在開發過程用遇到的很多中文亂碼問題,絕大部分都是這個原因。
public class StringSerializer implements Serializer<String> {@Overridepublic int size(String entry) {return entry.getBytes(StandardCharsets.UTF_8).length;}@Overridepublic void serialize(String entry, byte[] bytes, int offset, int length) {byte [] strBytes = entry.getBytes(StandardCharsets.UTF_8);System.arraycopy(strBytes, 0, bytes, offset, strBytes.length);}@Overridepublic String parse(byte[] bytes, int offset, int length) {return new String(bytes, offset, length, StandardCharsets.UTF_8);}@Overridepublic byte type() {return Types.TYPE_STRING;}@Overridepublic Class<String> getSerializeClass() {return String.class;}
}
這個序列化的實現,對外提供服務的就只有一個 SerializeSupport 靜態類,并且可以通過擴展支持序列化任何類型的數據。
使用 Netty 來實現異步網絡通信
把通信的部分也封裝成接口。在這個 RPC 框架中,對于通信模塊的需求是這樣的:只需要客戶端給服務端發送請求,然后服務返回響應就可以了。所以,通信接口只需要提供一個發送請求方法就可以了:
public interface Transport {/*** 發送請求命令* @param request 請求命令* @return 返回值是一個 Future,Future通過這個 CompletableFuture 對象可以獲得響應結果* 直接調用它的 get 方法來獲取響應數據,相當于同步調用;* 也可以使用以 then 開頭的一系列異步方法,指定當響應返回的時候,需要執行的操作,就等同于異步調用。* 等于,這樣一個方法既可以同步調用,也可以異步調用。*/CompletableFuture<Command> send(Command request);
}
Command 類:包含header和payload字節數組,
public class Command {protected Header header;//命令中要傳輸的數據,要求這個數據已經是被序列化之后生成的字節數組private byte [] payload;//...
}public class Header {//用于唯一標識一個請求命令,在使用雙工方式異步收發數據的時候,requestId用于請求和響應的配對。private int requestId;//用于標識這條命令的版本號private int version;//用于標識這條命令的類型,主要目的是為了能讓接收命令一方來識別收到的是什么命令,以便路由到對應的處理類中去。private int type;// ...
}
public class ResponseHeader extends Header {private int code;private String error;// ...
}
在設計通信協議時,讓協議具備持續的升級能力,并且保持向下兼容是非常重要的。為了確保使用這個傳輸協議的這些程序還能正常工作,或者是向下兼容,協議中必須提供一個版本號,標識收到的這條數據使用的是哪個版本的協議。
發送方在發送命令的時候需要帶上這個命令的版本號,接收方在收到命令之后必須先檢查命令的版本號,如果接收方可以支持這個版本的命令就正常處理,否則就拒絕接收這個命令,返回響應告知對方:我不認識這個命令。這樣才是一個完備的,可持續的升級的通信協議。
注意:這個版本號是命令的版本號,或者說是傳輸協議的版本號,它不等同于程序的版本號。
send 方法的實現,本質上就是一個異步方法,在把請求數據發出去之后就返回了,并不會阻塞當前這個線程去等待響應返回來。來看一下它的實現:
@Override
public CompletableFuture<Command> send(Command request) {// 構建返回值CompletableFuture<Command> completableFuture = new CompletableFuture<>();try {// 將在途請求放到 inFlightRequests 中inFlightRequests.put(new ResponseFuture(request.getHeader().getRequestId(), completableFuture));// 發送命令channel.writeAndFlush(request).addListener((ChannelFutureListener) channelFuture -> {// 處理發送失敗的情況if (!channelFuture.isSuccess()) {completableFuture.completeExceptionally(channelFuture.cause());channel.close();}});} catch (Throwable t) {// 處理發送異常inFlightRequests.remove(request.getHeader().getRequestId());completableFuture.completeExceptionally(t);}return completableFuture;
}
- 把請求中的 requestId 和返回的 completableFuture 一起,構建了一個 ResponseFuture 對象,然后把這個對象放到了 inFlightRequests 這個變量中。inFlightRequests 中存放了所有在途的請求,也就是已經發出了請求但還沒有收到響應的這些 responseFuture 對象。
- 調用 netty 發送數據的方法,把 request 命令發給對方。注意:已經發出去的請求,有可能會因為網絡連接斷開或者對方進程崩潰等各種異常情況,永遠都收不到響應。那為了確保這些 ResponseFuture 不會在內存中越積越多,必須要捕獲所有的異常情況,結束對應的 ResponseFuture。所以,兩個地方都做了異常處理,分別應對發送失敗和發送異常兩種情況。
- 不能保證所有 ResponseFuture 都能正常或者異常結束,比如說,編寫對端程序的程序員寫的代碼有問題,收到了請求沒給返回響應,為了應對這種情況,超時的機制來保證所有情況下 ResponseFuture 都能結束,無論什么情況,只要超過了超時時間還沒有收到響應,就認為這個 ResponseFuture 失敗了,結束并刪除它。這部分代碼在 InFlightRequests 這個類中。
最佳實踐-背壓機制:如果是同步發送請求,客戶端需要等待服務端返回響應,服務端處理這個請求需要花多長時間,客戶端就要等多長時間。這實際上是一個天然的背壓機制(Back pressure),服務端處理速度會天然地限制客戶端請求的速度。
但是在異步請求中,客戶端異步發送請求并不會等待服務端,缺少了這個天然的背壓機制,如果服務端的處理速度跟不上客戶端的請求速度,客戶端的發送速度也不會因此慢下來,就會出現在請求越來越多,這些請求堆積在服務端的內存中,內存放不下就會一直請求失敗。服務端處理不過來的時候,客戶端還一直不停地發請求顯然是沒有意義的。
為了避免這種情況,需要增加一個背壓機制,在服務端處理不過來的時候限制一下客戶端的請求速度。
定義了一個信號量:
private final Semaphore semaphore = new Semaphore(10);
這個信號量有 10 個許可,每次往 inFlightRequest 中加入一個 ResponseFuture 的時候,需要先從信號量中獲得一個許可,如果這時候沒有許可了,就會阻塞當前這個線程,也就是發送請求的這個線程,直到有人歸還了許可,才能繼續發送請求。每結束一個在途請求,就歸還一個許可,這樣就可以保證在途請求的數量最多不超過 10 個請求,積壓在服務端正在處理或者待處理的請求也不會超過 10 個。
客戶端-RPC 框架
如何來動態地生成樁
RPC 框架中的樁采用了代理模式:給某一個對象提供一個代理對象,并由代理對象控制對原對象的引用,被代理的那個對象稱為委托對象。
在 RPC 框架中
- 代理對象是由 RPC 框架的客戶端來提供的“樁”
- 委托對象就是在服務端,真正實現業務邏輯的服務類的實例。
利用代理模式,在調用流程中動態地注入一些非侵入式業務邏輯(在現有的調用鏈中,增加一些業務邏輯,而不用去修改調用鏈上下游的代碼)。
在 RPC 框架的客戶端中來實現代理類-“樁”。
public interface StubFactory {//創建一個樁的實例//Transport 對象:是用來給服務端發請求的時候使用的//Class 對象:它用來告訴樁工廠:我需要你給我創建的這個樁,應該是什么類型的//createStub 的返回值就是由工廠創建出來的樁。<T> T createStub(Transport transport, Class<T> serviceClass);
}
這個樁是一個由 RPC 框架生成的類,這個類它要實現給定的接口,里面的邏輯就是把方法名和參數封裝成請求,發送給服務端,然后再把服務端返回的調用結果返回給調用方。
RPC 框架怎么才能根據要實現的接口來生成一個類呢?在這一塊兒,不同的 RPC 框架的實現是不一樣的,比如,
- gRPC 在編譯 IDL 的時候就把樁生成好了,這個時候編譯出來樁,它是目標語言的源代碼文件。比如說,目標語言是 Java,編譯完成后它們會生成一些 Java 的源代碼文件,其中以 Grpc.java 結尾的文件就是生成的樁的源代碼。這些生成的源代碼文件再經過 Java 編譯器編譯以后,就成了樁。
- Dubbo 是在運行時動態生成的樁,它利用了很多 Java 語言底層的特性,Java 源代碼編譯完成之后,生成的是一些 class 文件,JVM 在運行的時候,讀取這些 Class 文件來創建對應類的實例。這個 Class 文件雖然非常復雜,但本質上,它里面記錄的內容,就是編寫的源代碼中的內容,包括類的定義,方法定義和業務邏輯等等,并且它也是有固定的格式的。如果說,我們按照這個格式,來生成一個 class 文件,只要這個文件的格式是符合 Java 規范的,JVM 就可以識別并加載它。這樣就不需要經過源代碼、編譯這些過程,直接動態來創建一個樁。
在這個 RPC 的例子中,采用一種更通用的方式來動態生成樁:先生成樁的源代碼,然后動態地編譯這個生成的源代碼,然后再加載到 JVM 中。
限定:服務接口只能有一個方法,并且這個方法只能有一個參數,參數和返回值的類型都是 String 類型。
需要動態生成的這個樁,它每個方法的邏輯都是一樣的,都是把類名、方法名和方法的參數封裝成請求,然后發給服務端,收到服務端響應之后再把結果作為返回值,返回給調用方。定義一個 AbstractStub 的抽象類,在這個類中實現大部分通用的邏輯,讓所有動態生成的樁都繼承這個抽象類,這樣動態生成樁的代碼會更少一些。
實現這個 StubFactory 接口動態生成樁。靜態變量STUB_SOURCE_TEMPLAT:樁的源代碼模板,需要做的就是,填充模板中變量,生成樁的源碼,然后動態的編譯、加載這個樁就可以了。
public class DynamicStubFactory implements StubFactory{private final static String STUB_SOURCE_TEMPLATE =//把接口的類名、方法名和序列化后的參數封裝成一個 RpcRequest 對象//調用父類 AbstractStub 中的 invokeRemote 方法,發送給服務端。//invokeRemote 方法的返回值就是序列化的調用結果"package com.github.liyue2008.rpc.client.stubs;\n" +"import com.github.liyue2008.rpc.serialize.SerializeSupport;\n" +"\n" +"public class %s extends AbstractStub implements %s {\n" +" @Override\n" +" public String %s(String arg) {\n" +" return SerializeSupport.parse(\n" +" invokeRemote(\n" +" new RpcRequest(\n" +" \"%s\",\n" +" \"%s\",\n" +" SerializeSupport.serialize(arg)\n" +" )\n" +" )\n" +" );\n" +" }\n" +"}";@Override@SuppressWarnings("unchecked")//從 serviceClass 參數中,可以取到服務接口定義的所有信息//包括接口名、它有哪些方法、每個方法的參數和返回值類型等等//通過這些信息,可以來填充模板,生成樁的源代碼。public <T> T createStub(Transport transport, Class<T> serviceClass) {try {// 填充模板String stubSimpleName = serviceClass.getSimpleName() + "Stub";String classFullName = serviceClass.getName();String stubFullName = "com.github.liyue2008.rpc.client.stubs." + stubSimpleName;String methodName = serviceClass.getMethods()[0].getName();String source = String.format(STUB_SOURCE_TEMPLATE, stubSimpleName, classFullName, methodName, classFullName, methodName);// 編譯源代碼JavaStringCompiler compiler = new JavaStringCompiler();Map<String, byte[]> results = compiler.compile(stubSimpleName + ".java", source);// 加載編譯好的類Class<?> clazz = compiler.loadClass(stubFullName, results);// 把 Transport 賦值給樁ServiceStub stubInstance = (ServiceStub) clazz.newInstance();stubInstance.setTransport(transport);// 返回這個樁return (T) stubInstance;} catch (Throwable t) {throw new RuntimeException(t);}}
}
樁的類名就定義為:“接口名 + Stub”。填充好模板生成的源代碼存放在 source 變量中,然后經過動態編譯、動態加載之后,我們就可以拿到這個樁的類 clazz,利用反射創建一個樁的實例 stubInstance。把用于網絡傳輸的對象 transport 賦值給樁,這樣樁才能與服務端進行通信。到這里,我們就實現了動態創建一個樁。
使用依賴倒置原則解耦調用者和實現
很多地方都采用了同樣一種解耦的方法:通過定義一個接口來解耦調用方和實現稱為“依賴倒置原則(Dependence Inversion Principle)”,
核心思想:調用方不應依賴于具體實現,而是為實現定義一個接口,讓調用方和實現都依賴于這個接口。這種方法也稱為“面向接口編程”。
要解耦調用方和實現類,還需要解決一個問題:誰來創建實現類的實例?SPI(Service Provider Interface)。在 SPI 中,每個接口在目錄 META-INF/services/ 下都有一個配置文件,文件名就是以這個接口的類名,文件的內容就是它的實現類的類名。以 StubFactory 接口為例,我們看一下它的配置文件:
$cat rpc-netty/src/main/resources/META-INF/services/com.github.liyue2008.rpc.client.StubFactory
com.github.liyue2008.rpc.client.DynamicStubFactory
只要把這個配置文件、接口和實現類都放到 CLASSPATH 中,就可以通過 SPI 的方式來進行加載了。加載的參數就是這個接口的 class 對象,返回值就是這個接口的所有實現類的實例,這樣就在“不依賴實現類”的前提下,獲得了一個實現類的實例。具體的實現代碼在 ServiceSupport 這個類中。
服務端-RPC 框架
對于這個 RPC 框架來說,服務端可以分為兩個部分:注冊中心和 RPC 服務。
- 注冊中心的作用是幫助客戶端來尋址,找到對應 RPC 服務的物理地址
- RPC 服務用于接收客戶端樁的請求,調用業務服務的方法,并返回結果
注冊中心如何實現
一個完整的注冊中心也是分為客戶端和服務端兩部分的
- 客戶端給調用方提供 API,并實現與服務端的通信
- 服務端提供真正的業務功能,記錄每個 RPC 服務發來的注冊信息,并保存到它的元數據中。當有客戶端來查詢服務地址的時候,它會從元數據中獲取服務地址,返回給客戶端。
這里只實現了一個單機版的注冊中心,它只有客戶端沒有服務端,所有的客戶端依靠讀寫同一個元數據文件來實現元數據共享。
首先,在 RPC 服務的接入點,接口 RpcAccessPoint 中增加一個獲取注冊中心實例的方法:
public interface RpcAccessPoint extends Closeable{/*** 獲取注冊中心的引用* @param nameServiceUri 注冊中心 URI* @return 注冊中心引用*/NameService getNameService(URI nameServiceUri);// ...
}
給 NameService 接口增加兩個方法:
public interface NameService {/*** 返回所有支持的協議*/Collection<String> supportedSchemes();/*** 給定注冊中心服務端的 URI,去建立與注冊中心服務端的連接。* @param nameServiceUri 注冊中心地址*/void connect(URI nameServiceUri);// ...
}
getNameService 的實現:通過 SPI 機制加載所有的 NameService 的實現類,然后根據給定的 URI 中的協議,去匹配支持這個協議的實現類,然后返回這個實現的引用就可以了。實現了一個可擴展的注冊中心接口,系統可以根據 URI 中的協議,動態地來選擇不同的注冊中心實現。增加一種注冊中心的實現,也不需要修改任何代碼,只要按照 SPI 的規范,把協議的實現加入到運行時 CLASSPATH 中就可以了。
注意:本地文件它是一個共享資源,它會被 RPC 框架所有的客戶端和服務端并發讀寫。所以必須要加鎖!
由于這個文件可能被多個進程讀寫,這里面必須使用由操作系統提供的文件鎖。這個鎖的使用和其他的鎖并沒有什么區別,同樣是在訪問共享文件之前先獲取鎖,訪問共享資源結束后必須釋放鎖。
RPC服務怎么實現
rpc服務端功能
- 服務端的業務代碼把服務的實現類注冊到 RPC 框架中 ;
- 接收客戶端樁發出的請求,調用服務的實現類并返回結果;
把服務的實現類注冊到 RPC 框架中,只要使用一個合適的數據結構,記錄下所有注冊的實例就可以了,后面在處理客戶端請求的時候,會用到這個數據結構來查找服務實例。
RPC 框架的服務端如何來處理客戶端發送的 RPC 請求。首先來看服務端中,使用 Netty 接收所有請求數據的處理類 RequestInvocation 的 channelRead0 方法。
//處理邏輯:根據請求命令的 Handler 中的請求類型 type,去 requestHandlerRegistry 中查找對應的請求處理器 RequestHandler,然后調用請求處理器去處理請求,最后把結果發送給客戶端。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Command request) throws Exception {RequestHandler handler = requestHandlerRegistry.get(request.getHeader().getType());if(null != handler) {Command response = handler.handle(request);if(null != response) {channelHandlerContext.writeAndFlush(response).addListener((ChannelFutureListener) channelFuture -> {if (!channelFuture.isSuccess()) {logger.warn("Write response failed!", channelFuture.cause());channelHandlerContext.channel().close();}});} else {logger.warn("Response is null!");}} else {throw new Exception(String.format("No handler for request with type: %d!", request.getHeader().getType()));}
}
這種通過“請求中的類型”,把請求分發到對應的處理類或者處理方法的設計,使用了一個命令注冊機制,讓這個路由分發的過程省略了大量的 if-else 或者是 switch 代碼。
好處:可以很方便地擴展命令處理器,而不用修改路由分發的方法,并且代碼看起來更加優雅。
這個 RPC 框架中只需要處理一種類型的請求:RPC 請求,只實現了一個命令處理器(核心):RpcRequestHandler。
處理客戶端請求, handle 方法的實現。
@Override
public Command handle(Command requestCommand) {Header header = requestCommand.getHeader();// 從 payload 中反序列化 RpcRequestRpcRequest rpcRequest = SerializeSupport.parse(requestCommand.getPayload());// 查找所有已注冊的服務提供方,尋找 rpcRequest 中需要的服務Object serviceProvider = serviceProviders.get(rpcRequest.getInterfaceName());// 找到服務提供者,利用 Java 反射機制調用服務的對應方法String arg = SerializeSupport.parse(rpcRequest.getSerializedArguments());Method method = serviceProvider.getClass().getMethod(rpcRequest.getMethodName(), String.class);String result = (String ) method.invoke(serviceProvider, arg);// 把結果封裝成響應命令并返回return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId()), SerializeSupport.serialize(result));// ...
}
- 把 requestCommand 的 payload 屬性反序列化成為 RpcRequest;
- 根據 rpcRequest 中的服務名,去成員變量 serviceProviders 中查找已注冊服務實現類的實例;
- 找到服務提供者之后,利用 Java 反射機制調用服務的對應方法;
- 把結果封裝成響應命令并返回,在 RequestInvocation 中,它會把這個響應命令發送給客戶端。
再來看成員變量 serviceProviders,它的定義是:Map serviceProviders。它實際上就是一個 Map,Key 就是服務名,Value 就是服務提供方,也就是服務實現類的實例。
@Singleton
public class RpcRequestHandler implements RequestHandler, ServiceProviderRegistry {@Overridepublic synchronized <T> void addServiceProvider(Class<? extends T> serviceClass, T serviceProvider) {serviceProviders.put(serviceClass.getCanonicalName(), serviceProvider);logger.info("Add service: {}, provider: {}.",serviceClass.getCanonicalName(),serviceProvider.getClass().getCanonicalName());}// ...
}
這個類不僅實現了處理客戶端請求的 RequestHandler 接口,同時還實現了注冊 RPC 服務 ServiceProviderRegistry 接口,也就是說,RPC 框架服務端需要實現的兩個功能——注冊 RPC 服務和處理客戶端 RPC 請求
注意:RpcRequestHandler 上增加了一個注解 @Singleton,限定這個類它是一個單例模式,這樣確保在進程中任何一個地方,無論通過 ServiceSupport 獲取 RequestHandler 或者 ServiceProviderRegistry 這兩個接口的實現類,拿到的都是 RpcRequestHandler 這個類的唯一的一個實例。這個 @Singleton 的注解和獲取單例的實現在 ServiceSupport 中。