Kafka Go客戶端--Sarama

Kafka Go客戶端

在Go中里面有三個比較有名氣的Go客戶端。

  • Sarama:用戶數量最多,早期這個項目是在Shopify下面,現在挪到了IBM下。
  • segmentio/kafka-go:沒啥大的缺點。
  • confluent-kafka-go:需要啟用cgo,跨平臺問題比較多,交叉編譯也不支持。

Sarama 使用入門:tools

IBM/sarama: Sarama is a Go library for Apache Kafka.

在 Sarama 里面提供了一些簡單的命令行工具,可以看做是 Shell腳本提供的功能一個子集。

Consumer和 producer中的用得比較多

在這里插入圖片描述

1.設置 Go 代理(如果內網無法直連 proxy.golang.org)

export GOPROXY=https://goproxy.cn,direct
export GOSUMDB=sum.golang.google.cn

2.在虛擬機上執行安裝命令:

  • ? go install github.com/IBM/sar ama/tools/kafka-console-consumer@latest
  • ? go install github.com/lBM/sarama/tools/kafka-console-producer@latest

3.把可執行文件所在目錄加到 PATH(如果還沒加)

export PATH=$PATH:$(go env GOBIN)

4.確認可執行文件在哪里

# 查看 GOBIN,如果你沒顯式設置,就會是空
go env GOBIN# 查看 GOPATH,默認是 $HOME/go(對于 root 用戶就是 /root/go)
go env GOPATH#我的是/home/cxz/go/lib:/home/cxz/go/work

5.查看安裝結果

ls /home/cxz/go/lib/bin
#應該能夠看到kafka-console-consumer  kafka-console-producer

6.臨時生效

export PATH=$PATH:/home/cxz/go/lib/bin# 然后驗證
which kafka-console-consumer
# 應該輸出 /home/cxz/go/lib/bin/kafka-console-consumer

7.永久生效

echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.bashrc
# 或者,如果你用的是 zsh:
# echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.zshrc# 然后重新加載配置
source ~/.bashrc

Sarama 使用入門:發送消息

虛擬機上執行

kafka-console-consumer -topic=test_topic -brokers=192.168.24.101:9094

Goland上執行

package mainimport ("github.com/IBM/sarama""github.com/stretchr/testify/assert""testing"
)var addrs = []string{"192.168.24.101:9094"}func TestSyncProducer(t *testing.T) {//創建一個 Sarama 的配置對象。cfg := sarama.NewConfig()//表示生產者要等待 Kafka 確認消息成功寫入后再返回(同步模式)。如果不設置這個,SyncProducer.SendMessage 會一直失敗。cfg.Producer.Return.Successes = true //同步的Producer一定要設置//創建一個同步的生產者實例producer, err := sarama.NewSyncProducer(addrs, cfg)assert.NoError(t, err)//構建消息并發送_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic",//消息數據本體Value: sarama.StringEncoder("hello world ,這是一條使用kafka的消息"),//會在生產者和消費者之間傳遞,消息頭,可傳遞自定義鍵值對,比如 trace_id 用于鏈路追蹤。Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于發送過程。元信息,在發送過程中使用,可以用來傳遞額外信息,發送完成后會原樣返回(不會傳給消費者)。Metadata: "這是metadata",})assert.NoError(t, err)
}

10.執行結果

Partition:	0
Offset:	0
Key:	
Value:	hello world ,這是一條使用kafka的消息

使用控制臺工具連接Kafka

Sarama 使用入門:指定分區

可以注意到,前面所有的消息都被發送到了 Partition 0 上面。

正常來說,在 Sarama 里面,可以通過指定 config 中的Partitioner來指定最終的目標分區。

常見的方法:

  • ? Random:隨機挑一個。
  • ? RoundRobin:輪詢。
  • ? Hash(默認):根據 key 的哈希值來篩選一個。
  • ? Manual: 根據 Message 中的 partition 字段來選擇。
  • ? ConsistentCRC:一致性哈希,用的是 CRC32 算法。
  • ? Custom:實際上不 Custom,而是自定義一部分Hash 的參數,本質上是一個 Hash 的實現。
