目錄
- 引言
- 背景與技術概述
- 實現技術細節
- 1. HTTP 頭部配置
- 2. 事件格式與發送
- 3. 保持連接與刷新
- 4. 處理連接關閉
- 4.1 使用上下文管理連接生命周期
- 4.2 使用通道管理客戶端連接
- 5. 客戶端交互
- 6.demo
- 7.Go轉發大模型流式輸出demo
引言
服務器推送事件(Server-Sent Events, SSE)是一種基于 HTTP 的單向數據流技術,允許服務器通過標準 HTTP 連接向客戶端推送實時更新。SSE 使用 Content-Type: text/event-stream 頭部標識響應內容為事件流,例如大模型流式輸出。
背景與技術概述
SSE 是 HTML5 規范的一部分,通過 EventSource API 提供客戶端支持。它的主要特點包括:
- 單向通信: 數據僅從服務器流向客戶端,無法通過同一連接反向發送。
- 自動重連: 客戶端在連接斷開后會自動嘗試重連。
- 基于 HTTP: 利用現有 HTTP 基礎設施,無需額外協議支持。
- 事件格式: 事件以文本形式發送,每條事件以 data: 開頭,結束于兩個換行符 \n\n。
在 Go 中,SSE 的實現通常依賴標準庫 net/http,也可以結合框架(如 Gin)或第三方庫(如 github.com/r3labs/sse)來簡化開發。
實現技術細節
1. HTTP 頭部配置
服務端必須在響應中設置以下頭部:
- Content-Type: text/event-stream: 標識響應為事件流。
- Cache-Control: no-cache: 防止瀏覽器緩存響應,確保實時性。
- Connection: keep-alive: 保持連接開放,支持持續流式傳輸。
2. 事件格式與發送
SSE 事件必須遵循特定格式,每條事件包括以下字段:
- data:: 事件數據,多個 data: 行會被拼接為一條消息。
- 事件以兩個換行符 \n\n 結束,表示一條事件的結束。
例如,發送一條消息 “Hello, World!” 的格式為:
data: Hello, World!
在 Go 中,事件發送通常通過 http.ResponseWriter 實現。例如,Pascal Allen 的 Medium 文章中使用了 Gin 框架c.SSEvent(“message”, msg) 方法,而 Kelche.co 的示例直接使用 fmt.Fprintf(w, “data: %d \n\n”, rand.Intn(100)) 發送隨機數。
3. 保持連接與刷新
為了實現流式輸出,服務端需要保持 HTTP 連接開放,通常通過無限循環實現。在每個循環中:
- 生成或獲取事件數據。
- 寫入響應,使用 w.(http.Flusher).Flush() 立即刷新,確保數據實時發送。
例如,Kelche.co 的 randomHandler 函數每 2 秒發送一次隨機數:
for{rand.Seed(time.Now().UnixNano())fmt.Fprintf(w,"data: %d \n\n", rand.Intn(100))w.(http.Flusher).Flush()time.Sleep(2* time.Second)
}
4. 處理連接關閉
客戶端可能隨時斷開連接,服務端需檢測并安全退出。
例如,可以通過檢查 http.ResponseWriter 的狀態或使用 Hijack 方法檢測連接狀態。在實際應用中,推薦使用通道(channel)或上下文(context)管理連接生命周期。
4.1 使用上下文管理連接生命周期
-
上下文的作用: 上下文可以用來傳遞取消信號和截止時間。例如,當客戶端斷開連接時,HTTP 請求的上下文會被取消,服務器可以通過 <-ctx.Done() 檢測到。
-
關鍵方法:
- context.Background():創建一個空的根上下文,通常作為父上下文。
- context.WithCancel(parentCtx):創建一個可手動取消的上下文,cancel() 函數用于取消。
- context.WithTimeout(parentCtx, duration):創建一個在指定時間后自動取消的上下文,適合設置 SSE 連接的超時。
- context.WithDeadline(parentCtx, deadline):創建一個在指定截止時間后自動取消的上下文。
-
在 SSE 中的應用:
- 在 SSE 處理函數中,使用 ctx := r.Context() 獲取 HTTP 請求的上下文。
- 使用 select 語句監聽 <-ctx.Done(),當上下文被取消時(例如客戶端斷開),執行清理邏輯。
- 示例代碼:
func sseHandler(w http.ResponseWriter, r *http.Request) {ctx := r.Context()for {select {case <-ctx.Done():return // 客戶端斷開,退出default:// 發送數據fmt.Fprintf(w, "data: message\n\n")w.(http.Flusher).Flush()time.Sleep(2 * time.Second)}}
}
這種方式確保當客戶端斷開時,goroutine 可以及時退出,避免資源泄漏。
4.2 使用通道管理客戶端連接
-
通道的作用: 通道可以用來管理多個客戶端的連接生命周期,例如添加新客戶端、移除斷開的客戶端和廣播消息。
-
關鍵結構:
- addClient:一個通道(如 chan *SSEClient),用于添加新客戶端。
- removeClient:一個通道(如 chan *SSEClient),用于移除斷開的客戶端。
- 定義一個 SSEServer 結構體,包含:- clients:一個映射(如 map[*SSEClient]struct{}),存儲所有活躍客戶端。
- 每個 SSEClient 包含一個消息通道(如 chan []byte),用于發送數據。
-
在 SSE 中的應用:
- 當新客戶端連接時,創建一個 SSEClient,初始化其消息通道,并通過 addClient 通道通知服務器。
- 當客戶端斷開時,通過 removeClient 通道通知服務器,服務器從 clients 中移除該客戶端并關閉其通道。
- 使用 sync.Mutex 保護 clients 映射的并發訪問,確保線程安全。
-
示例代碼:
type SSEClient struct {ID stringStream chan []byte
}type SSEServer struct {clients map[*SSEClient]struct{}addClient chan *SSEClientremoveClient chan *SSEClientmutex sync.Mutex
}func (s *SSEServer) Run() {for {select {case client := <-s.addClient:s.mutex.Lock()s.clients[client] = struct{}{}s.mutex.Unlock()case client := <-s.removeClient:s.mutex.Lock()delete(s.clients, client)s.mutex.Unlock()close(client.Stream)}}
}
5. 客戶端交互
客戶端通過 EventSource API 連接到 SSE 端點。例如:
const eventSource = newEventSource("/random");
eventSource.onmessage = function(event){console.log(event.data);// 處理接收到的隨機數
};
EventSource 會自動處理重連,適合需要持續更新的場景。
6.demo
package mainimport ("encoding/json""fmt""io""log""net/http""runtime/debug""time""github.com/spf13/cast"
)func main() {defer recovery()http.HandleFunc("/chat/send", Send)fmt.Println("服務器啟動在 http://localhost:8080")log.Fatal(http.ListenAndServe(":8080", nil))
}func Send(w http.ResponseWriter, r *http.Request) {// 處理預檢請求if r.Method == "OPTIONS" {w.WriteHeader(http.StatusOK)return}body, err := io.ReadAll(r.Body)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}var params SendRequesterr = json.Unmarshal(body, ¶ms)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}demo := []string{"你好","你是誰","你是做什么的","你是怎么工作的","你是在哪座城市","你是什么星座","你是哪個國家的","你是哪個省的","你是哪個市的","你是哪個區的","你是哪個街道的","你是哪個社區的","你是哪個村的",}flusher, ok := w.(http.Flusher) // 獲取流式輸出器if !ok {http.Error(w, "Streaming unsupported", http.StatusInternalServerError)return}//設置headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")// 流式輸出for _, v := range demo {time.Sleep(1 * time.Second)lineData := fmt.Sprintf("data: %s\n\n", v)io.WriteString(w, lineData)flusher.Flush()}
}type SendRequest struct {Msg string `json:"msg"`
}func recovery() {if rec := recover(); rec != nil {log.Printf("Panic Panic occur")if err, ok := rec.(error); ok {log.Printf("PanicRecover Unhandled error: %v\n stack:%v", err.Error(), cast.ToString(debug.Stack()))} else {log.Printf("PanicRecover Panic: %v\n stack:%v", rec, cast.ToString(debug.Stack()))}}
}
執行一下命令運行:
go mod initgo mod tidygo run main.go
用postman請求localhost:8080/chat/send
7.Go轉發大模型流式輸出demo
sendRequest.Model ="qwen-max"streamResp:=&proto.StreamResp{}qwenClient:= service.NewQwen(sendRequest)qwenClient.QwenStream(streamResp)defer streamResp.HttpResp.Body.Close()// 1. 復制下游服務的響應頭for key,values:= range streamResp.HttpResp.Header {for _,value:= range values {w.Header().Add(key, value)}}// 2. 復制下游服務的狀態碼w.WriteHeader(streamResp.HttpResp.StatusCode)//流式輸出// 確保 ResponseWriter 支持 Flusherflusher,ok:= w.(http.Flusher)if!ok {http.Error(w,"Streaming unsupported", http.StatusInternalServerError)return}// 處理流式響應scanner:= bufio.NewScanner(streamResp.HttpResp.Body)for scanner.Scan(){lineData:= scanner.Text()// 將響應數據逐步發送給客戶端io.WriteString(w, lineData+"\n\n")flusher.Flush()// 刷新緩沖區}
在 Go 中實現 Content-Type: text/event-stream
流式輸出需設置正確頭部、格式化事件數據并保持連接開放。標準庫和框架各有優勢,開發者可根據需求選擇。
- 推薦參考以下資源深入學習:
- 使用Go實現實時通信:基于Server-Sent Events (SSE)
- Go 中的Server-Sent Events:一種高效的實時通信替代方案
- Server-Sent Events (SSE) in Golang
- Using server-sent events