Go 如何通過 Kafka 客戶端庫 生產與消費消息

文章目錄

  • 0.前置說明
    • 1. confluent-kafka-go
    • 2. sarama
    • 3. segmentio/kafka-go
    • 4. franz-go
      • 選擇建議
  • 1.啟動 kafka 集群
  • 2.安裝 confluent-kafka-go 庫
  • 3.創建生產者
    • 特殊文件說明
    • 如何查看.log文件內容
  • 4.創建消費者

0.前置說明

Go 語言中有一些流行的 Kafka 客戶端庫。以下是幾個常用的庫及其優劣與區別:

1. confluent-kafka-go

  • 優點

    • 高性能:基于 librdkafka,性能非常高。
    • 功能全面:支持 Kafka 的所有高級功能,如事務、壓縮、認證等。
    • 社區支持:由 Confluent 維護,社區活躍,文檔豐富。
    • 穩定性:廣泛使用于生產環境,經過大量測試和驗證。
  • 缺點

    • 依賴性:依賴于 librdkafka,需要額外安裝該庫。
    • 復雜性:配置和使用相對復雜,特別是對于新手。

2. sarama

  • 優點

    • 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
    • 社區活躍:由 Shopify 維護,社區支持良好,文檔齊全。
    • 靈活性:提供了豐富的配置選項,適用于各種使用場景。
  • 缺點

    • 性能:相對于 confluent-kafka-go,性能稍遜一籌。
    • 功能:不支持 Kafka 的一些高級功能,如事務。

3. segmentio/kafka-go

  • 優點

    • 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
    • 簡潔易用:API 設計簡潔,易于上手。
    • 靈活性:支持多種配置選項,適用于各種使用場景。
  • 缺點

    • 性能:相對于 confluent-kafka-go,性能稍遜一籌。
    • 功能:不支持 Kafka 的一些高級功能,如事務。

4. franz-go

  • 優點

    • 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
    • 高性能:在純 Go 實現中性能較為優越。
    • 功能全面:支持 Kafka 的大部分功能,包括事務。
  • 缺點

    • 社區支持:相對于 saramaconfluent-kafka-go,社區支持稍弱。
    • 文檔:文檔相對較少,需要更多的社區貢獻。

選擇建議

  • 高性能和高級功能需求:如果你需要高性能和 Kafka 的高級功能(如事務、壓縮、認證等),confluent-kafka-go 是一個不錯的選擇。
  • 純 Go 實現和易用性:如果你更傾向于使用純 Go 實現的庫,并且希望安裝和使用更加簡便,可以選擇 saramasegmentio/kafka-go
  • 平衡性能和功能:如果你希望在純 Go 實現中獲得較好的性能和功能支持,可以考慮 franz-go

本文我們就以confluent-kafka-go庫為例來編寫代碼。

1.啟動 kafka 集群

不知道如何搭建集群請點擊這里 ----》Kafka 集群部署(CentOS 單機模擬版)

如果你懶得啟動集群,那么直接跳過

  1. cluster目錄下運行集群啟動腳本 cluster.sh;
cd cluster
./cluster.sh
  1. 檢查是否啟動成功;
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeperll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3

2.安裝 confluent-kafka-go 庫

  1. 查看你的go工作目錄
echo $GOPATH
  1. GOPATH目錄下的src目錄下新建 produce 項目
mkdir src/produce
cd src/produce
  1. 在你的項目目錄中運行 go mod init 命令來初始化一個新的 Go 模塊
go mod init produce
  1. 安裝 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka

3.創建生產者

  1. 新建文件 producer.go
touch producer.go
  1. 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創建生產者實例broker := "localhost:9091" // 集群地址topic := "test"            // 主題名稱producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 創建生產者實例// 檢查錯誤if err != nil {log.Fatalf("Failed to create producer: %s", err)}defer producer.Close()fmt.Printf("Created Producer %v\n", producer)// 生產消息message := "hello kafka"for i := 0; i < 10; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任題名稱Value:          []byte(message + fmt.Sprintf("%d", i)),                             // 消息內容}, nil)}if err != nil {log.Fatalf("Failed to produce message: %v", err)}// 等待消息發送完成e := <-producer.Events() // 阻塞直到消息發送完成switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {log.Printf("Failed to deliver message: %v", ev.TopicPartition)} else {fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)}}// 沖刷緩沖區消息producer.Flush(15 * 1000)
}

