服務器推送_初探 Watermill 構建 Golang 事件驅動程序,SSE 進行 HTTP 服務器推送

使用 SSE(Server-Sent Events) 進行 HTTP 服務器推送

這個示例是一個類似 twitter 的 web 應用程序,使用 Server-Sent Events 來支持實時刷新。

c969a34fd1422caef18ea7e29f89ec8b.gif

運行

docker-compose up

然后, 瀏覽 http://localhost:8080

您可以添加自己的帖子或點擊按鈕獲得隨機生成的帖子。

無論哪種方式,feeds 列表和 feed 中的帖子都應該是最新的。嘗試使用第二個瀏覽器窗口查看更新。

它是如何工作的

  • 可以創建和更新帖子。

  • 帖子可以包含標簽。

  • 每個標簽都有自己的 feed,其中包含來自該標簽的所有帖子。

  • 所有的帖子都存儲在 MySQL 中。這就是寫模型。

  • 所有 feed 都異步更新并存儲在 MongoDB 中。這是讀模型。

為什么要使用單獨的寫和讀模型?

對于這個示例應用程序,使用多語言持久性(兩個數據庫引擎)當然有些過頭了。我們這樣做是為了展示這個技術,以及如何很容易地將它應用到 Watermill。

專用的讀模型對于具有高讀/寫比率的應用程序是一種有用的模式。所有寫操作都被原子地應用到寫模型(在我們的例子中是 MySQL)。事件處理程序異步更新讀模型(我們使用 Mongo)。

讀取模型中的數據可以按原樣使用。也可以獨立于寫模型進行擴展。

請記住,要使用此模式,應用程序中必須接受最終的一致性。而且,在大多數用例中,您可能不需要使用它。務實!

1ee8f8c15bece1c8451d1b0cd17491d8.png

SSE Router

SSERouter?來自 watermill-http。當創建一個新的路由器時,你需要傳遞一個上游訂閱者。來自該訂閱服務器的消息將觸發通過 HTTP 推送更新。

在本例中,我們使用 NATS 作為 Pub/Sub,但這可以是 Watermill 支持的任何 Pub/Sub。

sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)

Stream Adapters(流適配器)

要使用?SSERouter,你需要準備一個帶有兩個方法的?StreamAdapter

GetResponse?類似于標準的 HTTP 處理程序。修改現有的處理程序來匹配這個簽名應該非常容易。

Validate?是一個額外的方法,它告訴我們是否應該為特定的?Message?推送更新。

type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}

Validate?示例如下所示。它檢查消息是否來自與用戶通過 HTTP 請求發送的相同的 post ID。

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}

err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}

postID := chi.URLParam(r, "id")

return postUpdated.OriginalPost.ID == postID
}

如果你想為每條消息觸發一個更新,你可以簡單地返回?true

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}

在開始?SSERouter?之前,您需要添加帶有特定主題的處理程序。?AddHandler?返回一個可以在任何路由庫中使用的標準 HTTP 處理程序。

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)

Event handlers(事件處理程序)

該示例使用 Watermill 進行所有異步通信,包括 SSE。

發布了以下事件:

  • PostCreated

    • 將 post 添加到貼子中包含標簽的所有 feeds 中。

  • FeedUpdated

    • 將更新推送到當前訪問 feed 頁面的所有客戶端。

  • PostUpdated

    • a) 對于現有標簽,帖子內容將在標簽中更新。

    • b) 如果添加了新的標簽,文章將被添加到標簽的 feed 中。

    • c) 如果標簽已刪除,則該帖子將從標簽的 feed 中刪除。

    • 將更新推送給所有當前訪問 post 頁面的客戶端。

    • 使用帖子中存在的標簽更新所有 feeds 中的帖子

前端 app

前端應用程序是使用 Vue.js 和 Bootstrap 構建的。

最有趣的部分是?EventSource?的使用。

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);

Refs

  • watermill.io

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/530909.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/530909.shtml
英文地址,請注明出處:http://en.pswp.cn/news/530909.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

