使用 Go 語言實現完整且輕量級高性能的 MQTT Broker

MQTT(Message Queuing Telemetry Transport)是一種輕量級的發布/訂閱消息傳輸協議。但是目前雖然mqtt的客戶端很多,但是服務端著實不多,常見的服務端如mosquitto或emqx。但是golang語言的實現幾乎找不到。golang的輕量級部署和高并發高性能,很合適做mqtt Broker。本文將詳細介紹如何使用 Go 語言實現一個簡單輕量級且高性能的 MQTT Broker,并涵蓋MQTT3.1.1協議的核心特性和完整功能。

1. 需求分析

本文選擇golang語言實現一個完整的 MQTT 3.1.1 Broker,不涉及集群支持和協議版本檢測。簡單且輕量級,不但可以替代mosquitto,后續還可以靈活的做擴展,如增加webUI的管理界面。且部署也很簡單,一個exe可執行文件。

完整項目開源地址:https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

可執行文件在release目錄下。

1.1 實現效果截圖

服務啟動:
在這里插入圖片描述
客戶端發布:
在這里插入圖片描述
客戶端訂閱:
在這里插入圖片描述
使用mosquitto客戶端測試效果:

在這里插入圖片描述
在這里插入圖片描述
優化增加基于redis的持久化存儲:
可在etc/config.yaml文件中配置是否啟用redis的持久化。默認基于內存。
在這里插入圖片描述
windows下的可執行文件僅有7M左右大小,簡單小巧。且代碼開源方便定制。可以作為替代mosquitto的另外一種選擇。
在這里插入圖片描述

1.2 功能特性
1.2.1核心功能
  • ? 完整的 MQTT 3.1.1 協議支持
  • ? QoS 0, 1, 2 消息傳遞保證
  • ? 會話管理(持久會話和清理會話)
  • ? 保留消息(Retained Messages)
  • ? 遺囑消息(Last Will and Testament)
  • ? 主題通配符(+ 和 # 通配符支持)
  • ? 客戶端認證(用戶名/密碼)
  • ? 保活機制(Keep Alive)
  • ? 并發安全
1.2.2 架構特性
  • 🏗? 模塊化設計,易于擴展
  • 🔌 可插拔存儲接口
  • 🔒 線程安全的并發處理
  • 📊 內置監控指標
  • 🐳 Docker 支持

2. 項目架構設計

架構設計

數據流

客戶端連接 → TCP Server 接受連接
協議解析 → Client 解析 MQTT 數據包
認證驗證 → Auth 模塊驗證用戶憑據
會話管理 → Storage 加載/保存會話信息
消息路由 → Broker 根據訂閱關系路由消息
主題匹配 → Topic Manager 處理通配符匹配

2.1 目錄結構
mqtt-broker/
├── README.md
├── Makefile
├── Dockerfile
├── go.mod
├── go.sum
├── cmd/
│   ├── broker/
│   │   └── main.go
│   └── test-client/
│       └── main.go
├── internal/
│   ├── auth/
│   │   └── auth.go
│   ├── broker/
│   │   ├── broker.go
│   │   ├── client.go
│   │   └── topic.go
│   ├── protocol/
│   │   ├── common/
│   │   │   └── types.go
│   │   └── mqtt311/
│   │       └── packet.go
│   └── storage/
│       ├── interface.go
│       └── memory/
│           └── store.go
└── pkg/└── mqtt/└── packet.go
2.2 主要模塊
  • cmd/broker/main.go:程序入口。
  • internal/broker/:Broker 核心邏輯,包括連接管理、消息路由等。
  • internal/storage/:存儲接口和內存實現。
  • pkg/mqtt/packet.go:MQTT 數據包編碼和解碼。

3. 核心實現

3.1 存儲接口

internal/storage/interface.go 文件中定義存儲接口:

package storageimport ("github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error
}type Session struct {ClientID      stringCleanSession  boolSubscriptions map[string]*common.SubscriptionPendingAcks   map[uint16]*common.MessageLastSeen      time.Time
}
3.2 內存存儲實現

internal/storage/memory/store.go 文件中實現內存存儲:

package memoryimport ("sync""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type MemoryStore struct {sessions        map[string]*storage.SessionretainedMsgs    map[string]*common.MessageclientMessages  map[string][]*common.Messagemu              sync.RWMutex
}func NewMemoryStore() *MemoryStore {return &MemoryStore{sessions:       make(map[string]*storage.Session),retainedMsgs:   make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),}
}func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] = sessionreturn nil
}func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists := m.sessions[clientID]if !exists {return nil, nil}return session, nil
}// 其他方法省略...
3.3 客戶端連接管理

internal/broker/client.go 文件中實現客戶端連接管理:

package brokerimport ("bufio""fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/pkg/mqtt"
)type Client struct {conn           net.ConnclientID       stringinfo           *common.ClientInfosession        *storage.Sessionbroker         *BrokerpacketReader   *mqtt.PacketReaderwriteChan      chan []bytecloseChan      chan struct{}keepAliveTimer *time.Timermu             sync.RWMutexconnected      boolnextPacketID   uint16pendingAcks    map[uint16]*PendingMessage
}type PendingMessage struct {Message   *common.MessageTimestamp time.TimeRetries   int
}func NewClient(conn net.Conn, broker *Broker) *Client {return &Client{conn:         conn,broker:       broker,packetReader: mqtt.NewPacketReader(conn),writeChan:    make(chan []byte, 1000),closeChan:    make(chan struct{}),pendingAcks:  make(map[uint16]*PendingMessage),nextPacketID: 1,}
}func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop()
}func (c *Client) readLoop() {defer c.Close()for {select {case <-c.closeChan:returndefault:packet, err := c.packetReader.ReadPacket()if err != nil {fmt.Printf("Read packet error: %v\n", err)return}c.handlePacket(packet)}}
}func (c *Client) writeLoop() {defer c.Close()for {select {case data := <-c.writeChan:if _, err := c.conn.Write(data); err != nil {fmt.Printf("Write error: %v\n", err)return}case <-c.closeChan:return}}
}func (c *Client) retryLoop() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:c.retryPendingMessages()case <-c.closeChan:return}}
}func (c *Client) handlePacket(packet common.Packet) {switch p := packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()}
}// handleConnect, handlePublish 等其他方法省略...
3.4 主 Broker 實現

internal/broker/broker.go 文件中實現主 Broker 的邏輯:

package brokerimport ("fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/storage"
)type Broker struct {listener      net.Listenerclients       map[string]*ClienttopicManager  *TopicManagerstore         storage.Storeauth          auth.Authenticatormu            sync.RWMutexrunning       boolconfig        *Config
}type Config struct {MaxConnections   intMaxMessageSize   intRetainedMsgLimit intSessionExpiry    time.DurationMessageExpiry    time.Duration
}func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return &Broker{clients:      make(map[string]*Client),topicManager: NewTopicManager(),store:        store,auth:         authenticator,config: &Config{MaxConnections:   10000,MaxMessageSize:   1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry:    24 * time.Hour,MessageExpiry:    24 * time.Hour,},}
}func (b *Broker) Start(address string) error {listener, err := net.Listen("tcp", address)if err != nil {return err}b.listener = listenerb.running = truefmt.Printf("MQTT Broker started on %s\n", address)for b.running {conn, err := listener.Accept()if err != nil {if b.running {fmt.Printf("Accept error: %v\n", err)}continue}client := NewClient(conn, b)go client.Start()}return nil
}func (b *Broker) Stop() {b.running = falseif b.listener != nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client := range b.clients {client.Close()}
}// AddClient, RemoveClient, PublishMessage 等其他方法省略...
3.5 主程序入口

cmd/broker/main.go 文件中定義主程序入口:

package mainimport ("flag""fmt""log""os""os/signal""syscall""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/broker""github.com/yangyongzhen/mqtt-broker/internal/storage/memory"
)func main() {addr := flag.String("addr", ":1883", "MQTT broker address")flag.Parse()authenticator := auth.NewSimpleAuthenticator() // 示例認證器,需要自行實現store := memory.NewMemoryStore()b := broker.NewBroker(store, authenticator)go func() {if err := b.Start(*addr); err != nil {log.Fatalf("Failed to start MQTT broker: %v", err)}}()sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanb.Stop()fmt.Println("MQTT broker stopped")
}

以上代碼是實現一個簡單的 MQTT Broker 的基礎框架,更多詳細功能和性能優化可以根據實際需求進行擴展和改進。

安裝和運行

  1. 克隆項目
git clone <your-repo-url>
cd mqtt-brokergo mod tidy

安裝依賴

go mod tidy

構建項目

make build
或者
go build -o bin/mqtt-broker cmd/broker/main.go
運行 Broker
make run
或者
./bin/mqtt-broker -addr=:1883 -debug

使用 Docker

構建鏡像
docker build -t mqtt-broker .
#### 運行容器
docker run -p 1883:1883 mqtt-broker
#### 使用示例**啟動 Broker**
#### 默認端口 1883
go run cmd/broker/main.go
##### 自定義端口和調試模式
go run cmd/broker/main.go -addr=:1883 -debug

測試客戶端

項目包含一個簡單的測試客戶端,可以用來測試 broker 功能:

訂閱消息:

go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1

發布消息:

go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1

使用第三方客戶端

你也可以使用任何標準的 MQTT 客戶端連接到 broker:

使用 mosquitto 客戶端:

訂閱
mosquitto_sub -h localhost -p 1883 -t "test/topic"
發布
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"

使用認證:

默認用戶:

admin/password, test/test123
mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message”

配置說明

命令行參數
參數 默認值 說明
-addr :1883 Broker 監聽地址
-debug false 啟用調試日志

內置用戶

Broker 默認創建了以下測試用戶:

用戶名 密碼
admin password
test test123

項目開源地址:

https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

作者

作者csdn貓哥,轉載請注明出處: https://blog.csdn.net/yyz_1987

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906758.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906758.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906758.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

uv sync --frozen卡住不動

今天受邀幫同事調試uv卡住不動的問題&#xff0c;同樣的代碼已經在別的服務器跑起來了&#xff0c;換了一臺服務器之后&#xff0c;執行uv sync --frozen沒有按預期創建虛擬環境和安裝依賴。 1. 鏡像源是已經配置好的&#xff0c;pip install也能很快安裝包。 2. 查看了uv.lo…

Spring Boot中如何對密碼等敏感信息進行脫敏處理

以下是常見的脫敏方法及實現步驟&#xff0c;涵蓋配置、日志和API響應等多個層面&#xff1a; ?1. 配置文件敏感信息脫敏? (1) 使用加密庫&#xff08;如Jasypt&#xff09; ?步驟?&#xff1a; 添加依賴&#xff1a; <dependency><groupId>com.github.ulise…

springboot中redis的事務的研究

redis的事務類似于隊列操作&#xff0c;執行過程分為三步&#xff1a; 開啟事務入隊操作執行事務 使用到的幾個命令如下&#xff1a; 命令說明multi開啟一個事務exec事務提交discard事務回滾watch監聽key(s)&#xff1a;當監聽一個key(s)時&#xff0c;如果在本次事務提交之…

python打卡day35@浙大疏錦行

知識點回顧&#xff1a; 三種不同的模型可視化方法&#xff1a;推薦torchinfo打印summary權重分布可視化進度條功能&#xff1a;手動和自動寫法&#xff0c;讓打印結果更加美觀推理的寫法&#xff1a;評估模式 作業&#xff1a;調整模型定義時的超參數&#xff0c;對比下效果。…

Python爬蟲實戰:研究Crawley 框架相關技術

1. Crawley 框架相關定義 1.1 網絡爬蟲定義 網絡爬蟲是一種按照一定的規則,自動地抓取萬維網信息的程序或者腳本。它通過 HTTP 協議與 Web 服務器進行交互,獲取網頁內容并進行解析處理,是數據采集和信息檢索的重要工具。 1.2 Crawley 框架定義 Crawley 是一個基于 Pytho…

tvalid寄存器的理解

if(!out_axis_tvalid_reg || m_axis_tready ) beginend m_axis_tready 是上拍下一級給的ready信號 out_axis_tvalid_reg是上一拍&#xff0c;本級給下級的valid信號 一共有四種組合&#xff0c;然后可以通過這個if語句&#xff0c;在接下來的begin ... end中&#xff0c;用來…

【AI實戰】從“苦AI”到“爽AI”:Magentic-UI 把“人類-多智能體協作”玩明白了!

Hello&#xff0c;親愛的小伙伴們&#xff01;你是否曾經在深夜里&#xff0c;為了自動化點外賣、篩機票、抓網頁數據焦頭爛額&#xff1f;有沒有幻想過哪天能出個“貼心AI管家”&#xff0c;一鍵點菜、搞定事務、自動操作網頁&#xff0c;比你還懂你&#xff1f;更關鍵——還讓…

【東楓科技】usrp rfnoc 開發環境搭建

作者 太原市東楓電子科技有限公司 &#xff0c;代理銷售 USRP&#xff0c;Nvidia&#xff0c;等產品與技術支持&#xff0c;培訓服務。 環境 Ubuntu 20.04 依賴包 sudo apt-get updatesudo apt-get install autoconf automake build-essential ccache cmake cpufrequtils …

Ntfs!ReadIndexBuffer函數分析之根目錄讀取索引緩沖區的一個例子

Ntfs!ReadIndexBuffer函數分析之根目錄讀取索引緩沖區的一個例子 第一部分&#xff1a; 0: kd> p Ntfs!ReadIndexBuffer0xdc: f7173962 e829f60300 call Ntfs!NtfsCheckIndexBuffer (f71b2f90) 0: kd> t Ntfs!NtfsCheckIndexBuffer: f71b2f90 55 p…

LumaDot (亮度可調的屏幕圓點)