代碼說明

  1. 創建生產者時需要指定集群地址以及主題信息,如果沒有該主題則自動創建
  2. 生產者會異步地將消息發送到 Kafka,因此你需要處理交付報告以確保消息成功發送。

我們需要了解一下Go語言和Kafka之間的關系:Go是一種靜態類型、編譯型的編程語言,由Google開發并開源。它適用于構建高性能服務器端應用程序和網絡服務。而Apache Kafka是一個分布式流處理平臺,主要面向大規模數據傳輸和存儲。

在這個例子中,我們有一個生產者程序,它使用Kafka的客戶端庫來連接到Kafka集群,然后通過創建一個生產者實例來開始發送消息。當生產者準備好要發送的消息時,它就會調用Send()方法將其添加到緩沖區中。一旦緩沖區滿了或者用戶主動觸發了Flush()方法,生產者就會把緩沖區里的所有消息一起發送給Kafka集群。

  1. 編譯運行,生產者發送消息
go build producer.go 
./producer 
Created Producer rdkafka#producer-1
Delivered message: hello kafka0 to test[0]@0
  1. 查看消息
ll cluster/broker-data/broker-1
total 20
-rw-r--r-- 1 root root    0 May 27 10:20 cleaner-offset-checkpoint
-rw-r--r-- 1 root root    4 May 27 11:36 log-start-offset-checkpoint
-rw-r--r-- 1 root root   88 May 27 10:20 meta.properties
-rw-r--r-- 1 root root   13 May 27 11:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   14 May 27 11:36 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我們創建的主題 數字代表分區號ll cluster/broker-data/broker-1/test-0/
total 12
-rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
-rw-r--r-- 1 root root      251 May 27 11:21 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 May 27 11:21 leader-epoch-checkpoint
-rw-r--r-- 1 root root       43 May 27 11:21 partition.metadata

特殊文件說明

Kafka 的數據文件存儲在每個分區的目錄中,這些文件包括 .index.log.timeindexleader-epoch-checkpointpartition.metadata 文件。每個文件都有其特定的用途,下面是對這些文件的詳細解釋:

  1. .log 文件

    • 用途:存儲實際的消息數據。
    • 描述:這是 Kafka 中最重要的文件,包含了生產者發送到 Kafka 的消息。每個 .log 文件代表一個日志段(log segment),文件名通常是該段的起始偏移量(offset)。
  2. .index 文件

    • 用途:存儲消息偏移量到物理文件位置的映射。
    • 描述:這個文件是一個稀疏索引,允許 Kafka 快速查找特定偏移量的消息。通過這個索引,Kafka 可以避免從頭開始掃描整個日志文件,從而提高查找效率。
  3. .timeindex 文件

    • 用途:存儲消息時間戳到物理文件位置的映射。
    • 描述:這個文件允許 Kafka 根據時間戳快速查找消息。它是一個稀疏索引,類似于 .index 文件,但索引的是時間戳而不是偏移量。
  4. leader-epoch-checkpoint 文件

    • 用途:記錄分區的領導者紀元(leader epoch)信息。
    • 描述:這個文件包含了每個紀元的起始偏移量。領導者紀元是 Kafka 用來跟蹤分區領導者變化的機制。每次分區領導者發生變化時,紀元號會增加。這個文件幫助 Kafka 在領導者變更時進行數據恢復和一致性檢查。
  5. partition.metadata 文件

    • 用途:存儲分區的元數據信息。
    • 描述:這個文件包含了分區的一些基本信息,如分區的版本號等。它幫助 Kafka 管理和維護分區的元數據。

這些文件共同作用,確保 Kafka 能夠高效、可靠地存儲和檢索消息數據。

如何查看.log文件內容

  • 執行指令
 ~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
Dumping ./00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
| offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
| offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
| offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
| offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
| offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
| offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
| offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
| offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
| offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
| offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9

如上我們可以看到消息已經成功的發送。

4.創建消費者

  1. 創建消費者項目
mkdir src/consume
cd src/consume
  1. 在你的項目目錄中運行 go mod init 命令來初始化一個新的 Go 模塊
go mod init consume
  1. 安裝 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka
  1. 新建文件
touch consumer.go
  1. 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創建消費者實例broker := "localhost:9091" // 集群地址topic := "test"            // 主題名稱c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker,     // 集群地址"group.id":          "my-group", // 消費者組"auto.offset.reset": "earliest", // 設置偏移量 從頭開始消費})// 檢查錯誤if err != nil {log.Printf("Failed to create consumer: %s\n", err)}defer c.Close()// 描述訂閱主題c.SubscribeTopics([]string{topic}, nil)fmt.Printf("Consuming topic %s\n", topic)// 消費消息for {msg, err := c.ReadMessage(-1) // 阻塞直到消息到達if err == nil {fmt.Printf("Consumed message: %s\n", msg.Value)} else {// 消費者錯誤fmt.Printf("Consumer error: %v (%v)\n", err, msg)}}
}
  1. 編譯并運行