//默認HashPartitioner  適合: 按用戶 ID、訂單 ID 等字段分區場景
cfg.Producer.Partitioner = sarama.NewHashPartitioner
//使用 CRC32 算法 計算 Key 的哈希。 適合: 需要高一致性分布的業務,例如日志收集系統
cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
//忽略 Key,每條消息隨機分配 partition。  適合: 普通消息隊列、廣播類場景。
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//需要手動指定 partition(ProducerMessage.Partition 字段)。適合: 明確知道要寫哪個 partition,例如做數據分流
cfg.Producer.Partitioner = sarama.NewManualPartitioner
//用于實現你自己的 Partitioner  一般不推薦使用這個空參函數(它會 panic),應實現完整接口。
cfg.Producer.Partitioner = sarama.NewCustomPartitioner()
//允許你使用自定義哈希函數來做 key 分區。  適合: 有特定哈希策略需求時,例如分布要盡可能均勻。
cfg.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {})Topic: "test_topic",
//分區依據
Key:   sarama.StringEncoder("user_123"), // 🔑 這里是分區依據
//消息數據本體
Value: sarama.StringEncoder("hello world ,這是一條使用kafka的消息"),

最典型的場景,就是利用Partitioner來保證同一個業務的消息一定發送到同一個分區上,從而保證業 有序。

Sarama 使用入門:異步發送

Sarama有一個異步發送的producer,它的用法稍微復雜一點。

  • ? 把Return.Success和 Errors都設置為true,這是為了后面能夠拿到發送結果。
  • ? 初始化異步producer。
  • ? 從producer里面拿到Input的channel,并且發送 一條消息。
  • ? 利用select case,同時**監聽Success和Error兩個channel,**來獲得發送成功與否的信息。
func TestAsyncProducer(t *testing.T) {cfg := sarama.NewConfig()//怎么知道發送是否成功cfg.Producer.Return.Errors = truecfg.Producer.Return.Successes = trueproducer, err := sarama.NewAsyncProducer(addrs, cfg)require.NoError(t, err)messages := producer.Input()go func() {for {messages <- &sarama.ProducerMessage{Topic: "test_topic",//分區依據Key: sarama.StringEncoder("user_123"), // 🔑 這里是分區依據//消息數據本體Value: sarama.StringEncoder("hello world ,這是一條使用kafka的消息"),//會在生產者和消費者之間傳遞Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于發送過程Metadata: "這是metadata",}}}()errCh := producer.Errors()succCh := producer.Successes()for {//兩個都不滿足就會阻塞select {case err := <-errCh:t.Log("發送出了問題", err.Err)case <-succCh:t.Log("發送成功")}}
}

Sarama 使用入門:acks

在Kafka里面,生產者在發送數據的時候,有一個很關鍵的參數,就是 acks。
有三個取值:

  • ? 0:客戶端發一次,不需要服務端的確認。
  • ? 1:客戶端發送,并且需要服務端寫入到主分區。
  • ? -1:客戶端發送,并且需要服務端同步到所有的ISR 上。

從上到下,性能變差,但是數據可靠性上升。需要性能,選 0,需要消息不丟失,選-1。

理解acks你就要抓住核心點,誰ack才算數?

  • 0:TCP協議返回了ack就可以。
  • 1:主分區確認寫入了就可以。
  • -1:所有的ISR都確認了就可以。

在這里插入圖片描述

ISR (In Sync Replicas),用通俗易懂的話來說,就是跟上了節奏的從分區。

什么叫做跟上了節奏?就是它和主分區保持了數據同步。

所以,當消息被同步到從分區之后,如果主分區崩潰了那么依舊可以保證在從分區上還有數據。

在這里插入圖片描述

sarama 使用入門:啟動消費者

Sarama的消費者設計不是很直觀,稍微有點復雜。

  • ? 首先要初始化一個ConsumerGroup。
  • ? 調用ConsumerGroup上的Consume方法。
  • ? 為 Consume 方法傳入一個 ConsumerGroupHandler的輔助方法。
