Kafka入門及實戰應用指南

1、Kafka概述

Apache Kafka是由LinkedIn公司于2010年開發的一款分布式消息系統,旨在解決當時傳統消息隊列(如ActiveMQ、RabbitMQ)在高吞吐量實時性場景下的性能瓶頸。隨著LinkedIn內部對實時日志處理、用戶行為追蹤等需求的激增,Kafka逐漸演化為一個支持水平擴展持久化存儲的流數據平臺。2011年,Kafka成為Apache基金會頂級開源項目,并在全球范圍內被廣泛應用于大數據、實時計算和微服務架構領域。

Kafka的設計哲學源于發布-訂閱模型,但其創新性地引入了分布式存儲分區化處理機制,使得系統能夠高效處理每秒百萬級的消息吞吐。這一特性使其迅速成為現代數據管道(Data Pipeline)和流式處理(Stream Processing)的核心組件。

Kafka是一個開源的高吞吐量的分布式消息中間件,對比于其他

1、緩沖和削峰:上游數據時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務就可以按照自己的節奏進行慢慢處理。

2、解耦和擴展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業務流程。只需要遵守約定,針對數據編程即可獲取擴展能力。

3、冗余:可以采用一對多的方式,一個生產者發布消息,可以被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用。

4、健壯性:消息隊列可以堆積請求,所以消費端業務即使短時間死掉,也不會影響主要業務的正常進行。

5、異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

2、核心組件

組件

說明

Producer

消息生產者,向Kafka發送消息

Consumer

消息消費者,從Kafka讀取消息

Broker

Kafka服務器節點,組成Kafka集群

Topic

消息類別/主題,生產者發送到特定Topic,消費者訂閱特定Topic

Partition

Topic的分區,實現并行處理和水平擴展

Offset

消息在分區中的唯一標識(位移)

ZooKeeper

管理Kafka集群元數據(新版本已逐步移除ZooKeeper依賴)

3、Kafka的特點與優勢

1. 高吞吐量與低延遲

Kafka通過批處理順序磁盤I/O零拷貝技術(Zero-Copy)優化數據傳輸效率。生產者(Producer)將消息批量發送至Broker,消費者(Consumer)按順序拉取數據,避免了傳統消息系統的頻繁網絡交互。實測中,單臺Broker可輕松支持每秒數十萬條消息的讀寫。

2. 水平擴展與容錯性

Kafka集群由多個Broker(服務器節點)組成,支持動態擴容。每個主題(Topic)被劃分為多個分區(Partition),分區可分布在不同Broker上,通過多副本(Replica)機制實現數據冗余。若某Broker宕機,其他副本會自動接管服務,確保系統的高可用性。

3. 持久化存儲與回溯消費

消息在Kafka中默認保留7天(可配置為永久存儲),消費者可隨時重置偏移量(Offset)以重新消費歷史數據。這一特性在數據重放、故障恢復等場景中至關重要。

4. 生態兼容性

Kafka與主流大數據工具(如Spark、Flink、Hadoop)深度集成,并提供了Connect API和Streams API,支持構建端到端的流處理管道。

4、Kafka 使用場景

  1. 實時流處理:用戶行為追蹤、實時推薦
  2. 日志收集:集中式日志系統
  3. 事件源:微服務間的事件驅動架構
  4. 消息隊列:系統解耦、削峰填谷
  5. Metrics收集:監控數據聚合

5、什么是Zookeeper?

Zookeeper是一個高性能、高可靠的分布式協調服務,最初由雅虎開發,是Google Chubby的開源實現。它被廣泛應用于分布式系統中,用于解決分布式應用中的協調問題,如配置管理、服務注冊與發現、分布式鎖等。Zookeeper的設計目標是封裝復雜且容易出錯的關鍵服務,為分布式應用提供簡單易用的接口。

6、Zookeeper的應用場景

Zookeeper在分布式系統中扮演著重要的角色,其典型應用場景包括:

  • 配置管理:Zookeeper可以作為分布式系統的配置中心,集中管理配置信息,確保所有節點能夠獲取到最新的配置。
  • 服務注冊與發現:分布式系統中的服務可以通過Zookeeper進行注冊,客戶端可以通過查詢Zookeeper來發現所需服務。
  • 分布式鎖:Zookeeper提供了一種實現分布式鎖的機制,確保多個節點對共享資源的訪問是互斥的。
  • 集群管理:Zookeeper能夠監控集群中節點的狀態,及時發現并處理節點故障。
  • 消息隊列:Zookeeper可以用于實現分布式消息隊列中的協調功能。

