Kafka
1. 基本知識
1.1 前置知識
- topic表示一個類型/業務的數據的組
- 為方便擴展,提高吞吐率,一個topic分為多個partition。
- 配合分區的設計,提出消費者組的概念,每個消費者并行消費,同時,一個分區的數據,只能由一個消費者組的一個消費者來消費,除此之外,消費者和消費者相互獨立,一個消費者消費之后,另一個消費者也可以消費這部分數據,在同一個消費者組里面,每個成員會被分配給不同的分區進行消費,在分區或者消費者變化的時候,也會對成員進行動態分配這些分區。
- 為提高可用性,每個partition都會有若干個副本,分為leader(當前正在執行的partition)和follower(備用的partition),當leader掛掉時,被選中的follower就會成為新的leader,跟redis的集群中的主從比較類似。
- kafka的部分數據存儲在zookeeper中,記錄了正在運行的節點以及每個分區的leader選舉等信息,值得一提的是,在kafka2.8.0之后,kafka就可以不依賴于zookeeper,獨立進行運行了。
如果通過客戶端自動創建的話,partition默認只有一個,而我們可以在命令行輸入kafka-topics.sh --topic create-test --bootstrap-server kafka-1:9092 --partitions 11 --create
來創建一個有11個分區的topic,而如果是想要更新已有的topic的partition大小,應該將--create
修改為--alter
,如果是在docker環境中,也可以在進入容器之后輸入kafka-topics.sh
來查看命令的參數。其他的比如--describe
用于查看topic的詳細信息,--list
查看所有主題。
而我們也可以在命令行進行消費者和生產者的操作,生產者輸入kafka-console-producer.sh --bootstrap-server ip:port --topic [your topic]
,消費者則是將producer換成consumer即可,如下圖所示:
除此之外,加上--from-beginning
字段之后,consumer會加載所有的消息。
1.2 生產者
當生產者生產消息的時候,會經過producer->序列化器->攔截器(可選)->分區器->RecordAccumulator
分區器會將消息的數據進行分區,而對應的消息會被發到RecordAccumulator
,此時還沒有將數據發送,當數據積累到batch.size(默認16k)之后,Sender才會發送數據,當然,如果數據量比較少,滯留的時間超過linger.ms
設定的時間,就會發送消息,但是默認linger.ms
是0ms,也就是拿到消息就會立即發送數據,但實際可能因線程調度略有延遲。
當通過Sender發送數據時,會為每個Broker維護獨立的請求隊列。Kafka通過max.in.flight.requests.per.connection
參數(默認5)控制每個Broker連接允許的最大未確認請求數。當某個Broker的in-flight
請求數達到該限制時,針對該Broker的發送將暫停,直到收到對應的請求確認,之后才能繼續發送新的請求。這個機制可以防止單個Broker堆積過多未確認請求,同時保證全局吞吐量,當然,如果等待的時間超過了request.timeout.ms
(默認30s),生產者則會認為請求失敗,隨后進行重試,當然應答有三個級別,0代表無需等待數據落盤就可以應答,1代表leader收到數據就可以應答,-1代表leader和follower全都同步完畢之后,才可以應答,另外,在底層鏈路中,我們發送請求會通過調用selector
將消息發送給kafka集群。
Go中的Kafka
相較于kafka-go還是感覺sarama好用一點,雖然不支持context。
func main() {brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}topic := "cluster_test_topic"config := sarama.NewConfig()config.Producer.Return.Successes = trueconfig.Producer.Partitioner = sarama.NewRoundRobinPartitionerproducer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to start Kafka async producer: %v", err)}defer producer.Close()// 監聽成功和失敗的消息go func() {for msg := range producer.Successes() {fmt.Printf("Message sent successfully: topic:%s partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)}}()go func() {for err := range producer.Errors() {fmt.Printf("Failed to send message: %v\n", err)}}()wg := sync.WaitGroup{}wg.Add(1000000)// 發送消息for i := 0; i < 1000; i++ {go func() {for j := 0; j < 1000; j++ {msg := &sarama.ProducerMessage{Topic: topic,Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka!! My id is %d", i)),}producer.Input() <- msgdefer wg.Done()}}()}wg.Wait()
}
這里是一個簡單的go的生產者客戶端,可以向kafka發送異步的發送消息(取決于sarama.NewAsyncProducer
這一方法,如果需要同步調用,則需要做一些修改),同時我們在啟動生產者客戶端的程序時,在終端上會顯示我們的partition,而且可以明顯的看見不同的消息,被分到了不同的partition上面,下面來細說一下分區的好處
分區(Partition)
Partition是Topic的子集,如果一個topic只設置在一個broker(機器)上面,則在傳輸巨大的數據量的時候,多臺機器的負載不均勻,可能會導致broker壓力過大,造成性能瓶頸,而且該Broker一旦故障,所有數據都會不可用,可靠性低。
所以引入了partition,一個topic可以具有多個partition,而每個partition可以存放在不同的broker上面,實現 數據分布式存儲,這樣,只要將消息均勻的發送到不同的partition上面,就能夠實現broker的負載均衡,與此同時,默認情況下,如果消息帶有key字段,那么kafka會根據這個key計算哈希值,將其放到合適的分區上面。
值得一提的是,和java客戶端不同,go的Sarama客戶端在不指定分區。并且不設定Key的時候,會采取輪詢的策略來選擇分區,而java客戶端則是使用黏性分區來選擇分區。
但是,我們在Sarama客戶端,也可以自定義分區器,事實上,只需要自定義一個合乎規范的函數簽名然后實現一個分區器的接口即可:
// 自定義分區器
type MyPartitioner struct {topic string
}var _ sarama.Partitioner = (*MyPartitioner)(nil)// Partition implements sarama.Partitioner.
func (m *MyPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {return 0, nil
}// RequiresConsistency implements sarama.Partitioner.
func (m *MyPartitioner) RequiresConsistency() bool {return true
}
// 自定義構造函數
func NewMyPartitioner(topic string) sarama.Partitioner {return &MyPartitioner{topic: topic,}
}func main() {...config.Producer.Partitioner = NewMyPartitioner...
}
另外,我們之前提到了攔截器,攔截器事實上就是在發送消息之前要處理的事情,我們可以在Sarama客戶端中通過實現func (m *MyInterceptor) OnSend(*sarama.ProducerMessage)
這個方法來實現自定義的攔截器!而使用這個攔截器,
具體操作如下:
type MyInterceptor struct{}func (m *MyInterceptor) OnSend(*sarama.ProducerMessage) {fmt.Println("OnSend")
}var _ sarama.ProducerInterceptor = (*MyInterceptor)(nil)func main() {...config.Producer.Interceptors = []sarama.ProducerInterceptor{&MyInterceptor{},}...
}
至于序列化器,就是把我們的消息轉換成能夠在kafka中進行傳輸的字節流,具體的邏輯在這里:
msg := &sarama.ProducerMessage{Topic: topic,Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka!! My id is %d", i)),Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),}
這里將我們的消息轉換成kafka能夠識別的數據,然后將其發送。
生產者如何提高吞吐量?
我們之前提過,kafka默認的linger.ms
設置的是0,也就是收到數據立刻發送,但是,這樣雖然能實時發送信息,但是這種模式就像一個大貨車一次拉一小點東西,吞吐量肯定是不夠的,所以我們可以對batch.size
和linger.ms
這兩個參數進行調整來提高吞吐量,同時也可以進行壓縮,來節省內存,從而提高吞吐量,而在go-Sarama客戶端,對應producer的config可以這樣調整:
config.Producer.Flush.Bytes = 16 * 1024config.Producer.Flush.Frequency = time.Millisecond * 50config.Producer.CompressionLevel = int(sarama.CompressionSnappy)
除此之外,我們還可以通過config.ChannelBufferSize
來調整生產者緩沖區的大小,緩沖區的設置會影響內存占用和吞吐量,所以需要權衡利弊。
數據
我們的數據應答類型在Sarama中是這樣設置的(此處以-1為例):
config.Producer.RequiredAcks = sarama.WaitForAll
而重試類型默認為int最大值,我們可以通過下面的方式來設置:
config.Producer.Retry.Max = 10
-
可靠性:
之前提到過,我們的應答ACK有三種模式:
0
,不需要等數據落盤,直接應答,1
,當leader的數據落盤之后,不需要等待follower同步即可應答,-1
則是需要等待leader和follower都已經同步完畢,才進行應答。-
0:當消息發送出去,不等待kafka的相應,就認為信息已經完成,此時如果leader掛掉或者在數據落盤過程中掛掉了,那么相對應的數據也沒有了,此時就一定會導致數據丟失,是最不可靠的。
-
1:此時leader已經將消息寫入到本地,在此之前,如果leader掛掉了,發送方也會認為超時,然后重新發送,所以此時比上一種更加可靠,但是如果在同步的時候,leader掛掉了,也會造成數據丟失,允許丟失個別數據,如傳輸普通日志。
-
-1:此時會等待leader和所有的follower同步,才會返回ack信息,但是,缺點是如果一個follower掛掉了,就會導致整個partition重試,適用于對可靠性要求高地場景。
怎么解決這個問題呢?
至少一次(At-Least-Once),事實上,Leader維護了一個動態的
in-sync replica set
表示和leader維持同步的follower集合,如果follower長時間不向leader申請通信或者同步請求,就會被leader踢出ISR,超時時間由replica.lag.time.max.ms
設定,默認30s,這樣就能一定程度地解決這個問題,可以類比為心跳機制。但是如果所有的follower都掛掉了,事實上,也就和1模式沒有區別了,所以我們數據完全可靠的條件是:ack級別-1,分區副本大于等于2,ISR最小應答副本大于等于2。
但與此同時還有一個問題就是數據重復問題,就是當所有節點都已經同步完成,但是恰好在應答的那一刻掛掉了,然后沒有受到消息,生產者又發送一次數據,此時會發送到新的leader上,造成數據重復。
-
-
數據重復問題
精確一次(Exactly-Once)上面提到了,我們的-1模式可以保證數據不丟失,但是不保證不重復,而1模式可以保證數據不重復,而不能保證數據不丟失。而Kafka通過引入了冪等性和事務這兩個特性。
-
冪等性:通過PID,Partition,SeqNumber來判斷當前的消息是否重復,PID就是生產者的ID,而Partition代表分區號,SeqNumber單調遞增,所以冪等性只能保證單分區單會話內不重復,如果遇見一個這些部分重復的,就會自動忽視這些消息,冪等性默認是開啟的,但是僅僅只能在一個會話中保證,如果傳輸過程中掛掉,又重新啟動了,怎么辦?
config.Producer.Idempotent = true
-
事務:開啟事務,必須要開啟冪等性,由于需要保持不同會話,能夠保持狀態,所以我們還需要一個事務id,在發送信息時,需要先標注好事務的id,以保證不同會話的同一個消息的一致性。為了保持事務的狀態,Kafka中還存在一個特殊的Topic,這個Topic中默認50個分區,將所有的事務保存到磁盤中,通過計算事務id的哈希值,我們可以找到對應的事務,并且由對應的事務協調器負責這個事務(一一對應),這樣即便客戶端掛掉,重啟之后也能繼續處理未完成的事務,或者回滾事務,保證數據一致性。
事務底層依賴于冪等性,即便如此,當producer重啟后,即便PID不同,Kafka也能根據事務ID來識別消息是否相同。除此之外,Sarama客戶端使用事務的流程如下:
func main() {defer func() {if err := recover(); err != nil {color.Red("Error: %v", err)}}()brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}topic := "cluster_test_topic"config := sarama.NewConfig()config.Producer.Return.Successes = trueconfig.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 10config.Producer.Partitioner = sarama.NewRoundRobinPartitionerconfig.Producer.Idempotent = trueconfig.Net.MaxOpenRequests = 1config.Producer.Transaction.ID = "my-transaction-id"producer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {log.Fatalln("Failed to start Sarama producer:", err)}defer producer.Close()go func() {for range producer.Successes() {color.Green("Message delivered successfully")}}()go func() {for err := range producer.Errors() {panic(err)}}()producer.BeginTxn()for i := 0; i < 100000; i++ {msg := &sarama.ProducerMessage{Topic: topic,Value: sarama.StringEncoder("Hello kafka World!"),}producer.Input() <- msg}producer.CommitTxn() }
-
-
數據有序:在單個分區里面,數據是有序的,但是如果消費多個分區的數據,則無法保證有序。
-
數據亂序:之前提過,broker最多能夠緩存五個請求,比如,當第三個請求失敗,但是第四個請求成功了,此時就會造成亂序,有一種解決方案是將
max.in,flight.request.per.connection
設置為1,表示最多只能緩存一個請求,但是效率低下,但是如果啟動冪等性的話,這個值就可以設置小于等于5,這就可以保證最近五個請求不亂序了,因為我們知道冪等性有一個參數是序號,所以能夠解決亂序的問題。
Broker
zookeeper存儲的kafka相關信息:
- 記錄有哪些服務器。
- 記錄每一個主題的leader以及ISR。
- 輔助leader選舉的controller。
在每個kafka實例啟動后,都會向zookeeper注冊broker,隨后開始選擇controller,按照先來后到的原則,誰先進行注冊,哪個broker就會被選舉為controller。
**Controller是什么?**controller是一個特殊的broker,一個集群中只有一個controller,由zookeeper輔助選舉,如果當前controller宕機,kafka通過zookeeper監控controller的狀態,此時,zookeeper會重新輔助選舉新的controller。
同時controller負責監聽brokers的節點變化,負責每個分區partition的leader的選舉,每次某個broker宕機或者加入時,都會進行重新選舉,在選舉一個新的leader之后,Controller就會將這些信息上傳到zookeeper,此時,還會將這些信息同步給其他節點,以便于controller掛掉之后,其他節點可以隨時進行選舉新的controller。
他還負責維護分區副本的管理,確保同步機制正常運行,維護kafka元數據,包括主題分區副本等信息,也負責處理topic和partition的創建刪除和修改,同時,由于Controller僅僅負責管理,所以他的變更并不會影響到集群的正常運行,但是頻繁的變更會影響kafka的性能。
在我們的生產者向其中發送信息的時候,follower會同步leader的信息,底層采用的是log的方式存儲這些信息,log的底層說segment(以1個G為單位),為了實現快速查找,里面還有index索引的概念,利用索引來進行檢索。
節點的服役與退役,在我們的節點服役和退役的時候,partition并不會自動的進行調整,而是需要我們手動進行負載均衡,具體步驟如下
-
首先需要在容器內創建一個
topics.json
文件,輸入以下內容,但是通常容器內置沒有文本編輯器,這個時候通過echo命令寫入就可以了。{"topics": [{"topic": "cluster_test_topic"} ],"version": 1 }
-
隨后執行
kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --topics-to-move-json-file topics.json --broker-list "1,2,3" --generate
,相關參數可能需要根據實際情況修改,然后終端就會輸出,當然,這里的kafka僅僅是提供了一種分配方法,實際上是可以自定義的。{"version": 1,"partitions": [{"topic": "cluster_test_topic", "partition": 0, "replicas": [1], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 1, "replicas": [2], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 2, "replicas": [3], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 3, "replicas": [1], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 4, "replicas": [2], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 5, "replicas": [3], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 6, "replicas": [1], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 7, "replicas": [2], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 8, "replicas": [3], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 9, "replicas": [1], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 10, "replicas": [2], "log_dirs": ["any"]},{"topic": "cluster_test_topic", "partition": 11, "replicas": [3], "log_dirs": ["any"]}] }
形如這樣的信息,將其echo到
reassign.json
中即可。 -
隨后執行
kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --reassignment-json-file reassign.json --execute
命令執行分配。 -
最后
kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --reassignment-json-file reassign.json --verify
查看,如果所有分區都已經被正確分配,那么就算完成了!這里如果是新增節點的話,肯定需要在一開始就新增節點,但是如果是退役節點的話,一般是在分配完成之后進行退役。
除此之外,還需要提到副本的問題,副本是什么?副本就是一個分區partition的備份,一個partition會有leader和follower,當leader掛掉的時候,我們就會選舉一個副本成為leader,這樣就保證了高可用性,但是副本不宜過多,當我們副本過多,主從同步就需要更多的時間和磁盤資源來繼續你同步,并且占用的空間大小也會增大,增加了系統資源的損耗和延遲,而當我們使用waitforall
級別的可靠性,延遲就會更加明顯。
我們第一次手動分配分區的時候,如果沒有執行副本數量,那么就不會分配副本!
所以我們需要在創建之初就指定副本數量kafka-topics.sh --bootstrap-server kafka-1:9092 --create --topic cluster_test_topic --partitions 12 --replication-factor 3
,當然,如果在創建的時候忘記了,也沒關系,步驟和重新分配分區的流程是一樣的,區別就是在第二步的時候,我們可以將kafka給定的json自定義;
{"version": 1,"partitions": [{"topic": "cluster_test_topic", "partition": 0, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 1, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 2, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 3, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 4, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 5, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 6, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 7, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 8, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 9, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 10, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},{"topic": "cluster_test_topic", "partition": 11, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]}]
}
將replica的參數進行修改,這樣就可以正確的分配副本了!然后execute,就完成了。
既然提到了副本,我們還需要引入一下leader選舉的一些知識,一般來說,我們手動通過命令行分配副本或者partition,會默認采取負載均衡的策略,但是如果是在一些節點宕掉的時候,然后進行選舉,就會采取搶占式的選舉leader,這就可能會導致某一個broker負載過大,broker集群的負載不均衡,而我們可以通過重新執行replica的再分配或者定期執行kafka-leader-election.sh --bootstrap-server kafka-1:9092 --election-type PREFERRED --all-topic-partitions
這個命令來實現負載均衡,同時這個命令也可以用于重啟后恢復原本的分區狀態;另外,還有幾個參數可以解決這個問題:auto.leader.rebalance.enable
默認為true,自動平衡,leader.imbalance.per.broker.percentage
表示每個broker允許的不平衡的leader的比率,超出就會觸發平衡機制,leader.imbalance.check.interval.seconds
檢查leader是否負載平衡的間隔時間。
另外,我們的生產者只會將數據發送給Leader,然后follower會與leader發送同步請求,如果長時間follower沒有向leader通信或者發送同步請求,就會被踢出ISR,這個時間閾值參數是replica.lag.time.max.ms
,而OSR表示同步過程中延遲過多的副本,replicas表示所有存儲副本的節點,而ISR表示所有保持同步的節點,也就是說,如果機器掛掉,ISR會將這個節點移除,但是replicas不會!
follower故障:在leader和follower同步的時候,LEO是每個副本的最后一個offset + 1,而HW則是所有副本中最小的LEO,也就是說,所有的Follower的LEO雖然不一定一樣,但是HW是一樣的,HW也就是(High WaterMark)高水位線,當其中一個follower掛掉之后,會被踢出ISR,之后,其他的follower和leader會繼續同步,維護一個HW,當之后,follower恢復了,此時會舍去掛掉的時候記錄的HW之后的數據,然后重新開始同步,直到達到HW,就可以再次加入ISR。
leader故障:leader掛掉之后,會重新選拔一個新的leader,同時,leader和follower的數據,超過HW的部分都會被舍去,保證數據一致性,但是無法保證數據不丟失或者不重復。
文件存儲
- Topic是邏輯上的概念,而Partition是物理上的概念,每個partition都對應一個log文件,其中存儲的是生產者生產的數據,但是為了防止log文件過大,導致搜索效率低,每個partition的log又被分成了多個segment,單位為1個G,每個segment包括.index(索引),.log(存儲數據),.timeindex文件(時間戳索引,輔助定期刪除),值得注意的是,index并不是為每一條數據都設置了索引,而是使用了稀疏索引,默認每寫入4kb數據,會往index文件寫入一條索引,可以通過
log.index.interval.bytes
修改,同時index中保存的offset為相對的offset,這樣既可以執行查找的功能,也可以節省內存,防止offset過大。 - kafka中的日志默認保存時間是七天,七天一到,就可以通過
delete
或者compact
策略進行日志清理,默認是基于時間的刪除delete策略,以segment所有記錄中最大時間戳作為該文件的時間戳,以此為基準執行刪除。另一種是基于大小的刪除策略,超過設置的所有日志的總大小,刪除最早的segment,類似LRU機制;而壓縮日志compact策略則是將所有的Key相同的數據,只保留最新的Key,這樣來壓縮,類似redis的AOF重寫。 - Kafka能做到高效的讀寫數據,原因如下:
- 本身為分布式集群,可以采取分區技術,并行度高。
- 讀數據采取稀疏索引,可以快速定位要消費的數據。
- 順序寫磁盤,生產者生產數據以追加寫的形式寫入到log文件,相較于隨機寫,順序寫之所以快。是因為省去了大量磁頭尋址的時間。
- Kafka采取了頁緩存和零拷貝技術,頁緩存就是生產者將數據發送時,先將數據寫道內存頁中,然后由操作系統內核決定何時刷新到磁盤,這樣就不會在寫入的時候觸發磁盤I/O,同時,如果consumer讀取數據,會先從頁緩存中找,找不到再去磁盤中尋找,就減少了頻繁的磁盤I/O,提高了讀寫效率;零拷貝,Kafka直接調用
sendfile()
讓數據從頁緩存直接發送到TCP socket,而不需要走用戶態將數據交給應用層,再通過向下傳輸將數據發送給TCP socket,簡單來說,零拷貝允許數據直接在內核空間傳遞,而減少了用戶態和內核態數據的來回拷貝和切換,提高了讀寫效率。
1.3 消費者
Kafka的消費方式是什么?
一般來說,消息隊列有兩種消費方式,pull拉模式和push推模式,而kafka采取的是拉模式
拉取就是consumer主動從broker中拉取數據,這樣,每個consumer可以根據自己的處理能力去拉去相應數據量大小的數據,保證了每個consumer的消費能力被充分利用
而推模式,為什么kafka不采取這種形式?因為推模式中,消息的發送速率由broker決定,很難適應所有消費者的消費速率,比如如果推送速度過低,由于消費者消費能力參差不齊,就會導致部分consumer的能力沒法充分利用,但是如果推送速度稍微大一點,一些consumer由來不及處理消息。
但是即便如此,拉模式依舊有自己的缺點,當kafka沒有數據的時候,消費者可能會陷入循環,一直返回空數據。
工作流程
每個獨立的消費者都可以去消費數據,并且可以重復消費其他消費者消費的數據,但是,如果這兩個消費者位列同一個消費者組中,則這個消費者組會被視為一個"消費者",通俗來將,就是消費者組只能對同一份數據消費一次,也就是說,同一份數據不能被消費者組中的消費者消費兩次,并且,每個消費者都會被分配一個partition,讓他們去特定的partition去消費數據,同一個消費者組中的兩個消費者不能同時消費一個partition。
另外,為防止kafka節點或者消費者掛掉后,消費者不知道上一次消費某個partition消費哪里了,最新版的kafka中還維護了一個__consumer_offsets
來保存消費者消費到的數據的偏移量,而老版本的kafka將這個信息維護在zookeeper中,而維護在kafka主題中,方便管理維護,也減少了通信的時間消耗。
形成一個消費者組的條件是:消費者的GroupID相同,當然如果partition的數量超過了消費者組中的消費者的數量,被空出來的消費者不會參與消費。
消費者是如何實現分區的分配的?首先有一個coordinator,用來輔助消費者的初始化和分區的分配。最開始會從__consumer_offsets
中,通過消費者組的id進行哈希計算,選擇一個partition來存儲消費者的offset數據,而負責管理這個partition的broker,也就成為了這個消費者組的coordinator,而選舉完成之后,所有的consumer都會向這個coordinator發送JoinGroup請求,而coordinator又會從這些consumer中選舉一個作為消費者組的leader,此后,coordinator會把要消費的topic情況發送給leader消費者,隨后leader會制定一個消費方案,這里就涉及到一個分區分配的策略,隨后就將分配的方案發送給每個consumer,然后進行消費,而且每個消費者都會定期和coordinator保持心跳機制,一旦超時,就會被移除。并且會觸發再平衡(重新分配消費任務)。(當某個消費者消費過慢,也會觸發)
消費者組是怎么消費的?
首先我們需要創建一個用于訪問kafka集群的消費者客戶端,這個客戶端當然也有config配置,和生產者客戶端類似,有每批次的拉取大小,拉取數據的超時時間,每批次最大拉取大小的信息,當拉取消息時,在kafka端會返回數據并放入一個拉取隊列(緩沖區)中,隨后經過攔截器和反序列化器,最終將消息返回到客戶端,當然,處理完消息后,還需要提交offset偏移量(分為手動和自動),告訴kafka集群當前消息已經被該消費者組消費。
下面是go-Sarama消費者客戶端代碼:
// ConsumerGroupHandler 實現了 ConsumerGroupHandler 接口
type ConsumerGroupHandler struct{}// Cleanup implements sarama.ConsumerGroupHandler.
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {color.Green("消費者關閉!\n")return nil
}// Setup implements sarama.ConsumerGroupHandler.
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {color.Green("消費者啟動!\n")return nil
}// 消費消息并打印
func (h *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {fmt.Printf("Consumed message: %s\n", string(message.Value))// 手動提交偏移量sess.MarkMessage(message, "")sess.Commit()}return nil
}var _ sarama.ConsumerGroupHandler = (*ConsumerGroupHandler)(nil)func main() {defer func() {if err := recover(); err != nil {color.Red("Error: %v", err)}}()brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}topic := "cluster_test_topic"config := sarama.NewConfig()config.Consumer.Return.Errors = true//保持偏移量是最新的位置config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始化消費者組ConsumerGroup, err := sarama.NewConsumerGroup(brokers, "my-group", config)if err != nil {panic(err)}//此處采取for循環是因為在kafka的rebalance之后,consume會返回錯誤導致無法繼續消費//所以此處需要采取循環。go func() {for {//指定topicerr = ConsumerGroup.Consume(context.Background(), []string{topic}, &ConsumerGroupHandler{})if err != nil {color.Red("Error from consumer: %v", err)}}}()defer ConsumerGroup.Close()select {}
}
而如果想要消費特定分區,則不能采取consumerGroup的形式,而是單獨使用Consumer,然后調用ConsumePartition,同樣的,生產者也可以配置字段單獨向一個partition發送信息。
分區策略
kafka中自帶的分區策略有Range,Roundrobin,Sticky,CooperativeSticky,而Kafka可以同時使用多個分區分配策略,在Sarama客戶但可以通過調整config.Consumer.Group.Rebalance.Strategy
來修改采取的分區策略,默認是Range+CooperativeSticky
-
Range:范圍分配策略,針對于每個主題對每個分區和每個消費者進行編號排序,然后用消費者去對應每一個partition,總體來說就是(四個分區,三個消費者):
Partition1 <-> Consumer1
Partition2 <-> Consumer2
Partition3 <-> Consumer3
Partition4 <-> Consumer1
雖然只針對一個topic而言,編號較低的Consumer可能消耗不大,但是如果對于上百個topic而言,低位的Consumer就多承擔上百個partition,容易造成數據傾斜!
-
RoundRobin:輪詢分配策略,roundrobin是針對于所有的消費者訂閱的topic而言,將所有的partition和consumer排序,然后按照range的輪詢方法將partition分配給消費者。、
以上兩種在Rebalance時,都會重新分配所有的分區。
-
Sticky:粘性分配策略,隨機且均勻,初始分配時盡量負載均衡,但是在重分配時,會盡可能保留原有的分區分配,而僅僅調整部分的分區分配,這樣可以減少分區遷移的開銷,但是實現比較復雜。
offset
每個消費者組為了記錄每個partition消費到了什么位置,都需要記錄offset的位置,而這個offset在舊版本的kafka是存儲在zookeeper里面的,在新版本的Kafka中,是存在__Conustmer_offsets
的主題中的,里面有50個partition,而在這個topic中,是采取key-value的格式存儲offset的值,key是groupid + topic +分區號,value對應的則是offset,同時,每隔一段時間,kafka內部還對這個topic進行compact壓縮,這樣能夠保存最新的數據。
-
自動保存:kafka提供了自動提交offset的功能,以便于我們能夠專注于自己的業務邏輯,配置參數為:
enable.auto.commit
以及auto.commit.interval.ms
表示是否開啟自動提交功能,自動提交offset的時間間隔,默認開啟和5s,在Sarama客戶端中,對應的字段為:config.Consumer.Offsets.AutoCommit.Enable config.Consumer.Offsets.AutoCommit.Interval
雖然很方便,但是缺點很明顯,如果在還沒有提交的時候,但是此時消費者掛了,就會導致重復消費!因此,我們的kafka也提供了手動提交的功能。
-
手動提交:手動提交又分為同步和異步,通過手動提交,我們能夠更好地控制offset的提交,通常我們是采取異步提交的方式來手動提交offset,但是Sarama庫似乎并沒有直接封裝異步提交的API,需要我們去手動實現,而kafka-go這個包貌似是支持的。
-
指定Offset:在Sarama中,可以通過設置
config.Consumer.Offsets.Initial
這個字段值,來設置我們此次消費的起始位置,默認是從最新的offset進行消費的。當然,此處只能指定分區和指定offset才能夠使用,既然是指定offset,當然也可以通過執行時間戳來進行查找,在Sarama中,我們需要通過sarama.NewClient(brokers, config)
創建一個client,然后調用client.GetOffset(topic, partition, targetTime)
來獲取當前時間戳的offset,隨后執行執行消費操作,當然你也可以通過遍歷topic的所有分區來實現在某一時間段之后的所有消息的消費。 -
重復消費和漏消費:一般來說,自動提交offset會引起重復消費,而在自動提交的間隔期間,consumer掛掉了,重啟就會出現重復消費的情況,同時,手動提交也可能引起重復消費,比如說,提交offset的時候,網絡故障或者kafka宕機了,kafka就無法接受提交的offset,就會導致重復消費,而漏消費則是在消費消息之前提交了offset,如果在處理業務的時候崩潰,那么此時offset已經提交,就無法重新進行笑飛了,造成漏消費的情況,而不管自動提交還是手動提交都會有這種情況,所以一般來說是采取先處理完業務,再手動提交的方式。
而一般來說,我們的解決方案就是采取事務的方式去處理這個問題,當然,這也要求下游的消費者,如MySQL,支持事務,否則是做不到事物的回滾的
-
消息積壓:消息積壓就是說,消費的速度小于生產的速度,而我們在kafka中的數據滯留過久,就會被刪除,所以我們需要考慮去提高消費者的消費能力:
- 消費能力不足,可以增加分區,同時增加消費者的數量
- 如果是數據處理不及時,可以提高每批次拉取消息的數量,批次拉取的數量過少,也會導致數據積壓,同時我們在提高每批次拉取消息的數量的時候,也需要提高每批次拉取的數據大小。
1.4 調優
生產者調優
-
linger.ms可以調整每批次最長發送消息的間隔
-
batch.size可以調整每批次發送的消息的最大值
-
config.ChannelBufferSize緩沖區的總大小,較大的緩沖區可以提高吞吐率,但是會增加內存占用,如果緩沖區較小,可能會導致生產者阻塞,從而吞吐量降低。
-
冪等性:開啟冪等性可以使得在broker中緩存的五個請求不會亂序,或者說將broker緩存的最大數據量設置成1(效率低下)。
-
retry:重試的次數,默認是int的最大值,如果不希望一直重試,可以自己手動調小。
-
retry間隔時間:默認100ms。
-
回應方式:之前提過有0,1,-1三者等待方式,0就是直接發送出去就不管了,1則是消息在leader上面落盤之后返回消息,-1是最可靠的,也就是waitforall,等待leader和follower全部同步完成才會返回ack,當然這里也有關于exactly once的筆記,上面有詳細的介紹。
-
壓縮方式:默認為none,一般會配置成snappy這種比較輕量的壓縮方式,用于提高吞吐量。
Broker調優
- replica.lag.time.max.ms:表示ISR中Follower未向Leader同步或通信被踢出的時間。
- auto.leader.rebalance.enable:Leader Partition的自動平衡,默認關閉,除非節點經常掛,否則不建議打開,相關的還有超過一定百分比觸發自動平衡以及定期檢查是否平衡的參數。
- segment大小:默認1G.
- log.index.interval.bytes:默認4kb,每寫入4kb就會添加一個索引。
- 數據保存時間:默認七天,相關的還有檢查是否超時的時間間隔
- 刪除策略:默認delete,如果為compact,則會采取壓縮策略。
…還有一堆讀寫拉傳輸的線程數,以及強制頁緩存刷寫到磁盤的條數以及刷寫數據到磁盤的時間間隔。
另外一些,就是擴展分區數,調整分區副本的存儲,手動負載均衡。還有自動創建主題,雖然在測試環境中可以隨便開啟,但是在生產環境一般是關閉的,防止出現未知的亂七八糟的topic,并且自動創建的主題,partition一般都是默認值,不如手動創建。
消費者調優
在一個消費者組中,首先會通過哈希計算,計算出自己的哈希值,然后對應__consumer_offsets
這個特殊分區上面的特殊的partition,然后將管理這個partition的broker設置為這個消費者組的coordinator,輔助這個消費者組的初始化和分區的分配,然后每個消費者向這個coordinator注冊自己的信息,表示要加入這個group,然后coordinator會選擇一個消費者作為leader,然后將要消費的topic信息發給leader,由leader去執行分區分配,將分區分配的方案發給coordinator,隨后coordinator下發給這個消費者組的所有consumer,在這個過程中,每個消費者都會保持心跳,超過這個時間,就會踢出消費者組,并觸發再分配,或者消費時間過長,也會觸發再分配。
- Fetch.min.bytes:每批次的最小抓取數據大小,默認一字節。
- fetch.max.wait.ms:一批數據的最長的等待時間,超出這個之間就會拉取數據。
- fetch.max.bytes:每批次的最大抓取大小,
- max.poll.recodes:每次拉取數據的最大跳數。
- auto.commit:自動提交,如果追求exactly once,默認開啟
- session.time.out:消費者和coordinator的超時時間。
數據精準一次如何實現?
生產者講ack生成-1,同時開啟冪等性和事務,broker角度分區副本數量大于等于二,ISR應答的最小副本數量大于等于2,消費者角度,開啟事務,并且手動提交offset,并且輸出的目的地必須支持事務,如MySQL。
擇一個消費者作為leader,然后將要消費的topic信息發給leader,由leader去執行分區分配,將分區分配的方案發給coordinator,隨后coordinator下發給這個消費者組的所有consumer,在這個過程中,每個消費者都會保持心跳,超過這個時間,就會踢出消費者組,并觸發再分配,或者消費時間過長,也會觸發再分配。
- Fetch.min.bytes:每批次的最小抓取數據大小,默認一字節。
- fetch.max.wait.ms:一批數據的最長的等待時間,超出這個之間就會拉取數據。
- fetch.max.bytes:每批次的最大抓取大小,
- max.poll.recodes:每次拉取數據的最大跳數。
- auto.commit:自動提交,如果追求exactly once,默認開啟
- session.time.out:消費者和coordinator的超時時間。
數據精準一次如何實現?
生產者講ack生成-1,同時開啟冪等性和事務,broker角度分區副本數量大于等于二,ISR應答的最小副本數量大于等于2,消費者角度,開啟事務,并且手動提交offset,并且輸出的目的地必須支持事務,如MySQL。