package mainimport ("context""github.com/IBM/sarama""github.com/stretchr/testify/assert""log""testing"
)func TestConsumer(t *testing.T) {cfg := sarama.NewConfig()//正常來說,一個消費者都是歸屬一個消費者組的//消費者就是你的業務consumerGroup, err := sarama.NewConsumerGroup(addrs, "test_group", cfg)assert.NoError(t, err)err = consumerGroup.Consume(context.Background(), []string{"test_topic"}, testConsumerGroupHandler{})//你消費結束,就會到這里t.Log(err)
}type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的會話(從建立連接到連接徹底斷掉的那一段時間)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//這就是消費消息出錯//	//大多數時候就是重試//	//記錄日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情況下會到這里//msg被人關了,也就是要退出消費邏輯return nil
}type MyBizMsg struct {Name string
}

sarama 使用入門:ConsumerGroupHandler

下面的代碼就是對ConsumerGroupHandler的實現,關鍵就是在消費了msg之后,如果消費成功了,要記得提交。

也就是調用MarkMessage方法。

至于 Setup 和 Cleanup 方法反而用得不多。

type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的會話(從建立連接到連接徹底斷掉的那一段時間)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//這就是消費消息出錯//	//大多數時候就是重試//	//記錄日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情況下會到這里//msg被人關了,也就是要退出消費邏輯return nil
}

sarama 使用入門:利用context來控制消費者退出

可以利用初始化ConsumerGroup 時候傳入的ctx來控制消費者組退出消息。

下圖中,我傳入了一個超時的context,那么:

	start := time.Now()//這里是測試,我們就控制消費10sctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()//開始消費,會在這里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消費結束,就會到這里t.Log(err, time.Since(start).String())	

下圖中,我主動調用了cancel,那么:

	start := time.Now()//這里是測試,我們就控制消費5sctx, cancel := context.WithCancel(context.Background())time.AfterFunc(time.Second*5, func() {cancel()})//開始消費,會在這里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消費結束,就會到這里t.Log(err, time.Since(start).String())
  • 如果超時了
  • 如果我主動調用了cancel

以上兩種情況,任何一種情況出現了,都會讓消費者退出消息。

sarama 使用入門:指定偏移量消費

在部分場景下,我們會希望消費歷史消息,或者從某個消息開始消費,那么可以考慮在Setup里面設置偏移量。

關鍵調用是 ResetOffset。

不過一般建議走離線渠道,操作Kafka集群去重置對應的偏移量。

核心在于,你并不是每次重新部署,重新啟動都是要重置這個偏移量的。

只要你的消費者組在這個分區上有過“已提交的 offset”,Kafka 就會優先使用這個提交的 offset,而忽略你在 Setup() 中設置的 offset

// 在每次 rebalance 或初次連接 Kafka 后調用,用于初始化。
func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {//執行一些初始化的事情log.Println("Setup")//假設要重置到0var offset int64 = 0//遍歷所有的分區partitions := session.Claims()["test_topic"]for _, p := range partitions {session.ResetOffset("test_topic", p, offset, "")//session.ResetOffset("test_topic", p, sarama.OffsetNewest, "")//session.ResetOffset("test_topic", p, sarama.OffsetOldest, "")}return nil
}

sarama使用入門:異步消費,批量提交

正常來說,為了在異步消費失敗之后還能繼續重試,可以考慮異步消費一批,提交一批。

下圖中,ctx.Done分支用來控制湊夠一批的超時機制,防止生產者的速率很低,一直湊不夠一批。

func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的會話(從建立連接到連接徹底斷掉的那一段時間)//可以通過 session 控制 offset 提交,獲取消費者信息,并感知退出時機。session sarama.ConsumerGroupSession,//claim 是你獲取消息的入口claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()//設置批量處理的條數const batchSize = 10for {ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)var eg errgroup.Groupvar last *sarama.ConsumerMessagefor i := 0; i < batchSize; i++ {done := falseselect {case <-ctx.Done()://這邊表示超時了done = truecase msg, ok := <-msgs:if !ok {cancel()return nil}last = msgmsg1 := msgeg.Go(func() error {//我就在這里消費time.Sleep(time.Second)//你在這里重試log.Println(string(msg1.Value))return nil})}if done {break}}cancel()err := eg.Wait()if err != nil {//這邊能怎么辦?//記錄日志continue}//就這樣session.MarkMessage(last, "")}return nil
}

另外一個分支就是讀取消息,并且提交到errgroup里面執行。

Sleep是模擬長時間業務執行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906515.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906515.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906515.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Axure全鏈路交互設計:快速提升實現能力(基礎交互+高級交互)

