51.Go操作kafka示例(kafka-go庫)

文章目錄

  • 一、簡介
  • 二、生產者
  • 三、消費者

代碼地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go

一、簡介

之前已經介紹過一個操作kafka的go庫了,28.windows安裝kafka,Go操作kafka示例(sarama庫) ,但是這個庫比較老了,當前比較流行的庫是github.com/segmentio/kafka-go,所以本次我們就使用一下它。

我們在GitHub直接輸入kafka并帶上language標簽為Go時,可以可以看到當前get github.com/segmentio/kafka-go庫是最流行的。
在這里插入圖片描述

首先啟動kafka的服務器,然后在項目中go get github.com/segmentio/kafka-go

接著我們就可以創建生產者和消費者了,注意:在實際工作中,一般是一個服務為生產者,另一個服務作為消費者,但是本案例中不涉及微服務,就是演示一下生成和消費的示例代碼,因此寫到了一個服務當中。代碼文件組織如下:
在這里插入圖片描述
user.go :用于測試發送和消費結構體字符串消息

package modeltype User struct {Id       int64  `json:"id"`UserName string `json:"user_name"`Age      int64  `json:"age"`
}

二、生產者

啟動zookeeperkafka,并創建名為testtopic,步驟可以參考:28.windows安裝kafka,Go操作kafka示例(sarama庫)

producer.go

package producerimport ("context""encoding/json""fmt""golang-trick/31-kafka-go/model""time""github.com/segmentio/kafka-go"
)var (topic    = "user"Producer *kafka.Writer
)func init() {Producer = &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //TCP函數參數為不定長參數,可以傳多個地址組成集群Topic:                  topic,Balancer:               &kafka.Hash{}, // 用于對key進行hash,決定消息發送到哪個分區MaxAttempts:            0,WriteBackoffMin:        0,WriteBackoffMax:        0,BatchSize:              0,BatchBytes:             0,BatchTimeout:           0,ReadTimeout:            0,WriteTimeout:           time.Second,       // kafka有時候可能負載很高,寫不進去,那么超時后可以放棄寫入,用于可以丟消息的場景RequiredAcks:           kafka.RequireNone, // 不需要任何節點確認就返回Async:                  false,Completion:             nil,Compression:            0,Logger:                 nil,ErrorLogger:            nil,Transport:              nil,AllowAutoTopicCreation: false, // 第一次發消息的時候,如果topic不存在,就自動創建topic,工作中禁止使用}
}// 生產消息,發送user信息
func SendMessage(ctx context.Context, user *model.User) {msgContent, err := json.Marshal(user)if err != nil {fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))}msg := kafka.Message{Topic:         "",Partition:     0,Offset:        0,HighWaterMark: 0,Key:           []byte(fmt.Sprintf("%d", user.Id)),Value:         msgContent,Headers:       nil,WriterData:    nil,Time:          time.Time{},}err = Producer.WriteMessages(ctx, msg)if err != nil {fmt.Println(fmt.Sprintf("寫入kafka失敗,user:%v,err:%v", user, err))}
}

main.go: 測試消息發送

package mainimport ("context""fmt""golang-trick/31-kafka-go/model""golang-trick/31-kafka-go/producer"
)func main() {ctx := context.Background()for i := 0; i < 5; i++ {user := &model.User{Id:       int64(i + 1),UserName: fmt.Sprintf("lym:%d", i),Age:      18,}producer.SendMessage(ctx, user)}producer.Producer.Close() // 消息發送完畢后,關閉生產者
}

可以看到五條消息都發送成功
在這里插入圖片描述

三、消費者

consumer.go

