【kafka】kafka概念,使用技巧go示例

1. Kafka基礎概念

1.1 什么是Kafka?

Kafka是一個分布式流處理平臺,用于構建實時數據管道和流式應用。核心特點:

  • 高吞吐量:每秒可處理百萬級消息
  • 持久化存儲:消息按Topic分區存儲在磁盤
  • 分布式架構:支持水平擴展
  • 高可用性:通過副本機制保證數據不丟失
1.2 核心組件
  • Topic(主題):消息的邏輯分類,如user_loginorder_create
  • Partition(分區):Topic的物理分片,每個分區是有序的日志文件
  • Broker(代理):Kafka集群中的服務器節點
  • Producer(生產者):向Topic發送消息的應用
  • Consumer(消費者):從Topic接收消息的應用
  • Consumer Group(消費者組):多個消費者組成的組,共同消費Topic數據

2. Go語言操作Kafka

2.1 選擇客戶端庫

Go語言中推薦使用confluent-kafka-go庫,它基于librdkafka實現,性能優秀且功能完整:

go get -u github.com/confluentinc/confluent-kafka-go/kafka
2.2 生產者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置生產者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092",  // Kafka集群地址"acks":              "all",             // 所有副本確認"retries":           5,                 // 重試次數})if err != nil {panic(err)}defer p.Close()// 異步處理發送結果go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\n", ev.TopicPartition)}}}}()// 發送消息topic := "user_login"for i := 0; i < 10; i++ {value := fmt.Sprintf("Hello Kafka %d", i)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),}, nil)}// 等待所有消息發送完成p.Flush(15 * 1000)  // 超時15秒// 優雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)<-sigchan
}
2.3 消費者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置消費者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","auto.offset.reset": "earliest",  // 從最早的消息開始消費})if err != nil {panic(err)}defer c.Close()// 訂閱主題topic := "user_login"c.SubscribeTopics([]string{topic}, nil)// 處理信號,優雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)run := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:ev := c.Poll(100)  // 輪詢100msif ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Message on %s: %s\n",e.TopicPartition, string(e.Value))// 手動提交偏移量c.CommitMessage(e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)if e.Code() == kafka.ErrAllBrokersDown {run = false}default:// 忽略其他事件}}}fmt.Println("Closing consumer")
}

3. 高級特性與最佳實踐

3.1 消息分區策略

Kafka通過分區實現并行處理,生產者可指定分區策略:

// 1. 輪詢(默認):均勻分布消息到各分區
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),
}, nil)// 2. 基于Key哈希:相同Key的消息發到同一分區
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(userID),  // 根據用戶ID哈希到固定分區Value:          []byte(value),
}, nil)
3.2 消費者組與分區分配
  • 同一消費者組內的消費者共同消費Topic的所有分區
  • 每個分區只能被組內一個消費者消費
  • 消費者數量超過分區數時,多余的消費者空閑
3.3 手動提交偏移量
// 配置手動提交
config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","enable.auto.commit": false,  // 禁用自動提交
}// 消費消息后手動提交
for {msg, err := c.ReadMessage(-1)  // 阻塞讀取if err == nil {fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))// 處理消息...// 手動提交當前消息的偏移量_, err := c.CommitMessage(msg)if err != nil {fmt.Printf("Failed to commit offset: %v\n", err)}}
}
3.4 事務處理
// 配置事務生產者
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","transactional.id":  "my-transactional-id",
})
if err != nil {panic(err)
}// 初始化事務
p.InitTransactions(10 * time.Second)// 開始事務
p.BeginTransaction()// 發送多條消息
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic1}, Value: []byte("msg1")}, nil)
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic2}, Value: []byte("msg2")}, nil)// 提交事務
err = p.CommitTransaction(10 * time.Second)
if err != nil {p.AbortTransaction(10 * time.Second)  // 回滾
}

4. 企業級實戰案例