7、Zookeeper核心特性

Zookeeper具有以下關鍵特性:

  • 順序一致性:客戶端的更新操作按照其發送的順序被應用到Zookeeper上,確保了操作的順序性。
  • 原子性:所有對Zookeeper的操作都是原子的,要么全部成功,要么全部失敗。
  • 單一系統映像:無論Zookeeper集群中有多少節點,客戶端看到的都是一個單一的、一致的視圖。
  • 可靠性:Zookeeper通過副本機制和選舉算法確保系統的高可用性。
  • 實時性:Zookeeper能夠實時監控節點的狀態變化,并及時通知客戶端。

8、ZooKeeper 的作用

ZooKeeper 是一個分布式協調服務,為 Kafka 提供以下關鍵功能:

功能

具體說明

集群管理

記錄 Kafka Broker 的節點狀態(存活/下線),維護 Broker 列表。

Controller 選舉

Kafka 集群需要一個主控制器(Controller)處理分區和副本管理,ZooKeeper 負責選舉。

Topic 元數據存儲

存儲 Topic 的分區信息、副本分配、Leader 選舉結果等元數據。

消費者組管理

記錄消費者組的 offset(Kafka 2.8+ 已支持脫離 ZooKeeper,默認仍依賴)。

分布式鎖

確保多個 Broker 或客戶端操作時的數據一致性(如分區遷移)。

9、Kafka的安裝

1、安裝JAVA環境

yum -y install java-1.8.0-openjdk

2、安裝ZooKeeper

mkdir -p /opt/kafka
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
cd apache-zookeeper-3.8.1-bin/conf
cp zoo_sample.cfg zoo.cfg
../bin/zkServer.sh  start

3、安裝Kafka

# 解壓
tar xvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
ls
# 啟動
bin/kafka-server-start.sh config/server.properties#后臺啟動
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &查看日志
tail -f kafka.log

單機啟動,并且ZooKeeper也剛好在本機,所以我們默認不需要修改任何配置就可以直接啟動。如果不在一起則修改配置文件:config/server.properties文件。

4、IDEA連接Kafka

cd /opt/kafka/kafka_2.13-2.8.2/configvim server.properties# 修改內容
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.142.131:9092

安裝Kafka插件進行連接

10、SpringBoot整合Kafka

1、導入jar包

<!--kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>

2、編寫配置

  # Kafka 相關配置kafka:# Kafka 服務器地址bootstrap-servers: 192.168.142.131:9092# 生產者配置producer:# 序列化器,用于將鍵轉換為字節key-serializer: org.apache.kafka.common.serialization.StringSerializer# 序列化器,用于將值轉換為字節value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消費者配置consumer:# 消費者組ID,用于區分不同的消費者組group-id: my-application-group# 自動偏移量重置策略,當沒有初始偏移量時從最早的偏移量開始消費auto-offset-reset: earliest# 禁用自動提交功能enable-auto-commit: false# 反序列化器,用于將字節轉換為鍵key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 反序列化器,用于將字節轉換為值value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 配置監聽器的確認模式為手動立即確認listener:ack-mode: manual_immediate# 主題配置topic:default: default-topicmanual: manual-commit-topic

3、Kafka消息生產者

package com.lw.mqdemo.mq.kafka;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** Kafka消息生產者服務類*/
@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 發送消息到指定主題* @param topic 主題名稱* @param message 消息內容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}/*** 發送消息到指定主題和分區* @param topic 主題名稱* @param partition 分區號* @param key 消息鍵* @param message 消息內容*/public void sendMessage(String topic, Integer partition, String key, String message) {kafkaTemplate.send(topic, partition, key, message);}
}

4、Kafka消息消費者