想讓你的設計稿像真實App一樣絲滑&#xff1f;本專欄帶你玩轉Axure交互&#xff0c;從選中高亮到動態面板騷操作&#xff0c;再到中繼器表單花式交互&#xff0c;全程動圖教學&#xff0c;一看就會&#xff01; 本專欄系統講解多個核心交互效果&#xff0c;是你的Axure交互急救…

自動化測試腳本點擊運行后,打開Chrome很久??

親愛的小伙伴們大家好。 小編最近剛換了電腦&#xff0c;這幾天做自動化測試發現打開Chrome瀏覽器需要等待好長時間&#xff0c;起初還以為代碼有問題&#xff0c;或者Chromedriver與Chrome不匹配造成的&#xff0c;但排查后發現并不是&#xff01;&#xff01; 在driver.py中…

現代人工智能系統的實用設計模式

關鍵要點 AI設計模式是為現代AI驅動的軟件中常見問題提供的可復用解決方案&#xff0c;幫助團隊避免重復造輪子。我們將其分為五類&#xff1a;提示與上下文&#xff08;Prompting & Context&#xff09;、負責任的AI&#xff08;Responsible AI&#xff09;、用戶體驗&…

經典面試題:TCP 三次握手、四次揮手詳解

在網絡通信的復雜架構里&#xff0c;“三次握手”與“四次揮手”仿若一座無形的橋梁&#xff0c;它們是連接客戶端與服務器的關鍵紐帶。這座“橋梁”不僅確保了連接的穩固建立&#xff0c;還保障了連接的有序結束&#xff0c;使得網絡世界中的信息能夠順暢、準確地流動。 在面…

食品飲料行業AI轉型趨勢分析與智能化解決方案探索?

一、行業洞察&#xff1a;AI驅動食品飲料行業價值重構? 當前&#xff0c;食品飲料行業正面臨消費分級顯性化、需求多元化與技術范式革新的三重挑戰。根據《2024食品飲料行業全營銷白皮書》&#xff0c;高收入群體傾向于高端化、個性化產品&#xff0c;而下沉市場更關注性價比…

Electron使用WebAssembly實現CRC-8 ITU校驗

Electron使用WebAssembly實現CRC-8 ITU校驗 將C/C語言代碼&#xff0c;經由WebAssembly編譯為庫函數&#xff0c;可以在JS語言環境進行調用。這里介紹在Electron工具環境使用WebAssembly調用CRC-8 ITU格式校驗的方式。 CRC-8 ITU校驗函數WebAssembly源文件 C語言實現CRC-8 I…

python如何遍歷postgresql所有的用戶表

要遍歷PostgreSQL數據庫中的所有用戶表&#xff0c;可以按照以下步驟操作&#xff1a; 安裝必要依賴庫 pip install psycopg2-binary使用標準SQL查詢方案&#xff08;推薦&#xff09; import psycopg2def list_user_tables():try:conn psycopg2.connect(host"your_ho…

面試相關的知識點

1 vllm 1.1常用概念 1 vllm&#xff1a;是一種大模型推理的框架&#xff0c;使用了張量并行原理&#xff0c;把大型矩陣分割成低秩矩陣&#xff0c;分散到不同的GPU上運行。 2 模型推理與訓練&#xff1a;模型訓練是指利用pytorch進行對大模型進行預訓練。 模型推理是指用訓…

node.js如何實現雙 Token + Cookie 存儲 + 無感刷新機制

node.js如何實現雙 Token Cookie 存儲 無感刷新機制 為什么要實施雙token機制&#xff1f; 優點描述安全性Access Token 短期有效&#xff0c;降低泄露風險&#xff1b;Refresh Token 權限受限&#xff0c;僅用于獲取新 Token用戶體驗用戶無需頻繁重新登錄&#xff0c;Toke…

MySQL——6、內置函數

內置函數 1、日期函數2、字符串函數3、數學函數4、其他函數 1、日期函數 1.1、獲取當前日期&#xff1a; 1.2、獲取當前時間&#xff1a; 1.3、獲取當前時間戳&#xff1a; 1.4、獲取當前日期時間&#xff1a; 1.5、提取出日期&#xff1a; 1.6、給日期添加天數或時間…

【Linux】Shell腳本中向文件中寫日志,以及日志文件大小、數量管理

