文章目錄
- 0.前置說明
- 1. confluent-kafka-go
- 2. sarama
- 3. segmentio/kafka-go
- 4. franz-go
- 選擇建議
- 1.啟動 kafka 集群
- 2.安裝 confluent-kafka-go 庫
- 3.創建生產者
- 特殊文件說明
- 如何查看.log文件內容
- 4.創建消費者
0.前置說明
Go 語言中有一些流行的 Kafka 客戶端庫。以下是幾個常用的庫及其優劣與區別:
1. confluent-kafka-go
-
優點:
- 高性能:基于
librdkafka
,性能非常高。 - 功能全面:支持 Kafka 的所有高級功能,如事務、壓縮、認證等。
- 社區支持:由 Confluent 維護,社區活躍,文檔豐富。
- 穩定性:廣泛使用于生產環境,經過大量測試和驗證。
- 高性能:基于
-
缺點:
- 依賴性:依賴于
librdkafka
,需要額外安裝該庫。 - 復雜性:配置和使用相對復雜,特別是對于新手。
- 依賴性:依賴于
2. sarama
-
優點:
- 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
- 社區活躍:由 Shopify 維護,社區支持良好,文檔齊全。
- 靈活性:提供了豐富的配置選項,適用于各種使用場景。
-
缺點:
- 性能:相對于
confluent-kafka-go
,性能稍遜一籌。 - 功能:不支持 Kafka 的一些高級功能,如事務。
- 性能:相對于
3. segmentio/kafka-go
-
優點:
- 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
- 簡潔易用:API 設計簡潔,易于上手。
- 靈活性:支持多種配置選項,適用于各種使用場景。
-
缺點:
- 性能:相對于
confluent-kafka-go
,性能稍遜一籌。 - 功能:不支持 Kafka 的一些高級功能,如事務。
- 性能:相對于
4. franz-go
-
優點:
- 純 Go 實現:不依賴于任何 C 庫,安裝和使用非常方便。
- 高性能:在純 Go 實現中性能較為優越。
- 功能全面:支持 Kafka 的大部分功能,包括事務。
-
缺點:
- 社區支持:相對于
sarama
和confluent-kafka-go
,社區支持稍弱。 - 文檔:文檔相對較少,需要更多的社區貢獻。
- 社區支持:相對于
選擇建議
- 高性能和高級功能需求:如果你需要高性能和 Kafka 的高級功能(如事務、壓縮、認證等),
confluent-kafka-go
是一個不錯的選擇。 - 純 Go 實現和易用性:如果你更傾向于使用純 Go 實現的庫,并且希望安裝和使用更加簡便,可以選擇
sarama
或segmentio/kafka-go
。 - 平衡性能和功能:如果你希望在純 Go 實現中獲得較好的性能和功能支持,可以考慮
franz-go
。
本文我們就以confluent-kafka-go
庫為例來編寫代碼。
1.啟動 kafka 集群
不知道如何搭建集群請點擊這里 ----》Kafka 集群部署(CentOS 單機模擬版)
如果你懶得啟動集群,那么直接跳過。
- 在
cluster
目錄下運行集群啟動腳本cluster.sh
;
cd cluster
./cluster.sh
- 檢查是否啟動成功;
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeperll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3
2.安裝 confluent-kafka-go 庫
- 查看你的
go工作目錄
echo $GOPATH
- 在
GOPATH
目錄下的src
目錄下新建produce
項目
mkdir src/produce
cd src/produce
- 在你的項目目錄中運行
go mod init
命令來初始化一個新的Go 模塊
go mod init produce
- 安裝
confluent-kafka-go
庫
go get github.com/confluentinc/confluent-kafka-go/kafka
3.創建生產者
- 新建文件
producer.go
touch producer.go
- 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創建生產者實例broker := "localhost:9091" // 集群地址topic := "test" // 主題名稱producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 創建生產者實例// 檢查錯誤if err != nil {log.Fatalf("Failed to create producer: %s", err)}defer producer.Close()fmt.Printf("Created Producer %v\n", producer)// 生產消息message := "hello kafka"for i := 0; i < 10; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任題名稱Value: []byte(message + fmt.Sprintf("%d", i)), // 消息內容}, nil)}if err != nil {log.Fatalf("Failed to produce message: %v", err)}// 等待消息發送完成e := <-producer.Events() // 阻塞直到消息發送完成switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {log.Printf("Failed to deliver message: %v", ev.TopicPartition)} else {fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)}}// 沖刷緩沖區消息producer.Flush(15 * 1000)
}
代碼說明
- 創建生產者時需要指定
集群地址
以及主題信息
,如果沒有該主題則自動創建
。 - 生產者會
異步
地將消息發送到 Kafka,因此你需要處理交付報告以確保消息成功發送。
我們需要了解一下Go語言和Kafka之間的關系:Go是一種靜態類型、編譯型的編程語言,由Google開發并開源。它適用于構建高性能服務器端應用程序和網絡服務。而Apache Kafka是一個分布式流處理平臺,主要面向大規模數據傳輸和存儲。
在這個例子中,我們有一個生產者程序,它使用Kafka的客戶端庫來連接到Kafka集群,然后通過創建一個生產者實例來開始發送消息。當生產者準備好要發送的消息時,它就會調用Send()
方法將其添加到緩沖區中。一旦緩沖區滿了或者用戶主動觸發了Flush()
方法,生產者就會把緩沖區里的所有消息一起發送給Kafka集群。
- 編譯運行,生產者發送消息
go build producer.go
./producer
Created Producer rdkafka#producer-1
Delivered message: hello kafka0 to test[0]@0
- 查看消息
ll cluster/broker-data/broker-1
total 20
-rw-r--r-- 1 root root 0 May 27 10:20 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 May 27 11:36 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 May 27 10:20 meta.properties
-rw-r--r-- 1 root root 13 May 27 11:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 14 May 27 11:36 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我們創建的主題 數字代表分區號ll cluster/broker-data/broker-1/test-0/
total 12
-rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
-rw-r--r-- 1 root root 251 May 27 11:21 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 27 11:21 leader-epoch-checkpoint
-rw-r--r-- 1 root root 43 May 27 11:21 partition.metadata
特殊文件說明
Kafka 的數據文件存儲在每個分區的目錄中,這些文件包括 .index
、.log
、.timeindex
、leader-epoch-checkpoint
和 partition.metadata
文件。每個文件都有其特定的用途,下面是對這些文件的詳細解釋:
-
.log
文件:- 用途:存儲實際的消息數據。
- 描述:這是 Kafka 中最重要的文件,包含了生產者發送到 Kafka 的消息。每個
.log
文件代表一個日志段(log segment),文件名通常是該段的起始偏移量(offset)。
-
.index
文件:- 用途:存儲消息偏移量到物理文件位置的映射。
- 描述:這個文件是一個稀疏索引,允許 Kafka 快速查找特定偏移量的消息。通過這個索引,Kafka 可以避免從頭開始掃描整個日志文件,從而提高查找效率。
-
.timeindex
文件:- 用途:存儲消息時間戳到物理文件位置的映射。
- 描述:這個文件允許 Kafka 根據時間戳快速查找消息。它是一個稀疏索引,類似于
.index
文件,但索引的是時間戳而不是偏移量。
-
leader-epoch-checkpoint
文件:- 用途:記錄分區的領導者紀元(leader epoch)信息。
- 描述:這個文件包含了每個紀元的起始偏移量。領導者紀元是 Kafka 用來跟蹤分區領導者變化的機制。每次分區領導者發生變化時,紀元號會增加。這個文件幫助 Kafka 在領導者變更時進行數據恢復和一致性檢查。
-
partition.metadata
文件:- 用途:存儲分區的元數據信息。
- 描述:這個文件包含了分區的一些基本信息,如分區的版本號等。它幫助 Kafka 管理和維護分區的元數據。
這些文件共同作用,確保 Kafka 能夠高效、可靠地存儲和檢索消息數據。
如何查看.log文件內容
- 執行指令
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
Dumping ./00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
| offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
| offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
| offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
| offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
| offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
| offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
| offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
| offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
| offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
| offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9
如上我們可以看到消息已經成功的發送。
4.創建消費者
- 創建消費者項目
mkdir src/consume
cd src/consume
- 在你的項目目錄中運行
go mod init
命令來初始化一個新的Go 模塊
go mod init consume
- 安裝
confluent-kafka-go
庫
go get github.com/confluentinc/confluent-kafka-go/kafka
- 新建文件
touch consumer.go
- 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創建消費者實例broker := "localhost:9091" // 集群地址topic := "test" // 主題名稱c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker, // 集群地址"group.id": "my-group", // 消費者組"auto.offset.reset": "earliest", // 設置偏移量 從頭開始消費})// 檢查錯誤if err != nil {log.Printf("Failed to create consumer: %s\n", err)}defer c.Close()// 描述訂閱主題c.SubscribeTopics([]string{topic}, nil)fmt.Printf("Consuming topic %s\n", topic)// 消費消息for {msg, err := c.ReadMessage(-1) // 阻塞直到消息到達if err == nil {fmt.Printf("Consumed message: %s\n", msg.Value)} else {// 消費者錯誤fmt.Printf("Consumer error: %v (%v)\n", err, msg)}}
}
- 編譯并運行
go build consumer.go
./consumer
Consuming topic test
Consumed message: hello kafka0
Consumed message: hello kafka1
Consumed message: hello kafka2
Consumed message: hello kafka3
Consumed message: hello kafka4
Consumed message: hello kafka5
Consumed message: hello kafka6
Consumed message: hello kafka7
Consumed message: hello kafka8
Consumed message: hello kafka9
可以看到已經成功的消費剛才生產的消息。