mysql 5.6 生產my.cnf_一個生產可用的mysql參數文件my.cnf

[client]#客戶端選項設置#設置客戶端和連接字符集default_character_set utf8port 3306socket /opt/mysql-5.6.24/tmp/mysql.socket[mysqld]#服務器端選項設置# innodb設置default_storage_engine InnoDBinnodb_strict_mode 1innodb_buffer_pool_size 256M #mysql數據庫服…

extends 抽象方法_關于abstract抽象類的理解

abstract:抽象類不能被實例化(new),包含屬性、方法、構造器(此構造器不用來初始化實例,只用來被子類調用,其構造函數是提供給子類創建對象的時候初始化父類的屬性的),故只…

streaming接mysql數據庫_[Spark streaming舉例]-- 實時統計并且存儲到mysql數據庫中

舉例package com.scala.myimport org.apache.spark.SparkConfimport org.apache.spark.streaming.Durationsimport org.apache.spark.streaming.StreamingContext/**** author root* 測試步驟:* 1\打開h15\h16\h17\h18,啟動zookeeper,再啟動hadoop集群…

mysql更新id最大_我們可以在單個MySQL查詢中更新具有最高ID的行嗎?

是的,我們可以做到。讓我們首先創建一個表-mysql> create table DemoTable(ID int,GameScore int);使用插入命令在表中插入一些記錄-mysql> insert into DemoTable values(15,848747);mysql> insert into DemoTable values(13,909049);mysql> insert in…

三張表有重復字段_什么?搞不定Kafka重復消費?

點戳藍字“架構之美”關注我們哦!前言 今天我們聊一個話題,這個話題大家可能在面試過程中,或者是工作當中經常遇到 ?如何保證 Kafka 消息不重復消費?我們在做開發的時候為了程序的健壯性,在使用 Kafka 的時候一般都會…

如何利用擴展歐幾里得算法求解不定方程_歐幾里德算法、拓展歐幾里德、中國剩余定理...

01.歐幾里德算法(Euclidean algorithm)(輾轉相除法)歐幾里德算法又稱輾轉相除法,主要是用于計算兩個整數a,b的最大公約數。簡單點說一下算法原理:兩個整數的最大公約數等于其中小的那個數跟大除以小余數的最…

mysql 先刪后增 更新_MySQL 高級操作——新增數據、更新數據、刪除數據、查詢數據...

新增數據多數據插入只要寫一次insert指令,但是可以插入多條記錄語法:insert into 表名 [(字段列表)] values (值列表1),(值列表2),(值列表3);主鍵沖突主鍵沖突,在有的表中,使用的是業務主鍵(字段有業務含義),但是往往在…

nltk和python的關系_NLTK學習筆記(一):語言處理和Python

目錄nltk資料下載import nltknltk.download()其中,download() 參數默認是all,可以在腳本里面加上nltk.download(需要的資料庫) 來進行下載文本和詞匯首先,通過from nltk.book import * 引入需要的內置9本書搜索文本上下文:Text.concordance(monstrous) &…

python七段數碼管倒計時_python實現七段數碼管和倒計時效果

8是典型的七段數碼管的例子,因為剛好七段都有經過,這里我寫的代碼是從1開始右轉。這是看Mooc視頻寫的一個關于用七段數碼管顯示當前時間# -*-coding:utf-8 -*-import turtle as timport timedef drawGap():t.penup()t.fd(5)def drawLine(draw):drawGap()…

rda分析怎么做_數量生態學筆記||冗余分析(RDA)

上一節數量生態學筆記||冗余分析(RDA)概述中,我們回顧了RDA的計算過程,不管這個過程我們有沒有理解透徹,我希望你能知道的是:RDA是響應變量矩陣與解釋變量之間多元多重線性回歸的擬合值矩陣的PCA分析。本節我們就是具體來看一個RD…

mysql 服務器管理員_mysql 查看數據庫管理員

mysql 查看數據庫管理員云服務器(Elastic Compute Service,簡稱ECS)是阿里云提供的性能卓越、穩定可靠、彈性擴展的IaaS(Infrastructure as a Service)級別云計算服務。云服務器ECS免去了您采購IT硬件的前期準備,讓您像使用水、電、天然氣等公共資源一樣…

python中有哪些重要的書寫規則_一文讀懂Python代碼的書寫規范

Python代碼的書寫規范1. 一致性的建議打破一條既定規則的兩個好理由當應用這個規則將導致代碼可讀性下降,即使對于某人來說他已經習慣于按照這條規則來閱讀代碼了為了和周圍的代碼保持一致而打破規則(也許是歷史原因)2. 代碼的布局縮進4個空格代碼行行最大長度 : 79字符推薦長度…

二進制文件mysql創表_MySQL_MYSQL中如何存取二進制文件,首先創建測試表testtable CREATE TA - phpStudy...

MYSQL中如何存取二進制文件首先創建測試表testtableCREATE TABLE testtable ( id INT(5) NOT NULL AUTO_INCREMENT PRIMARY KEY,filename CHAR(255),data LONGBLOB );將文件存入表中mysql_connect( "localhost", "root", "password"); //連接數據…

樹莓派 php mysql 中文_使用樹莓派(raspberry pi)搭建網站(nginx+php+mysql+ddclient)

標簽: 樹莓派 raspberrypi php 網站 mysql分類: Linux技術最近在研究學習PHP,有時候想隨時就學習,所以就決定搭建一個網站,隨時可以進行學習,因為要24小時在線,要低功耗和安靜,所以選…

mysql從庫應用負載_線上MySQL數據庫高負載的解決思路--再次論程序應用索引的重要性...

前言:過去的筆記整理而得,未免丟失,發布個人博客。[2012年的資料筆記]場景:數據庫的負載飆升,CPU高達99%。查看進程。通過猜測推理,定位了一些select語句363478427 | apps_read | 192.168.1.113:48945 …

python獲取方法的裝飾方法_python中的方法和裝飾器

[TOC]裝飾器python中的裝飾器(decorator)是在pep 318中被首次引入,它的本質是一個函數這個函數是接受其它參數為參數,并且用一個新的,修改后的函數作為替換,最常見的裝飾器就classmethod和staticmethoddef happy(f):return lambda…

一幫一python_[python]L1-030?一幫一?(15分)

L1-030 一幫一 (15分)“一幫一學習小組”是中小學中常見的學習組織方式,老師把學習成績靠前的學生跟學習成績靠后的學生排在一組。本題就請你編寫程序幫助老師自動完成這個分配工作,即在得到全班學生的排名后,在當前尚未分組的學生中&#xf…

java書面_Java程序猿的書面采訪String3

public class SameString {//思想二:每個字符都相應著自己的ASC碼,第一個思想的算法復雜度為O(nlogn)。一般能夠利用空間來減少時間復雜度//能夠開辟一個大小為256的數組空間,而且將256個數組元素都置為0,然后遍歷第一個字符串把字…

java fangfa_daicanfangfa java中的方法 剛入門的分不清帶參方法的作用和用處 這個可以詳細的講解如何使用帶參方法 - 下載 - 搜珍網...

第14章 帶參數的方法/01 教學演示示例/示例1:帶一個參數的方法/StudentsBiz.java第14章 帶參數的方法/01 教學演示示例/示例1:帶一個參數的方法/TestAdd.java第14章 帶參數的方法/01 教學演示示例/示例2:帶多個參數的方法/StudentsBiz.java第…

java sqlite 工具類_Java 工具類 - JDBC通用操作基類 BaseDao

封裝了增刪改查功能適用于MySQL、Oracle、SQLServer、DB2、Sybase、JTDS、PostgreSql、SQLite、Derby、H2、HSQLDB、ODBC 等等數據庫,有需要的還可以自己增加。package com.tgb.hz.jdbc;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.namin…