文章目錄
- 一、簡介
- 二、生產者
- 三、消費者
代碼地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go
一、簡介
之前已經介紹過一個操作kafka的go庫了,28.windows安裝kafka,Go操作kafka示例(sarama庫) ,但是這個庫比較老了,當前比較流行的庫是github.com/segmentio/kafka-go
,所以本次我們就使用一下它。
我們在GitHub
直接輸入kafka
并帶上language
標簽為Go
時,可以可以看到當前get github.com/segmentio/kafka-go
庫是最流行的。
首先啟動kafka的服務器,然后在項目中go get github.com/segmentio/kafka-go
接著我們就可以創建生產者和消費者了,注意:在實際工作中,一般是一個服務為生產者,另一個服務作為消費者,但是本案例中不涉及微服務,就是演示一下生成和消費的示例代碼,因此寫到了一個服務當中。
代碼文件組織如下:
user.go :用于測試發送和消費結構體字符串消息
package modeltype User struct {Id int64 `json:"id"`UserName string `json:"user_name"`Age int64 `json:"age"`
}
二、生產者
啟動zookeeper
和kafka
,并創建名為test
的topic
,步驟可以參考:28.windows安裝kafka,Go操作kafka示例(sarama庫)
producer.go
package producerimport ("context""encoding/json""fmt""golang-trick/31-kafka-go/model""time""github.com/segmentio/kafka-go"
)var (topic = "user"Producer *kafka.Writer
)func init() {Producer = &kafka.Writer{Addr: kafka.TCP("localhost:9092"), //TCP函數參數為不定長參數,可以傳多個地址組成集群Topic: topic,Balancer: &kafka.Hash{}, // 用于對key進行hash,決定消息發送到哪個分區MaxAttempts: 0,WriteBackoffMin: 0,WriteBackoffMax: 0,BatchSize: 0,BatchBytes: 0,BatchTimeout: 0,ReadTimeout: 0,WriteTimeout: time.Second, // kafka有時候可能負載很高,寫不進去,那么超時后可以放棄寫入,用于可以丟消息的場景RequiredAcks: kafka.RequireNone, // 不需要任何節點確認就返回Async: false,Completion: nil,Compression: 0,Logger: nil,ErrorLogger: nil,Transport: nil,AllowAutoTopicCreation: false, // 第一次發消息的時候,如果topic不存在,就自動創建topic,工作中禁止使用}
}// 生產消息,發送user信息
func SendMessage(ctx context.Context, user *model.User) {msgContent, err := json.Marshal(user)if err != nil {fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))}msg := kafka.Message{Topic: "",Partition: 0,Offset: 0,HighWaterMark: 0,Key: []byte(fmt.Sprintf("%d", user.Id)),Value: msgContent,Headers: nil,WriterData: nil,Time: time.Time{},}err = Producer.WriteMessages(ctx, msg)if err != nil {fmt.Println(fmt.Sprintf("寫入kafka失敗,user:%v,err:%v", user, err))}
}
main.go
: 測試消息發送
package mainimport ("context""fmt""golang-trick/31-kafka-go/model""golang-trick/31-kafka-go/producer"
)func main() {ctx := context.Background()for i := 0; i < 5; i++ {user := &model.User{Id: int64(i + 1),UserName: fmt.Sprintf("lym:%d", i),Age: 18,}producer.SendMessage(ctx, user)}producer.Producer.Close() // 消息發送完畢后,關閉生產者
}
可以看到五條消息都發送成功
三、消費者
consumer.go
package consumerimport ("context""encoding/json""fmt""golang-trick/24-gin-learning/class08/model""time""github.com/segmentio/kafka-go"
)var (topic = "user"Consumer *kafka.Reader
)func init() {Consumer = kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"}, // broker地址 數組GroupID: "test", // 消費者組id,每個消費者組可以消費kafka的完整數據,但是同一個消費者組中的消費者根據設置的分區消費策略共同消費kafka中的數據GroupTopics: nil,Topic: topic, // 消費哪個topicPartition: 0,Dialer: nil,QueueCapacity: 0,MinBytes: 0,MaxBytes: 0,MaxWait: 0,ReadBatchTimeout: 0,ReadLagInterval: 0,GroupBalancers: nil,HeartbeatInterval: 0,CommitInterval: time.Second, // offset 上報間隔PartitionWatchInterval: 0,WatchPartitionChanges: false,SessionTimeout: 0,RebalanceTimeout: 0,JoinGroupBackoff: 0,RetentionTime: 0,StartOffset: kafka.FirstOffset, // 僅對新創建的消費者組生效,從頭開始消費,工作中可能更常用從最新的開始消費kafka.LastOffsetReadBackoffMin: 0,ReadBackoffMax: 0,Logger: nil,ErrorLogger: nil,IsolationLevel: 0,MaxAttempts: 0,OffsetOutOfRangeError: false,})
}// 消費消息
func ReadMessage(ctx context.Context) {// 消費者應該通過協程一直開著,一直消費for {if msg, err := Consumer.ReadMessage(ctx); err != nil {fmt.Println(fmt.Sprintf("讀kafka失敗,err:%v", err))break // 當前消息讀取失敗時,并不退出for終止所有后續消費,而是跳過該消息即可} else {user := &model.User{}err := json.Unmarshal(msg.Value, user)if err != nil {fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))break // 當前消息處理失敗時,并不退出for終止所有后續消費,而是跳過該消息即可}fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))}}
}
main.go: 測試接收消息
package mainimport ("context""fmt""golang-trick/31-kafka-go/consumer""os""os/signal""syscall"
)// 需要監聽信息2和15,在程序退出時,關閉Consumer
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)sig := <-cfmt.Printf("收到信號 %s ", sig.String())if consumer.Consumer != nil {consumer.Consumer.Close()}os.Exit(0)
}func main() {ctx := context.Background()//for i := 0; i < 5; i++ {// user := &model.User{// Id: int64(i + 1),// UserName: fmt.Sprintf("lym:%d", i),// Age: 18,// }// producer.SendMessage(ctx, user)//}//producer.Producer.Close()go consumer.ReadMessage(ctx)listenSignal()
}
啟動后,因為我們設置的從頭開始消費,所以原有的五條消息消費成功,然后在等待著隊列中有消息時繼續消費
我們可以通過kafka
客戶端發兩條消息,看看我們的消費者程序是否能消費到
最后關閉服務停止消費