MQTT(Message Queuing Telemetry Transport)是一種輕量級的發布/訂閱消息傳輸協議。但是目前雖然mqtt的客戶端很多,但是服務端著實不多,常見的服務端如mosquitto或emqx。但是golang語言的實現幾乎找不到。golang的輕量級部署和高并發高性能,很合適做mqtt Broker。本文將詳細介紹如何使用 Go 語言實現一個簡單輕量級且高性能的 MQTT Broker,并涵蓋MQTT3.1.1協議的核心特性和完整功能。
1. 需求分析
本文選擇golang語言實現一個完整的 MQTT 3.1.1 Broker,不涉及集群支持和協議版本檢測。簡單且輕量級,不但可以替代mosquitto,后續還可以靈活的做擴展,如增加webUI的管理界面。且部署也很簡單,一個exe可執行文件。
完整項目開源地址:https://github.com/yangyongzhen/goang-mqtt-broker
gitee: https://gitee.com/yyz116/goang-mqtt-broker
可執行文件在release目錄下。
1.1 實現效果截圖
服務啟動:
客戶端發布:
客戶端訂閱:
使用mosquitto客戶端測試效果:
優化增加基于redis的持久化存儲:
可在etc/config.yaml文件中配置是否啟用redis的持久化。默認基于內存。
windows下的可執行文件僅有7M左右大小,簡單小巧。且代碼開源方便定制。可以作為替代mosquitto的另外一種選擇。
1.2 功能特性
1.2.1核心功能
- ? 完整的 MQTT 3.1.1 協議支持
- ? QoS 0, 1, 2 消息傳遞保證
- ? 會話管理(持久會話和清理會話)
- ? 保留消息(Retained Messages)
- ? 遺囑消息(Last Will and Testament)
- ? 主題通配符(+ 和 # 通配符支持)
- ? 客戶端認證(用戶名/密碼)
- ? 保活機制(Keep Alive)
- ? 并發安全
1.2.2 架構特性
- 🏗? 模塊化設計,易于擴展
- 🔌 可插拔存儲接口
- 🔒 線程安全的并發處理
- 📊 內置監控指標
- 🐳 Docker 支持
2. 項目架構設計
架構設計
數據流
客戶端連接 → TCP Server 接受連接
協議解析 → Client 解析 MQTT 數據包
認證驗證 → Auth 模塊驗證用戶憑據
會話管理 → Storage 加載/保存會話信息
消息路由 → Broker 根據訂閱關系路由消息
主題匹配 → Topic Manager 處理通配符匹配
2.1 目錄結構
mqtt-broker/
├── README.md
├── Makefile
├── Dockerfile
├── go.mod
├── go.sum
├── cmd/
│ ├── broker/
│ │ └── main.go
│ └── test-client/
│ └── main.go
├── internal/
│ ├── auth/
│ │ └── auth.go
│ ├── broker/
│ │ ├── broker.go
│ │ ├── client.go
│ │ └── topic.go
│ ├── protocol/
│ │ ├── common/
│ │ │ └── types.go
│ │ └── mqtt311/
│ │ └── packet.go
│ └── storage/
│ ├── interface.go
│ └── memory/
│ └── store.go
└── pkg/└── mqtt/└── packet.go
2.2 主要模塊
- cmd/broker/main.go:程序入口。
- internal/broker/:Broker 核心邏輯,包括連接管理、消息路由等。
- internal/storage/:存儲接口和內存實現。
- pkg/mqtt/packet.go:MQTT 數據包編碼和解碼。
3. 核心實現
3.1 存儲接口
在 internal/storage/interface.go
文件中定義存儲接口:
package storageimport ("github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error
}type Session struct {ClientID stringCleanSession boolSubscriptions map[string]*common.SubscriptionPendingAcks map[uint16]*common.MessageLastSeen time.Time
}
3.2 內存存儲實現
在 internal/storage/memory/store.go
文件中實現內存存儲:
package memoryimport ("sync""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type MemoryStore struct {sessions map[string]*storage.SessionretainedMsgs map[string]*common.MessageclientMessages map[string][]*common.Messagemu sync.RWMutex
}func NewMemoryStore() *MemoryStore {return &MemoryStore{sessions: make(map[string]*storage.Session),retainedMsgs: make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),}
}func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] = sessionreturn nil
}func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists := m.sessions[clientID]if !exists {return nil, nil}return session, nil
}// 其他方法省略...
3.3 客戶端連接管理
在 internal/broker/client.go
文件中實現客戶端連接管理:
package brokerimport ("bufio""fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/pkg/mqtt"
)type Client struct {conn net.ConnclientID stringinfo *common.ClientInfosession *storage.Sessionbroker *BrokerpacketReader *mqtt.PacketReaderwriteChan chan []bytecloseChan chan struct{}keepAliveTimer *time.Timermu sync.RWMutexconnected boolnextPacketID uint16pendingAcks map[uint16]*PendingMessage
}type PendingMessage struct {Message *common.MessageTimestamp time.TimeRetries int
}func NewClient(conn net.Conn, broker *Broker) *Client {return &Client{conn: conn,broker: broker,packetReader: mqtt.NewPacketReader(conn),writeChan: make(chan []byte, 1000),closeChan: make(chan struct{}),pendingAcks: make(map[uint16]*PendingMessage),nextPacketID: 1,}
}func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop()
}func (c *Client) readLoop() {defer c.Close()for {select {case <-c.closeChan:returndefault:packet, err := c.packetReader.ReadPacket()if err != nil {fmt.Printf("Read packet error: %v\n", err)return}c.handlePacket(packet)}}
}func (c *Client) writeLoop() {defer c.Close()for {select {case data := <-c.writeChan:if _, err := c.conn.Write(data); err != nil {fmt.Printf("Write error: %v\n", err)return}case <-c.closeChan:return}}
}func (c *Client) retryLoop() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:c.retryPendingMessages()case <-c.closeChan:return}}
}func (c *Client) handlePacket(packet common.Packet) {switch p := packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()}
}// handleConnect, handlePublish 等其他方法省略...
3.4 主 Broker 實現
在 internal/broker/broker.go
文件中實現主 Broker 的邏輯:
package brokerimport ("fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/storage"
)type Broker struct {listener net.Listenerclients map[string]*ClienttopicManager *TopicManagerstore storage.Storeauth auth.Authenticatormu sync.RWMutexrunning boolconfig *Config
}type Config struct {MaxConnections intMaxMessageSize intRetainedMsgLimit intSessionExpiry time.DurationMessageExpiry time.Duration
}func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return &Broker{clients: make(map[string]*Client),topicManager: NewTopicManager(),store: store,auth: authenticator,config: &Config{MaxConnections: 10000,MaxMessageSize: 1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry: 24 * time.Hour,MessageExpiry: 24 * time.Hour,},}
}func (b *Broker) Start(address string) error {listener, err := net.Listen("tcp", address)if err != nil {return err}b.listener = listenerb.running = truefmt.Printf("MQTT Broker started on %s\n", address)for b.running {conn, err := listener.Accept()if err != nil {if b.running {fmt.Printf("Accept error: %v\n", err)}continue}client := NewClient(conn, b)go client.Start()}return nil
}func (b *Broker) Stop() {b.running = falseif b.listener != nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client := range b.clients {client.Close()}
}// AddClient, RemoveClient, PublishMessage 等其他方法省略...
3.5 主程序入口
在 cmd/broker/main.go
文件中定義主程序入口:
package mainimport ("flag""fmt""log""os""os/signal""syscall""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/broker""github.com/yangyongzhen/mqtt-broker/internal/storage/memory"
)func main() {addr := flag.String("addr", ":1883", "MQTT broker address")flag.Parse()authenticator := auth.NewSimpleAuthenticator() // 示例認證器,需要自行實現store := memory.NewMemoryStore()b := broker.NewBroker(store, authenticator)go func() {if err := b.Start(*addr); err != nil {log.Fatalf("Failed to start MQTT broker: %v", err)}}()sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanb.Stop()fmt.Println("MQTT broker stopped")
}
以上代碼是實現一個簡單的 MQTT Broker 的基礎框架,更多詳細功能和性能優化可以根據實際需求進行擴展和改進。
安裝和運行
- 克隆項目
git clone <your-repo-url>
cd mqtt-brokergo mod tidy
安裝依賴
go mod tidy
構建項目
make build
或者
go build -o bin/mqtt-broker cmd/broker/main.go
運行 Broker
make run
或者
./bin/mqtt-broker -addr=:1883 -debug
使用 Docker
構建鏡像
docker build -t mqtt-broker .
#### 運行容器
docker run -p 1883:1883 mqtt-broker
#### 使用示例**啟動 Broker**
#### 默認端口 1883
go run cmd/broker/main.go
##### 自定義端口和調試模式
go run cmd/broker/main.go -addr=:1883 -debug
測試客戶端
項目包含一個簡單的測試客戶端,可以用來測試 broker 功能:
訂閱消息:
go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1
發布消息:
go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1
使用第三方客戶端
你也可以使用任何標準的 MQTT 客戶端連接到 broker:
使用 mosquitto 客戶端:
訂閱
mosquitto_sub -h localhost -p 1883 -t "test/topic"
發布
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"
使用認證:
默認用戶:
admin/password, test/test123
mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message”
配置說明
命令行參數
參數 默認值 說明
-addr :1883 Broker 監聽地址
-debug false 啟用調試日志
內置用戶
Broker 默認創建了以下測試用戶:
用戶名 密碼
admin password
test test123
項目開源地址:
https://github.com/yangyongzhen/goang-mqtt-broker
gitee: https://gitee.com/yyz116/goang-mqtt-broker
作者
作者csdn貓哥,轉載請注明出處: https://blog.csdn.net/yyz_1987