go build consumer.go 
./consumer 
Consuming topic test
Consumed message: hello kafka0
Consumed message: hello kafka1
Consumed message: hello kafka2
Consumed message: hello kafka3
Consumed message: hello kafka4
Consumed message: hello kafka5
Consumed message: hello kafka6
Consumed message: hello kafka7
Consumed message: hello kafka8
Consumed message: hello kafka9

可以看到已經成功的消費剛才生產的消息。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/20208.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/20208.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/20208.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Uniapp小程序】自定義導航欄uni-nav-bar滾動漸變色

效果圖 新建activityScrollTop.js作為mixins export default {data() {return {navBgColor: "rgba(0,0,0,0)", // 初始背景顏色為完全透明navTextColor: "rgba(0,0,0,1)", // 初始文字顏色};},onPageScroll(e) {// 設置背景const newAlpha Math.min((e.s…

踩坑:6年后為何不用GraphQL了?

GraphQL 是一項令人難以置信的技術&#xff0c;自從我在 2018 年首次開始將其投入生產以來&#xff0c;它就吸引了很多人的注意力。 在一大堆無類型的 JSON REST API 上構建了許多 React SPA 之后&#xff0c;我發現 GraphQL 是一股清新的空氣。 然而&#xff0c;隨著時間的推…

mybatis用map接收返回對象,不想讓數據類型為tinyint自動轉換為boolean,如何處理

在 MyBatis 中&#xff0c;當使用 Map 來接收查詢結果時&#xff0c;MyBatis 會根據列的數據類型自動選擇合適的 Java 類型來映射這些值。默認情況下&#xff0c;如果數據庫列是 TINYINT(1)&#xff0c;MyBatis 可能會錯誤地將其映射為 boolean&#xff0c;因為它經常被誤解為只…

PPP認證兩種:PAP和CHAP,兩次握手和三次握手

CHAP&#xff08;Challenge-Handshake Authentication Protocol&#xff0c;質詢握手認證協議&#xff09;的設計理念是增強網絡認證過程的安全性。在CHAP的三次握手過程中&#xff0c;不直接傳送用戶的明文密碼&#xff0c;以此來提高安全性&#xff0c;具體步驟如下&#xff…

開源大模型源代碼

開源大模型的源代碼可以在多個平臺上找到&#xff0c;以下是一些知名的開源大模型及其源代碼的獲取方式&#xff1a; 1. **艾倫人工智能研究所的開放大語言模型&#xff08;Open Language Model&#xff0c;OLMo&#xff09;**&#xff1a; - 提供了完整的模型權重、訓練代…

springboot結合mybatis使用多數據源的方式

背景 最近有一個需求&#xff0c;有兩個庫需要做同步數據&#xff0c;一個Doris庫&#xff0c;一個mysql庫&#xff0c;兩邊的表結構一致&#xff0c;這里不能使用navicat等工具提供的數據傳輸之類的功能&#xff0c;只能使用代碼做同步&#xff0c;springboot配置多數據…

如何設置手機的DNS

DNS 服務器 IP 地址 蘋果 華為 小米 OPPO VIVO DNS 服務器 IP 地址 中國大陸部分地區會被運營商屏蔽網絡導致無法訪問&#xff0c;可修改手機DNS解決。 推薦 阿里的DNS (223.5.5.5&#xff09;或 114 (114.114.114.114和114.114.115.115) 更多公開DNS參考&#xff1a; 蘋果…

ESP32-C3模組上實現藍牙BLE配網功能(1)

本文內容參考&#xff1a; 《ESP32-C3 物聯網工程開發實戰》 樂鑫科技 藍牙的名字由來是怎樣的&#xff1f;為什么不叫它“白牙”&#xff1f; 特此致謝&#xff01; 一、藍牙知識基礎 1. 什么是藍牙&#xff1f; &#xff08;1&#xff09;簡介 藍牙技術是一種無線數據和…

【緩存】OS層面緩存設計機制

操作系統的緩存設計機制是計算機體系結構中的一個重要組成部分&#xff0c;旨在提高系統的性能&#xff0c;特別是通過減少對慢速存儲設備&#xff08;如硬盤&#xff09;的訪問次數來加速數據的讀取和寫入。 以下是一些常見的操作系統緩存設計機制&#xff1a; CPU緩存&…

web學習筆記(六十一)

目錄 如何使用公共組件來編寫頁面 如何使用公共組件來編寫頁面 1.導入公共組件nav.vue import Catenav from "/components/nav.vue"; 2.在頁面插入子組件 如果使用了setup語法糖此時就可以直接在頁面插入 <Catenav ></Catenav>標簽&#xff0c; …

.NET 快速重構概要1

1.封裝集合 在某些場景中,向類的使用者隱藏類中的完整集合是一個很好的做法,比如對集合的 add/remove 操作中包 含其他的相關邏輯時。因此,以可迭代但不直接在集合上進行操作的方式來暴露集合,是個不錯的主意。 public class Order { private int _orderTotal; private Li…

Camunda BPM架構

Camunda BPM既可以單獨作為流程引擎服務存在,也能嵌入到其他java應用中。Camunda BPM的核心流程引擎是一個輕量級的模塊,可以被Spring管理或者加入到自定義的編程模型中,并且支持線程模型。 1,流程引擎架構 流程引擎由多個組件構成,如下所示: API服務 API服務,允許ja…

邏輯回歸分類算法

文章目錄 算法推導 線性回歸解決連續值的回歸預測&#xff1b;而邏輯回歸解決離散值的分類預測&#xff1b; 算法推導 邏輯回歸可以看作是兩部分&#xff0c;以0、1分類問題說明&#xff1b; 線性回歸部分 對于一個樣本 x i x_i xi?&#xff0c;有n個特征 x i ( 1 ) x_i^{(1)…

蒙自源兒童餐新品上市,引領健康美味新潮流

隨著夏日的熱烈與兒童節的歡樂氛圍到來&#xff0c;蒙自源品牌隆重推出兒童餐新品&#xff0c;以“快樂不分大小&#xff0c;誰還不是個寶寶”為主題&#xff0c;為廣大消費者帶來一場健康與美味的盛宴。新品上市活動將于5月25日舉行&#xff0c;蒙自源將以其獨特的產品魅力和創…

install

目錄 1、 install 1.1、 //creates form with validation 1.2、 onStepChanging: function (event, currentIndex, newIndex) { 1.3、 onFinishing: function (event, currentIndex) { 1.4、 //init inst

最新 HUAWEI DevEco Studio 調試技巧

最新 HUAWEI DevEco Studio 調試技巧 前言 在我們使用 HUAWEI DevEco Studio 編輯器開發鴻蒙應用時&#xff0c;免不了要對我們的應用程序進行代碼調試。我們根據實際情況&#xff0c;一般會用到以下三種方式進行代碼調試。 肉眼調試法注釋排錯調試法控制臺輸出法彈出提示法斷…

【算法實戰】每日一題:將某個序列中內的每個元素都設為相同的值的最短次數(差分數組解法,附概念理解以及實戰操作)

題目 將某個序列中內的每個元素都設為相同的值的最短次數 1.差分數組&#xff08;后面的減去前面的值存儲的位置可以理解為中間&#xff09; 差分數組用于處理序列中的區間更新和查詢問題。它存儲序列中相鄰元素之間的差值&#xff0c;而不是直接存儲每個元素的值 怎么對某…

STM32 入門教程(江科大教材)#筆記2

3-4按鍵控制LED /** LED.c**/ #include "stm32f10x.h" // Device headervoid LED_Init(void) {/*開啟時鐘*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE); //開啟GPIOA的時鐘/*GPIO初始化*/GPIO_InitTypeDef GPIO_InitStructure;GPIO_I…

關系數據庫:關系運算

文章目錄 關系運算并&#xff08;Union&#xff09;差&#xff08;Difference&#xff09;交&#xff08;Intersection&#xff09;笛卡爾積&#xff08;Extended Cartesian Product&#xff09;投影&#xff08;projection&#xff09;選擇&#xff08;Selection&#xff09;除…

微信小程序中應用van-calendar時加載時間過長,以及設置min-data無效的問題解決

一、我們微信小程序中應用van-calendar時&#xff0c;如果沒有設置min-data&#xff0c;那么頁面的加載時間會非常長&#xff0c;所以&#xff0c;一定一定要配置min-data&#xff1b; 二、vue中min-data的寫法是:min-data“new Date(2023, 0, 1)”&#xff0c;而在小程序中的寫…