用 Go 編寫一個簡單的 WebSocket 推送服務
本文中代碼可以在 github.com/alfred-zhon… 獲取。
背景
最近拿到需求要在網頁上展示報警信息。以往報警信息都是通過短信,微信和 App 推送給用戶的,現在要讓登錄用戶在網頁端也能實時接收到報警推送。
依稀記得以前工作的時候遇到過類似的需求。因為以前的瀏覽器標準比較陳舊,并且那時用 Java 較多,所以那時候解決這個問題就用了 Comet4J。具體的原理就是長輪詢,長鏈接。但現在畢竟 html5 流行開來了,IE 都被 Edge 接替了,再用以前這種技術就顯得過時。
很早以前就聽過 WebSocket 的大名,但因為那時很多用戶的瀏覽器還不支持,所以對這個技術也就是淺嘗輒止,沒有太深入研究過。現在趁著項目需要,就來稍微深入了解一下。
websocket 簡介
以往瀏覽器要獲取服務端數據,都是通過發送 HTTP 請求,然后等待服務端回應的。也就是說瀏覽器端一直是整個請求的發起者,只有它主動,才能獲取到數據。而要讓瀏覽器一側能夠獲取到服務端的實時數據,就需要不停地向服務端發起請求。雖然大多數情況下并沒有獲取到實際數據,但這大大增加了網絡壓力,對于服務端來說壓力也直線上升。
后來我們學會了使用長連接 + 長輪詢的方式。換句話說,也就是延長 HTTP 請求的存在時間,盡量保持 HTTP 連接。雖然這在一定程度上降低了不少壓力,但仍然需要不停地進行輪詢,也做不到真正的實時性。(借用一張圖)
隨著 HTML5 的到來,WebSocket 在 2011 年被定為標準(詳情請參見 RFC 6455)。
借用 《Go Web 編程》的話。WebSocket 采用了一些特殊的報頭,使得瀏覽器和服務器只需要做一個握手的動作,就可以在瀏覽器和服務器之間建立一條連接通道。且此連接會保持在活動狀態,你可以使用 JavaScript 來向連接寫入或從中接收數據,就像在使用一個常規的 TCP Socket 一樣。它解決了 Web 實時化的問題。
由于 WebSocket 是全雙工通信,所以當建立了 WebSocket 連接之后,接下來的通信就類似于傳統的 TCP 通信了。客戶端和服務端可以相互發送數據,不再有實時性的問題。
開發包的選擇
在 Go 官方的 SDK 中,并不包含對 WebSocket 的支持,所以必須使用第三方庫。
要使用 Golang 開發 WebSocket,選擇基本就在 x/net/websocket 和 gorilla/websocket 之間。《Go Web 編程》一書中的例子使用了 x/net/websocket
作為開發包,而且貌似它也更加官方且正式。而實際根據我在網上查詢得到的反饋看來,并非如此。x/net/websocket
貌似 Bug 較多,且較為不穩定,問題解決也并不及時。相比之下,gorilla/websocket
則更加優秀。
還有對于 Gorilla web toolkit 組織的貢獻,必須予以感謝。?。其下不僅有 WebSocket 的實現,也有一些其他工具。歡迎大家使用并且能夠給予反饋或貢獻。
推送服務實現
基本原理
項目初步設計如下:
server 啟動以后會注冊兩個 Handler。
- websocketHandler 用于提供瀏覽器端發送 Upgrade 請求并升級為 WebSocket 連接。
- pushHandler 用于提供外部推送端發送推送數據的請求。
瀏覽器首先連接 websocketHandler (默認地址為 ws://ip:port/ws
)升級請求為 WebSocket 連接,當連接建立之后需要發送注冊信息進行注冊。這里注冊信息中包含一個 token 信息。server 會對提供的 token 進行驗證并獲取到相應的 userId(通常來說,一個 userId 可能同時關聯許多 token),并保存維護好 token, userId 和 conn(連接)之間的關系。
推送端發送推送數據的請求到 pushHandler(默認地址為 ws://ip:port/push
),請求中包含了 userId 字段和 message 字段。server 會根據 userId 獲取到所有此時連接到該 server 的 conn,然后將 message 一一進行推送。
由于推送服務的實時性,推送的數據并沒有也不需要進行緩存。
代碼詳解
我在此處會稍微講述一下代碼的基本構成,也順便說說 Go 語言中一些常用的寫法和模式(本人也是從其他語言轉向 Go 語言,畢竟 Go 語言也相當年輕。所以有建議的話,敬請提出。)。由于 Go 語言的發明人和一些主要維護者大都來自于 C/C++ 語言,所以 Go 語言的代碼也更偏向于 C/C++ 系。
首先先看一下 Server
的結構:
// Server defines parameters for running websocket server.
type Server struct {// Address for server to listen onAddr string// Path for websocket request, default "/ws".WSPath string// Path for push message, default "/push".PushPath string// Upgrader is for upgrade connection to websocket connection using// "github.com/gorilla/websocket".//// If Upgrader is nil, default upgrader will be used. Default upgrader is// set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always// returns true.Upgrader *websocket.Upgrader// Check token if it's valid and return userID. If token is valid, userID// must be returned and ok should be true. Otherwise ok should be false.AuthToken func(token string) (userID string, ok bool)// Authorize push request. Message will be sent if it returns true,// otherwise the request will be discarded. Default nil and push request// will always be accepted.PushAuth func(r *http.Request) boolwh *websocketHandlerph *pushHandler
}
復制代碼
PS: 由于我整個項目的注釋都是用英文寫的,所以見諒了,希望不妨礙閱讀。
這里說一下 Upgrader *websocket.Upgrader
,這是 gorilla/websocket
包的對象,它用來升級 HTTP 請求。
如果一個結構體參數過多,通常不建議直接初始化,而是使用它提供的 New 方法。這里是:
// NewServer creates a new Server.
func NewServer(addr string) *Server {return &Server{Addr: addr,WSPath: serverDefaultWSPath,PushPath: serverDefaultPushPath,}
}
復制代碼
這也是 Go 語言對外提供初始化方法的一種常見用法。
然后 Server
使用 ListenAndServe
方法啟動并監聽端口,與 http
包的使用類似:
// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {b := &binder{userID2EventConnMap: make(map[string]*[]eventConn),connID2UserIDMap: make(map[string]string),}// websocket request handlerwh := websocketHandler{upgrader: defaultUpgrader,binder: b,}if s.Upgrader != nil {wh.upgrader = s.Upgrader}if s.AuthToken != nil {wh.calcUserIDFunc = s.AuthToken}s.wh = &whhttp.Handle(s.WSPath, s.wh)// push request handlerph := pushHandler{binder: b,}if s.PushAuth != nil {ph.authFunc = s.PushAuth}s.ph = &phhttp.Handle(s.PushPath, s.ph)return http.ListenAndServe(s.Addr, nil)
}
復制代碼
這里我們生成了兩個 Handler
,分別為 websocketHandler
和 pushHandler
。websocketHandler
負責與瀏覽器建立連接并傳輸數據,而 pushHandler
則處理推送端的請求。可以看到,這里兩個 Handler
都封裝了一個 binder
對象。這個 binder
用于維護 token <-> userID <-> Conn 的關系:
// binder is defined to store the relation of userID and eventConn
type binder struct {mu sync.RWMutex// map stores key: userID and value of related slice of eventConnuserID2EventConnMap map[string]*[]eventConn// map stores key: connID and value: userIDconnID2UserIDMap map[string]string
}
復制代碼
websocketHandler
具體看一下 websocketHandler
的實現。
// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {// upgrader is used to upgrade request.upgrader *websocket.Upgrader// binder stores relations about websocket connection and userID.binder *binder// calcUserIDFunc defines to calculate userID by token. The userID will// be equal to token if this function is nil.calcUserIDFunc func(token string) (userID string, ok bool)
}
復制代碼
很簡單的結構。websocketHandler
實現了 http.Handler
接口:
// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {wsConn, err := wh.upgrader.Upgrade(w, r, nil)if err != nil {return}defer wsConn.Close()// handle Websocket requestconn := NewConn(wsConn)conn.AfterReadFunc = func(messageType int, r io.Reader) {var rm RegisterMessagedecoder := json.NewDecoder(r)if err := decoder.Decode(&rm); err != nil {return}// calculate userID by tokenuserID := rm.Tokenif wh.calcUserIDFunc != nil {uID, ok := wh.calcUserIDFunc(rm.Token)if !ok {return}userID = uID}// bindwh.binder.Bind(userID, rm.Event, conn)}conn.BeforeCloseFunc = func() {// unbindwh.binder.Unbind(conn)}conn.Listen()
}
復制代碼
首先將傳入的 http.Request
轉換為 websocket.Conn
,再將其分裝為我們自定義的一個 wserver.Conn
(封裝,或者說是組合,是 Go 語言的典型用法。記住,Go 語言沒有繼承,只有組合)。然后設置了 Conn
的 AfterReadFunc
和 BeforeCloseFunc
方法,接著啟動了 conn.Listen()
。AfterReadFunc
意思是當 Conn
讀取到數據后,嘗試驗證并根據 token
計算 userID
,然乎 bind
注冊綁定。BeforeCloseFunc
則為 Conn
關閉前進行解綁操作。
pushHandler
pushHandler
則容易理解。它解析請求然后推送數據:
// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {if r.Method != http.MethodPost {w.WriteHeader(http.StatusMethodNotAllowed)return}// authorizeif s.authFunc != nil {if ok := s.authFunc(r); !ok {w.WriteHeader(http.StatusUnauthorized)return}}// read requestvar pm PushMessagedecoder := json.NewDecoder(r.Body)if err := decoder.Decode(&pm); err != nil {w.WriteHeader(http.StatusBadRequest)w.Write([]byte(ErrRequestIllegal.Error()))return}// validate the dataif pm.UserID == "" || pm.Event == "" || pm.Message == "" {w.WriteHeader(http.StatusBadRequest)w.Write([]byte(ErrRequestIllegal.Error()))return}cnt, err := s.push(pm.UserID, pm.Event, pm.Message)if err != nil {w.WriteHeader(http.StatusInternalServerError)w.Write([]byte(err.Error()))return}result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))io.Copy(w, result)
}
復制代碼
Conn
Conn
(此處指 wserver.Conn
) 為 websocket.Conn
的包裝。
// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {Conn *websocket.ConnAfterReadFunc func(messageType int, r io.Reader)BeforeCloseFunc func()once sync.Onceid stringstopCh chan struct{}
}
復制代碼
最主要的方法為 Listen()
:
// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {c.Conn.SetCloseHandler(func(code int, text string) error {if c.BeforeCloseFunc != nil {c.BeforeCloseFunc()}if err := c.Close(); err != nil {log.Println(err)}message := websocket.FormatCloseMessage(code, "")c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))return nil})// Keeps reading from Conn util get error.
ReadLoop:for {select {case <-c.stopCh:break ReadLoopdefault:messageType, r, err := c.Conn.NextReader()if err != nil {// TODO: handle read error maybebreak ReadLoop}if c.AfterReadFunc != nil {c.AfterReadFunc(messageType, r)}}}
}
復制代碼
主要設置了當 websocket 連接關閉時的處理和不停地讀取數據。
文中很難全面地描述整個代碼的運作流程,像具體閱讀代碼,請前往 github.com/alfred-zhon… 獲取。
后記
代碼我已經進行了一定的測試,也已經在正式環境中運行了一段時間。但是代碼可能仍然不夠穩定,所以在使用過程中出現問題,也實屬正常。隨意隨時歡迎大家給我提 issues 或者 PRs。
參考
- 《Go Web 編程》 --- astaxie
- Web 通信 之 長連接、長輪詢(long polling) --- hoojo
- Gorilla web toolkit