使用 SSE(Server-Sent Events) 進行 HTTP 服務器推送
這個示例是一個類似 twitter 的 web 應用程序,使用 Server-Sent Events 來支持實時刷新。
運行
docker-compose up
然后, 瀏覽 http://localhost:8080
您可以添加自己的帖子或點擊按鈕獲得隨機生成的帖子。
無論哪種方式,feeds 列表和 feed 中的帖子都應該是最新的。嘗試使用第二個瀏覽器窗口查看更新。
它是如何工作的
可以創建和更新帖子。
帖子可以包含標簽。
每個標簽都有自己的 feed,其中包含來自該標簽的所有帖子。
所有的帖子都存儲在 MySQL 中。這就是寫模型。
所有 feed 都異步更新并存儲在 MongoDB 中。這是讀模型。
為什么要使用單獨的寫和讀模型?
對于這個示例應用程序,使用多語言持久性(兩個數據庫引擎)當然有些過頭了。我們這樣做是為了展示這個技術,以及如何很容易地將它應用到 Watermill。
專用的讀模型對于具有高讀/寫比率的應用程序是一種有用的模式。所有寫操作都被原子地應用到寫模型(在我們的例子中是 MySQL)。事件處理程序異步更新讀模型(我們使用 Mongo)。
讀取模型中的數據可以按原樣使用。也可以獨立于寫模型進行擴展。
請記住,要使用此模式,應用程序中必須接受最終的一致性。而且,在大多數用例中,您可能不需要使用它。務實!
SSE Router
SSERouter
?來自 watermill-http。當創建一個新的路由器時,你需要傳遞一個上游訂閱者。來自該訂閱服務器的消息將觸發通過 HTTP 推送更新。
在本例中,我們使用 NATS 作為 Pub/Sub,但這可以是 Watermill 支持的任何 Pub/Sub。
sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)
Stream Adapters(流適配器)
要使用?SSERouter
,你需要準備一個帶有兩個方法的?StreamAdapter
。
GetResponse
?類似于標準的 HTTP 處理程序。修改現有的處理程序來匹配這個簽名應該非常容易。
Validate
?是一個額外的方法,它告訴我們是否應該為特定的?Message
?推送更新。
type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}
Validate
?示例如下所示。它檢查消息是否來自與用戶通過 HTTP 請求發送的相同的 post ID。
func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}
err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}
postID := chi.URLParam(r, "id")
return postUpdated.OriginalPost.ID == postID
}
如果你想為每條消息觸發一個更新,你可以簡單地返回?true
。
func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}
在開始?SSERouter
?之前,您需要添加帶有特定主題的處理程序。?AddHandler
?返回一個可以在任何路由庫中使用的標準 HTTP 處理程序。
postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)
// ...
r.Get("/posts/{id}", postHandler)
Event handlers(事件處理程序)
該示例使用 Watermill 進行所有異步通信,包括 SSE。
發布了以下事件:
PostCreated
將 post 添加到貼子中包含標簽的所有 feeds 中。
FeedUpdated
將更新推送到當前訪問 feed 頁面的所有客戶端。
PostUpdated
a) 對于現有標簽,帖子內容將在標簽中更新。
b) 如果添加了新的標簽,文章將被添加到標簽的 feed 中。
c) 如果標簽已刪除,則該帖子將從標簽的 feed 中刪除。
將更新推送給所有當前訪問 post 頁面的客戶端。
使用帖子中存在的標簽更新所有 feeds 中的帖子
前端 app
前端應用程序是使用 Vue.js 和 Bootstrap 構建的。
最有趣的部分是?EventSource
?的使用。
this.es = new EventSource('/api/feeds/' + this.feed)
this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);
Refs
watermill.io