1、寫日志 shell腳本中使用echo命令,將字符串輸入到文件中 覆蓋寫入:echo “Hello, World!” > laoer.log ,如果文件不存在,則會創建文件追加寫入:echo “Hello, World!” >> laoer.log轉移字符:echo -e “Name:\tlaoer\nAge:\t18” > laoer.log,\t制表符 …

深度學習中ONNX格式的模型文件

一、模型部署的核心步驟 模型部署的完整流程通常分為以下階段&#xff0c;用 “跨國旅行” 類比&#xff1a; 步驟類比解釋技術細節1. 訓練模型學會一門語言&#xff08;如中文&#xff09;用 PyTorch/TensorFlow 訓練模型2. 導出為 ONNX翻譯成國際通用語言&#xff08;如英語…

基于兩階段交互控制框架的互聯多能系統協同自治優化

摘要&#xff1a;從多能源集成系統的效益出發&#xff0c;建立了基于交互控制的雙層兩階段框架&#xff0c;以實現互聯多能源系統(MESs)間的最優能量供應。在下層&#xff0c;各MES通過求解成本最小化問題自主確定其可控資產的最優設定值&#xff0c;其中滾動時域優化用于處理負…

matlab編寫的BM3D圖像去噪方法

BM3D&#xff08;Block-Matching and 3D Filtering&#xff09;是一種基于塊匹配和三維濾波的圖像去噪方法&#xff0c;廣泛應用于圖像處理領域。它通過在圖像中尋找相似的塊&#xff0c;并將這些塊堆疊成三維數組進行濾波處理&#xff0c;從而有效地去除噪聲&#xff0c;同時保…

前端(小程序)學習筆記(CLASS 1):組件

1、小程序中組件的分類 小程序中的組件也是由宿主環境提供的&#xff0c;開發者可以基于組件快速搭建出漂亮的頁面結構。官方把小程序的組件分為了9大類&#xff0c;分別是&#xff1a; * 視圖容器&#xff0c;* 基礎內容&#xff0c;* 表單組件&#xff0c;* 導航組件 媒體…

基于亞馬遜云科技構建音視頻直播審核方案

1. 前言 隨著互聯網內容形態的多樣化發展&#xff0c;用戶生成內容&#xff08;UGC&#xff09;呈現爆發式增長。社交平臺、直播、短視頻、語聊房等應用場景中&#xff0c;海量的音視頻內容需要進行實時審核&#xff0c;以維護平臺安全與用戶體驗。 然而&#xff0c;企業在構…

linux基礎操作11------(運行級別)

一.前言 這個是linux最后一章節內容&#xff0c;主要還是介紹一下&#xff0c;這個就和安全有關系了&#xff0c;內容還是很多的&#xff0c;但是呢&#xff0c;大家還是做個了解就好了。 二.權限掩碼 運行級別 0 關機 運行級別 1 單用戶 &#xff0c;這個類似于windows安全…

QT+Visual Studio 配置開發環境教程

一、QT架構 Qt Creator 是一個輕量級、跨平臺的 IDE&#xff0c;專為 Qt 開發量身打造&#xff0c;內置對 qmake/CMake 的深度支持、Kits 配置管理、原生 QML 調試器以及較低的資源占用維基百科。 而在 Windows 環境下&#xff0c;Visual Studio 配合 Qt VS Tools 擴展則可將 Q…

(2)JVM 內存模型更新與 G1 垃圾收集器優化

JVM 內存模型更新與 G1 垃圾收集器優化 &#x1f680; 掌握前沿技術&#xff0c;成為頂尖 Java 工程師 2?? JVM 內存模型更新 &#x1f449; 點擊展開題目 JVM內存模型在Java 17中有哪些重要更新&#xff1f;如何優化G1垃圾收集器在容器化環境的表現&#xff1f; &#x1…

TASK04【Datawhale 組隊學習】構建RAG應用

目錄 將LLM接入LangChain構建檢索問答鏈運行成功圖遇到的問題 langchain可以便捷地調用大模型&#xff0c;并將其結合在以langchain為基礎框架搭建的個人應用中。 將LLM接入LangChain from langchain_openai import ChatOpenAI實例化一個 ChatOpenAI 類,實例化時傳入超參數來…