package com.lw.mqdemo.mq.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;/*** Kafka 消費者服務*/
@Service
public class KafkaConsumerService {/*** 監聽指定主題的消息(自動提交)* @param message 消息內容*/@KafkaListener(topics = "${spring.kafka.topic.default}", groupId = "${spring.kafka.consumer.group-id}")public void consumeAutoCommit(String message) {System.out.println("接收到自動提交消息: " + message);// 業務處理邏輯}/*** 監聽指定主題的消息(手動提交)* 偏移量(Offset) 是 Kafka 為分區(Partition)中的每條消息分配的唯一序號(從 0 開始遞增),用于標識消息在分區中的位置。* @param record 消息記錄(包含元數據)* @param ack 確認對象*/@KafkaListener(topics = "${spring.kafka.topic.manual}", groupId = "${spring.kafka.consumer.group-id}")public void consumeManualCommit(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.println("接收到手動提交消息: key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());// 業務處理邏輯// 手動提交偏移量ack.acknowledge();} catch (Exception e) {// 處理異常,可以選擇不提交偏移量以便重試System.err.println("消息處理失敗: " + e.getMessage());}}
}

5、測試類

package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.kafka.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** Kafka 控制器*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}/*** 發送消息到指定的Kafka主題*/@PostMapping("/send")public String sendMessage(@RequestParam("topic") String topic,@RequestParam("message") String message) {producerService.sendMessage(topic, message);return "消息已發送到主題: " + topic;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/85366.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/85366.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/85366.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

智能指針 c++

C 智能指針詳解 智能指針是 C11 引入的內存管理工具&#xff0c;位于 <memory> 頭文件中&#xff0c;用于自動管理動態分配的內存&#xff0c;防止內存泄漏。主要類型如下&#xff1a; 1. std::unique_ptr (獨占所有權) 特點&#xff1a;唯一擁有所指對象&#xff0c;不…

Python應用八股文

大家好!在 Python 學習的道路上&#xff0c;掌握一些基礎知識要點至關重要&#xff0c;這些要點常被稱為“Python 八股”。以下是對它們的簡易總結&#xff0c;幫助你快速回顧和鞏固 Python 的核心概念。 一、數據結構 列表&#xff08;List&#xff09;&#xff1a;有序可變序…

【技術深度】領碼SPARK破解微服務數據依賴困局:架構設計與實踐指南

——深度解析分布式數據冗余與異步消息機制&#xff0c;驅動企業數字化轉型加速 ? 核心摘要 本文從技術架構與工程實現的角度&#xff0c;系統講解領碼SPARK融合平臺如何精準解決微服務架構下數據依賴“卡脖子”問題。通過設計高效的數據冗余模型和完善的異步消息更新機制&am…

關于前端的防抖和節流

給我解釋下 前端開發中的防抖和節流 并舉個具體的例子 防抖&#xff08;Debounce&#xff09;與節流&#xff08;Throttle&#xff09;詳解 在前端開發中&#xff0c;防抖&#xff08;Debounce&#xff09; 和 節流&#xff08;Throttle&#xff09; 是兩種優化高頻觸發事件的…

React-router 多類型歷史記錄棧

react-router 為了滿足開發者更多路由歷史存儲場景&#xff0c;提供了以下幾種模式&#xff1a; 瀏覽器原生歷史記錄 瀏覽器 hash 內存型 服務端記錄 以上實現分別對應于一下 API 實現&#xff1a; createBrowserRouter&#xff1a;瀏覽器提供的歷史管理。 createHashRou…

java設計模式[3]之結構型模式

文章目錄 一 代理模式1.1 靜態代理1.1.1 靜態代理的結構1.1.2 靜態代理的特點1.1.3 靜態代理的應用場景1.1.4 靜態代理的案例代碼 1.2 JDK動態代理1.2.1 JDK動態代理概述1.2.2 JDK動態代理案例代碼1.2.3 JDK動態代理的應用場景1.2.4 JDK動態代理的特點1.2.5 與創建型模式的區別…

鴻蒙Harmony測試-wukong穩定性工具(類似Android的Monkey測試)

一、功能介紹 wukong是系統自帶的一種命令行工具&#xff0c;支持Ability的隨機事件注入、控件注入、異常捕獲、報告生成和對Ability數據遍歷截圖等特性。通過模擬用戶行為&#xff0c;對系統或應用進行穩定性壓力測試。wukong分為隨機測試、專項測試和專注測試。 隨機測試是指…

從零學起VIM

前言 筆者早年剛入行的時候就接觸過Vim,當時還是真正的菜鳥&#xff0c;帶我的師父是一個華為骨干員工&#xff0c;猶記得他給我指導如何保存并關閉文本&#xff1a;按Esc&#xff0c;然后輸入:wq。還記得自己打開Vim編輯器&#xff0c;一個字符都敲不進去&#xff0c;然后問旁…

不依賴rerank 模型排序通過使用 PostgreSQL 中的 pgvector 與 tsearch2 函數進行混合搜索提高召回率

前言 在向量搜索中&#xff0c;召回率是一個關鍵指標&#xff0c;它衡量搜索結果的相關性。然而&#xff0c;提高召回率往往會犧牲其他指標&#xff0c;如索引大小或查詢延遲。為了平衡這些權衡&#xff0c;混合搜索技術應運而生。本文將介紹如何在 PostgreSQL 中結合 pgvecto…

Uniapp 跨平臺開發框架全面解析:一次開發,多端運行

在移動互聯網時代&#xff0c;開發者面臨著一個重要挑戰&#xff1a;如何高效地開發出能在多個平臺&#xff08;iOS、Android、Web、小程序等&#xff09;上運行的應用&#xff1f;傳統的原生開發方式需要為每個平臺單獨編寫代碼&#xff0c;導致開發周期長、維護成本高。而 Un…

ios如何把H5網頁變成主屏幕webapp應用

一、將 H5 頁面添加到主屏幕的步驟 打開 Safari 瀏覽器 在 iPhone 上打開 Safari 瀏覽器&#xff0c;訪問目標網頁&#xff08;H5 頁面&#xff09;。 點擊分享按鈕 在 Safari 瀏覽器底部點擊 “分享” 圖標&#xff08;箭頭向上的按鈕&#xff09;。 添加到主屏幕 在分享菜單…

Node.js 項目啟動命令大全 (形象版)

文章目錄 Node.js 項目啟動命令大全 &#x1f31f;?&#xff08;形象版&#xff09;一、&#x1f50d; 如何查看項目啟動命令&#xff08;魔法書目錄&#xff09;package.json scripts 參數詳解開發相關腳本測試相關腳本構建相關腳本代碼質量相關腳本最佳實踐 二、&#x1f68…

愛普特APT32F1104C8T6單片機 高抗干擾+硬件加密雙保障

愛普特APT32F1104C8T6單片機深度解析 1. 產品定位 APT32F1104C8T6 是愛普特半導體&#xff08;APT&#xff09;推出的 32位高性能經濟型單片機&#xff0c;基于 ARM Cortex-M0內核&#xff0c;采用 LQFP48封裝&#xff0c;主打 高性價比、低功耗、強抗干擾&#xff0c;是替代進…

使用uni-app ios 打包流程

配置幾個步驟即可 1、打包ios需要BundleID ID 2、證書私鑰密碼 3、信任文件證書文件 4、私鑰證書 5、打包 6、獲取打包后的ipa文件 7、通過愛思助手安裝到iso手機上 8、完成 1、下載&#xff1a;App Uploader去獲取我們想要的證書私鑰等文件 2、下載完成解壓后的文件如下打…

仿muduo庫實現并發服務器

1.實現目標 仿muduo庫One Thread One Loop式主從Reactor模型實現高并發服務器&#xff1a; 通過實現高并發服務器的組件&#xff0c;可以快速實現一個高并發服務器的搭建&#xff0c;并且&#xff0c;通過組內不同應用層協議的支持&#xff0c;可以快速完成高性能服務器的搭建…

迭代器模式:集合遍歷的統一之道

引言&#xff1a;集合遍歷的演進之路 在軟件開發中&#xff0c;集合遍歷是我們每天都要面對的基礎操作。從最初的數組索引遍歷到現代的流式處理&#xff0c;我們經歷了&#xff1a; #mermaid-svg-KwTr9k8JgbwRTDhU {font-family:"trebuchet ms",verdana,arial,sans-…

Spring Security OAuth2 組件

我們來系統地講解一下 Spring Security OAuth2 這個強大的組件。我會從概念、作用、核心組件&#xff0c;以及實際應用場景來為你剖析。 1. 什么是 Spring Security OAuth2&#xff1f; 簡單來說&#xff0c;Spring Security OAuth2 是 Spring Security 框架的一個模塊&#…

Redis的持久化功能

Redis的持久化功能能夠將內存中的數據保存到磁盤&#xff0c;從而在重啟后恢復數據。下面為你詳細介紹Redis的兩種主要持久化方式及其配置方法。 RDB&#xff08;Redis Database&#xff09;持久化 RDB持久化是通過生成某個時間點的數據集快照來實現的。它具有高性能的特點&a…

Chrome 將成為下一個 IE6

最近在技術圈刷到一個帖子&#xff0c;說&#xff1a;“Chrome 就快變成新的 IE6 了。” 乍一看有點危言聳聽&#xff0c;但你一細品&#xff0c;發現還真挺像回事。 想當年&#xff1a;IE6 是怎么垮的&#xff1f; IE6 當年多風光&#xff1f;全球市場份額一度超過 90%&#…

Redis 配置文件詳解redis.conf 從入門到實戰

一、redis.conf 是什么&#xff1f; Redis 的配置文件&#xff08;默認命名為 redis.conf&#xff0c;Redis 8.0 之后改為 redis-full.conf&#xff09;控制著服務運行的各項參數。該文件采用以下結構&#xff1a; 指令名 參數1 參數2 ... 參數N例如&#xff1a; replicaof …