package consumerimport ("context""encoding/json""fmt""golang-trick/24-gin-learning/class08/model""time""github.com/segmentio/kafka-go"
)var (topic    = "user"Consumer *kafka.Reader
)func init() {Consumer = kafka.NewReader(kafka.ReaderConfig{Brokers:                []string{"localhost:9092"}, // broker地址 數組GroupID:                "test",                     // 消費者組id,每個消費者組可以消費kafka的完整數據,但是同一個消費者組中的消費者根據設置的分區消費策略共同消費kafka中的數據GroupTopics:            nil,Topic:                  topic, // 消費哪個topicPartition:              0,Dialer:                 nil,QueueCapacity:          0,MinBytes:               0,MaxBytes:               0,MaxWait:                0,ReadBatchTimeout:       0,ReadLagInterval:        0,GroupBalancers:         nil,HeartbeatInterval:      0,CommitInterval:         time.Second, // offset 上報間隔PartitionWatchInterval: 0,WatchPartitionChanges:  false,SessionTimeout:         0,RebalanceTimeout:       0,JoinGroupBackoff:       0,RetentionTime:          0,StartOffset:            kafka.FirstOffset, // 僅對新創建的消費者組生效,從頭開始消費,工作中可能更常用從最新的開始消費kafka.LastOffsetReadBackoffMin:         0,ReadBackoffMax:         0,Logger:                 nil,ErrorLogger:            nil,IsolationLevel:         0,MaxAttempts:            0,OffsetOutOfRangeError:  false,})
}// 消費消息
func ReadMessage(ctx context.Context) {// 消費者應該通過協程一直開著,一直消費for {if msg, err := Consumer.ReadMessage(ctx); err != nil {fmt.Println(fmt.Sprintf("讀kafka失敗,err:%v", err))break // 當前消息讀取失敗時,并不退出for終止所有后續消費,而是跳過該消息即可} else {user := &model.User{}err := json.Unmarshal(msg.Value, user)if err != nil {fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))break // 當前消息處理失敗時,并不退出for終止所有后續消費,而是跳過該消息即可}fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))}}
}

main.go: 測試接收消息

package mainimport ("context""fmt""golang-trick/31-kafka-go/consumer""os""os/signal""syscall"
)// 需要監聽信息2和15,在程序退出時,關閉Consumer
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)sig := <-cfmt.Printf("收到信號 %s ", sig.String())if consumer.Consumer != nil {consumer.Consumer.Close()}os.Exit(0)
}func main() {ctx := context.Background()//for i := 0; i < 5; i++ {//	user := &model.User{//		Id:       int64(i + 1),//		UserName: fmt.Sprintf("lym:%d", i),//		Age:      18,//	}//	producer.SendMessage(ctx, user)//}//producer.Producer.Close()go consumer.ReadMessage(ctx)listenSignal()
}

啟動后,因為我們設置的從頭開始消費,所以原有的五條消息消費成功,然后在等待著隊列中有消息時繼續消費
在這里插入圖片描述
我們可以通過kafka客戶端發兩條消息,看看我們的消費者程序是否能消費到

在這里插入圖片描述
最后關閉服務停止消費
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/214695.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/214695.shtml
英文地址,請注明出處:http://en.pswp.cn/news/214695.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

二叉搜索樹的最近公共祖先【數據結構】

二叉搜索樹的最近公共祖先 題目描述 給定一棵二叉搜索樹的先序遍歷序列&#xff0c;要求你找出任意兩結點的最近公共祖先結點&#xff08;簡稱 LCA&#xff09;。 輸入 輸入的第一行給出兩個正整數&#xff1a;待查詢的結點對數 M&#xff08;≤ 1 000&#xff09;和二叉搜索…

基于JavaWeb+SpringBoot+Vue在線拍賣系統的設計和實現

基于JavaWebSpringBootVue在線拍賣系統系統的設計和實現 源碼獲取入口Lun文目錄前言主要技術系統設計功能截圖訂閱經典源碼專欄Java項目精品實戰案例《500套》 源碼獲取 源碼獲取入口 Lun文目錄 摘 要 1 Abstract 1 1 系統概述 4 1.1 概述 4 1.2課題意義 4 1.3 主要內容 4 2 …

Git命令---綁定遠程倉庫

介紹 使用git命令綁定遠程倉庫 命令 git remote add origin https://gitee.com/x.xx.com/test.git

什么是多態

