書接上回:springBoot 整合 扣子cozeAI 智能體 對話https://blog.csdn.net/weixin_44548582/article/details/147457236
上文實現的是一次性等待并得到完整的AI回復內容,但隨著問題和AI的邏輯日趨復雜,會明顯增加這個等待時間,這對前端用戶并不友好,所以需要實現與coze對話的流式、打字機效果。
核心工具:SseEmitter
基本概念
SseEmitter 是 Spring Framework 提供的一個類,用于實現服務器發送事件(Server-Sent Events, SSE)。SSE 是一種允許服務器向客戶端推送實時更新的技術,通常用于實現實時通知、數據流傳輸等功能。SseEmitter 通過 HTTP 長連接保持與客戶端的通信,服務器可以持續向客戶端發送數據,而客戶端則通過 EventSource API 接收這些數據。
實現流式傳輸的原理
SseEmitter 實現流式傳輸的核心在于它使用了 HTTP 長連接和分塊傳輸編碼(Chunked Transfer Encoding)。當客戶端發起 SSE 請求時,服務器會保持連接打開,并通過分塊傳輸的方式逐步發送數據。每個數據塊都是一個獨立的事件,客戶端可以實時接收并處理這些事件。
實現打字機效果的原理
打字機效果是指文本逐字或逐行顯示的效果。通過 SseEmitter,可以實現這種效果。服務器可以逐步發送文本的每個字符或每行,客戶端接收到數據后立即追加顯示,從而模擬出打字機的效果。
實戰代碼
application.yml配置
# Tomcat
server:port: 9210#扣子參數
coze:clientId: xxxxxxxxxxxxxpublicKeyId: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyprivateKeyFilePath: 本地或服務器絕對路徑/private_key.pemwwwBase: https://www.coze.cnapiBase: https://api.coze.cn# 智能體IDbotId: zzzzzzzzzzzzzzzzzzzzzzzzzzzz
扣子參數配置類?
/*** 扣子參數配置類* @Author: Tenk*/
@Component
@ConfigurationProperties(prefix = "coze") // 通過yml讀取
public class CozeConfig {//OAuth應用IDprivate String clientId;//公鑰private String publicKeyId;//私鑰證書private String privateKeyFilePath;//coze官網private String wwwBase;//cozeApi請求地址private String apiBase;//智能體botIdprivate String botId;
}
Coze授權工具類
/*** 初始化CozeJWTOAuth授權工具** @url https://github.com/coze-dev/coze-java/blob/main/example/src/main/java/example/auth/JWTOAuthExample.java* @Author: Tenk*/
@Component
public class CozeJWTOAuthUtil {private static final Logger log = LoggerFactory.getLogger(CozeJWTOAuthUtil.class);@Autowiredprivate CozeConfig cozeConfig;@Autowiredprivate RedisService redisService;//CozeAPIprivate JWTOAuthClient oauth;public OAuthToken getAccessToken(Long userId) {OAuthToken accessToken;if (redisService.hasKey(CozeConstants.JWT_TOKEN + userId)) {accessToken = JSONObject.parseObject(redisService.getCacheObject(CozeConstants.JWT_TOKEN + userId).toString(), OAuthToken.class);} else {accessToken = oauth.getAccessToken(userId.toString());redisService.setCacheObject(CozeConstants.JWT_TOKEN + userId, accessToken, 14L, TimeUnit.MINUTES);}return accessToken;}public CozeConfig getCozeConfig() {return cozeConfig;}@PostConstructpublic void init() {this.oauth = createJWTOAuthClient();}public JWTOAuthClient getJWTOAuthClient() {return oauth;}/*** 初始化CozeJWTOAuth** @return*/public CozeAPI createCozeAPIByUser(String accessToken) {return new CozeAPI.Builder().auth(new TokenAuth(accessToken)).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();}public JWTOAuthClient createJWTOAuthClient() {try {//讀取coze_private_key_pemString jwtOAuthPrivateKey = new String(Files.readAllBytes(Paths.get(cozeConfig.getPrivateKeyFilePath())), StandardCharsets.UTF_8);oauth = new JWTOAuthClient.JWTOAuthBuilder().clientID(cozeConfig.getClientId()).privateKey(jwtOAuthPrivateKey).publicKey(cozeConfig.getPublicKeyId()).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();} catch (Exception e) {log.error("初始化CozeJWTOAuth失敗", e);return null;}return oauth;}
}
SSE服務類?
/*** SSE服務類** @Author: Tenk*/
@Service
public class SseServiceImpl implements SseService {private static final Logger log = LoggerFactory.getLogger(SseServiceImpl.class);/*** k:扣子會話id v:SseEmitter* 這里一定是使用ConcurrentHashMap,因為他是多線程安全的。*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** k:會話id v:userId* 這里一定是使用ConcurrentHashMap,因為他是多線程安全的。*/private static Map<String, Long> sseUserMap = new ConcurrentHashMap<>();@Overridepublic SseEmitter connect(String conversationId, Long userId) {SseEmitter sseEmitter;// 判斷是否已經存在if (sseEmitterMap.containsKey(conversationId)) {sseEmitter = sseEmitterMap.get(conversationId);} else {// 最多6小時斷開連接sseEmitter = new SseEmitter(6 * 60 * 60 * 1000L);}// 連接斷開sseEmitter.onCompletion(() -> {disconnect("連接斷開", conversationId);});// 連接超時sseEmitter.onTimeout(() -> {disconnect("連接超時", conversationId);});// 連接報錯sseEmitter.onError((throwable) -> {disconnect("連接報錯", conversationId);});sseEmitterMap.put(conversationId, sseEmitter);sseUserMap.put(conversationId, userId);return sseEmitter;}private static void disconnect(String action, String conversationId) {Long value = sseUserMap.get(conversationId);log.info("sse{},用戶userId:{}", action, value);sseEmitterMap.remove(conversationId);sseUserMap.remove(conversationId);}
}
AI 接口 Controller?
/*** AI 接口 Controller* @Author: Tenk*/
@RestController
@RequestMapping("/ai/chats")
public class CozeController {@Autowiredprivate CozeService cozeService;@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;@Autowiredprivate SseService sseService;/*** 向AI發起流式對話請求** @param conversationId 會話ID* @param content 對話內容* @return 對話流*/@GetMapping(value = "/send", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sendFlowMessage(@RequestParam String conversationId,@RequestParam String content) throws IOException {if (StringUtils.isEmpty(conversationId)) {throw new ServiceException("會話信息缺失", 500);}ChatBo bo = new ChatBo();bo.setUserId(userId);bo.setConversationId(conversationId);bo.setContent(content);SseEmitter emitter = sseService.connect(conversationId, userId);try{cozeService.sendFlowMessage(bo, emitter);}catch (Exception e){e.printStackTrace();// 捕獲并發送 SSE 格式的錯誤emitter.send("{\"status\":\"fail\",\"data\":\""+e.getMessage()+"\"}");emitter.completeWithError(e);}return emitter;}
}
方法實現 Service?
/*** 方法具體實現 Service* @Author: Tenk*/
@Service
public class CozeServiceImpl implements CozeService {// @Autowired Mapper Service ……@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;// 流式消息狀態private static final String inProgress = "in-progress"; // 進行中private static final String done = "done"; // 完成/*** 構建 SSE 返回格式** @param status 響應狀態(in-progress / done)* @param data 數據內容,可以是 textBuffer 或最終 data 對象* @return 構造好的 JSON 對象*/private JSONObject buildSseResult(String status, Object data) {JSONObject result = new JSONObject();result.put("status", status);result.put("data", data);return result;}@Override@Transactionalpublic void sendFlowMessage(ChatBo bo, SseEmitter emitter) {// 1. 初始化 Coze API 客戶端CozeAPI cozeAPI = cozeJWTOAuthUtil.createCozeAPIByUser(cozeJWTOAuthUtil.getAccessToken(bo.getUserId()).getAccessToken());// 2. 構造用戶發送的消息CreateMessageReq msgReq = CreateMessageReq.builder().conversationID(bo.getConversationId()).role(MessageRole.USER).content(bo.getContent()).contentType(MessageContentType.TEXT).build();// 整理用戶消息,插入消息歷史數據表Message userMsg = cozeAPI.conversations().messages().create(msgReq).getMessage();CozeMsgLog userMsgLog = new CozeMsgLog(bo.getUserId(),bo.getConversationId(),userMsg.getBotId(),userMsg.getChatId(),userMsg.getId(),null,bo.getContent(),userMsg.getContentType().getValue(),userMsg.getMetaData().toString(),userMsg.getReasoningContent(),userMsg.getRole().getValue(),userMsg.getSectionId(),MessageType.QUESTION.getValue(),new Date(userMsg.getCreatedAt() * 1000),new Date(userMsg.getUpdatedAt() * 1000));cozeMsgLogService.insertCozeMsgLog(userMsgLog);// 4. 打開 Coze 流式對話Flowable<ChatEvent> chatStream = cozeAPI.chat().stream(CreateChatReq.builder().botID(cozeJWTOAuthUtil.getCozeConfig().getTripBotId()).stream(true).autoSaveHistory(true).conversationID(bo.getConversationId()).userID(bo.getUserId().toString()).messages(Collections.singletonList(userMsg)).build());// 5. 發送初始提示信息try {JSONObject delayJson = new JSONObject();delayJson.put("type", "delay");delayJson.put("delayReason", "開始思考……");emitter.send(buildSseResult(inProgress, delayJson));} catch (IOException e) {log.error("規劃行程開始錯誤", e);throw new ServiceException("規劃行程開始錯誤");}StringBuffer fullContent = new StringBuffer();// 完整 AI 回復文本,包含一些不想給前端的特殊符號List<JSONObject> textBuffer = new ArrayList<>(); // 緩沖 SSE 數據int bufferThreshold = 3; // 緩沖閾值,當緩沖列表長度超過此值時,發送給前端// 7. 訂閱流式對話事件chatStream.timeout(10, TimeUnit.MINUTES).observeOn(Schedulers.io()).subscribe(event -> {// 增量消息(例如:['H','e','l','l','o',' ','W','o','r','l',……])if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {// 提取增量消息String delta = event.getMessage().getContent();// 逐步拼接成完整消息fullContent.append(delta);// 清洗輸出給前端的文本:去除 <、>、[、] 特殊符號,發送給前端,視情況而定,非必須String cleanText= delta.replaceAll("[<>\\[\\]]", "");// TODO 自定義業務邏輯// 實際發送if (!cleanText.isEmpty()) {JSONObject textJson = new JSONObject();textJson.put("type", "text");textJson.put("text", cleanText);synchronized (textBuffer) {// 添加到緩沖列表textBuffer.add(textJson);// 發送緩沖列表if (textBuffer.size() >= bufferThreshold) {/** 示例* {* "data": [* {* "text": "hello world\n",* "type": "text"* },* {* "text": "### title three\n",* "type": "text"* },* {* "text": "#### title four\n",* "type": "text"* }* ],* "status": "in-progress"* }*/emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}}// TODO 自定義業務邏輯}// AI處理、回復完成// event:conversation.message.completed會有兩次,&&后的條件是取其中一次,詳見 https://www.coze.cn/open/docs/developer_guides/chat_v3#70a1d1bdif (ChatEventType.CONVERSATION_MESSAGE_COMPLETED.equals(event.getEvent()) && MessageType.ANSWER.getValue().equals(event.getMessage().getType().getValue())) {// === 最后一批textBuffer沒發的統一發出 ===synchronized (textBuffer) {if (!textBuffer.isEmpty()) {emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}// === 構造最終完成的數據包 ===JSONObject finalData = new JSONObject();finalData.put("xxx", "自定義數據");finalData.put("yyy", "自定義內容");finalData.put("botMessage", fullContent.toString());// 發送狀態為 done 的 SSEemitter.send(buildSseResult(done, finalData));// AI回復的內容,插入消息歷史數據表Message message = event.getMessage();CozeMsgLog aiMsgLog = new CozeMsgLog(bo.getUserId(),message.getConversationId(),message.getBotId(),message.getChatId(),message.getId(),message.getContent(),finalData.toString(),message.getContentType().getValue(),message.getMetaData() == null ? null : message.getMetaData().toString(),message.getReasoningContent(),message.getRole().getValue(),message.getSectionId(),message.getType().getValue(),new Date(message.getCreatedAt() * 1000),new Date());cozeMsgLogService.insertCozeMsgLog(aiMsgLog);}},error -> {log.error("AI對話異常:{}", error.getMessage(), error);emitter.send(buildSseResult(done, "AI思考失敗"));emitter.completeWithError(error);cozeAPI.shutdownExecutor();},() -> {// 釋放資源emitter.complete();cozeAPI.shutdownExecutor();});}
}