三、Kafka 基礎入門
3.1 Kafka 是什么
Kafka 最初由 LinkedIn 公司開發,是一個開源的分布式事件流平臺,后成為 Apache 基金會的頂級項目 。它不僅僅是一個簡單的消息隊列,更是一個分布式流處理平臺,具備強大的消息隊列、存儲系統和流處理能力。Kafka 采用發布 - 訂閱的消息模型,能夠以極高的吞吐量處理海量的消息,并且支持將消息持久化到磁盤,確保數據的可靠性和可回溯性。
Kafka 的設計目標主要包括以下幾個方面:
- 高吞吐量:Kafka 在設計上充分考慮了性能因素,通過一系列優化手段,如順序讀寫、零拷貝技術等,使其能夠在普通的商用硬件上實現單機每秒處理 100K 條以上消息的傳輸,甚至在集群環境下,吞吐量可以達到每秒百萬級別的消息處理能力,這使得它非常適合處理大規模的實時數據。
- 持久化存儲:Kafka 將所有消息都持久化到磁盤上,通過高效的日志存儲機制,保證消息的不丟失。它采用追加式的日志寫入方式,減少了磁盤尋道時間,提高了寫入性能。同時,Kafka 還支持數據的多副本復制,進一步增強了數據的可靠性,即使部分節點出現故障,也不會導致數據丟失。
- 分布式與擴展性:Kafka 基于分布式架構,一個 Kafka 集群可以由多個 Kafka 服務器(Broker)組成,這些服務器可以分布在不同的物理節點上,實現水平擴展。通過分區(Partition)機制,Kafka 可以將一個大的主題(Topic)分布到多個 Broker 上,從而提高系統的處理能力和負載均衡能力。當業務量增長時,可以方便地添加新的 Broker 節點來擴展集群的容量和性能。
- 順序保證:對于同一個分區內的消息,Kafka 能夠嚴格保證它們的順序性,這在一些對消息順序有嚴格要求的場景中非常重要,如金融交易記錄、日志記錄等。雖然跨分區的消息順序無法保證,但在很多實際應用中,通過合理的分區設計,可以滿足大部分業務對消息順序的需求。
- 支持多種客戶端和語言:Kafka 提供了豐富的客戶端庫,支持 Java、C/C++、Scala、Python、Go、Erlang、Node.js 等多種編程語言,方便不同技術棧的開發者在各自的項目中集成和使用 Kafka。
3.2 Kafka 的核心概念
3.2.1 生產者(Producer)
生產者是 Kafka 中負責向 Kafka 集群發送消息的客戶端應用程序。它可以將各種類型的數據,如日志信息、業務事件、實時監控數據等,封裝成 Kafka 消息,并發送到指定的主題(Topic)中。
在實際應用中,生產者通常會根據業務需求對消息進行一些配置和處理。例如,在一個電商系統中,訂單服務作為生產者,當有新訂單生成時,它會將訂單相關的信息(如訂單號、用戶 ID、商品信息、訂單金額等)封裝成 Kafka 消息,并發送到名為 “order - topic” 的主題中。生產者在發送消息時,可以設置消息的鍵(Key),鍵的作用主要有兩個方面:一是可以用于決定消息被發送到主題的哪個分區,通過對鍵進行哈希計算,可以將具有相同特征的消息發送到同一個分區,從而保證這些消息在分區內的順序性;二是在一些場景下,消費者可以根據鍵來進行消息的篩選和處理,提高處理效率。此外,生產者還可以設置消息的一些屬性,如消息的時間戳、壓縮方式等,以滿足不同的業務需求。生產者發送消息的過程可以是同步的,也可以是異步的。同步發送時,生產者會等待 Kafka 集群的確認響應,確保消息成功發送后才繼續執行后續操作;異步發送時,生產者將消息發送到緩沖區后就立即返回,由后臺線程負責將緩沖區中的消息批量發送到 Kafka 集群,這種方式可以提高發送效率,適用于對實時性要求不高但對吞吐量要求較高的場景。
3.2.2 消費者(Consumer)
消費者是從 Kafka 集群接收并處理消息的客戶端應用程序。它通過訂閱一個或多個主題,獲取主題中的消息,并按照業務邏輯進行相應的處理。
消費者在 Kafka 中是以消費者組(Consumer Group)的形式存在的。一個消費者組可以包含多個消費者實例,這些消費者實例共同消費一個或多個主題中的消息。在同一個消費者組內,每個消費者負責消費不同分區的數據,這樣可以實現消息的并行處理,提高消費效率。例如,假設有一個名為 “user - behavior - topic” 的主題,用于記錄用戶在網站上的行為數據,它有 4 個分區。現在有一個消費者組,包含 3 個消費者實例。Kafka 會將這 4 個分區分配給這 3 個消費者實例,可能的分配方式是消費者實例 1 消費分區 1 和分區 2,消費者實例 2 消費分區 3,消費者實例 3 消費分區 4。這樣,3 個消費者實例可以同時從各自負責的分區中讀取消息并進行處理,大大提高了消息的消費速度。
消費者組的這種設計模式使得 Kafka 既支持消息的廣播(Broadcast)模式,也支持消息的單播(Unicast)模式。當所有消費者都屬于同一個消費者組時,消息會被均衡地投遞給每一個消費者,即每條消息只會被一個消費者處理,這就實現了單播模式,類似于點對點的消息傳遞;當所有消費者都屬于不同的消費者組時,消息會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就實現了廣播模式,類似于發布 - 訂閱模式。
消費者在消費消息時,采用的是拉(Pull)模式,即消費者主動從 Kafka 集群中拉取消息。這種模式與推(Push)模式相比,具有更好的靈活性和可控性,消費者可以根據自身的處理能力來調整拉取消息的頻率和數量,避免因為生產者推送消息的速度過快而導致消費者處理不過來的情況。同時,消費者還需要維護一個消費位移(Offset),它表示消費者在分區中已經消費到的位置。每次消費者成功消費消息后,會將消費位移提交給 Kafka 集群,以便在下次消費時能夠從上次消費的位置繼續讀取消息,保證消息的順序消費和不重復消費。
3.2.3 主題(Topic)
主題是 Kafka 中消息的邏輯分類,它類似于傳統消息隊列中的隊列概念,但又有所不同。每個主題可以看作是一個獨立的消息通道,生產者將消息發送到特定的主題,而消費者通過訂閱感興趣的主題來獲取消息。
在實際應用中,主題通常根據業務功能或數據類型來劃分。例如,在一個社交媒體平臺中,可能會有 “user - registration - topic” 主題用于記錄用戶注冊信息,“post - creation - topic” 主題用于記錄用戶發布的帖子內容,“comment - topic” 主題用于記錄用戶對帖子的評論等。通過將不同類型的消息劃分到不同的主題中,可以方便地對消息進行管理和處理,提高系統的可維護性和可擴展性。
一個 Kafka 集群可以包含多個主題,每個主題可以有多個分區,并且可以為每個主題設置不同的配置參數,如消息的保留時間、副本數量等。生產者在發送消息時,需要指定消息所屬的主題;消費者在訂閱消息時,也需要指定要訂閱的主題。這樣,通過主題的概念,Kafka 實現了消息的分類管理和靈活訂閱,使得不同的業務模塊可以獨立地進行消息的生產和消費,而互不干擾。
3.2.4 分區(Partition)
分區是主題的物理細分,每個主題可以包含一個或多個分區。分區的主要作用是提高 Kafka 的并行處理能力和數據可靠性。
從并行處理的角度來看,當一個主題有多個分區時,生產者發送的消息會被分布到不同的分區中,消費者組中的不同消費者實例可以同時從不同的分區中讀取消息并進行處理,從而實現了消息的并行消費,大大提高了系統的吞吐量。例如,一個電商系統的 “order - topic” 主題有 10 個分區,當有大量訂單生成時,生產者可以將訂單消息均勻地分布到這 10 個分區中。同時,一個包含 5 個消費者實例的消費者組可以分別從不同的分區中讀取訂單消息進行處理,這樣可以在短時間內處理大量的訂單消息,滿足高并發場景下的業務需求。
從數據可靠性的角度來看,每個分區都可以有多個副本(Replica),這些副本分布在不同的 Kafka 服務器(Broker)上。其中,一個副本被選舉為領導者(Leader)副本,其他副本為跟隨者(Follower)副本。生產者發送的消息會首先被寫入到 Leader 副本中,然后 Follower 副本會從 Leader 副本中同步數據,以保持數據的一致性。當 Leader 副本所在的 Broker 出現故障時,Kafka 會從 Follower 副本中選舉出一個新的 Leader 副本,繼續提供服務,從而保證了數據的可靠性和系統的高可用性。
此外,分區還可以根據鍵(Key)來進行數據的分配。如果生產者在發送消息時指定了鍵,Kafka 會根據鍵的哈希值將消息分配到相應的分區中,這樣可以保證具有相同鍵的消息會被發送到同一個分區,從而滿足一些對消息順序有要求的業務場景。例如,在一個訂單處理系統中,將訂單 ID 作為消息的鍵,那么同一個訂單的所有相關消息(如訂單創建、訂單支付、訂單發貨等)都會被發送到同一個分區,消費者在消費這些消息時,可以按照順序處理,確保訂單處理的正確性和一致性。
四、Kafka 實現異步通信
4.1 異步通信原理
Kafka 的異步通信基于生產者 - 消費者模型和發布 - 訂閱模型。在這個模型中,生產者負責將消息發送到 Kafka 集群的指定主題(Topic),而消費者則從感興趣的主題中訂閱并獲取消息進行處理。
當生產者有消息要發送時,它并不會直接將消息發送給消費者,而是將消息發送到 Kafka 集群的主題中。Kafka 集群會將這些消息持久化存儲在磁盤上,并根據分區(Partition)策略將消息分配到不同的分區中。生產者發送消息的過程是異步的,它在將消息發送到 Kafka 集群后,不需要等待 Kafka 集群的確認響應,就可以繼續執行其他操作。這大大提高了生產者的發送效率,減少了因為等待響應而造成的時間浪費,使得生產者能夠在短時間內發送大量的消息,提高了系統的吞吐量。
消費者則通過訂閱主題來獲取消息。消費者可以以消費者組(Consumer Group)的形式存在,同一個消費者組內的不同消費者實例可以同時從同一個主題的不同分區中讀取消息并進行處理,實現了消息的并行消費。消費者在消費消息時,采用的是拉(Pull)模式,即消費者主動從 Kafka 集群中拉取消息。消費者會定期向 Kafka 集群發送拉取請求,Kafka 集群根據消費者的請求,將相應分區中的消息返回給消費者。消費者接收到消息后,根據自身的業務邏輯進行處理。處理完成后,消費者會將消費位移(Offset)提交給 Kafka 集群,以便在下次消費時能夠從上次消費的位置繼續讀取消息,保證消息的順序消費和不重復消費。
這種異步通信機制實現了生產者和消費者之間的解耦。生產者不需要關心消費者是否能夠及時接收和處理消息,它只需要將消息發送到 Kafka 集群即可;消費者也不需要關心消息是由哪個生產者發送的,它只需要從 Kafka 集群中獲取自己感興趣的消息進行處理。通過 Kafka 這個中間層,生產者和消費者之間的依賴關系被大大降低,它們可以獨立地進行擴展、升級和維護,而不會相互影響。
4.2 Kafka 異步通信的優勢
4.2.1 解耦服務
在微服務架構中,各個微服務之間往往存在復雜的依賴關系。如果采用同步通信方式,一個微服務的接口發生變化,可能會導致依賴它的其他微服務也需要進行相應的修改,這增加了系統的維護成本和風險。而 Kafka 的異步通信機制通過消息隊列將微服務之間的直接調用轉換為間接的消息傳遞,實現了服務之間的解耦。每個微服務只需要關注自己的業務邏輯和與 Kafka 的交互,不需要關心其他微服務的實現細節和接口變化。例如,在一個電商系統中,訂單服務在創建訂單后,將訂單消息發送到 Kafka 的 “order - topic” 主題中,庫存服務和物流服務通過訂閱該主題來獲取訂單消息并進行相應的處理。當訂單服務的業務邏輯發生變化,或者需要對訂單消息的格式進行調整時,只需要在訂單服務內部進行修改,而不會影響到庫存服務和物流服務的正常運行,因為它們是通過 Kafka 的消息隊列進行通信的,彼此之間沒有直接的依賴關系。
4.2.2 削峰填谷
在一些業務場景中,系統可能會面臨突發的高并發請求,例如電商平臺的促銷活動、社交媒體平臺的熱點事件等。如果采用同步通信方式,系統可能會因為無法及時處理大量的請求而導致性能下降甚至崩潰。Kafka 的異步通信機制可以很好地解決這個問題,它通過消息隊列的緩沖作用,將大量的請求消息暫時存儲起來,然后在系統負載較低時,再將消息逐步發送給消費者進行處理,實現了削峰填谷的功能。例如,在電商促銷活動期間,短時間內會產生大量的訂單請求,訂單服務將這些訂單消息發送到 Kafka 的 “order - topic” 主題中,Kafka 集群可以存儲這些大量的訂單消息。庫存服務和物流服務可以根據自身的處理能力,從 Kafka 集群中逐步拉取訂單消息進行處理,避免了因為瞬間高并發請求而導致系統負載過高的問題,保證了系統的穩定性和可靠性。
4.2.3 提升系統響應速度
由于 Kafka 的異步通信機制使得生產者在發送消息后不需要等待消費者的響應,就可以繼續執行其他操作,這大大提高了系統的響應速度。在一些對響應時間要求較高的業務場景中,如用戶注冊、下單等操作,用戶希望能夠盡快得到系統的反饋。通過使用 Kafka 進行異步通信,生產者可以快速地將消息發送到 Kafka 集群,然后立即返回響應給用戶,而不需要等待相關業務邏輯的全部處理完成。例如,在一個在線教育平臺中,用戶進行課程購買操作時,訂單服務將訂單消息發送到 Kafka 集群后,立即返回 “訂單提交成功” 的響應給用戶,后續的課程分配、支付處理等操作由其他微服務通過訂閱 Kafka 消息來異步完成。這樣,用戶能夠在最短的時間內得到系統的響應,提升了用戶體驗,同時也提高了系統的整體處理能力,因為各個微服務可以在后臺并行地處理消息,而不會相互阻塞。
4.3 代碼示例
以下是使用 Java 語言實現 Kafka 生產者和消費者的代碼示例,并對關鍵配置和消息收發邏輯進行解釋。
首先,需要在項目中引入 Kafka 的客戶端依賴。如果使用 Maven 項目,可以在pom.xml文件中添加以下依賴:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
4.3.1 Kafka 生產者代碼示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Kafka集群地址
String bootstrapServers = "localhost:9092";
// 要發送到的主題
String topic = "test-topic";
// 配置生產者屬性
Properties props = new Properties();
// Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 消息的鍵序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 消息的值序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 確認機制,"all"表示等待所有副本確認
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 創建生產者實例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 要發送的消息
String key = "key1";
String value = "Hello, Kafka!";
// 創建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
try {
// 發送消息,并同步獲取發送結果
RecordMetadata metadata = producer.send(record).get();
System.out.println("Message sent successfully: " +
"Topic: " + metadata.topic() +
", Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
System.out.println("Failed to send message: " + e.getMessage());
} finally {
// 關閉生產者
producer.close();
}
}
}
關鍵配置解釋:
- BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址,生產者通過該地址與 Kafka 集群建立連接。
- KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG:分別指定消息的鍵和值的序列化器,用于將消息轉換為字節數組發送到 Kafka 集群。這里使用StringSerializer將字符串類型的鍵和值進行序列化。
- ACKS_CONFIG:指定生產者的確認機制。"all" 表示生產者在發送消息后,需要等待所有副本都確認收到消息后才認為發送成功,這可以保證消息的可靠性,但會降低發送的性能;"1" 表示生產者只需要等待領導者副本確認收到消息即可;"0" 表示生產者發送消息后不需要等待任何確認。
消息發送邏輯解釋:
- 創建Properties對象,設置生產者的配置屬性。
- 使用配置屬性創建KafkaProducer實例。
- 創建ProducerRecord對象,指定要發送的主題、消息的鍵和值。
- 使用producer.send(record).get()方法發送消息,并通過get()方法同步獲取發送結果。如果發送成功,get()方法會返回一個RecordMetadata對象,包含消息發送到的主題、分區和偏移量等信息;如果發送失敗,會拋出InterruptedException或ExecutionException異常。
- 最后,在finally塊中關閉生產者,釋放資源。
4.3.2 Kafka 消費者代碼示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// Kafka集群地址
String bootstrapServers = "localhost:9092";
// 消費者組ID
String groupId = "test-group";
// 要訂閱的主題
String topic = "test-topic";
// 配置消費者屬性
Properties props = new Properties();
// Kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 消費者組ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 消息的鍵反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 消息的值反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 自動提交位移的時間間隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 自動提交位移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 創建消費者實例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
// 拉取消息,每次拉取等待100毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " +
"Topic: " + record.topic() +
", Partition: " + record.partition() +
", Offset: " + record.offset() +
", Key: " + record.key() +
", Value: " + record.value());
}
}
} finally {
// 關閉消費者
consumer.close();
}
}
}
關鍵配置解釋:
- BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址,消費者通過該地址與 Kafka 集群建立連接。
- GROUP_ID_CONFIG:指定消費者組 ID,同一個消費者組內的消費者共同消費訂閱主題的消息。
- KEY_DESERIALIZER_CLASS_CONFIG和VALUE_DESERIALIZER_CLASS_CONFIG:分別指定消息的鍵和值的反序列化器,用于將從 Kafka 集群接收到的字節數組轉換為具體的對象類型。這里使用StringDeserializer將字節數組反序列化為字符串類型的鍵和值。
- AUTO_COMMIT_INTERVAL_MS_CONFIG:指定自動提交消費位移的時間間隔,單位為毫秒。消費者在拉取并處理消息后,會按照這個時間間隔自動將消費位移提交給 Kafka 集群。
- ENABLE_AUTO_COMMIT_CONFIG:設置是否開啟自動提交消費位移功能,"true" 表示開啟,"false" 表示關閉。如果關閉自動提交,需要手動調用consumer.commitSync()或consumer.commitAsync()方法來提交消費位移。
消息接收邏輯解釋:
- 創建Properties對象,設置消費者的配置屬性。
- 使用配置屬性創建KafkaConsumer實例。
- 使用consumer.subscribe(Collections.singletonList(topic))方法訂閱指定的主題,這里Collections.singletonList(topic)表示將主題封裝成一個單元素的列表。
- 在一個無限循環中,使用consumer.poll(Duration.ofMillis(100))方法拉取消息,每次拉取等待 100 毫秒。poll()方法會返回一個ConsumerRecords對象,包含拉取到的所有消息。
- 遍歷ConsumerRecords對象,獲取每個ConsumerRecord,并打印消息的相關信息,包括主題、分區、偏移量、鍵和值。
- 最后,在finally塊中關閉消費者,釋放資源。