/*** Description 什么是多態*/ package com.oop;import com.oop.demo06.Person; import com.oop.demo06.Student;public class Application {public static void main(String[] args) {//一個對象的實際類型是確定的//new Student();//new Person();//可以指向的引用類型就不確…

C++新經典模板與泛型編程:策略技術中的算法策略

策略技術中的算法策略 在之前博客中funcsum()函數模板中&#xff0c;實現了對數組元素的求和運算。求和在這里可以看作一種算法&#xff0c;擴展一下思路&#xff0c;對數組元素求差、求乘積、求最大值和最小值等&#xff0c;都可以看作算法。而當前的funcsum()函數模板中&…

MySQL使用教程

數據構成了我們日益數字化的社會基礎。想象一下&#xff0c;從移動應用和銀行系統到搜索引擎&#xff0c;再到如 ChatGPT 這樣的先進人工智能聊天機器人&#xff0c;這些工具若沒有數據支撐&#xff0c;將寸步難行。你有沒有好奇過這些海量數據都存放在哪里呢&#xff1f;答案正…

2023年團體程序設計天梯賽——總決賽題

F-L1-1 最好的文檔 有一位軟件工程師說過一句很有道理的話&#xff1a;“Good code is its own best documentation.”&#xff08;好代碼本身就是最好的文檔&#xff09;。本題就請你直接在屏幕上輸出這句話。 輸入格式&#xff1a; 本題沒有輸入。 輸出格式&#xff1a; 在一…

讀excel文件,借助openpyxl工具

讀excel文件&#xff0c;借助openpyxl工具 import osimport requestsos.environ["http_proxy"] "http://127.0.0.1:7890" os.environ["https_proxy"] "http://127.0.0.1:7890"base_url "https://testnet.starscan.io/explore…

ALNS4VRPTWTF

文章概述 文章研究了城市物流背景下帶有第三方轉運設施的車輛路徑問題。與經典的車輛路徑問題不同&#xff0c;這些問題提供了將客戶需求交付給第三方轉運設施&#xff08;如城市集散中心&#xff09;的選擇&#xff0c;并收取一定的費用。為了解決這些挑戰&#xff0c;該研究…

LeetCode 279完全平方數 139單詞拆分 卡碼網 56攜帶礦石資源(多重背包) | 代碼隨想錄25期訓練營day45

動態規劃算法6 LeetCode 279 完全平方數 2023.12.11 題目鏈接代碼隨想錄講解[鏈接] int numSquares(int n) {//1確定dp數組&#xff0c;其下標表示j的完全平方數的最少數量//3初始化&#xff0c;將dp[0]初始化為0&#xff0c;用于計算&#xff0c;其他值設為INT_MAX用于遞推…

物料分類帳概覽

原文地址&#xff1a;Overview: What is SAP Material Ledger? | SAP Blogs 物料分類賬是收集物料主數據存儲在物料主數據中的物料交易數據的工具。 物料分類帳使用此數據來計算價格以評估這些物料。 物料臺賬是實際成本核算的基礎。它允許以多種貨幣對材料庫存進行評估&am…

對象的生離死別

對象的生離死別 實驗介紹 在構建一個類時&#xff0c;一般情況下需要編寫構造函數、拷貝構造函數以及析構函數&#xff0c;這將直接影響程序的運行。而初始化列表是在調用構造函數時初始化參數的方式。 一個對象從實例化到銷毀的歷程&#xff1a; 知識點 內存分區構造函數exp…

java中什么是Spring Bean?

在Spring框架中&#xff0c;一個"Bean"是指由Spring IoC容器所管理的對象。這個對象可以是Java類的實例&#xff0c;也可以是引用其他對象的引用、集合或者是簡單類型。Spring Bean是應用中由IoC容器負責創建、裝配和管理的對象。 Spring中的Bean具有以下特征&#…

地牢手冊-3d

Description 你進入了一個3D的寶藏地宮中探尋到了寶藏&#xff0c;你可以找到走出地宮的路帶出寶藏&#xff0c;或者使用爐石空手回家。 地宮由立方體單位構成&#xff0c;立方體中不定會充滿巖石。向上、下、前、后、左、右移動一個單位需要一分鐘。你不能對角線移動并且地宮…

LabVIEW開發礦井排水監控系統

LabVIEW開發礦井排水監控系統 針對礦井水害對煤礦安全生產構成的威脅&#xff0c;設計了一種基于嵌入式PLC和LabVIEW的礦井排水監控系統。該系統結合了PLC的可靠控制與單片機的應用靈活性&#xff0c;有效克服了傳統排水方法中的不足&#xff0c;如測量不準確、效率低下等問題…

react相關hooks(二)

不寫性能優化的時候 const Child (props) > {console.log(child function is recalled)// count1改變時多次執行return (<div><h1>{ props.count2}</h1></div>) } function app () {const [count1.setCount1] useState(0)const [count2.setCount…

ESP8266模塊(CH340)零基礎實戰

USB數據線連接ESP8266模塊到電腦 先按住FLASH鍵,再按一下RST鍵,然后松開 此時電腦可識別出CH340 COM接口 CH340芯片廠商網址: wch.cn 傳輸比特率9600 win11自帶驅動 下載Arduino IDE

一文了解什么是Selenium自動化測試?

一、Selenium是什么&#xff1f; 用官網的一句話來講&#xff1a;Selenium automates browsers. Thats it&#xff01;簡單來講&#xff0c;Selenium是一個用于Web應用程序自動化測試工具。Selenium測試直接運行在瀏覽器中&#xff0c;就像真正的用戶在操作瀏覽器一樣。支持的瀏…

【美賽指南】新手小白必備參賽指南

美賽指南 一、2024美賽安排二、題目類型三、選題建議四、美賽前期準備五、常用算法 一、2024美賽安排 報名截至時間&#xff1a;2024年 2月2日 00&#xff1a;00 比賽時間&#xff1a;2024年 2月2日 6&#xff1a;00- 2月6日 9&#xff1a;00 提交截至日期&#xff1a;2024年2…

嵌入式系統復習--概述

文章目錄 基本概念嵌入式系統的組成結構嵌入式操作系統嵌入式軟件開發環境硬件基礎簡介下一篇 基本概念 嵌入式計算機&#xff1a;把嵌入到對象體系中、實現對象體系智能化控制的帶有微控制器的計算機&#xff0c;稱作嵌入式計算機 嵌入式系統&#xff1a;以應用為中心&#…