Java SpringBoot 扣子CozeAI SseEmitter流式對話完整實戰 打字機效果

書接上回:springBoot 整合 扣子cozeAI 智能體 對話https://blog.csdn.net/weixin_44548582/article/details/147457236

上文實現的是一次性等待得到完整的AI回復內容,但隨著問題和AI的邏輯日趨復雜,會明顯增加這個等待時間,這對前端用戶并不友好,所以需要實現與coze對話的流式、打字機效果。

核心工具:SseEmitter

基本概念

SseEmitter 是 Spring Framework 提供的一個類,用于實現服務器發送事件(Server-Sent Events, SSE)。SSE 是一種允許服務器向客戶端推送實時更新的技術,通常用于實現實時通知、數據流傳輸等功能。SseEmitter 通過 HTTP 長連接保持與客戶端的通信,服務器可以持續向客戶端發送數據,而客戶端則通過 EventSource API 接收這些數據。

實現流式傳輸的原理

SseEmitter 實現流式傳輸的核心在于它使用了 HTTP 長連接和分塊傳輸編碼(Chunked Transfer Encoding)。當客戶端發起 SSE 請求時,服務器會保持連接打開,并通過分塊傳輸的方式逐步發送數據。每個數據塊都是一個獨立的事件,客戶端可以實時接收并處理這些事件。

實現打字機效果的原理

打字機效果是指文本逐字或逐行顯示的效果。通過 SseEmitter,可以實現這種效果。服務器可以逐步發送文本的每個字符或每行,客戶端接收到數據后立即追加顯示,從而模擬出打字機的效果。

實戰代碼

