文章目錄
- 2. 消息隊列
- 2.1 MQ有什么用?
- 2.2 說一說生產者與消費者模式
- 2.3 消息隊列如何保證順序消費?
- 2.4 消息隊列如何保證消息不丟?
- 2.5 消息隊列如何保證不重復消費?
- 2.6 MQ處理消息失敗了怎么辦?
- 2.7 請介紹消息隊列推和拉的使用場景
- 2.8 RabbitMQ和Kafka有什么區別?
- 2.9 Kafka為什么速度快?
- 2.10 RabbitMQ如何保證消息已達?
2. 消息隊列
2.1 MQ有什么用?
參考答案
消息隊列有很多使用場景,比較常見的有3個:解耦、異步、削峰。
- 解耦:傳統的軟件開發模式,各個模塊之間相互調用,數據共享,每個模塊都要時刻關注其他模塊的是否更改或者是否掛掉等等,使用消息隊列,可以避免模塊之間直接調用,將所需共享的數據放在消息隊列中,對于新增業務模塊,只要對該類消息感興趣,即可訂閱該類消息,對原有系統和業務沒有任何影響,降低了系統各個模塊的耦合度,提高了系統的可擴展性。
- 異步:消息隊列提供了異步處理機制,在很多時候應用不想也不需要立即處理消息,允許應用把一些消息放入消息中間件中,并不立即處理它,在之后需要的時候再慢慢處理。
- 削峰:在訪問量驟增的場景下,需要保證應用系統的平穩性,但是這樣突發流量并不常見,如果以這類峰值的標準而投放資源的話,那無疑是巨大的浪費。使用消息隊列能夠使關鍵組件支撐突發訪問壓力,不會因為突發的超負荷請求而完全崩潰。消息隊列的容量可以配置的很大,如果采用磁盤存儲消息,則幾乎等于“無限”容量,這樣一來,高峰期的消息可以被積壓起來,在隨后的時間內進行平滑的處理完成,而不至于讓系統短時間內無法承載而導致崩潰。在電商網站的秒殺搶購這種突發性流量很強的業務場景中,消息隊列的強大緩沖能力可以很好的起到削峰作用。
2.2 說一說生產者與消費者模式
參考答案
所謂生產者-消費者問題,實際上主要是包含了兩類線程。一種是生產者線程用于生產數據,另一種是消費者線程用于消費數據,為了解耦生產者和消費者的關系,通常會采用共享的數據區域,就像是一個倉庫。生產者生產數據之后直接放置在共享數據區中,并不需要關心消費者的行為。而消費者只需要從共享數據區中去獲取數據,就不再需要關心生產者的行為。但是,這個共享數據區域中應該具備這樣的線程間并發協作的功能:
- 如果共享數據區已滿的話,阻塞生產者繼續生產數據放置入內;
- 如果共享數據區為空的話,阻塞消費者繼續消費數據。
在Java語言中,實現生產者消費者問題時,可以采用三種方式:
- 使用 Object 的 wait/notify 的消息通知機制;
- 使用 Lock 的 Condition 的 await/signal 的消息通知機制;
- 使用 BlockingQueue 實現。
2.3 消息隊列如何保證順序消費?
參考答案
在生產中經常會有一些類似報表系統這樣的系統,需要做 MySQL 的 binlog 同步。比如訂單系統要同步訂單表的數據到大數據部門的 MySQL 庫中用于報表統計分析,通常的做法是基于 Canal 這樣的中間件去監聽訂單數據庫的 binlog,然后把這些 binlog 發送到 MQ 中,再由消費者從 MQ 中獲取 binlog 落地到大數據部門的 MySQL 中。
在這個過程中,可能會有對某個訂單的增刪改操作,比如有三條 binlog 執行順序是增加、修改、刪除。消費者愣是換了順序給執行成刪除、修改、增加,這樣能行嗎?肯定是不行的。不同的消息隊列產品,產生消息錯亂的原因,以及解決方案是不同的。下面我們以RabbitMQ、Kafka、RocketMQ為例,來說明保證順序消費的辦法。
RabbitMQ:
對于 RabbitMQ 來說,導致上面順序錯亂的原因通常是消費者是集群部署,不同的消費者消費到了同一訂單的不同的消息。如消費者A執行了增加,消費者B執行了修改,消費者C執行了刪除,但是消費者C執行比消費者B快,消費者B又比消費者A快,就會導致消費 binlog 執行到數據庫的時候順序錯亂,本該順序是增加、修改、刪除,變成了刪除、修改、增加。如下圖:
RabbitMQ 的問題是由于不同的消息都發送到了同一個 queue 中,多個消費者都消費同一個 queue 的消息。解決這個問題,我們可以給 RabbitMQ 創建多個 queue,每個消費者固定消費一個 queue 的消息,生產者發送消息的時候,同一個訂單號的消息發送到同一個 queue 中,由于同一個 queue 的消息是一定會保證有序的,那么同一個訂單號的消息就只會被一個消費者順序消費,從而保證了消息的順序性。如下圖:
Kafka:
對于 Kafka 來說,一個 topic 下同一個 partition 中的消息肯定是有序的,生產者在寫的時候可以指定一個 key,通過我們會用訂單號作為 key,這個 key 對應的消息都會發送到同一個 partition 中,所以消費者消費到的消息也一定是有序的。
那么為什么 Kafka 還會存在消息錯亂的問題呢?問題就出在消費者身上。通常我們消費到同一個 key 的多條消息后,會使用多線程技術去并發處理來提高消息處理速度,否則一條消息的處理需要耗時幾十 毫秒,1 秒也就只能處理幾十條消息,吞吐量就太低了。而多線程并發處理的話,binlog 執行到數據庫的時候就不一定還是原來的順序了。如下圖:
Kafka 從生產者到消費者消費消息這一整個過程其實都是可以保證有序的,導致最終亂序是由于消費者端需要使用多線程并發處理消息來提高吞吐量,比如消費者消費到了消息以后,開啟 32 個線程處理消息,每個線程線程處理消息的快慢是不一致的,所以才會導致最終消息有可能不一致。
所以對于 Kafka 的消息順序性保證,其實我們只需要保證同一個訂單號的消息只被同一個線程處理的就可以了。由此我們可以在線程處理前增加個內存隊列,每個線程只負責處理其中一個內存隊列的消息,同一個訂單號的消息發送到同一個內存隊列中即可。如下圖:
RocketMQ:
對于 RocketMQ 來說,每個 Topic 可以指定多個 MessageQueue,當我們寫入消息的時候,會把消息均勻地分發到不同的 MessageQueue 中,比如同一個訂單號的消息,增加 binlog 寫入到 MessageQueue1 中,修改 binlog 寫入到 MessageQueue2 中,刪除 binlog 寫入到 MessageQueue3 中。
但是當消費者有多臺機器的時候,會組成一個 Consumer Group,Consumer Group 中的每臺機器都會負責消費一部分 MessageQueue 的消息,所以可能消費者A消費了 MessageQueue1 的消息執行增加操作,消費者B消費了 MessageQueue2 的消息執行修改操作,消費者C消費了 MessageQueue3 的消息執行刪除操作,但是此時消費 binlog 執行到數據庫的時候就不一定是消費者A先執行了,有可能消費者C先執行刪除操作,因為幾臺消費者是并行執行,是不能夠保證他們之間的執行順序的。如下圖:
RocketMQ 的消息亂序是由于同一個訂單號的 binlog 進入了不同的 MessageQueue,進而導致一個訂單的 binlog 被不同機器上的 Consumer 處理。
要解決 RocketMQ 的亂序問題,我們只需要想辦法讓同一個訂單的 binlog 進入到同一個 MessageQueue 中就可以了。因為同一個 MessageQueue 內的消息是一定有序的,一個 MessageQueue 中的消息只能交給一個 Consumer 來進行處理,所以 Consumer 消費的時候就一定會是有序的。
2.4 消息隊列如何保證消息不丟?
參考答案
丟數據一般分為兩種,一種是mq把消息丟了,一種就是消費時將消息丟了。下面從rabbitmq和kafka分別說一下,丟失數據的場景。
RabbitMQ:
RabbitMQ丟失消息分為如下幾種情況:
-
生產者丟消息:
生產者將數據發送到RabbitMQ的時候,可能在傳輸過程中因為網絡等問題而將數據弄丟了。
-
RabbitMQ自己丟消息:
如果沒有開啟RabbitMQ的持久化,那么RabbitMQ一旦重啟數據就丟了。所以必須開啟持久化將消息持久化到磁盤,這樣就算RabbitMQ掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟失。除非極其罕見的情況,RabbitMQ還沒來得及持久化自己就掛了,這樣可能導致一部分數據丟失。
-
消費端丟消息:
主要是因為消費者消費時,剛消費到還沒有處理,結果消費者就掛了,這樣你重啟之后,RabbitMQ就認為你已經消費過了,然后就丟了數據。
針對上述三種情況,RabbitMQ可以采用如下方式避免消息丟失:
-
生產者丟消息:
- 可以選擇使用RabbitMQ提供是事務功能,就是生產者在發送數據之前開啟事務,然后發送消息,如果消息沒有成功被RabbitMQ接收到,那么生產者會受到異常報錯,這時就可以回滾事務,然后嘗試重新發送。如果收到了消息,那么就可以提交事務。這種方式有明顯的缺點,即RabbitMQ事務開啟后,就會變為同步阻塞操作,生產者會阻塞等待是否發送成功,太耗性能會造成吞吐量的下降。
- 可以開啟confirm模式。在生產者那里設置開啟了confirm模式之后,每次寫的消息都會分配一個唯一的id,然后如何寫入了RabbitMQ之中,RabbitMQ會給你回傳一個ack消息,告訴你這個消息發送OK了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息失敗了,你可以進行重試。而且你可以結合這個機制知道自己在內存里維護每個消息的id,如果超過一定時間還沒接收到這個消息的回調,那么你可以進行重發。
事務機制是同步的,你提交了一個事物之后會阻塞住,但是confirm機制是異步的,發送消息之后可以接著發送下一個消息,然后RabbitMQ會回調告知成功與否。 一般在生產者這塊避免丟失,都是用confirm機制。
-
RabbitMQ自己丟消息:
設置消息持久化到磁盤,設置持久化有兩個步驟:
- 創建queue的時候將其設置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數據,但是不會持久化queue里面的數據。
- 發送消息的時候講消息的deliveryMode設置為2,這樣消息就會被設為持久化方式,此時RabbitMQ就會將消息持久化到磁盤上。 必須要同時開啟這兩個才可以。
而且持久化可以跟生產的confirm機制配合起來,只有消息持久化到了磁盤之后,才會通知生產者ack,這樣就算是在持久化之前RabbitMQ掛了,數據丟了,生產者收不到ack回調也會進行消息重發。
-
消費端丟消息:
使用RabbitMQ提供的ack機制,首先關閉RabbitMQ的自動ack,然后每次在確保處理完這個消息之后,在代碼里手動調用ack。這樣就可以避免消息還沒有處理完就ack。
Kafka:
Kafka丟失消息分為如下幾種情況:
-
生產者丟消息:
生產者沒有設置相應的策略,發送過程中丟失數據。
-
Kafka自己丟消息:
比較常見的一個場景,就是Kafka的某個broker宕機了,然后重新選舉partition的leader時。如果此時follower還沒來得及同步數據,leader就掛了,然后某個follower成為了leader,它就少了一部分數據。
-
消費端丟消息:
消費者消費到了這個數據,然后消費之自動提交了offset,讓Kafka知道你已經消費了這個消息,當你準備處理這個消息時,自己掛掉了,那么這條消息就丟了。
針對上述三種情況,Kafka可以采用如下方式避免消息丟失:
-
生產者丟消息:
關閉自動提交offset,在自己處理完畢之后手動提交offset,這樣就不會丟失數據。
-
Kafka自己丟消息:
一般要求設置4個參數來保證消息不丟失:
- 給topic設置
replication.factor
參數,這個值必須大于1,表示要求每個partition必須至少有2個副本。 - 在kafka服務端設置
min.isync.replicas
參數,這個值必須大于1,表示 要求一個leader至少感知到有至少一個follower在跟自己保持聯系正常同步數據,這樣才能保證leader掛了之后還有一個follower。 - 在生產者端設置
acks=all
,表示 要求每條每條數據,必須是寫入所有replica副本之后,才能認為是寫入成功了。 - 在生產者端設置
retries=MAX
(很大的一個值),表示這個是要求一旦寫入事變,就無限重試。
- 給topic設置
-
消費端丟消息:
如果按照上面設置了ack=all,則一定不會丟失數據,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
-
數據,大數據量的數據處理上。
-
架構模型方面
RabbitMQ:以broker為中心,有消息的確認機制。
Kafka:以consumer為中心,沒有消息的確認機制。
-
吞吐量方面
RabbitMQ:支持消息的可靠的傳遞,支持事務,不支持批量操作,基于存儲的可靠性的要求存儲可以采用內存或硬盤,吞吐量小。
Kafka:內部采用消息的批量處理,數據的存儲和獲取是本地磁盤順序批量操作,消息處理的效率高,吞吐量高。
-
集群負載均衡方面
RabbitMQ:本身不支持負載均衡,需要loadbalancer的支持。
Kafka:采用zookeeper對集群中的broker,consumer進行管理,可以注冊topic到zookeeper上,通過zookeeper的協調機制,producer保存對應的topic的broker信息,可以隨機或者輪詢發送到broker上,producer可以基于語義指定分片,消息發送到broker的某個分片上。
2.5 消息隊列如何保證不重復消費?
參考答案
先大概說一說可能會有哪些重復消費的問題。首先就是比如rabbitmq、rocketmq、kafka,都有可能會出現消費重復消費的問題,正常。因為這問題通常不是mq自己保證的,是給你保證的。然后我們挑一個kafka來舉個例子,說說怎么重復消費吧。
kafka實際上有個offset的概念,就是每個消息寫進去,都有一個offset,代表他的序號,然后consumer消費了數據之后,每隔一段時間,會把自己消費過的消息的offset提交一下,代表我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。
但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎么重啟了,如果碰到點著急的,直接kill進程了,再重啟。這會導致consumer有些消息處理了,但是沒來得及提交offset,尷尬了。重啟之后,少數消息會再次消費一次。
其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。舉個例子,假設你有個系統,消費一條往數據庫里插入一條,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了?但是你要是消費到第二次的時候,自己判斷一下已經消費過了,直接扔了,不就保留了一條數據?
一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性冪等性。通俗點說,就一個數據,或者一個請求,給你重復來多次,你得確保對應的數據是不會改變的,不能出錯。
想要保證不重復消費,其實還要結合業務來思考,這里給幾個思路:
- 比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update一下。
- 比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性。
- 比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的id,類似訂單id之類的東西,然后你這里消費到了之后,先根據這個id去比如redis里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個id寫redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
還有比如基于數據庫的唯一鍵來保證重復數據不會重復插入多條,我們之前線上系統就有這個問題,就是拿到數據的時候,每次重啟可能會有重復,因為kafka消費者還沒來得及提交offset,重復數據拿到了以后我們插入的時候,因為有唯一鍵約束了,所以重復數據只會插入報錯,不會導致數據庫中出現臟數據。
2.6 MQ處理消息失敗了怎么辦?
參考答案
一般生產環境中,都會在使用MQ的時候設計兩個隊列:一個是核心業務隊列,一個是死信隊列。核心業務隊列,就是比如專門用來讓訂單系統發送訂單消息的,然后另外一個死信隊列就是用來處理異常情況的。
比如說要是第三方物流系統故障了,此時無法請求,那么倉儲系統每次消費到一條訂單消息,嘗試通知發貨和配送,都會遇到對方的接口報錯。此時倉儲系統就可以把這條消息拒絕訪問,或者標志位處理失敗!注意,這個步驟很重要。
一旦標志這條消息處理失敗了之后,MQ就會把這條消息轉入提前設置好的一個死信隊列中。然后你會看到的就是,在第三方物流系統故障期間,所有訂單消息全部處理失敗,全部會轉入死信隊列。然后你的倉儲系統得專門有一個后臺線程,監控第三方物流系統是否正常,能否請求的,不停的監視。一旦發現對方恢復正常,這個后臺線程就從死信隊列消費出來處理失敗的訂單,重新執行發貨和配送的通知邏輯。死信隊列的使用,其實就是MQ在生產實踐中非常重要的一環,也就是架構設計必須要考慮的。
整個過程,如下圖所示:
2.7 請介紹消息隊列推和拉的使用場景
參考答案
推模式:
推模式是服務器端根據用戶需要,由目的、按時將用戶感興趣的信息主動發送到用戶的客戶端。
優點:
- 對用戶要求低,方便用戶獲取需要的信息;
- 及時性好,服務器端及時地向客戶端推送更新動態信息,吞吐量大。
缺點:
- 不能確保發送成功,推模式采用廣播方式,只有服務器端和客戶端在同一個頻道上,推模式才有效,用戶才能接收到信息;
- 沒有信息狀態跟蹤,推模式采用開環控制技術,一個信息推送后的狀態,比如客戶端是否接收等,無從得知;
- 針對性較差。推送的信息可能并不能滿足客戶端的個性化需求。
拉模式:
拉模式是客戶端主動從服務器端獲取信息。
優點:
- 針對性強,能滿足客戶端的個性化需求;
- 信息傳輸量較小,網絡中傳輸的只是客戶端的請求和服務器端對該請求的響應;
- 服務器端的任務輕。服務器端只是被動接收查詢,對客戶端的查詢請求做出響應。
缺點:
- 實時性較差,針對于服務器端實時更新的信息,客戶端難以獲取實時信息;
- 對于客戶端用戶的要求較高,需要對服務器端具有一定的了解。
2.8 RabbitMQ和Kafka有什么區別?
參考答案
在實際生產應用中,通常會使用Kafka作為消息傳輸的數據管道,RabbitMQ作為交易數據作為數據傳輸管道,主要的取舍因素則是是否存在丟數據的可能。RabbitMQ在金融場景中經常使用,具有較高的嚴謹性,數據丟失的可能性更小,同事具備更高的實時性。而Kafka優勢主要體現在吞吐量上,雖然可以通過策略實現數據不丟失,但從嚴謹性角度來講,大不如RabbitMQ。而且由于Kafka保證每條消息最少送達一次,有較小的概率會出現數據重復發送的情況。詳細來說,它們之間主要有如下的區別:
-
應用場景方面
RabbitMQ:用于實時的,對可靠性要求較高的消息傳遞上。
Kafka:用于處于活躍的流式數據,大數據量的數據處理上。
-
架構模型方面
RabbitMQ:以broker為中心,有消息的確認機制。
Kafka:以consumer為中心,沒有消息的確認機制。
-
吞吐量方面
RabbitMQ:支持消息的可靠的傳遞,支持事務,不支持批量操作,基于存儲的可靠性的要求存儲可以采用內存或硬盤,吞吐量小。
Kafka:內部采用消息的批量處理,數據的存儲和獲取是本地磁盤順序批量操作,消息處理的效率高,吞吐量高。
-
集群負載均衡方面
RabbitMQ:本身不支持負載均衡,需要loadbalancer的支持。
Kafka:采用zookeeper對集群中的broker,consumer進行管理,可以注冊topic到zookeeper上,通過zookeeper的協調機制,producer保存對應的topic的broker信息,可以隨機或者輪詢發送到broker上,producer可以基于語義指定分片,消息發送到broker的某個分片上。
2.9 Kafka為什么速度快?
參考答案
Kafka的消息是保存或緩存在磁盤上的,一般認為在磁盤上讀寫數據是會降低性能的,因為尋址會比較消耗時間,但是實際上,Kafka的特性之一就是高吞吐率。即使是普通的服務器,Kafka也可以輕松支持每秒百萬級的寫入請求,超過了大部分的消息中間件,這種特性也使得Kafka在日志處理等海量數據場景廣泛應用。
下面從數據寫入和讀取兩方面分析,為什么Kafka速度這么快:
寫入數據:
Kafka會把收到的消息都寫入到硬盤中,它絕對不會丟失數據。為了優化寫入速度Kafka采用了兩個技術,順序寫入和MMFile 。
一、順序寫入
磁盤讀寫的快慢取決于你怎么使用它,也就是順序讀寫或者隨機讀寫。在順序讀寫的情況下,磁盤的順序讀寫速度和內存持平。因為硬盤是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是最耗時的。所以硬盤最討厭隨機I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
而且Linux對于磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。如果在內存做這些操作的時候,一個是JAVA對象的內存開銷很大,另一個是隨著堆內存數據的增多,JAVA的GC時間會變得很長,使用磁盤操作有以下幾個好處:
- 磁盤順序讀寫速度超過內存隨機讀寫;
- JVM的GC效率低,內存占用大。使用磁盤可以避免這一問題;
- 系統冷啟動后,磁盤緩存依然可用。
下圖就展示了Kafka是如何寫入數據的, 每一個Partition其實都是一個文件 ,收到消息后Kafka會把數據插入到文件末尾(虛框部分):
這種方法有一個缺陷——沒有辦法刪除數據 ,所以Kafka是不會刪除數據的,它會把所有的數據都保留下來,每個消費者(Consumer)對每個Topic都有一個offset用來表示讀取到了第幾條數據 。
二、Memory Mapped Files
即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以Kafka的數據并不是實時的寫入硬盤 ,它充分利用了現代操作系統分頁存儲來利用內存提高I/O效率。Memory Mapped Files(后面簡稱mmap)也被翻譯成 內存映射文件,在64位操作系統中一般可以表示20G的數據文件,它的工作原理是直接利用操作系統的Page來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。
通過mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存),也不必關心內存的大小有虛擬內存為我們兜底。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內核空間復制的開銷(調用文件的read會把數據先放到內核空間的內存中,然后再復制到用戶空間的內存中。)
但也有一個很明顯的缺陷——不可靠,寫到mmap中的數據并沒有被真正的寫到硬盤,操作系統會在程序主動調用flush的時候才把數據真正的寫到硬盤。Kafka提供了一個參數——producer.type來控制是不是主動flush,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);寫入mmap之后立即返回Producer不調用flush叫異步 (async)。
讀取數據:
一、基于sendfile實現Zero Copy
傳統模式下,當需要對一個文件進行傳輸的時候,其具體流程細節如下:
- 調用read函數,文件數據被copy到內核緩沖區;
- read函數返回,文件數據從內核緩沖區copy到用戶緩沖區;
- write函數調用,將文件數據從用戶緩沖區copy到內核與socket相關的緩沖區;
- 數據從socket緩沖區copy到相關協議引擎。
以上細節是傳統read/write方式進行網絡文件傳輸的方式,我們可以看到,在這個過程當中,文件數據實際上是經過了四次copy操作:硬盤->內核buf->用戶buf->socket相關緩沖區->協議引擎。而sendfile系統調用則提供了一種減少以上多次copy,提升文件傳輸性能的方法。
在內核版本2.1中,引入了sendfile系統調用,以簡化網絡上和兩個本地文件之間的數據傳輸。sendfile的引入不僅減少了數據復制,還減少了上下文切換。運行流程如下:
- sendfile系統調用,文件數據被copy至內核緩沖區;
- 再從內核緩沖區copy至內核中socket相關的緩沖區;
- 最后再socket相關的緩沖區copy到協議引擎。
相較傳統read/write方式,2.1版本內核引進的sendfile已經減少了內核緩沖區到user緩沖區,再由user緩沖區到socket相關緩沖區的文件copy,而在內核版本2.4之后,文件描述符結果被改變,sendfile實現了更簡單的方式,再次減少了一次copy操作。
在Apache、Nginx、lighttpd等web服務器當中,都有一項sendfile相關的配置,使用sendfile可以大幅提升文件傳輸性能。Kafka把所有的消息都存放在一個一個的文件中,當消費者需要數據的時候Kafka直接把文件發送給消費者,配合mmap作為文件讀寫方式,直接把它傳給sendfile。
二、批量壓縮
在很多情況下,系統的瓶頸不是CPU或磁盤,而是網絡IO,對于需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。進行數據壓縮會消耗少量的CPU資源,不過對于kafka而言,網絡IO更應該需要考慮。
- 如果每個消息都壓縮,但是壓縮率相對很低,所以Kafka使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮;
- Kafka允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費者解壓縮;
- Kafka支持多種壓縮協議,包括Gzip和Snappy壓縮協議。
總結:
Kafka速度的秘訣在于,它把所有的消息都變成一個批量的文件,并且進行合理的批量壓縮,減少網絡IO損耗,通過mmap提高I/O速度,寫入數據的時候由于單個Partion是末尾添加所以速度最優。讀取數據的時候配合sendfile直接暴力輸出。
2.10 RabbitMQ如何保證消息已達?
參考答案
RabbitMQ可能丟失消息分為如下幾種情況:
-
生產者丟消息:
生產者將數據發送到RabbitMQ的時候,可能在傳輸過程中因為網絡等問題而將數據弄丟了。
-
RabbitMQ自己丟消息:
如果沒有開啟RabbitMQ的持久化,那么RabbitMQ一旦重啟數據就丟了。所以必須開啟持久化將消息持久化到磁盤,這樣就算RabbitMQ掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟失。除非極其罕見的情況,RabbitMQ還沒來得及持久化自己就掛了,這樣可能導致一部分數據丟失。
-
消費端丟消息:
主要是因為消費者消費時,剛消費到還沒有處理,結果消費者就掛了,這樣你重啟之后,RabbitMQ就認為你已經消費過了,然后就丟了數據。
針對上述三種情況,RabbitMQ可以采用如下方式避免消息丟失:
-
生產者丟消息:
- 可以選擇使用RabbitMQ提供是事務功能,就是生產者在發送數據之前開啟事務,然后發送消息,如果消息沒有成功被RabbitMQ接收到,那么生產者會受到異常報錯,這時就可以回滾事務,然后嘗試重新發送。如果收到了消息,那么就可以提交事務。這種方式有明顯的缺點,即RabbitMQ事務開啟后,就會變為同步阻塞操作,生產者會阻塞等待是否發送成功,太耗性能會造成吞吐量的下降。
- 可以開啟confirm模式。在生產者那里設置開啟了confirm模式之后,每次寫的消息都會分配一個唯一的id,然后如何寫入了RabbitMQ之中,RabbitMQ會給你回傳一個ack消息,告訴你這個消息發送OK了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息失敗了,你可以進行重試。而且你可以結合這個機制知道自己在內存里維護每個消息的id,如果超過一定時間還沒接收到這個消息的回調,那么你可以進行重發。
事務機制是同步的,你提交了一個事物之后會阻塞住,但是confirm機制是異步的,發送消息之后可以接著發送下一個消息,然后RabbitMQ會回調告知成功與否。 一般在生產者這塊避免丟失,都是用confirm機制。
-
RabbitMQ自己丟消息:
設置消息持久化到磁盤,設置持久化有兩個步驟:
- 創建queue的時候將其設置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數據,但是不會持久化queue里面的數據。
- 發送消息的時候講消息的deliveryMode設置為2,這樣消息就會被設為持久化方式,此時RabbitMQ就會將消息持久化到磁盤上。 必須要同時開啟這兩個才可以。
而且持久化可以跟生產的confirm機制配合起來,只有消息持久化到了磁盤之后,才會通知生產者ack,這樣就算是在持久化之前RabbitMQ掛了,數據丟了,生產者收不到ack回調也會進行消息重發。
-
消費端丟消息:
息,然后RabbitMQ會回調告知成功與否。 一般在生產者這塊避免丟失,都是用confirm機制。
-
RabbitMQ自己丟消息:
設置消息持久化到磁盤,設置持久化有兩個步驟:
- 創建queue的時候將其設置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數據,但是不會持久化queue里面的數據。
- 發送消息的時候講消息的deliveryMode設置為2,這樣消息就會被設為持久化方式,此時RabbitMQ就會將消息持久化到磁盤上。 必須要同時開啟這兩個才可以。
而且持久化可以跟生產的confirm機制配合起來,只有消息持久化到了磁盤之后,才會通知生產者ack,這樣就算是在持久化之前RabbitMQ掛了,數據丟了,生產者收不到ack回調也會進行消息重發。
-
消費端丟消息:
使用RabbitMQ提供的ack機制,首先關閉RabbitMQ的自動ack,然后每次在確保處理完這個消息之后,在代碼里手動調用ack。這樣就可以避免消息還沒有處理完就ack。