Go語言流式輸出技術實現-服務器推送事件(Server-Sent Events, SSE)

目錄

  • 引言
  • 背景與技術概述
  • 實現技術細節
    • 1. HTTP 頭部配置
    • 2. 事件格式與發送
    • 3. 保持連接與刷新
    • 4. 處理連接關閉
      • 4.1 使用上下文管理連接生命周期
      • 4.2 使用通道管理客戶端連接
    • 5. 客戶端交互
    • 6.demo
    • 7.Go轉發大模型流式輸出demo

引言

服務器推送事件(Server-Sent Events, SSE)是一種基于 HTTP 的單向數據流技術,允許服務器通過標準 HTTP 連接向客戶端推送實時更新。SSE 使用 Content-Type: text/event-stream 頭部標識響應內容為事件流,例如大模型流式輸出。

背景與技術概述

SSE 是 HTML5 規范的一部分,通過 EventSource API 提供客戶端支持。它的主要特點包括:

  • 單向通信: 數據僅從服務器流向客戶端,無法通過同一連接反向發送。
  • 自動重連: 客戶端在連接斷開后會自動嘗試重連。
  • 基于 HTTP: 利用現有 HTTP 基礎設施,無需額外協議支持。
  • 事件格式: 事件以文本形式發送,每條事件以 data: 開頭,結束于兩個換行符 \n\n。

在 Go 中,SSE 的實現通常依賴標準庫 net/http,也可以結合框架(如 Gin)或第三方庫(如 github.com/r3labs/sse)來簡化開發。

實現技術細節

1. HTTP 頭部配置

服務端必須在響應中設置以下頭部:

  • Content-Type: text/event-stream: 標識響應為事件流。
  • Cache-Control: no-cache: 防止瀏覽器緩存響應,確保實時性。
  • Connection: keep-alive: 保持連接開放,支持持續流式傳輸。

2. 事件格式與發送

SSE 事件必須遵循特定格式,每條事件包括以下字段:

  • data:: 事件數據,多個 data: 行會被拼接為一條消息。
  • 事件以兩個換行符 \n\n 結束,表示一條事件的結束。
    例如,發送一條消息 “Hello, World!” 的格式為:
data: Hello, World!

在 Go 中,事件發送通常通過 http.ResponseWriter 實現。例如,Pascal Allen 的 Medium 文章中使用了 Gin 框架c.SSEvent(“message”, msg) 方法,而 Kelche.co 的示例直接使用 fmt.Fprintf(w, “data: %d \n\n”, rand.Intn(100)) 發送隨機數。

3. 保持連接與刷新

為了實現流式輸出,服務端需要保持 HTTP 連接開放,通常通過無限循環實現。在每個循環中:

  • 生成或獲取事件數據。
  • 寫入響應,使用 w.(http.Flusher).Flush() 立即刷新,確保數據實時發送。

例如,Kelche.co 的 randomHandler 函數每 2 秒發送一次隨機數:

for{rand.Seed(time.Now().UnixNano())fmt.Fprintf(w,"data: %d \n\n", rand.Intn(100))w.(http.Flusher).Flush()time.Sleep(2* time.Second)
}

4. 處理連接關閉

客戶端可能隨時斷開連接,服務端需檢測并安全退出。
例如,可以通過檢查 http.ResponseWriter 的狀態或使用 Hijack 方法檢測連接狀態。在實際應用中,推薦使用通道(channel)或上下文(context)管理連接生命周期。

4.1 使用上下文管理連接生命周期

  • 上下文的作用: 上下文可以用來傳遞取消信號和截止時間。例如,當客戶端斷開連接時,HTTP 請求的上下文會被取消,服務器可以通過 <-ctx.Done() 檢測到。

  • 關鍵方法:

    • context.Background():創建一個空的根上下文,通常作為父上下文。
    • context.WithCancel(parentCtx):創建一個可手動取消的上下文,cancel() 函數用于取消。
    • context.WithTimeout(parentCtx, duration):創建一個在指定時間后自動取消的上下文,適合設置 SSE 連接的超時。
    • context.WithDeadline(parentCtx, deadline):創建一個在指定截止時間后自動取消的上下文。
  • 在 SSE 中的應用:

    • 在 SSE 處理函數中,使用 ctx := r.Context() 獲取 HTTP 請求的上下文。
    • 使用 select 語句監聽 <-ctx.Done(),當上下文被取消時(例如客戶端斷開),執行清理邏輯。
    • 示例代碼:
