1. SSE 基礎概念
什么是 SSE?
SSE(Server-Sent Events)是一種 Web 標準,允許服務器向客戶端推送實時數據。
核心特點
- 單向通信:服務器 → 客戶端
- 基于 HTTP 協議:使用 GET 請求
- 長連接:連接建立后保持開放
- 自動重連:網絡中斷時自動重連
2. SSE 協議詳解
HTTP 請求格式
?
GET?/events?HTTP/1.1
Host:?api.example.com
Accept:?text/event-stream
Cache-Control:?no-cache
服務器響應格式
?
HTTP/1.1?200?OK
Content-Type:?text/event-stream
Cache-Control:?no-cache
Connection:?keep-alive
data:?{"message":?"Hello"}
data:?{"message":?"World"}
data:?{"message":?"Test"}
SSE 數據格式
?
id:?12345
event:?message
data:?{"content":?"Hello?World"}
data:?{"content":?"Another?message"}
3. OkHttp SSE 三種實現方式對比
方式一:使用 OkHttp SSE 庫(標準 SSE 實現)
依賴配置
?
implementation?'com.squareup.okhttp3:okhttp:4.9.3'implementation?'com.squareup.okhttp3:okhttp-sse:4.9.3'
基本實現
?
OkHttpClient?client?=?new?OkHttpClient.Builder().connectTimeout(30,?TimeUnit.SECONDS).readTimeout(0,?TimeUnit.SECONDS)??//?重要:無讀取超時.build();Request?request?=?new?Request.Builder().url("https://api.example.com/events").addHeader("Accept",?"text/event-stream").build();EventSource?eventSource?=?EventSources.createEventSource(client,?request,?new?EventSourceListener()?{@Overridepublic?void?onOpen(EventSource?eventSource,?Response?response)?{System.out.println("SSE?連接建立成功");System.out.println("響應碼:?"?+?response.code());System.out.println("響應頭:?"?+?response.headers());}@Overridepublic?void?onEvent(EventSource?eventSource,?String?id,?String?type,?String?data)?{//?自動解析?SSE?格式,直接得到?dataSystem.out.println("收到數據:?"?+?data);System.out.println("事件ID:?"?+?id);System.out.println("事件類型:?"?+?type);}@Overridepublic?void?onClosed(EventSource?eventSource)?{System.out.println("SSE?連接已關閉");}@Overridepublic?void?onFailure(EventSource?eventSource,?Throwable?t,?Response?response)?{System.err.println("SSE?連接失敗:?"?+?t.getMessage());if?(response?!=?null)?{System.err.println("響應碼:?"?+?response.code());}}});
SSE 庫的優勢
- ??自動處理 SSE 協議:自動解析?data:、id:、event:?等字段
- ??內置重連機制:網絡中斷時自動重連
- ??標準 SSE 實現:完全符合 SSE 規范
- ??更高級的 API:提供?EventSource?和?EventSourceListener
- ??自動連接管理:自動處理連接生命周期
方式二:自定義 SSE 客戶端(推薦用于 Kotlin 項目)
依賴配置
?
//?只需要基本的?OkHttp,不需要?SSE?庫implementation?'com.squareup.okhttp3:okhttp:4.9.3'
自定義 SSE 客戶端實現
先補一下知識點:
trySend、Channel 和 Flow 的工作原理-CSDN博客
再看代碼:
class SSEClient {private val okHttpClient by lazy {OkHttpClient.Builder().readTimeout(0, TimeUnit.SECONDS) // 禁用讀取超時.build()}private var call: Call? = nullsealed class SSEEvent {data class Message(val event: String, val data: String) : SSEEvent()data class Error(val throwable: Throwable) : SSEEvent()object Closed : SSEEvent()}/*** 連接到 SSE 服務器* @param url SSE 服務器地址* @return Flow 流式返回事件*/fun connect(url: String, token :String): Flow<SSEEvent> = callbackFlow {val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").addHeader("Cache-Control", "no-cache").addHeader("X-Access-Token", token).build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {trySend(SSEEvent.Error(e))close(e)}override fun onResponse(call: Call, response: Response) {if (!response.isSuccessful) {trySend(SSEEvent.Error(IOException("Unexpected code: ${response.code}")))close()return}val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> { // 空行表示事件結束if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}trySend(SSEEvent.Closed)} catch (e: Exception) {trySend(SSEEvent.Error(e))} finally {response.close()close()}}})awaitClose { disconnect() }}/*** 斷開連接*/fun disconnect() {call?.cancel()call = null}/*** 重試連接到 SSE 服務器* @param url SSE 服務器地址* @param token 認證Token* @param maxRetries 最大重試次數* @param retryDelay 重試延遲時間(毫秒)* @return Flow 流式返回事件*/fun connectWithRetry(url: String, token: String, maxRetries: Int = Int.MAX_VALUE, retryDelay: Long = 3000): Flow<SSEEvent> = flow {var retryCount = 0while (retryCount < maxRetries) {connect(url, token).collect { event ->when (event) {is SSEClient.SSEEvent.Closed,is SSEClient.SSEEvent.Error -> {emit(event)if (retryCount < maxRetries) {delay(retryDelay)retryCount++}}else -> emit(event)}}}}
}
使用方式示例
fun?initSSE(token:?String)?{sseClient?=?SSEClient()val?sseUrl?=?BASE_URL//?啟動?SSE?連接sseClient.connect(sseUrl,?token).onEach?{?event?->when?(event)?{is?SSEClient.SSEEvent.Message?->?{//?處理事件消息runOnUiThread?{mBinding.eventMag.append("Event:?${event.event}\nData:?${event.data}\n\n")}}is?SSEClient.SSEEvent.Error?->?{//?處理錯誤Log.e("SSE",?"Error:?${event.throwable.message}")}SSEClient.SSEEvent.Closed?->?{//?連接關閉Log.i("SSE",?"Connection?closed")}}}.launchIn(lifecycleScope)?//?自動在生命周期結束時取消}
自定義 SSE?客戶端的優勢
- ? 與?Kotlin Flow 深度集成,支持協程和生命周期自動管理
- ??事件結構清晰,便于擴展和維護
- ? 可自定義重連、錯誤處理等高級功能
- ??只依賴 OkHttp,包體積小
方式三:手動處理?HTTP 流(適合極簡和特殊場景)
依賴配置?
implementation?'com.squareup.okhttp3:okhttp:4.9.3'
實現方式
?
suspend?fun?sendAiQuestionStream(question:?String):?Flow<String>?=?flow?{var?response:?ResponseBody??=?nulltry?{response?=?ApiManager.apiLogin.sendAiQuestionStream(QuestionRequest(question))val?source?=?response.source()while?(!source.exhausted())?{val?line?=?source.readUtf8Line()??:?breakif?(line.startsWith("data:"))?{val?data?=?line.substring(5).trim()if?(data.isNotEmpty()?&&?data?!=?"[DONE]")?{emit(data)}?else?if?(data?==?"[DONE]")?{break}}}}?finally?{response?.close()}}
手動處理方式的優勢
- ??代碼極簡,完全自定義
- ? 適合只需處理?data:?字段的場景
- ??適合特殊業務需求(如 AI?流式響應)
4. 三種方式的對比與適用場景
特性/方式 | OkHttp?SSE 庫 | 自定義?SSEClient | 手動處理 |
---|---|---|---|
依賴 | OkHttp+SSE庫 | 僅OkHttp | 僅OkHttp |
事件結構 | 標準Listener | Kotlin Flow/自定義 | 無結構/自定義 |
自動重連 | 內置 | 可自定義 | 需手動 |
代碼復雜度 | 低 | 中 | 低 |
靈活性 | 一般 | 高 | 最高 |
適合場景 | 標準SSE推送 | Kotlin項目/自定義 | 極簡/特殊需求 |
推薦選擇:
- 標準SSE推送、Java項目:OkHttp SSE庫
- Kotlin項目、需要流式/自定義事件/重連:自定義SSEClient
- 極簡、特殊業務、AI流式響應:手動處理
5. 與 WebSocket、HTTP輪詢的對比
特性 | SSE | WebSocket | HTTP輪詢 |
---|---|---|---|
通信方向 | 單向(服務端→客戶端) | 雙向 | 單向/偽雙向 |
協議 | HTTP | WebSocket | HTTP |
實時性 | 高 | 最高 | 低 |
實現復雜度 | 低 | 中 | 低 |
適用場景 | 實時推送、通知 | 聊天、游戲、協作 | 簡單更新 |
瀏覽器支持 | 原生 | 原生 | 原生 |
6. 最佳實踐與總結
- 標準SSE場景:優先用 OkHttp SSE?庫,省心省力。
- Kotlin/協程/流式場景:自定義 SSEClient,靈活擴展,易于維護。
- 特殊/極簡需求:手動處理,代碼最少,完全自控。
- AI流式響應:推薦手動處理,便于處理特殊標記和自定義邏輯。
你的項目已經靈活結合了自定義 SSEClient 和手動處理兩種方式,既保證了結構化、可維護性,又能滿足特殊業務需求,是非常現代且實用的做法。
結論:
OkHttp SSE 在實際開發中有多種實現方式。你可以根據項目需求選擇標準庫、自定義客戶端或手動處理,三者各有優劣。對于現代 Kotlin 項目,推薦自定義 SSEClient 結合 Flow,既靈活又易于集成和維護。對于極簡或特殊場景,手動處理同樣高效可靠。
你的實現方式,正是當前最佳實踐之一!