application.yml配置
# Tomcat
server:port: 9210#扣子參數
coze:clientId: xxxxxxxxxxxxxpublicKeyId: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyprivateKeyFilePath: 本地或服務器絕對路徑/private_key.pemwwwBase: https://www.coze.cnapiBase: https://api.coze.cn# 智能體IDbotId: zzzzzzzzzzzzzzzzzzzzzzzzzzzz
扣子參數配置類?
/*** 扣子參數配置類* @Author: Tenk*/
@Component
@ConfigurationProperties(prefix = "coze") // 通過yml讀取
public class CozeConfig {//OAuth應用IDprivate String clientId;//公鑰private String publicKeyId;//私鑰證書private String privateKeyFilePath;//coze官網private String wwwBase;//cozeApi請求地址private String apiBase;//智能體botIdprivate String botId;
}
Coze授權工具類
/*** 初始化CozeJWTOAuth授權工具** @url https://github.com/coze-dev/coze-java/blob/main/example/src/main/java/example/auth/JWTOAuthExample.java* @Author: Tenk*/
@Component
public class CozeJWTOAuthUtil {private static final Logger log = LoggerFactory.getLogger(CozeJWTOAuthUtil.class);@Autowiredprivate CozeConfig cozeConfig;@Autowiredprivate RedisService redisService;//CozeAPIprivate JWTOAuthClient oauth;public OAuthToken getAccessToken(Long userId) {OAuthToken accessToken;if (redisService.hasKey(CozeConstants.JWT_TOKEN + userId)) {accessToken = JSONObject.parseObject(redisService.getCacheObject(CozeConstants.JWT_TOKEN + userId).toString(), OAuthToken.class);} else {accessToken = oauth.getAccessToken(userId.toString());redisService.setCacheObject(CozeConstants.JWT_TOKEN + userId, accessToken, 14L, TimeUnit.MINUTES);}return accessToken;}public CozeConfig getCozeConfig() {return cozeConfig;}@PostConstructpublic void init() {this.oauth = createJWTOAuthClient();}public JWTOAuthClient getJWTOAuthClient() {return oauth;}/*** 初始化CozeJWTOAuth** @return*/public CozeAPI createCozeAPIByUser(String accessToken) {return new CozeAPI.Builder().auth(new TokenAuth(accessToken)).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();}public JWTOAuthClient createJWTOAuthClient() {try {//讀取coze_private_key_pemString jwtOAuthPrivateKey = new String(Files.readAllBytes(Paths.get(cozeConfig.getPrivateKeyFilePath())), StandardCharsets.UTF_8);oauth = new JWTOAuthClient.JWTOAuthBuilder().clientID(cozeConfig.getClientId()).privateKey(jwtOAuthPrivateKey).publicKey(cozeConfig.getPublicKeyId()).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();} catch (Exception e) {log.error("初始化CozeJWTOAuth失敗", e);return null;}return oauth;}
}
SSE服務類?
/*** SSE服務類** @Author: Tenk*/
@Service
public class SseServiceImpl implements SseService {private static final Logger log = LoggerFactory.getLogger(SseServiceImpl.class);/*** k:扣子會話id  v:SseEmitter* 這里一定是使用ConcurrentHashMap,因為他是多線程安全的。*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** k:會話id  v:userId* 這里一定是使用ConcurrentHashMap,因為他是多線程安全的。*/private static Map<String, Long> sseUserMap = new ConcurrentHashMap<>();@Overridepublic SseEmitter connect(String conversationId, Long userId) {SseEmitter sseEmitter;// 判斷是否已經存在if (sseEmitterMap.containsKey(conversationId)) {sseEmitter = sseEmitterMap.get(conversationId);} else {// 最多6小時斷開連接sseEmitter = new SseEmitter(6 * 60 * 60 * 1000L);}// 連接斷開sseEmitter.onCompletion(() -> {disconnect("連接斷開", conversationId);});// 連接超時sseEmitter.onTimeout(() -> {disconnect("連接超時", conversationId);});// 連接報錯sseEmitter.onError((throwable) -> {disconnect("連接報錯", conversationId);});sseEmitterMap.put(conversationId, sseEmitter);sseUserMap.put(conversationId, userId);return sseEmitter;}private static void disconnect(String action, String conversationId) {Long value = sseUserMap.get(conversationId);log.info("sse{},用戶userId:{}", action, value);sseEmitterMap.remove(conversationId);sseUserMap.remove(conversationId);}
}
AI 接口 Controller?
/*** AI 接口 Controller* @Author: Tenk*/
@RestController
@RequestMapping("/ai/chats")
public class CozeController {@Autowiredprivate CozeService cozeService;@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;@Autowiredprivate SseService sseService;/*** 向AI發起流式對話請求** @param conversationId 會話ID* @param content        對話內容* @return 對話流*/@GetMapping(value = "/send", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sendFlowMessage(@RequestParam String conversationId,@RequestParam String content) throws IOException {if (StringUtils.isEmpty(conversationId)) {throw new ServiceException("會話信息缺失", 500);}ChatBo bo = new ChatBo();bo.setUserId(userId);bo.setConversationId(conversationId);bo.setContent(content);SseEmitter emitter = sseService.connect(conversationId, userId);try{cozeService.sendFlowMessage(bo, emitter);}catch (Exception e){e.printStackTrace();// 捕獲并發送 SSE 格式的錯誤emitter.send("{\"status\":\"fail\",\"data\":\""+e.getMessage()+"\"}");emitter.completeWithError(e);}return emitter;}
}
方法實現 Service?
/*** 方法具體實現 Service* @Author: Tenk*/
@Service
public class CozeServiceImpl implements CozeService {// @Autowired Mapper Service ……@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;// 流式消息狀態private static final String inProgress = "in-progress"; // 進行中private static final String done = "done";              // 完成/*** 構建 SSE 返回格式** @param status 響應狀態(in-progress / done)* @param data   數據內容,可以是 textBuffer 或最終 data 對象* @return 構造好的 JSON 對象*/private JSONObject buildSseResult(String status, Object data) {JSONObject result = new JSONObject();result.put("status", status);result.put("data", data);return result;}@Override@Transactionalpublic void sendFlowMessage(ChatBo bo, SseEmitter emitter) {// 1. 初始化 Coze API 客戶端CozeAPI cozeAPI = cozeJWTOAuthUtil.createCozeAPIByUser(cozeJWTOAuthUtil.getAccessToken(bo.getUserId()).getAccessToken());// 2. 構造用戶發送的消息CreateMessageReq msgReq = CreateMessageReq.builder().conversationID(bo.getConversationId()).role(MessageRole.USER).content(bo.getContent()).contentType(MessageContentType.TEXT).build();// 整理用戶消息,插入消息歷史數據表Message userMsg = cozeAPI.conversations().messages().create(msgReq).getMessage();CozeMsgLog userMsgLog = new CozeMsgLog(bo.getUserId(),bo.getConversationId(),userMsg.getBotId(),userMsg.getChatId(),userMsg.getId(),null,bo.getContent(),userMsg.getContentType().getValue(),userMsg.getMetaData().toString(),userMsg.getReasoningContent(),userMsg.getRole().getValue(),userMsg.getSectionId(),MessageType.QUESTION.getValue(),new Date(userMsg.getCreatedAt() * 1000),new Date(userMsg.getUpdatedAt() * 1000));cozeMsgLogService.insertCozeMsgLog(userMsgLog);// 4. 打開 Coze 流式對話Flowable<ChatEvent> chatStream = cozeAPI.chat().stream(CreateChatReq.builder().botID(cozeJWTOAuthUtil.getCozeConfig().getTripBotId()).stream(true).autoSaveHistory(true).conversationID(bo.getConversationId()).userID(bo.getUserId().toString()).messages(Collections.singletonList(userMsg)).build());// 5. 發送初始提示信息try {JSONObject delayJson = new JSONObject();delayJson.put("type", "delay");delayJson.put("delayReason", "開始思考……");emitter.send(buildSseResult(inProgress, delayJson));} catch (IOException e) {log.error("規劃行程開始錯誤", e);throw new ServiceException("規劃行程開始錯誤");}StringBuffer fullContent = new StringBuffer();// 完整 AI 回復文本,包含一些不想給前端的特殊符號List<JSONObject> textBuffer = new ArrayList<>();     // 緩沖 SSE 數據int bufferThreshold = 3;                             // 緩沖閾值,當緩沖列表長度超過此值時,發送給前端// 7. 訂閱流式對話事件chatStream.timeout(10, TimeUnit.MINUTES).observeOn(Schedulers.io()).subscribe(event -> {// 增量消息(例如:['H','e','l','l','o',' ','W','o','r','l',……])if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {// 提取增量消息String delta = event.getMessage().getContent();// 逐步拼接成完整消息fullContent.append(delta);// 清洗輸出給前端的文本:去除 <、>、[、] 特殊符號,發送給前端,視情況而定,非必須String cleanText= delta.replaceAll("[<>\\[\\]]", "");// TODO 自定義業務邏輯// 實際發送if (!cleanText.isEmpty()) {JSONObject textJson = new JSONObject();textJson.put("type", "text");textJson.put("text", cleanText);synchronized (textBuffer) {// 添加到緩沖列表textBuffer.add(textJson);// 發送緩沖列表if (textBuffer.size() >= bufferThreshold) {/** 示例* {*     "data": [*         {*             "text": "hello world\n",*             "type": "text"*         },*         {*             "text": "### title three\n",*             "type": "text"*         },*         {*             "text": "#### title four\n",*             "type": "text"*         }*     ],*     "status": "in-progress"* }*/emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}}// TODO 自定義業務邏輯}// AI處理、回復完成// event:conversation.message.completed會有兩次,&&后的條件是取其中一次,詳見 https://www.coze.cn/open/docs/developer_guides/chat_v3#70a1d1bdif (ChatEventType.CONVERSATION_MESSAGE_COMPLETED.equals(event.getEvent()) && MessageType.ANSWER.getValue().equals(event.getMessage().getType().getValue())) {// === 最后一批textBuffer沒發的統一發出 ===synchronized (textBuffer) {if (!textBuffer.isEmpty()) {emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}// === 構造最終完成的數據包 ===JSONObject finalData = new JSONObject();finalData.put("xxx", "自定義數據");finalData.put("yyy", "自定義內容");finalData.put("botMessage", fullContent.toString());// 發送狀態為 done 的 SSEemitter.send(buildSseResult(done, finalData));// AI回復的內容,插入消息歷史數據表Message message = event.getMessage();CozeMsgLog aiMsgLog = new CozeMsgLog(bo.getUserId(),message.getConversationId(),message.getBotId(),message.getChatId(),message.getId(),message.getContent(),finalData.toString(),message.getContentType().getValue(),message.getMetaData() == null ? null : message.getMetaData().toString(),message.getReasoningContent(),message.getRole().getValue(),message.getSectionId(),message.getType().getValue(),new Date(message.getCreatedAt() * 1000),new Date());cozeMsgLogService.insertCozeMsgLog(aiMsgLog);}},error -> {log.error("AI對話異常:{}", error.getMessage(), error);emitter.send(buildSseResult(done, "AI思考失敗"));emitter.completeWithError(error);cozeAPI.shutdownExecutor();},() -> {// 釋放資源emitter.complete();cozeAPI.shutdownExecutor();});}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/82172.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/82172.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/82172.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《AVL樹完全解析:平衡之道與C++實現》

目錄 AVL樹的核心概念數據結構與節點定義插入操作與平衡因子更新旋轉操作&#xff1a;從理論到代碼雙旋場景深度剖析平衡檢測與測試策略性能分析與工程實踐總結 0.前置知識&#xff1a;BS樹 代碼實現部分對和BS樹相似的部分會省略。 1. AVL樹的核心概念 1.1 平衡二叉搜索樹…

跨平臺游戲引擎 Axmol-2.6.0 發布

Axmol 2.6.0 版本是一個以錯誤修復和功能改進為主的次要LTS長期支持版本 &#x1f64f;感謝所有貢獻者及財務贊助者&#xff1a;scorewarrior、peterkharitonov、duong、thienphuoc、bingsoo、asnagni、paulocoutinhox、DelinWorks 相對于2.5.0版本的重要變更&#xff1a; 通…

【Django Serializer】一篇文章詳解 Django 序列化器

第一章 Django 序列化器概述 1.1 序列化器的定義 1.1.1 序列化與反序列化的概念 1. 序列化 想象你有一個裝滿各種物品&#xff08;數據對象&#xff09;的大箱子&#xff08;數據庫&#xff09;&#xff0c;但是你要把這些物品通過一個狹窄的管道&#xff08;網絡&#xff…

關于spring @Bean里調用其他產生bean的方法

背景 常常見到如下代碼 Bean public TestBean testBean() {TestBean t new TestBean();System.out.println("testBean:" t);return t; }Bean public FooBean fooBean() {TestBean t testBean();System.out.println("這里看似是自己new的&#xff0c;但因為…

Level1.7列表

1.7_1列表&#xff08;索引切片&#xff09; #1.列表 students[Bob,Alice,Jim,Mike,Judy] print(students)#2.在列表&#xff08;添加不同數據類型&#xff0c;查看列表是否可以運行&#xff1f;是否為列表類型&#xff1f;&#xff09; students[Bob,Alice,Jim,Mike,Judy,123…

Python爬蟲實戰:研究Cola框架相關技術

一、Cola 框架概述 Cola 是一款基于 Python 的異步爬蟲框架,專為高效抓取和處理大規模數據設計。它結合了 Scrapy 的強大功能和 asyncio 的異步性能優勢,特別適合需要高并發處理的爬蟲任務。 1.1 核心特性 異步 IO 支持:基于 asyncio 實現非阻塞 IO,大幅提高并發性能模塊…

vue2中el-table 實現前端分頁

一些接口不分頁的數據列表&#xff0c;一次性返回大量數據會導致前端渲染卡頓&#xff0c;接口不做分頁的情況下前端可以截取數據來做分頁 以下是一個例子&#xff0c;被截取的列表和全量數據在同一個棧內存空間&#xff0c;所以如果有表格內的表單編輯&#xff0c;新的值也會事…

Python + moviepy:根據圖片或數據高效生成視頻全流程詳解

前言 在數據可視化、自媒體內容生產、學術匯報等領域,我們常常需要將一組圖片或一段變動的數據,自動合成為視頻文件。這樣不僅能提升內容表現力,也極大節省了人工操作時間。Python作為數據處理和自動化領域的王者,其`moviepy`庫為我們提供了靈活高效的視頻生成方案。本文將…

科技賦能,開啟現代健康養生新潮流

在科技與生活深度融合的當下&#xff0c;健康養生也迎來了全新的打開方式。無需傳統醫學的介入&#xff0c;借助現代科學與智能設備&#xff0c;我們能以更高效、精準的方式守護健康。? 飲食管理步入精準化時代。利用手機上的營養計算 APP&#xff0c;錄入每日飲食&#xff0…

Ubuntu24.04 LTS安裝java8、mysql8.0

在 Ubuntu 24.04 上安裝 OpenJDK OpenJDK 包在 Ubuntu 24.04 的默認存儲庫中隨時可用。 打開終端并運行以下 apt 命令: sudo apt update查看是否已經安裝java java --version如果未安裝會有提示&#xff0c;直接復制命令安裝即可&#xff0c;默認版本&#xff1a; sudo apt in…

深度學習框架顯存泄漏診斷手冊(基于PyTorch的Memory Snapshot對比分析方法)

點擊 “AladdinEdu&#xff0c;同學們用得起的【H卡】算力平臺”&#xff0c;H卡級別算力&#xff0c;按量計費&#xff0c;靈活彈性&#xff0c;頂級配置&#xff0c;學生專屬優惠。 一、顯存泄漏&#xff1a;深度學習開發者的"隱形殺手" 在深度學習模型的訓練與推…

Pytorch分布式訓練,數據并行,單機多卡,多機多卡

分布式訓練 所有代碼可以見我github 倉庫&#xff1a;https://github.com/xiejialong/ddp_learning.git 數據并行&#xff08;Data Parallelism&#xff0c;DP&#xff09; 跨多個gpu訓練模型的最簡單方法是使用 torch.nn.DataParallel. 在這種方法中&#xff0c;模型被復制…

【論文閱讀】——D^3-Human: Dynamic Disentangled Digital Human from Monocular Vi

文章目錄 摘要1 引言2 相關工作3 方法3.1 HmSDF 表示3.2 區域聚合3.3. 變形場3.4. 遮擋感知可微分渲染3.5 訓練3.5.1 訓練策略3.5.2 重建損失3.5.3 正則化限制 4. 實驗4.1 定量評估4.2 定性評價4.3 消融研究4.4 應用程序 5 結論 摘要 我們介紹 D 3 D^{3} D3人&#xff0c;一種…

docker commit除了提交容器成鏡像,還能搞什么之修改cmd命令

要讓新鏡像默認啟動時執行 /usr/sbin/sshd -D&#xff0c;需在提交鏡像時 ??顯式指定新的啟動命令??。 方法一&#xff1a;提交時通過 --change 覆蓋 CMD docker commit --changeCMD ["/usr/sbin/sshd", "-D"] v2 project:v2 方法二&#xff1a;重…

為什么我輸入對了密碼,還是不能用 su 切換到 root?

“為什么我輸入對了密碼&#xff0c;還是不能用 su 切換到 root&#xff1f;” 其實這背后可能不是“密碼錯了”&#xff0c;而是系統不允許你用 su 切 root&#xff0c;即使密碼對了。 &#x1f447; 以下是最常見的幾個真正原因&#xff1a; ? 1. Root 用戶沒有設置密碼&…

轉移dp簡單數學數論

1.轉移dp問題 昨天的練習賽上有一個很好玩的起終點問題&#xff0c;第一時間給出bfs的寫法。 但是寫到后面發現不行&#xff0c;還得是的dp轉移的寫法才能完美的解決這道題目。 每個格子可以經過可以不經過&#xff0c;因此它的狀態空間是2^&#xff08;n*m&#xff09;&…

IP查詢基礎介紹

IP 查詢原理 IP 地址是網絡設備唯一標識&#xff0c;IP 查詢通過解析 IP 地址獲取地理位置、運營商等信息。目前主流的 IPv4&#xff08;32 位&#xff09;與 IPv6&#xff08;128 位&#xff09;協議&#xff0c;前者理論提供約 43 億地址&#xff0c;后者地址空間近乎無限。…

Linux命令簡介

1 Linux系統的命令概述 在 Linux 操作系統中&#xff0c;凡是在字符操作界面中輸入能夠完成特定操作和任務的字符串都可以稱為命令。嚴格來說&#xff0c;命令通常只代表實現某一類功能的指令或程序的名稱。 1.1 Shell Linux 命令的執行必須依賴于 Shell 命令解釋器。Shell …

WebRTC與RTSP|RTMP的技術對比:低延遲與穩定性如何決定音視頻直播的未來

引言 音視頻直播技術已經深刻影響了我們的生活方式&#xff0c;尤其是在教育、醫療、安防、娛樂等行業中&#xff0c;音視頻技術成為了行業發展的重要推動力。近年來&#xff0c;WebRTC作為一種開源的實時通信技術&#xff0c;成為了音視頻領域的重要選擇&#xff0c;它使得瀏覽…

多通道振弦式數據采集儀MCU安裝指南

設備介紹 數據采集儀 MCU集傳統數據采集器與5G/4G,LoRa/RS485兩種通信功能與一體的智能數據采集儀。該產品提供振弦、RS-485等的物理接口&#xff0c;能自動采集并存儲多種自然資源、建筑、橋梁、城市管廊、大壩、隧道、水利、氣象傳感器的實時數據&#xff0c;利用現場采集的數…