func sseHandler(w http.ResponseWriter, r *http.Request) {ctx := r.Context()for {select {case <-ctx.Done():return // 客戶端斷開,退出default:// 發送數據fmt.Fprintf(w, "data: message\n\n")w.(http.Flusher).Flush()time.Sleep(2 * time.Second)}}
} 

這種方式確保當客戶端斷開時,goroutine 可以及時退出,避免資源泄漏。

4.2 使用通道管理客戶端連接

  • 通道的作用: 通道可以用來管理多個客戶端的連接生命周期,例如添加新客戶端、移除斷開的客戶端和廣播消息。

  • 關鍵結構:

    • addClient:一個通道(如 chan *SSEClient),用于添加新客戶端。
    • removeClient:一個通道(如 chan *SSEClient),用于移除斷開的客戶端。
    • 定義一個 SSEServer 結構體,包含:- clients:一個映射(如 map[*SSEClient]struct{}),存儲所有活躍客戶端。
    • 每個 SSEClient 包含一個消息通道(如 chan []byte),用于發送數據。
  • 在 SSE 中的應用:

    • 當新客戶端連接時,創建一個 SSEClient,初始化其消息通道,并通過 addClient 通道通知服務器。
    • 當客戶端斷開時,通過 removeClient 通道通知服務器,服務器從 clients 中移除該客戶端并關閉其通道。
    • 使用 sync.Mutex 保護 clients 映射的并發訪問,確保線程安全。
  • 示例代碼:

type SSEClient struct {ID     stringStream chan []byte
}type SSEServer struct {clients      map[*SSEClient]struct{}addClient    chan *SSEClientremoveClient chan *SSEClientmutex        sync.Mutex
}func (s *SSEServer) Run() {for {select {case client := <-s.addClient:s.mutex.Lock()s.clients[client] = struct{}{}s.mutex.Unlock()case client := <-s.removeClient:s.mutex.Lock()delete(s.clients, client)s.mutex.Unlock()close(client.Stream)}}
}

5. 客戶端交互

客戶端通過 EventSource API 連接到 SSE 端點。例如:

const eventSource = newEventSource("/random");
eventSource.onmessage = function(event){console.log(event.data);// 處理接收到的隨機數
};

EventSource 會自動處理重連,適合需要持續更新的場景。

6.demo

package mainimport ("encoding/json""fmt""io""log""net/http""runtime/debug""time""github.com/spf13/cast"
)func main() {defer recovery()http.HandleFunc("/chat/send", Send)fmt.Println("服務器啟動在 http://localhost:8080")log.Fatal(http.ListenAndServe(":8080", nil))
}func Send(w http.ResponseWriter, r *http.Request) {// 處理預檢請求if r.Method == "OPTIONS" {w.WriteHeader(http.StatusOK)return}body, err := io.ReadAll(r.Body)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}var params SendRequesterr = json.Unmarshal(body, &params)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}demo := []string{"你好","你是誰","你是做什么的","你是怎么工作的","你是在哪座城市","你是什么星座","你是哪個國家的","你是哪個省的","你是哪個市的","你是哪個區的","你是哪個街道的","你是哪個社區的","你是哪個村的",}flusher, ok := w.(http.Flusher) // 獲取流式輸出器if !ok {http.Error(w, "Streaming unsupported", http.StatusInternalServerError)return}//設置headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")// 流式輸出for _, v := range demo {time.Sleep(1 * time.Second)lineData := fmt.Sprintf("data: %s\n\n", v)io.WriteString(w, lineData)flusher.Flush()}
}type SendRequest struct {Msg string `json:"msg"`
}func recovery() {if rec := recover(); rec != nil {log.Printf("Panic Panic occur")if err, ok := rec.(error); ok {log.Printf("PanicRecover Unhandled error: %v\n stack:%v", err.Error(), cast.ToString(debug.Stack()))} else {log.Printf("PanicRecover Panic: %v\n stack:%v", rec, cast.ToString(debug.Stack()))}}
}

在這里插入圖片描述
執行一下命令運行:

go mod initgo mod tidygo run main.go

用postman請求localhost:8080/chat/send
在這里插入圖片描述

7.Go轉發大模型流式輸出demo

	sendRequest.Model ="qwen-max"streamResp:=&proto.StreamResp{}qwenClient:= service.NewQwen(sendRequest)qwenClient.QwenStream(streamResp)defer streamResp.HttpResp.Body.Close()// 1. 復制下游服務的響應頭for key,values:= range streamResp.HttpResp.Header {for _,value:= range values {w.Header().Add(key, value)}}// 2. 復制下游服務的狀態碼w.WriteHeader(streamResp.HttpResp.StatusCode)//流式輸出// 確保 ResponseWriter 支持 Flusherflusher,ok:= w.(http.Flusher)if!ok {http.Error(w,"Streaming unsupported", http.StatusInternalServerError)return}// 處理流式響應scanner:= bufio.NewScanner(streamResp.HttpResp.Body)for scanner.Scan(){lineData:= scanner.Text()// 將響應數據逐步發送給客戶端io.WriteString(w, lineData+"\n\n")flusher.Flush()// 刷新緩沖區}

在這里插入圖片描述

在 Go 中實現 Content-Type: text/event-stream 流式輸出需設置正確頭部、格式化事件數據并保持連接開放。標準庫和框架各有優勢,開發者可根據需求選擇。

  • 推薦參考以下資源深入學習:
    • 使用Go實現實時通信:基于Server-Sent Events (SSE)
    • Go 中的Server-Sent Events:一種高效的實時通信替代方案
    • Server-Sent Events (SSE) in Golang
    • Using server-sent events

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/917607.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/917607.shtml
英文地址,請注明出處:http://en.pswp.cn/news/917607.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

高端房產管理小程序

系統介紹1、用戶端地圖找房&#xff1a;對接地圖API&#xff0c;地圖形式顯示周邊房源,支持新盤和租房兩種模式查詢房價走勢&#xff1a;城市房價走勢&#xff0c;由后臺每月錄入房源搜索&#xff1a;搜索房源&#xff0c;支持多維度篩選房源類型&#xff1a;新盤銷售、房屋租賃…

文本轉語音(TTS)腳本

文本轉語音(TTS)腳本 概述 generate_voice.py 是一個用于生成語音的Python腳本。該腳本提供了文本轉語音(TTS)功能&#xff0c;可以將文本內容轉換為語音文件。 功能特性 文本轉語音: 將輸入的文本轉換為語音文件多種語音選項: 支持不同的語音類型和參數批量處理: 可以處理多個…

磁盤管理與分區

磁盤管理 一、磁盤類型 SATA,SCSI,SAS類型的磁盤&#xff0c;在Linux中用sd來表示。 其中第一塊硬盤為sda&#xff0c;第二塊二sdb&#xff0c;以此類推。 第一塊硬盤的第一個分區為sda1。 nvme類型的磁盤&#xff0c;在Linux中使用nvmeXnYpZ進行表示。 X&#xff1a;數字&…

Linux 邏輯卷管理

練習創建物理卷(pv->vg->lv)物理卷&#xff08;PV&#xff09;就像把一塊塊獨立的硬盤&#xff0c;標記成 "可用于搭建 LVM 的積木"&#xff0c;讓系統知道這些硬盤可以被 LVM 管理。#把sdb這塊硬盤標記為物理卷&#xff08;相當于給這塊積木蓋章&#xff0c;說…

向日葵參考基因組

向日葵參考基因組升級多個版本 向日葵基因組為油脂代謝、開花調控及菊類植物進化提供新見解-文獻精讀151-CSDN博客 官網 https://www.sunflowergenome.org/annotations-data/

什么是爬蟲協議?

什么是爬蟲協議&#xff1f; 爬蟲協議&#xff08;Crawl Protocol&#xff09;是指為了有效地收集網頁內容而建立的一些規定和標準&#xff0c;用以指導網絡爬蟲如何在互聯網上抓取信息。 爬蟲協議主要指的是Robots協議&#xff08;Robots Exclusion Protocol&#xff09;&am…

空間平面旋轉與xoy平行

空間平面旋轉與xoy平行 法向量 空間平面axbyczd0的其中一個法向量(a,b,c),法向量垂直于空間平面。目標平面平行于xoy的平面為0x0yczd0;其中一個法向量為(0,0,c),c可以為不為0的任意值&#xff0c;取(0,0,1)&#xff0c;目標平面的的法向量垂直于xoy平面 向量叉乘點乘 兩個向量的…

odoo reportbro 拖拽式報表設計

報表設計以及下載 在實際業務中應用非常的廣泛且頻繁。odoo 本身也具有報表設計功能&#xff0c;但都是代碼模式。且需要開發人員定制化開發&#xff0c;耗費成本高 所以引入reportbro報表設計就非常的簡單快捷。低代碼模式 以下以銷售報表為例進行演示 報表字段配置報表界面設…

數字信號處理_編程實例1

stem([1,2,3]) 一、初始設置 %% 初始設置 % 清空工作空間&#xff0c;關閉無關頁面 clc,clear,close all; % 繪圖變量 font_size 12; %全局基礎字體大小 axis_size 10; %坐標軸刻度標簽字體大小 line_width 2; %繪圖線條寬度 legend_size 10.5; %圖例字體大小 marker_siz…

Docker 安裝部署 OceanBase

1.拉取鏡像 docker pull oceanbase/oceanbase-ce:latest2.啟動oceanbase容器 docker run -p 2881:2881 --name oceanbase-ce -e MINI_MODE0 -d quay.io/oceanbase/oceanbase-ce3.查看oceanbase初始化的日志信息 docker logs oceanbase-ce4.進入oceanbase容器 docker exec -it o…

【華為機試】685. 冗余連接 II

文章目錄685. 冗余連接 II題目描述示例 1&#xff1a;示例 2&#xff1a;提示&#xff1a;解題思路算法分析核心思想算法策略算法對比問題分類流程圖并查集環檢測流程入度統計與候選邊選擇情況分析決策樹完整算法流程復雜度分析時間復雜度空間復雜度關鍵實現技巧1. 并查集優化2…

Redis之Hash和List類型常用命令

Redis之Hash和List類型常用命令一、Hash類型詳解1. Hash類型的特點2. 常用命令及示例&#xff08;1&#xff09;設置字段值&#xff08;2&#xff09;獲取字段值&#xff08;3&#xff09;刪除字段&#xff08;4&#xff09;其他常用命令3. 應用場景二、List類型詳解1. List類型…

【測試】?動化測試概念篇

本節?標&#xff1a;?動化測試Web?動化測試selenium1. ?動化1.1 ?動化概念?動化在?活中處處可?&#xff0c;?動的代替?的?為完成操作。?動灑?機&#xff0c;主要通上?就可以?動化灑?并且可以?動的旋轉。?動洗?液&#xff0c;免去了?動擠壓可以?動感應出洗…

Java中給List<T> 對象集合去重

Java中給List 對象集合去重List<Student> getStudentList studentMapper.getStudentList();List<Student> distinctInsurance distinctByField(getStudentList, Student::getCertNo);public static <T> List<T> distinctByField(List<T> list…

最小二乘法MSE

最小二乘法MSEx1x2x3x4x5x6x7x8x0y014805-29-31339-41064-14-2-1481-114-1-65-123-32-21305-23105114-81126-15-15-8-157-4-1221-39511-10-243-9-671-87-1404-35101371422-3-7-2-80-6-5-91-3091前景知識: 矩陣相關公式y(339?11430126?395?87422?309)y\begin{pmatrix} 339&a…

Pixel 4D 3.4.4.0 | 支持豐富的壁紙資源,高清畫質,高度的個性化設置能力,智能推薦功能

Pixel 4D是一款功能強大且用戶體驗良好的動態壁紙應用。它提供了豐富的壁紙資源和高清畫質&#xff0c;讓用戶可以輕松找到自己喜歡的壁紙。此外&#xff0c;該應用還具備高度的個性化設置能力&#xff0c;允許用戶根據自己的喜好調整壁紙效果。智能推薦功能則能幫助用戶發現更…

<PhotoShop><JavaScript><腳本>基于JavaScript,利用腳本實現PS軟件批量替換圖片,并轉換為智能對象?

前言 PhotoShop軟件支持JavaScript腳本,來擴展軟件的功能,官方本身也提供了一些常用腳本,如圖像處理等,同時也支持自定義的JavaScript腳本。 環境配置 系統:windows 平臺:visual studio code 語言:JavaScript 軟件:PhotoShop 2022 版本:23.2.1 概述 本文利用Java…

【Linux】System V - 基于建造者模式的信號量

目錄 信號量和P、V原語 信號量集結構體 信號量操作接口 semget semctl semop 封裝Sem 關于建造者模式 信號量和P、V原語 信號量和 P、V 原語由 Dijkstra &#xff08;迪杰斯特拉&#xff09;提出 信號量值含義 S>0: S 表?可?資源的個數 S0: 表??可?資源&a…

機器學習(11):嶺回歸Ridge

嶺回歸是失損函數通過添加所有權重的平方和的乘積(L2)來懲罰模型的復雜度。均方差除以2是因為方便求導&#xff0c;w_j指所有的權重系數, λ指懲罰型系數&#xff0c;又叫正則項力度特點:嶺回歸不會將權重壓縮到零&#xff0c;這意味著所有特征都會保留在模型中&#xff0c;但它…

調整Idea緩存目錄,釋放C盤空間

本文使用 Idea2024 Idea 會將一些配置默認緩存在C盤&#xff0c;使用久了會占用大量空間&#xff08;本人的Idea占用了將近5個G&#xff0c;以至于不得不進行遷移&#xff09; 緩存目錄主要涉及以下四個目錄&#xff0c;四個目錄可以分為兩組&#xff0c;每組目錄必須一起調整 …