文章目錄
- 1. Kafka 消費者消費消息
- 01. 創建消費者
- 02. 訂閱主題
- 03. 輪詢拉取數據
- 2. Kafka 消費者參數配置
- 01. fetch.min.bytes
- 02. fetch.max.wait.ms
- 03. fetch.max.bytes
- 04. max.poll.records
- 05. max.partition.fetch.bytes
- 06. session.timeout.ms 和 heartbeat.interval.ms
- 07. max.poll.interval.ms
- 08. default.api.timeout.ms
- 09. request.timeout.ms
- 10. auto.offset.reset
- 11. partition.assignment.strategy
- 12. client.id
- 13. group.instance.id
- 14. receive.buffer.bytes和send.buffer.bytes
- 15. offsets.retention.minutes
1. Kafka 消費者消費消息
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group-hh");// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 訂閱主題 testconsumer.subscribe(Collections.singletonList("test"));// 消費數據while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.printf("主題 = %s, 分區 = %d, 位移 = %d, " + "消息鍵 = %s, 消息值 = %s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}}
}
01. 創建消費者
在讀取消息之前,需要先創建一個KafkaConsumer對象。創建KafkaConsumer對象與創建KafkaProducer對象非常相似——把想要傳給消費者的屬性放在Properties對象里。
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group-hh");// 創建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。
① bootstrap.servers 指定了連接Kafka集群的字符串。
② key.deserializer 和 value.deserialize 是為了把字節數組轉成Java對象。
③ group.id 指定了一個消費者屬于哪一個消費者群組 ,默認值為“”。如果設置為空,則會報出異常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,這個參數需要設置成具有一定的業務意義的名稱。
02. 訂閱主題
① 在創建好消費者之后,下一步就可以開始訂閱主題了。subscribe()方法會接收一個主題列表作為參數。
// 訂閱單個主題 test
consumer.subscribe(Collections.singletonList("test"));
// 訂閱多個主題
consumer.subscribe(Arrays.asList("test","test1"));
② 也可以在調用subscribe()方法時傳入一個正則表達式。正則表達式可以匹配多個主題,如果有人創建了新主題,并且主題的名字與正則表達式匹配,那么就會立即觸發一次再均衡,然后消費者就可以讀取新主題里的消息。如果應用程序需要讀取多個主題,并且可以處理不同類型的數據,那么這種訂閱方式就很有用。
consumer.subscribe(Pattern.compile("test.*"));
subscribe 的重載方法中有一個參數類型是ConsumerRebalance-Listener,這個是用來設置相應的再均衡監聽器的。
③ 消費者不僅可以通過KafkaConsumer.subscribe()方法訂閱主題,還可以直接訂閱某些主題的特定分區,在KafkaConsumer中還提供了一個assign()方法來實現這些功能,這個方法只接受一個參數partitions,用來指定需要訂閱的分區集合。
public class KafkaConsumer<K, V> implements Consumer<K, V> {@Overridepublic void assign(Collection<TopicPartition> partitions) {// ...}public final class TopicPartition implements Serializable {private static final long serialVersionUID = -613627415771699627L;private int hash = 0;private final int partition;private final String topic;public TopicPartition(String topic, int partition) {this.partition = partition;this.topic = topic;}public int partition() {return partition;}public String topic() {return topic;}// ...}@Overridepublic List<PartitionInfo> partitionsFor(String topic) {return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));}
}
TopicPartition類只有2個屬性:topic和partition,分別代表分區所屬的主題和自身的分區編號,這個類可以和我們通常所說的主題—分區的概念映射起來。
// 訂閱主題 test 和分區2
consumer.assign(Collections.singletonList(new TopicPartition("test",2)));
如果我們事先并不知道主題中有多少個分區怎么辦?KafkaConsumer 中的partitionsFor()方法可以用來查詢指定主題的元數據信息。PartitionInfo類中的屬性topic表示主題名稱,partition代表分區編號,leader代表分區的leader副本所在的位置,replicas代表分區的AR集合,inSyncReplicas代表分區的ISR集合,offlineReplicas代表分區的OSR集合。
public class PartitionInfo {private final String topic;private final int partition;private final Node leader;private final Node[] replicas;private final Node[] inSyncReplicas;private final Node[] offlineReplicas;
}
通過 subscribe()方法訂閱主題具有消費者自動再均衡的功能,在多個消費者的情況下可以根據分區分配策略來自動分配各個消費者與分區的關系。當消費組內的消費者增加或減少時,分區分配關系會自動調整,以實現消費負載均衡及故障自動轉移。而通過assign()方法訂閱分區時,是不具備消費者自動均衡的功能的,其實這一點從assign()方法的參數中就可以看出端倪,兩種類型的subscribe()都有ConsumerRebalanceListener類型參數的方法,而assign()方法卻沒有。
03. 輪詢拉取數據
消費者API最核心的東西是通過一個簡單的輪詢向服務器請求數據。
// 消費數據
while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.printf("主題 = %s, 分區 = %d, 位移 = %d, " + "消息鍵 = %s, 消息值 = %s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}
這是一個無限循環。消費者實際上是一個長時間運行的應用程序,它通過持續輪詢來向Kafka請求數據。消費者必須持續對Kafka進行輪詢,否則會被認為已經“死亡”,它所消費的分區將被移交給群組里其他的消費者。傳給poll()的參數是一個超時時間間隔,用于控制poll()的阻塞時間(當消費者緩沖區里沒有可用數據時會發生阻塞)。如果這個參數被設置為0或者有可用的數據,那么poll()就會立即返回,否則它會等待指定的毫秒數。poll()方法會返回一個記錄列表。列表中的每一條記錄都包含了主題和分區的信息、記錄在分區里的偏移量,以及記錄的鍵–值對。我們一般會遍歷這個列表,逐條處理記錄。
輪詢不只是獲取數據那么簡單。在第一次調用消費者的poll()方法時,它需要找到GroupCoordinator,加入群組,并接收分配給它的分區。如果觸發了再均衡,則整個再均衡過程也會在輪詢里進行,包括執行相關的回調。所以,消費者或回調里可能出現的錯誤最后都會轉化成poll()方法拋出的異常。
需要注意的是,如果超過max.poll.interval.ms沒有調用poll(),則消費者將被認為已經“死亡”,并被逐出消費者群組。因此,要避免在輪詢循環中做任何可能導致不可預知的阻塞的操作。
消費者消費到的每條消息的類型為ConsumerRecord,這個和生產者發送的消息類型ProducerRecord相對應:
public class ConsumerRecord<K, V> {private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private final Optional<Integer> leaderEpoch;
}
其中,topic 和 partition 這兩個字段分別代表消息所屬主題的名稱和所在分區的編號。offset 表示消息在所屬分區的偏移量。timestamp 表示時間戳,與此對應的timestampType 表示時間戳的類型。timestampType 有兩種類型:CreateTime 和LogAppendTime,分別代表消息創建的時間戳和消息追加到日志的時間戳。headers表示消息的頭部內容。key 和 value 分別表示消息的鍵和消息的值,一般業務應用要讀取的就是value。
2. Kafka 消費者參數配置
01. fetch.min.bytes
這個屬性指定了消費者從服務器獲取記錄的最小字節數,默認是1字節。broker在收到消費者的獲取數據請求時,如果可用數據量小于fetch.min.bytes指定的大小,那么它就會等到有足夠可用數據時才將數據返回。這樣可以降低消費者和broker的負載,因為它們在主題流量不是很大的時候(或者一天里的低流量時段)不需要來來回回地傳輸消息。如果消費者在沒有太多可用數據時CPU使用率很高,或者在有很多消費者時為了降低broker的負載,那么可以把這個屬性的值設置得比默認值大。但需要注意的是,在低吞吐量的情況下,加大這個值會增加延遲。
02. fetch.max.wait.ms
通過設置fetch.min.bytes,可以讓Kafka等到有足夠多的數據時才將它們返回給消費者,feth.max.wait.ms則用于指定broker等待的時間,默認是500毫秒。如果沒有足夠多的數據流入Kafka,那么消費者獲取數據的請求就得不到滿足,最多會導致500毫秒的延遲。如果要降低潛在的延遲,那么可以把這個屬性的值設置得小一些。如果fetch.max.wait.ms被設置為100毫秒,fetch.min.bytes被設置為1 MB,那么Kafka在收到消費者的請求后,如果有1MB數據,就將其返回,如果沒有,就在100毫秒后返回,就看哪個條件先得到滿足。
03. fetch.max.bytes
這個屬性指定了Kafka返回的數據的最大字節數(默認為50 MB)。消費者會將服務器返回的數據放在內存中,所以這個屬性被用于限制消費者用來存放數據的內存大小。需要注意的是,記錄是分批發送給客戶端的,如果broker要發送的批次超過了這個屬性指定的大小,那么這個限制將被忽略。這樣可以保證消費者能夠繼續處理消息。值得注意的是,broker端也有一個與之對應的配置屬性,Kafka管理員可以用它來限制最大獲取數量。broker端的這個配置屬性可能很有用,因為請求的數據量越大,需要從磁盤讀取的數據量就越大,通過網絡發送數據的時間就越長,這可能會導致資源爭用并增加broker的負載。
04. max.poll.records
這個屬性用于控制單次調用poll()方法返回的記錄條數。可以用它來控制應用程序在進行每一次輪詢循環時需要處理的記錄條數(不是記錄的大小)。
05. max.partition.fetch.bytes
這個屬性指定了服務器從每個分區里返回給消費者的最大字節數(默認值是1MB)。當KafkaConsumer.poll()方法返回ConsumerRecords時,從每個分區里返回的記錄最多不超過max.partition.fetch.bytes指定的字節。需要注意的是,使用這個屬性來控制消費者的內存使用量會讓事情變得復雜,因為你無法控制broker返回的響應里包含多少個分區的數據。因此,對于這種情況,建議用fetch.max.bytes替代,除非有特殊的需求,比如要求從每個分區讀取差不多的數據量。
06. session.timeout.ms 和 heartbeat.interval.ms
session.timeout.ms指定了消費者可以在多長時間內不與服務器發生交互而仍然被認為還“活著”,默認是10秒。如果消費者沒有在session.timeout.ms指定的時間內發送心跳給群組協調器,則會被認為已“死亡”,協調器就會觸發再均衡,把分區分配給群組里的其他消費者。session.timeout.ms與heartbeat.interval.ms緊密相關。
heartbeat.interval.ms指定了消費者向協調器發送心跳的頻率,session.timeout.ms指定了消費者可以多久不發送心跳。因此,我們一般會同時設置這兩個屬性,heartbeat.interval.ms必須比session.timeout.ms小,通常前者是后者的1/3。如果session.timeout.ms是3秒,那么heartbeat.interval.ms就應該是1秒。把session.timeout.ms設置得比默認值小,可以更快地檢測到崩潰,并從崩潰中恢復,但也會導致不必要的再均衡。把session.timeout.ms設置得比默認值大,可以減少意外的再均衡,但需要更長的時間才能檢測到崩潰。
07. max.poll.interval.ms
這個屬性指定了消費者在被認為已經“死亡”之前可以在多長時間內不發起輪詢。前面提到過,心跳和會話超時是Kafka檢測已“死亡”的消費者并撤銷其分區的主要機制。我們也提到了心跳是通過后臺線程發送的,而后臺線程有可能在消費者主線程發生死鎖的情況下繼續發送心跳,但這個消費者并沒有在讀取分區里的數據。要想知道消費者是否還在處理消息,最簡單的方法是檢查它是否還在請求數據。但是,請求之間的時間間隔是很難預測的,它不僅取決于可用的數據量、消費者處理數據的方式,有時還取決于其他服務的延遲。在需要耗費時間來處理每個記錄的應用程序中,可以通過max.poll.records來限制返回的數據量,從而限制應用程序在再次調用poll()之前的等待時長。但是,即使設置了max.poll.records,調用poll()的時間間隔仍然很難預測。于是,設置max.poll.interval.ms就成了一種保險措施。它必須被設置得足夠大,讓正常的消費者盡量不觸及這個閾值,但又要足夠小,避免有問題的消費者給應用程序造成嚴重影響。這個屬性的默認值為5分鐘。
08. default.api.timeout.ms
如果在調用消費者API時沒有顯式地指定超時時間,那么消費者就會在調用其他API時使用這個屬性指定的值。默認值是1分鐘,因為它比請求超時時間的默認值大,所以可以將重試時間包含在內。poll()方法是一個例外,因為它需要顯式地指定超時時間。
09. request.timeout.ms
這個屬性指定了消費者在收到broker響應之前可以等待的最長時間。如果broker在指定時間內沒有做出響應,那么客戶端就會關閉連接并嘗試重連。它的默認值是30秒。不建議把它設置得比默認值小。在放棄請求之前要給broker留有足夠長的時間來處理其他請求,因為向已經過載的broker發送請求幾乎沒有什么好處,況且斷開并重連只會造成更大的開銷。
10. auto.offset.reset
這個屬性指定了消費者在讀取一個沒有偏移量或偏移量無效(因消費者長時間不在線,偏移量對應的記錄已經過期并被刪除)的分區時該做何處理。它的默認值是latest,意思是說,如果沒有有效的偏移量,那么消費者將從最新的記錄(在消費者啟動之后寫入Kafka的記錄)開始讀取。另一個值是earliest,意思是說,如果沒有有效的偏移量,那么消費者將從起始位置開始讀取記錄。如果將auto.offset.reset設置為none,并試圖用一個無效的偏移量來讀取記錄,則消費者將拋出異常。
11. partition.assignment.strategy
我們知道,分區會被分配給群組里的消費者。PartitionAssignor根據給定的消費者和它們訂閱的主題來決定哪些分區應該被分配給哪個消費者。Kafka提供了幾種默認的分配策略。
① 區間(range)
這個策略會把每一個主題的若干個連續分區分配給消費者。假設消費者C1和消費者C2同時訂閱了主題T1和主題T2,并且每個主題有3個分區。那么消費者C1有可能會被分配到這兩個主題的分區0和分區1,消費者C2則會被分配到這兩個主題的分區2。因為每個主題擁有奇數個分區,并且都遵循一樣的分配策略,所以第一個消費者會分配到比第二個消費者更多的分區。只要使用了這個策略,并且分區數量無法被消費者數量整除,就會出現這種情況。
② 輪詢 (roundRobin)
這個策略會把所有被訂閱的主題的所有分區按順序逐個分配給消費者。如果使用輪詢策略為消費者C1和消費者C2分配分區,那么消費者C1將分配到主題T1的分區0和分區2以及主題T2的分區1,消費者C2將分配到主題T1的分區1以及主題T2的分區0和分區2。一般來說,如果所有消費者都訂閱了相同的主題(這種情況很常見),那么輪詢策略會給所有消費者都分配相同數量(或最多就差一個)的分區。
③ 黏性(sticky)
設計黏性分區分配器的目的有兩個:一是盡可能均衡地分配分區,二是在進行再均衡時盡可能多地保留原先的分區所有權關系,減少將分區從一個消費者轉移給另一個消費者所帶來的開銷。如果所有消費者都訂閱了相同的主題,那么黏性分配器初始的分配比例將與輪詢分配器一樣均衡。后續的重新分配將同樣保持均衡,但減少了需要移動的分區的數量。如果同一個群組里的消費者訂閱了不同的主題,那么黏性分配器的分配比例將比輪詢分配器更加均衡。
④ 協作黏性(cooperative sticky)
這個分配策略與黏性分配器一樣,只是它支持協作(增量式)再均衡,在進行再均衡時消費者可以繼續從沒有被重新分配的分區讀取消息。
可以通過partition.assignment.strategy來配置分區策略,默認值是org.apache.kafka.clients.consumer.RangeAssignor,它實現了區間策略。你也可以把它改成org.apache.kafka.clients.consumer.RoundRobinAssignor、org.apache.kafka.clients.consumer.StickyAssignor或org.apache.kafka.clients.consumer.CooperativeStickyAssignor。還可以使用自定義分配策略,如果是這樣,則需要把partition.assignment.strategy設置成自定義類的名字。
12. client.id
這個屬性可以是任意字符串,broker用它來標識從客戶端發送過來的請求,比如獲取請求。它通常被用在日志、指標和配額中。
13. group.instance.id
這個屬性可以是任意具有唯一性的字符串,被用于消費者群組的固定名稱。
14. receive.buffer.bytes和send.buffer.bytes
這兩個屬性分別指定了socket在讀寫數據時用到的TCP緩沖區大小。如果它們被設置為–1,就使用操作系統的默認值。如果生產者或消費者與broker位于不同的數據中心,則可以適當加大它們的值,因為跨數據中心網絡的延遲一般都比較高,而帶寬又比較低。
15. offsets.retention.minutes
這是broker端的一個配置屬性,需要注意的是,它也會影響消費者的行為。只要消費者群組里有活躍的成員(也就是說,有成員通過發送心跳來保持其身份),群組提交的每一個分區的最后一個偏移量就會被Kafka保留下來,在進行重分配或重啟之后就可以獲取到這些偏移量。但是,如果一個消費者群組失去了所有成員,則Kafka只會按照這個屬性指定的時間(默認為7天)保留偏移量。一旦偏移量被刪除,即使消費者群組又“活”了過來,它也會像一個全新的群組一樣,沒有了過去的消費記憶。