應用名稱 LumaDot &#xff08;源自 “Luminance”&#xff08;亮度&#xff09; “Dot”&#xff08;圓點&#xff09;&#xff0c;強調其核心功能&#xff1a;亮度可調的屏幕圓點&#xff09; 應用說明 LumaDot 是一款輕量級 Windows 桌面工具&#xff0c;專為需要屏幕標記…

HarmonyOS 鴻蒙應用開發基礎:EventHub,優雅解決跨組件通信難題

EventHub是鴻蒙開發中用于線程內通信的事件中心模塊&#xff0c;基于發布訂閱模式實現組件間的高效通信。它完美解決了傳統回調方式在多層嵌套場景下的痛點&#xff0c;使得組件間的通信更加靈活和易于管理。 核心特性 事件中心機制&#xff1a;通過事件名進行通信&#xff0c…

前端框架token相關bug,前后端本地聯調

今天我搭建框架的時候&#xff0c;我想請求我自己的本地&#xff01;然后我自己想鏈接我自己的本地后端&#xff0c;我之前用的前端項目&#xff0c;都是鏈別人的后端&#xff0c;基本上很少情況會鏈接自己的后端&#xff01;所以我當時想的是&#xff0c;我前后端接口一樣&…

【數據結構初階】順序表專題

文章目錄 順序表1.數據結構相關概念1、什么是數據結構2、為什么需要數據結構&#xff1f; 2.順序表1、順序表的概念及結構2、順序表分類3、動態順序表的實現1.定義一個動態順序表2.順序表的初始化3.順序表的銷毀4.順序表達的尾插5.順序表的頭插6.空間大小檢查函數7.順序表的尾刪…

從神經生物學到社會心理學:游戲沉迷機制的深度解構

你是否曾在深夜放下手機時驚覺&#xff1a;"明明只想玩10分鐘&#xff0c;怎么天都亮了&#xff1f;"這不是意志力薄弱的表現&#xff0c;而是價值數十億美元的游戲產業用神經科學精心設計的認知陷阱。 當《王者榮耀》的Victory音效讓你心跳加速&#xff0c;當《原神…

15.集合框架的學習

一、簡介 集合框架&#xff08;Collection Framework&#xff09; 是 Java 提供的一套用于存儲、操作和處理數據集合的標準化架構。它主要位于 java.util 包中&#xff0c;提供了一組 接口 和 實現類&#xff0c;用于操作不同類型的數據集合&#xff0c;如列表&#xff08;List…

【方案分享】展廳智能講解:基于BLE藍牙Beacon的自動講解觸發技術實現

【方案分享】展廳智能講解&#xff1a;基于BLE藍牙Beacon的自動講解觸發技術實現 讓觀眾靠近展品即可自動彈出講解頁面&#xff0c;是智能展廳的核心功能之一。本文將從軟硬件技術、BLE Beacon原理、微信小程序實現、優劣對比與拓展方案五個維度&#xff0c;系統講解“靠近展臺…

微前端架構:從單體到模塊化的前端新革命

在信息技術&#xff08;IT&#xff09;的迅猛發展中&#xff0c;前端開發領域正迎來一場顛覆性的變革 —— 微前端架構&#xff08;Micro - Frontends&#xff09;。2025 年&#xff0c;隨著 Web 應用的復雜性激增、團隊協作需求的增長以及用戶對無縫體驗的期待&#xff0c;微前…

React中常用的鉤子函數:

一. 基礎鉤子 (1)useState 用于在函數組件中添加局部狀態。useState可以傳遞一個參數&#xff0c;做為狀態的初始值&#xff0c;返回一個數組&#xff0c;數組的第一個元素是返回的狀態變量&#xff0c;第二個是修改狀態變量的函數。 const [state, setState] useState(ini…

如何在 Windows 11 或 10 上通過 PowerShell 安裝 Docker Desktop

了解如何使用 PowerShell 或命令提示符在 Windows 11 或 10 上安裝 Docker CLI 和 Docker Desktop GUI,以創建容器運行虛擬機。無需手動訪問網站下載安裝程序,所有操作都將在命令終端完成。 Docker 是一個強大的容器化平臺,允許開發人員將應用程序及其依賴項打包為輕量級容…

Python實例題:人機對戰初體驗Python基于Pygame實現四子棋游戲

目錄 Python實例題 題目 代碼實現 實現原理 游戲邏輯&#xff1a; AI 算法&#xff1a; 界面渲染&#xff1a; 關鍵代碼解析 游戲棋盤渲染 AI 決策算法 勝利條件檢查 使用說明 安裝依賴&#xff1a; 運行游戲&#xff1a; 游戲操作&#xff1a; 擴展建議 增強…