?
IMPLEMENTATION
1. API Design
Producer APIs
Producer API封裝了底層兩個Producer:
- kafka.producer.SyncProducer
- kafka.producer.async.AsyncProducer
class Producer {/* Sends the data, partitioned by key to the topic using either the *//* synchronous or the asynchronous producer */public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);/* Sends a list of data, partitioned by key to the topic using either *//* the synchronous or the asynchronous producer */public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);/* Closes the producer and cleans up */public void close();}
這么做的目的是通過一個簡答的API暴露把所有Producer的功能暴露給Client。Kafka的Producer可以:
- 排隊/緩存多個發送請求并且異步的批量分發出去:
kafka.producer.Producer
提供批量化多個發送請求(producer.type=async),之后進行序列化并發送的分區的能力。批大小可以通過一些配置參數進行設置。將事件加入到queue中,他們會被緩沖在queue中,直到滿足queue.time
或batch.size
達到配置的值。后臺線程(kafka.producer.async.ProducerSendThread)從queue中獲取數據并使用kafka.producer.EventHandler對數據進行序列化并發送到合適的分區。可以通過event.handler
參數以插件的形式添加自定義的event handler程序。在producer queue pipeline處理的各個階段可以注入回調,用于自定義的日志/跟蹤代碼或者監控邏輯。這可以通過實現kafka.producer.async.CallbackHandler
接口并設置callback.handler
參數來實現。
- 使用用戶指定的
Encoder
來序列化數據:
interface Encoder<T> {public Message toMessage(T data);}
默認使用kafka.serializer.DefaultEncoder
。
- 通過用戶可選的
Partitioner
來實現亂負載均衡:
Partition的路由由kafka.producer.Partitioner
決定。
interface Partitioner<T> {int partition(T key, int numPartitions);}
分區選擇API使用key和分區總數來選擇最終的partition(返回選擇的partition id)。id用于從排序的partition列表中選擇最終的一個分區去發送數據。默認的分區策略是hash(key)%numPartitions
。如果key是null,會隨機選擇一個分區。可以通過partitioner.class
參數來配置特定的分區選擇策略。
Consumer APIs
我們有兩個級別的Consumer API。低級別的“簡單的”API和單個Broker之間保持鏈接并且和發送到服務端的網絡請求有緊密的對應關系。這個API是無狀態的,每個請求都包含offset信息,允許用戶維護這個元數據。
高級別的API在Consumer端隱藏了Broker的細節,并且允許從集群消費數據而不關心底層的拓撲結構。同樣維持了“哪些數據已經被消費過”的狀態。高級別的API還提供了通過表達式訂閱的Topic的功能(例如通過白名單或者黑名單的方式訂閱)。
Low-level API
class SimpleConsumer {/* Send fetch request to a broker and get back a set of messages. */public ByteBufferMessageSet fetch(FetchRequest request);/* Send a list of fetch requests to a broker and get back a response set. */public MultiFetchResponse multifetch(List<FetchRequest> fetches);/*** Get a list of valid offsets (up to maxSize) before the given time.* The result is a list of offsets, in descending order.* @param time: time in millisecs,* if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.* if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.*/public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);}
低級別的API用于實現高級別的API,也被直接使用在一些在狀態上有特殊需求的“離線”Consumer。
High-level API
/* create a connection to the cluster */ConsumerConnector connector = Consumer.create(consumerConfig);interface ConsumerConnector {/*** This method is used to get a list of KafkaStreams, which are iterators over* MessageAndMetadata objects from which you can obtain messages and their* associated metadata (currently only topic).* Input: a map of <topic, #streams>* Output: a map of <topic, list of message streams>*/public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);/*** You can also obtain a list of KafkaStreams, that iterate over messages* from topics that match a TopicFilter. (A TopicFilter encapsulates a* whitelist or a blacklist which is a standard Java regex.)*/public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);/* Commit the offsets of all messages consumed so far. */public commitOffsets()/* Shut down the connector */public shutdown()}
這個API圍繞迭代器,通過KafkaStream類實現。一個KafkaStream表示了一個或多個分區(可以分布在不同的Broker上)組成的消息流。每個Stream被單個線程處理,客戶端可以在創建流時提供需要的個數。這樣,一個流背后可以是多個分區,但是一個分區只會屬于一個流。
createMessageStreams調用會吧Consumer注冊到Topic,促使Consumer/Broker的重新分配。API鼓勵在單次調用中創建多個Stream以減少充分配的次數。createMessageStreamsByFilter方法的調用(另外的)用于注冊watcher去發現匹配過濾規則的topic。createMessageStreamsByFilter返回的迭代器可以迭代來此多個Topic的消息(如果多個Topic都符合過濾規則)。
2. Network Layer
網絡層是一個直接的NIO Server,不會詳細的描述。sendfile是通過給MessageSet增加writeTo方法實現的。這允許使用文件存儲的消息用更高效的transferTo實現代替讀取數據到進程內的處理。線程模型是簡單的單acceptor多processor,每個線程處理固定數量的連接。這種設計已經被很好的測試并且是易于實現的。協議非常簡單,以便在將來實現其他語言的客戶端。
3. Messages
消息包含一個固定大小的頭,一個變長且“不透明”的byte數組表示的key和一個變長且“不透明”的byte數組表示的內容。header包含以下內容:
- A CRC32 checksum to detect corruption or truncation.
- A format version.
- An attributes identifier
- A timestamp
保持key和value的不透明是正確的決定:現在序列化方面取得了巨大的進展,選擇特定的一個序列化方式都不太適合所有的場景。不用說使用Kafka的特定程序可能需要特定的序列化方式。MessageSet接口是批量讀寫消息到NIO Channel的迭代器。
4. Message Format
/*** 1. 4 byte CRC32 of the message* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version* bit 0 ~ 2 : Compression codec.* 0 : no compression* 1 : gzip* 2 : snappy* 3 : lz4* bit 3 : Timestamp type* 0 : create time* 1 : log append time* bit 4 ~ 7 : reserved* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0* 5. 4 byte key length, containing length K* 6. K byte key* 7. 4 byte payload length, containing length V* 8. V byte payload*/
5. Log
包含兩個partition,名稱為“my_topic”的Topic的日志包含兩個目錄(名稱為my_topic_0和my_topic_1),其中包含該Topic的消息的數據文件。日志文件的格式是log entry的序列;每個log entry都是4字節的消息長度N加上后面N個字節的消息數據。每條消息都有一個64位的offset標識這條消息在這個Topic的Partition中的偏移量。消息在磁盤中的存儲格式如下所示。每個日志文件都以它存儲的第一條消息的offset命名。所以第一個文件會命名為00000000000.kafka,隨后每個文件的文件名將是前一個文件的文件名加上S的正數,S是配置中指定的單個文件的大小。
消息的確切的二進制格式都有版本,它保持一個標準的接口,讓消息集可以根據需要在Producer、Broker、Consumer之間傳輸而不需要重新拷貝或者轉換,其格式如下:
On-disk format of a messageoffset : 8 bytes message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)crc : 4 bytesmagic value : 1 byteattributes : 1 bytetimestamp : 8 bytes (Only exists when magic value is greater than zero)key length : 4 byteskey : K bytesvalue length : 4 bytesvalue : V bytes
使用消息的Offset作為消息ID是不常見的。我們初始的想法是在Producer生成一個GUID作為Message ID,并在Broker上維持ID和Offset之間的映射關系。但是因為Consumer需要為每個Server維持一個ID,那么GUID的全局唯一性就變得沒什么意義了。此外,維持一個隨機的ID和Offset的映射關系將給索引的構建帶來巨大的負擔,本質上需要一個完整的持久化的隨機存取的數據結構。因此,為了簡化查找結構,我們決定使用每個分區的原子計數器,它可以和分區ID加上ServerID來唯一標識一條消息。一旦使用了計數器,直接使用Offset進行跳轉是順其自然的,兩者都是分區內單調遞增的整數。由于偏移量從消費者API中隱藏起來,因此這個決定是最終的實現細節,所以我們采用更有效的方法。
Writes
日志允許連續追加到最后一個文件。當文件達到配置的大小時(如1GB)將滾動到一個新文件。日志采用兩個配置:M,配置達到多少條消息后進行刷盤;S,配置多長時間之后進行刷盤。這個持久化策略保證最多只丟失M條消息或者S秒之內的消息。
Reads
讀取通過提供64位的offset和S-byte的chunk大小來實現。這將返回包含在S-byte的buffer的消息跌代替。S比任意單條消息都大,但是如果在異常的超大消息的情況下,讀取操作可以通過多次重試,每次都將buffer大小翻倍,直到消息被讀取成功。最大消息大小和buffer大小可以配置,用于拒絕超過特定大小的消息,以限制客戶端讀取消息時需要拓展的buffer大小。buffer可能以不完整的消息作為結尾,這可以通過消息大小來輕松的檢測到。
實際的讀取操作首先需要定位offset所在的文件,再將offset轉化為文件內相對的偏移量,然后從文件的這個偏移量開始讀取數據。搜索操作通過內存中映射的文件的簡單的二分查找完成。
日志提供了獲取最近寫入消息的能力以允許客戶端從“當前時間”開始訂閱。這在客戶端無法在指定天數內消費掉消息的場景中非常有用。在這種情況下,如果客戶端嘗試消費一個不存在的offset將拋出OutOfRangeException異常并且可以根據場景重置或者失敗。
這面是發送到Consumer的數據的格式:
MessageSetSend (fetch result)total length : 4 byteserror code : 2 bytesmessage 1 : x bytes...message n : x bytes
MultiMessageSetSend (multiFetch result)total length : 4 byteserror code : 2 bytesmessageSetSend 1...messageSetSend n
Deletes
數據刪除一次刪除一個日志段。日志管理器允許通過插件的形式實現刪除策略來選擇那些文件是合適刪除的。當前的刪除策略是日志文件的修改時間已經過去N天,保留最近N GB數據的策略也是有用的。為了避免刪除時鎖定讀取操作,我們采用copy-on-write的方式來實現,以保證一致性的視圖。
Guarantees
日志提供了配置參數M,用于控制寫入多少條消息之后進行一次刷盤。在日志恢復過程中遍歷最新的日志段的所有消息并驗證每一條消息是有效的。如果消息的大小和偏移量之和小于文件長度并且消息的CRC32和存儲的CRC相同,那么消息是有效的。在異常事件被檢測到時,日志會被截取到最后一條有效的消息的offset。
兩種錯誤需要處理:因為Crash導致的written塊丟失和無意義的block被添加到文件中。這樣做的原因是,一般的操作系統不保證file inode和實際數據塊之間的寫入順序,所以除了丟失丟失written data,文件還會獲得無意義的數據,在inode更新大小但是在block寫入數據之前。CRC檢測這個錯誤并防止損壞日志(unwritten的消息肯定會丟失)。
6. Distribution
Consumer Offset Tracking
high-level的Consume保持它自己消費過的每個分區的最大的offset并且周期性的提交,所以可以在重啟的時候恢復offset信息。Kafka提供在offset manager中保存所有offset的選項。任何Consumer實例都需要發送offset到offset manager。high-level的Consumer自動化的處理offset。如果使用simple consumer,需要自己手動管理offset。在Java simple consumer中現在還不支持,Java simple consumer只能從ZooKeeper提交和獲取offset。如果使用Scala simple consumer,你會找到offset manager并且可以明確指定向offset manager提交和獲取offset。Consumer通過向Broker發送GroupCoordinatorRequest請求并獲取包含offset manager的GroupCoordinatorResponse來獲取offset。之后Consumer可以向offset manager提交和獲取offset。如果offset manager移動,Consumer需要重新發現。如果你期望手動管理offset,可以查看這些解釋如果提交OffsetCommitRequest和OffsetFetchRequest的代碼。
當offset manager收到OffsetCommitRequest,將其添加到一個特殊的、壓縮的,成為_consumeroffsets的Kafka topic中。offset manager返回一個成功的offset commit的響應給Consumer,當所有的備份都收到offset之后。如果在配置的timeout時間內所有副本沒有完成備份,commit offset認為是失敗的,將在之后重試(high-consumer將自動執行)。broker周期性的壓縮offset信息,因為它只需要保存每個Partition最近的offset信息即可。為了更快的響應獲取offset的請求,offset manager也會在內存緩存offset數據。
當offset manager收到fetch offset的請求,它從cache中返回最近commit的offset。如果offset manager是剛啟動或者剛成為一些group的offset manager(通過成為一下offset topic的leader partition),它需要加載offset信息。這種情況下,fetch offset request或返回OffsetsLoadInProgress異常,Consumer需要之后重試(high-level consumer會自動處理)。
Migrating offsets from ZooKeeper to Kafka
Kafka較早的版本將offset信息存儲在ZooKeeper中。可以通過以下步驟將這些數據遷移到Kafka中:
- 在Consumer配置中設置offsets.storage=kafka,dual.commit.enabled=true
- 驗證Consumer正常
- 設置dual.commit.enabled=false
- 驗證Consumer正常
回滾(從Kafka到ZooKeeper)也可以通過以上的步驟執行,只需要將offsets.storage設置為zookeeper。
ZooKeeper Directories
以下說明ZooKeeper用于統籌Consumer和Broker的結構和算法。
Notation
路徑中標志位[xyz]表示xyz是不固定的,如/topics/[topic]表示/topics下每個topic都有一個對應的目錄。[0...5]表示0,1,2,3,4,5的序列。->符號用于指示一個節點的值,如/hello -> world表示/hello存儲的值是“world”。
Broker Node Registry
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
這是一個所有存在的Broker的節點列表,每個都提供一個唯一標識用于Consumer識別(必須作為配置的一部分)。在啟動時,Broker通過在/brokers/ids中創建一個znode來注冊自己。使用邏輯上的Broker ID的目的是可以在不影響Consumer的前提下將Broker移動到另外的物理機上。如果嘗試注冊一個已經存在的ID的Broker會失敗。
一旦Broker通過臨時節點注冊到ZK,注冊信息是動態的并且在Broker宕機或者關閉后會丟失(那么通知消費者Broker不再可用)。
Broker Topic Registry
/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
每個Broker將自己注冊到它包含的Topic下面,并保存Topic的partition數量。
Consumers and Consumer Groups
Consumer同樣將自己注冊到ZK中,為了協調其他的Consumer并且做消費數據的負載均衡。Consumer還可以通過offsets.storage=zookeeper將offset信息也存儲在ZK中。但是這個存儲一直在未來的版本中將被廢棄。因此建議將offset信息遷移到Kafka中。
多個Consumer可以組成一個集群共同消費一個Topic。一個Group中的每個Consumer實例共享一個group_id。
一個Group中的Consumer盡可能公平的分配partition,每個partition只能被一個group中的一個Consumer消費。
Consumer Id Registry
除了一個group內的所有Consumer實例共享一個groupid,每個Consumer實例還擁有一個唯一的consumerid(hostname:uuid)用于區分不同的實例。Consumer的id注冊在以下的目錄中。
/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
每個Consumer將自己注冊到Group目錄下并創建一個包含id的znode。節點的值包含<topic, #stream="">的Map。id用于標識group中哪些Consumer是活躍的。節點是臨時節點,所以在Consumer進程關閉之后節點會丟失。
Consumer Offsets
Consumer記錄每個分區消費過的最大的offset信息。如果配置了offsets.storage=zookeeper,這個數據會被記錄到ZK中。
/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
Partition Owner registry
每個Partition都被一個group內的一個Consumer消費。這個Consumer必須在消費這個Partition之前建立對這個Partition的所有權。為了建立所有權,Consumer需要將id寫入到Partition下面。
/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
Cluster Id
Cluster id是唯一且不可變的,用于表示Cluster。Cluster id最長可以擁有22個字符,由[a-zA-Z0-9_-]+組成。從概念上講,它在Cluster第一次啟動的時候生成。
實現層面上,它在Broker(0.10.1或更新的版本)第一次成功啟動后產生。Broker在啟動時嘗試從/cluster/id節點獲取cluster id。如果不存在,Broker穿件一個新的cluster id寫入到這個節點中。
Broker node registration
Broker節點基本上是相互獨立的,所以他們只是發布他們自己擁有的信息。當一個Broker加入時,它將自己注冊到broker node,并寫入自己的信息(host name和port)。Broker還將自己的Topic和Partition注冊到對應的目錄中。新Topic會被動態的注冊,當他們在Broker上創建的時候。
Consumer registration algorithm
當啟動一個Consumer時,它按照如下步驟操作:
- 在group下注冊自己的consumer id
- 注冊監聽器用于監聽新Consumer的加入和Consumer的關閉,Consumer變更都會觸發Partition的分配(負載均衡)。
- 注冊監聽器用于監聽Broker的加入和關閉,Broker變更都會觸發Partition的分配(負載均衡)。
- 如果Consumer通過filter創建了一個消息流,同樣會注冊一個監聽器用于監聽新topic的加入。
- 強制自己在消費group內重新平衡。
Consumer rebalancing algorithm
Consumer的負載均衡算法允許一個group內的所有Consumer對哪個Consumer消費哪些Partition達成一個共識。Consumer的負載均衡被Broker和同一個Group中的其他Consumer的添加和移除觸發。對于給定的topic和group,partitions被平均的分配個consumers。這個設計是為了簡化實現。如果我們允許一個分區同時被多個Consumer消費,那么在這個分區上會有沖突,需要一些鎖去保證。如果consumer的數量超過了分區數,部分consumer會拿不到任何數據。在充分配算法中,我們分配分區時盡量使consumer需要和最少的Broker通信。
每個Consumer按照如下步驟進行重分配:
1. 對于Ci(Ci表示Consumer Instance)訂閱的Topic T2. PT表示Topic T的所有分區T3. CG表示group內所有的Consumer4. 對PT進行排序 (那么相同Broker上的分區會被集中到一起)5. 對GC排序6. i表示Ci在CG中的位置,N = size(PT)/size(CG)7. 分配 i*N to (i+1)*N - 1 給Consumer Ci8. 從分區所有者注冊表中刪除Ci擁有的這些條目9. 將分區添加到所有者的分區注冊表中(我們可能需要重試直到分區原來的擁有者釋放分區所有權)
當分區重分配在一個Consumer觸發時,重分配應該在同一時間在相同group內的其他Consumer上也觸發。
?
--------------------------------------------------------------------------------
這是我的個人公眾號,計劃寫一個系列的文章《MQ從入門到...》,會從最基礎的MQ介紹到最終設計和實現一個MQ,歡迎長期關注和交流。
?