Spring AI阿里百煉平臺實現流式對話:基于 SSE 的實踐指南
在大模型應用開發中,流式對話是提升用戶體驗的關鍵特性。本文將詳細介紹如何利用 Spring AI 結合 Spring Boot,基于 SSE(Server-Sent Events)協議實現高效的流式對話功能,包括動態中斷機制和前端交互優化。
技術選型與協議解析
SSE 協議與 WebSocket 的區別
SSE(Server-Sent Events)是一種基于 HTTP 的輕量級服務器向客戶端推送信息的協議,與 WebSocket 相比有顯著差異:
特性 | SSE | WebSocket |
---|---|---|
通信方向 | 單向(服務端 → 客戶端) | 全雙工(雙向通信) |
連接方式 | 基于 HTTP 長連接 | 獨立的 WebSocket 協議 |
復雜度 | 簡單(無需復雜握手) | 復雜(需要專門握手過程) |
適用場景 | 消息推送、實時通知、流式輸出 | 即時通訊、游戲等雙向交互場景 |
數據格式 | 文本/event-stream 格式 | 二進制/文本,需自定義格式 |
對于僅需服務端向客戶端推送流式響應的對話場景,SSE 是更簡潔高效的選擇。
項目環境搭建
核心依賴配置
首先在 pom.xml
中添加必要依賴(注意修正版本號格式錯誤):
<!-- Spring AI 依賴管理 -->
<dependencyManagement><dependencies><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-bom</artifactId><version>1.0.0-M6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><!-- 核心依賴 -->
<dependencies><!-- Spring AI OpenAI 適配 --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-openai-spring-boot-starter</artifactId></dependency><!-- 阿里百煉 SDK(兼容 OpenAI 協議) --><dependency><groupId>com.alibaba</groupId><artifactId>dashscope-sdk-java</artifactId><version>2.16.9</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion></exclusions></dependency><!-- Spring WebFlux(響應式編程支持) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
</dependencies>
配置文件設置
在 application.yml
中配置模型服務信息:
spring:ai:openai:base-url: https://dashscope.aliyuncs.com/compatible-mode # 阿里百煉兼容接口api-key: sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx # 替換為實際API密鑰chat:options:model: qwen-plus # 可替換為實際使用的模型,如qwen-max、qwen-turbo等
核心功能實現
1. ChatClient 配置
創建 ChatClient
實例,配置對話模型、記憶機制和系統提示:
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.ChatMemory;
import org.springframework.ai.chat.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.advisor.SimpleLoggerAdvisor;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class AiConfig {@Beanpublic ChatClient chatClient(ChatModel chatModel, ChatMemory chatMemory) {return ChatClient.builder(chatModel).defaultOptions(options -> options.withModel("qwen-plus") // 模型名稱.withTemperature(0.7f)) // 新增溫度參數,控制輸出隨機性.defaultSystem("你是一個友好的智能助手,負責解答用戶問題") // 修正引號格式.defaultAdvisors(new SimpleLoggerAdvisor(), // 日志記錄new MessageChatMemoryAdvisor(chatMemory) // 對話記憶).build();}
}
2. 流式對話控制器
實現支持 SSE 的控制器,包含流式響應和中斷功能:
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicBoolean;import static org.springframework.ai.chat.client.advisor.AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY;@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
@Slf4j
public class ChatController {private final ChatClient chatClient;private final AtomicBoolean isStreaming = new AtomicBoolean(true); // 線程安全的狀態標識/*** 流式對話接口* @param prompt 用戶輸入* @param chatId 對話ID(用于記憶上下文)*/@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChat(@RequestParam String prompt,@RequestParam(required = false, defaultValue = "default") String chatId) {log.info("收到對話請求 [{}]: {}", chatId, prompt);isStreaming.set(true); // 重置流式狀態return chatClient.prompt().user(prompt).advisors(advisorSpec -> advisorSpec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId)).stream().content().takeWhile(data -> isStreaming.get()) // 動態中斷控制.doOnCancel(() -> log.info("對話流已取消 [{}]", chatId)) // 取消時日志.concatWithValues("\u0003"); // 結束標記(ETX字符)}/*** 中斷流式輸出接口*/@PostMapping("/cancel")public void cancelStream() {isStreaming.set(false);log.info("已觸發流式輸出中斷");}
}
3. 前端交互實現
完善前端頁面,處理流式數據接收、中斷控制和用戶體驗優化:
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no"><title>Spring AI 流式對話演示</title><style>.container { max-width: 800px; margin: 20px auto; padding: 0 15px; }#output { border: 1px solid #e0e0e0; padding: 15px; min-height: 300px; border-radius: 8px; }.controls { margin: 15px 0; display: flex; gap: 10px; }input[type="text"] { flex: 1; padding: 8px 12px; border: 1px solid #ccc; border-radius: 4px; }button { padding: 8px 16px; cursor: pointer; border: none; border-radius: 4px; }button.send { background: #4285f4; color: white; }button.cancel { background: #ea4335; color: white; }</style>
</head>
<body><div class="container"><div class="controls"><input type="text" id="prompt" placeholder="請輸入問題..."><button class="send" onclick="sendMessage()">發送</button><button class="cancel" onclick="cancelStream()">中斷</button></div><h3>AI 回復:</h3><div id="output"></div></div><script>const outputDiv = document.getElementById('output');const promptInput = document.getElementById('prompt');let eventSource = null;// 發送消息function sendMessage() {const prompt = promptInput.value.trim();if (!prompt) return;// 清空輸入和輸出promptInput.value = '';outputDiv.innerHTML = '';// 創建對話ID(可基于時間戳生成)const chatId = 'chat_' + Date.now();// 建立SSE連接eventSource = new EventSource(`/api/chat/stream?prompt=${encodeURIComponent(prompt)}&chatId=${chatId}`);// 處理收到的消息eventSource.onmessage = (event) => {const data = event.data;// 檢測結束標記if (data === '\u0003') {eventSource.close();return;}// 追加內容到輸出區域outputDiv.textContent += data;};// 處理錯誤eventSource.onerror = () => {console.error('連接發生錯誤');eventSource.close();};}// 中斷流式輸出function cancelStream() {if (eventSource) {eventSource.close();eventSource = null;}// 通知后端停止生成fetch('/api/chat/cancel', { method: 'POST' }).catch(err => console.error('取消請求失敗', err));}</script>
</body>
</html>
關鍵技術點解析
1. 響應式編程與 Flux
Spring AI 的流式響應基于 Reactor 框架的 Flux
實現:
Flux
代表一個異步的序列數據流,適合處理流式輸出takeWhile
操作符用于根據條件(isStreaming
狀態)控制流的生命周期concatWithValues
用于在流結束時添加終止標記,方便前端處理
2. 動態中斷機制
- 使用
AtomicBoolean
保證多線程環境下的狀態安全性 - 前端通過
/cancel
接口觸發中斷,后端通過takeWhile
終止流 - 結合
eventSource.close()
確保前端資源正確釋放
3. 對話記憶管理
- 通過
ChatMemory
和MessageChatMemoryAdvisor
實現上下文記憶 chatId
參數用于區分不同對話會話,實現多用戶/多會話隔離
常見問題與優化建議
- 依賴版本問題:確保 Spring AI 版本與 Spring Boot 版本兼容(建議使用 Spring Boot 3.2+)
通過以上實現,我們基于 Spring AI 和 SSE 協議構建了一個完整的流式對話系統,支持實時響應、動態中斷和上下文記憶,為用戶提供流暢的對話體驗。在實際應用中,可根據業務需求進一步擴展功能,如添加消息加密、內容過濾或多模型切換等特性。