4.1 異步日志處理
// 生產者:收集應用日志發送到Kafka
func LogToKafka(level, message string) {p, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "app_logs"msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(level),Value:          []byte(message),}p.Produce(msg, nil)p.Flush(2 * 1000)  // 等待2秒
}// 消費者:從Kafka讀取日志并存儲到Elasticsearch
func ConsumeAndIndex() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "log-consumer-group",})c.SubscribeTopics([]string{"app_logs"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 發送到ElasticsearchsendToES(string(msg.Key), string(msg.Value))}}
}
4.2 微服務間事件驅動通信
// 訂單服務:創建訂單后發送事件
func CreateOrder(userID, productID string, amount float64) {// 1. 創建訂單orderID := generateOrderID()saveOrderToDB(orderID, userID, productID, amount)// 2. 發送訂單創建事件到Kafkap, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "order_created"event := fmt.Sprintf(`{"order_id": "%s", "user_id": "%s", "amount": %.2f}`, orderID, userID, amount)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(event),}, nil)
}// 庫存服務:監聽訂單創建事件并扣減庫存
func StartInventoryService() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "inventory-service-group",})c.SubscribeTopics([]string{"order_created"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 解析訂單事件var orderEvent struct {OrderID string  `json:"order_id"`UserID  string  `json:"user_id"`Amount  float64 `json:"amount"`}json.Unmarshal(msg.Value, &orderEvent)// 扣減庫存deductInventory(orderEvent.ProductID, 1)}}
}

5. 性能優化與常見問題

5.1 生產者性能優化
  • 批量發送:設置batch.sizelinger.ms
  • 壓縮消息:啟用compression.type(如snappylz4
  • 異步發送:使用回調函數處理發送結果
5.2 消費者性能優化
  • 增加分區數:提高并行消費能力
  • 多消費者實例:通過消費者組水平擴展
  • 合理批量處理:批量拉取消息,批量提交偏移量
5.3 常見問題排查
問題原因解決方案
消息丟失acks配置不當、副本數不足設置acks=all,確保至少2個副本
消費滯后消費速度慢、分區數不足增加消費者、提高處理效率、增加分區數
重復消費偏移量提交時機不當處理完消息后再提交偏移量,或使用事務
生產者吞吐量低批處理參數不合理、網絡延遲增大batch.sizelinger.ms,優化網絡連接

6. 生產環境部署建議

  1. 多Broker集群:至少3個Broker,提高可用性
  2. 合理分區數:根據業務量預估,建議單個Topic分區數≥3
  3. 數據備份:定期備份Kafka日志
  4. 監控系統:集成Prometheus、Grafana監控Kafka性能
  5. 安全配置:啟用SSL/TLS加密、SASL認證

總結:Go語言使用Kafka的最佳實踐

  1. 生產者

    • 使用異步發送提高吞吐量
    • 合理配置acks和重試次數保證消息不丟失
    • 根據業務需求選擇分區策略
  2. 消費者

    • 使用消費者組實現水平擴展
    • 手動提交偏移量確保消息處理可靠性
    • 處理消息失敗時考慮重試或死信隊列
  3. 性能與可靠性

    • 批量處理提高效率
    • 監控關鍵指標(如Lag、吞吐量)
    • 設計冪等消費邏輯應對重復消息

https://github.com/0voice

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/83797.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/83797.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/83797.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

掌握Git:版本控制與高效協作指南

一、初始Git 提出問題&#xff1a;無論是在工作還是學習&#xff0c;我們在編寫各種文檔的時候&#xff0c;更改失誤&#xff0c;失誤后恢復到原來版本&#xff0c;不得不復制出一個副本。 每個版本由各自的內容&#xff0c;但最終只有一個報告需要被我們使用。 但在此之前的…

【生活相關-日語-日本-東京-搬家后-引越(ひっこし)(3)-踩坑點:國民健康保險】

【生活相關-日語-日本-東京-搬家后-引越&#xff08;ひっこし&#xff09;&#xff08;3&#xff09;-注意點&#xff1a;國民健康保險】 1、前言2、情況說明&#xff08;1&#xff09;問題說明&#xff08;2&#xff09;情況說明&#xff08;1&#xff09;收到情況&#xff08…

linux——mysql故障排查與生產環境優化

目錄 一&#xff0c;mysql數據庫常見的故障 1&#xff0c;故障現象1 2&#xff0c;故障現象2 3&#xff0c;故障現象3 &#xff14;&#xff0c;故障現象&#xff14; &#xff15;&#xff0c;故障現象&#xff15; &#xff16;&#xff0c;故障現象&#xff16; 二&…

【C#】用 DevExpress 創建帶“下拉子表”的參數表格視圖

展示如何用 DevExpress 創建帶“下拉子表”的參數表格視圖。主表為 參數行 ParamRow&#xff0c;子表為 子項 ChildParam。 一、創建模型類 public class ParamRow {public string Pn { get; set; }public string DisplayName { get; set; }public string Value { get; set; }…

【JavaScript】用 Proxy 攔截對象屬性

目錄 一、Proxy 的基本結構&#xff08;打地基&#xff09; 二、最常用的兩個攔截方法&#xff1a;get 和 set 1. get(target, key) 2. set(target, key, value) 三、說到這&#xff0c;那就可以回到題目來 四、什么是 Reflect&#xff1f; 總結不易&#xff0c;本章節對…

[IMX] 02.GPIO 寄存器

目錄 手冊對應章節 1.GPIO 復用&#xff08;引腳功能選擇&#xff09;- IOMUXC_SW_MUX_CTL_PAD_xxx 2.GPIO 電氣特性 - IOMUXC_SW_PAD_CTL_PAD_xxx 3.GPIO 數據與控制寄存器 3.1.數據 - DR 3.2.輸入/輸出選擇 - GDIR 3.3.狀態 - PSR 3.4.中斷觸發控制 - ICR 3.5.中斷使…

Tomcat 配置 HTTPS 訪問全攻略(CentOS 環境)

Tomcat 配置 HTTPS 訪問全攻略&#xff08;CentOS 環境&#xff09; 一、環境說明 操作系統&#xff1a;CentOS Tomcat 版本&#xff1a;Apache Tomcat/9.0.105 服務器 IP&#xff1a;192.168.1.35 目標&#xff1a;將 Tomcat 默認的 HTTP 訪問升級為 HTTPS&#xff0c;提…

Flink 運維監控與指標采集實戰(Prometheus + Grafana 全流程)

一、引言:為什么 Flink 運維監控如此重要? 在實時計算場景中,Flink 作業 724 小時運行,對性能、資源、故障感知、狀態變化的實時監控非常關鍵。沒有有效的運維可觀測體系: 不知道任務是否在穩定運行 發生問題難以快速定位 無法感知背壓、延遲、反壓等狀態 因此,構建完善…

【prometheus+Grafana篇】基于Prometheus+Grafana實現Oracle數據庫的監控與可視化

&#x1f4ab;《博主主頁》&#xff1a; &#x1f50e; CSDN主頁 &#x1f50e; IF Club社區主頁 &#x1f525;《擅長領域》&#xff1a;擅長阿里云AnalyticDB for MySQL(分布式數據倉庫)、Oracle、MySQL、Linux、prometheus監控&#xff1b;并對SQLserver、NoSQL(MongoDB)有了…

【數據倉庫面試題合集③】實時數倉建模思路與實踐詳解

實時數據倉庫已經成為各大企業構建核心指標監控與業務實時洞察的基礎能力。面試中,關于實時建模的題目頻繁出現,尤其聚焦于建模思路、寬表設計、狀態管理、亂序處理等方面。本文整理典型題目及答題思路,幫助你應對相關考察。 一、建模原則與數倉分層認知 1. 實時數倉與離線…

鴻蒙PC操作系統:從Linux到自研微內核的蛻變

鴻蒙PC操作系統是否基于Linux內核,需要結合其技術架構、發展階段和官方聲明綜合分析。以下從多個角度展開論述: 一、鴻蒙操作系統的多內核架構設計 多內核混合架構 根據資料,鴻蒙操作系統(HarmonyOS)采用分層多內核架構,內核層包含Linux內核、LiteOS-m內核、LiteOS-a內核…

LabVIEW數據庫使用說明

介紹LabVIEW如何在數據庫中插入記錄以及執行 SQL 查詢&#xff0c;適用于對數據庫進行數據管理和操作的場景。借助 Database Connectivity Toolkit&#xff0c;可便捷地與指定數據庫交互。 各 VI 功能詳述 左側 VI 功能概述&#xff1a;實現向數據庫表中插入數據的操作。當輸入…

【docker】--docker file編寫教程

文章目錄 構建docker file 鏡像常用命令速查表一、基礎指令&#xff08;指定鏡像和執行命令&#xff09;二、構建上下文管理三、設置鏡像內部環境四、容器運行配置五、多階段構建&#xff08;可選進階&#xff09; 構建docker file 鏡像 # -f 指定dockerfile # -t 鏡像名和tag…

WeakAuras Lua Script <BiaoGe>

WeakAuras Lua Script <BiaoGe> 表格拍賣插件WA字符串 表格字符串代碼&#xff1a; !WA:2!S3xA3XXXrcoE2VH9l7ZFy)C969PvDpSrRgaeuhljFlUiiSWbxaqXDx(4RDd0vtulB0fMUQMhwMZJsAO5HenLnf1LPSUT4iBrjRzSepL(pS)e2bDdWp5)cBEvzLhrMvvnAkj7zWJeO7mJ8kYiJmYiImYF0b(XR)JR9JRD…

虛幻引擎5-Unreal Engine筆記之什么時候新建GameMode,什么時候新建關卡?

虛幻引擎5-Unreal Engine筆記之什么時候新建GameMode,什么時候新建關卡&#xff1f; code review! 參考筆記&#xff1a; 1.虛幻引擎5-Unreal Engine筆記之GameMode、關卡&#xff08;Level&#xff09; 和 關卡藍圖&#xff08;Level Blueprint&#xff09;的關系 2.虛幻引擎…

開源模型應用落地-模型上下文協議(MCP)-Resource Template-資源模板的使用邏輯(六)

一、前言 在數字化進程加速的今天,如何高效管理動態資源已成為開發者們的核心課題。Resource Template(資源模板)作為Model Context Protocol(MCP)中的關鍵機制,正通過參數化設計重新定義資源調用的邊界——它不僅是靜態數據的容器,更是動態上下文生成的引擎。與傳統的R…

uniapp小程序獲取手機設備安全距離

utils.js let systemInfo null;export const getSystemInfo () > {if (!systemInfo) {systemInfo uni.getSystemInfoSync();// 補充安全區域默認值systemInfo.safeAreaInsets systemInfo.safeAreaInsets || {top: 0,bottom: 0,left: 0,right: 0};// 確保statusBarHei…

【線下沙龍】NineData x Apache Doris x 阿里云聯合舉辦數據庫技術Meetup,5月24日深圳見!

5月24日下午&#xff0c;NineData 將聯合 Apache Doris、阿里云一起&#xff0c;在深圳舉辦數據庫技術Meetup。本次技術沙龍聚焦「數據實時分析」與「數據同步遷移」 兩大核心領域&#xff0c;針對企業數據戰略中的痛點&#xff0c;特邀行業資深技術大咖&#xff0c;結合多年技…

企業網站架構部署與優化 --web技術與nginx網站環境部署

一、Web 基礎 本節將介紹Web 基礎知識,包括域名的概念、DNS 原理、靜態網頁和動態網頁的 相關知識。 1、域名和DNS 1.1、域名的概念 網絡是基于TCP/IP 協議進行通信和連接的&#xff0c;每一臺主機都有一個唯一的標識(固定的IP 地址),用以區別在網絡上成千上萬個用戶和計算機。…

java實現poi-ooxml導出Excel的功能

文章目錄 1. 添加poi-ooxml依賴2. Excel導出工具類3.核心邏輯說明4.擴展建議5.HSSF、XSSF、SXSSF 的核心原則和場景建議&#xff0c;幫助你在不同需求下快速決策&#xff1a; 以下是一個基于 Apache POI 實現的簡單、通用的Java導出Excel工具類&#xff0c;代碼邏輯清晰且注釋詳…