一、背景
????????在大語言模型(LLM)應用場景中,GPT-4等模型的響應生成往往需要數秒至數十秒的等待時間。傳統同步請求會導致用戶面對空白頁面等待,體驗較差。本文通過Spring WebFlux響應式編程與SSE服務器推送技術,實現類似打印機的逐字流式輸出效果,同時結合LangChain4j框架進行AI能力集成,有效提升用戶體驗。
二、技術選型
- Spring WebFlux:基于 Reactor 的異步非阻塞 Web 框架
- SSE(Server-Sent Events):輕量級服務器推送技術
- LLM框架:LangChain4j?
- 大模型 API:以 OpenAI 的 GPT-4 (實際大模型是deepseek)
- 開發工具:IntelliJ IDEA + JDK 17
三、Spring WebFlux介紹
Spring Webflux 教程 - spring 中文網
這里就不多介紹了,網上教程很多
四、整體方案
五、實現步驟
1、pom依賴
<dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.1</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-milvus</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-embeddings-all-minilm-l6-v2</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-open-ai</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-open-ai-spring-boot-starter</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-reactor</artifactId><version>0.36.2</version></dependency>
2、controller層
content-type= text/event-stream
@ApiOperation(value = "流式對話")
@PostMapping(value = "", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestBody @Validated ChatReq chatReq) {log.info("--流式對話 chat request: {}--", chatReq);return chatService.chat(chatReq);
}
@ApiModel(value = "對話請求")
public class ChatReq {@ApiModelProperty(value = "對話id")private Long chatId;@ApiModelProperty(value = "對話類型")private Integer type;@ApiModelProperty(value = "提問")private String question;@ApiModelProperty(value = "外部id")private List<Long> externalIds;@ApiModelProperty(value = "向量檢索閾值", example = "0.5")@Min(value = 0)@Max(value = 1)private Double retrievalThreshold;@ApiModelProperty(value = "向量匹配結果數", example = "5")@Min(value = 1)private Integer topK;....}
3、service層
1)主體請求
public Flux<String> chat(ChatReq chatReq) {// Create a Sink that will emit items to FluxSinks.Many<ApiResponse<String>> sink = Sinks.many().multicast().onBackpressureBuffer();// 用于控制數據生成邏輯的標志AtomicBoolean isCancelled = new AtomicBoolean(false);ChatStreamingResponseHandler chatStreamingResponseHandler = new ChatStreamingResponseHandler();// 判斷新舊對話if (isNewChat(chatReq.getChatId())) { // 新對話,涉及業務略過chatReq.setHasHistory(false);chatModelHandle(chatReq);} else { // 舊對話// 根據chatId查詢對話類型和對話歷史chatReq.setHasHistory(true);chatModelHandle(chatReq);}return sink.asFlux().doOnCancel(() -> {log.info("停止流處理");isCancelled.set(true); // 設置取消標志sink.tryEmitComplete(); // 停止流});}
2)構建請求參數
有會話歷史,獲取會話歷史(請求回答和回答)
封裝成ChatMessages(question存UserMessage、answer存AiMessage)
?
private void chatModelHandle(ChatReq chatReq){List<ChatMessage> history = new ArrayList<>();if (chatReq.getHasHistory()) {// 組裝對話歷史,獲取question和answer分別存UserMessage和AiMessagehistory = getHistory(chatReq.getChatId());}Integer chatType = chatReq.getType();//依賴文本List<Long> externalIds = chatReq.getExternalIds();// 判斷對話類型if (ChatType.NORMAL.getCode().equals(chatType)) { // 普通對話if (chatReq.getHasHistory()) {history.add(UserMessage.from(chatReq.getQuestion()));}chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature()).generate(chatReq.getHasHistory() ? history : chatReq.getQuestion(), chatStreamingResponseHandler);} else if (ChatType.DOCUMENT_DB.getCode().equals(chatType)) { // 文本對話Prompt prompt = geneRagPrompt(chatReq);if (chatReq.getHasHistory()) {history.add(UserMessage.from(prompt.text()));}chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature()).generate(chatReq.getHasHistory() ? history : prompt.text(), chatStreamingResponseHandler);} else {throw new BizException("功能待開發");}}?
3)如果有參考文本,獲取參考文本
在向量庫中,根據參考文本id和向量檢索閾值,查看參考文本topN
private List<PPid> search(ChatReq chatReq, MilvusClientV2 client, MilvusConfig config, EmbeddingModel model) {//使用文本id進行查詢TextSegment segment = TextSegment.from(chatReq.getQuestion());Embedding queryEmbedding = model.embed(segment).content();SearchResp searchResp = client.search(SearchReq.builder().collectionName(config.getCollectionName()).data(Collections.singletonList(new FloatVec(queryEmbedding.vector()))).filter(String.format("ARRAY_CONTAINS(documentIdList, %s)", chatReq.getExternalIds())).topK(chatReq.getTopK() == null ? config.getTopK() : chatReq.getTopK()).outputFields(Arrays.asList("pid", "documentId")).build());// 過濾掉分數低于閾值的結果List<SearchResp.SearchResult> searchResults = searchResp.getSearchResults().get(0);Double minScore = chatReq.getRetrievalThreshold() == null ? config.getMinScore() : chatReq.getRetrievalThreshold();return searchResults.stream().filter(item -> item.getScore() >= minScore).sorted((item1, item2) -> Double.compare(item2.getScore(), item1.getScore())).map(item -> new PPid((Long) item.getEntity().get("documentId"),(Long) item.getEntity().get("pid"))).toList();}
獲取參考文本id后,獲取文本,再封裝請求模版
?
private Prompt genePrompt(String context) {...
}?
4)連接大模型客戶端
public static StreamingChatLanguageModel getStreamingChatLanguageModel() {ChatModelConfig config = ChatConfig.getInstance().getChatModelConfig();return OpenAiStreamingChatModel.builder().baseUrl(config.getBaseUrl()).modelName(config.getModelName()).apiKey(config.getApiKey()).maxTokens(config.getMaxTokens()).timeout(Duration.ofSeconds(config.getTimeout())).build();
}
5)大模型輸出處理
@Slf4j
@Data
@NoArgsConstructor
public class ChatStreamingResponseHandler implements StreamingResponseHandler<AiMessage> {private Sinks.Many<ApiResponse<String>> sink;private ChatReq chatReq;private AtomicBoolean isCancelled;public ChatStreamingResponseHandler(Sinks.Many<ApiResponse<String>> sink, ChatReq chatReq, AtomicBoolean isCancelled) {this.sink = sink;this.chatReq = chatReq;this.isCancelled = isCancelled;}@Overridepublic void onNext(String answer) {//取消不輸出if (isCancelled.get()) {return;}sink.tryEmitNext(BaseController.success(answer));}@Overridepublic void onComplete(Response<AiMessage> response) {if (!isCancelled.get()) {sink.tryEmitNext("結束標識");sink.tryEmitComplete();}// 業務處理}@Overridepublic void onError(Throwable error) {if (!isCancelled.get()) {sink.tryEmitError(error);}// 業務處理}}
六、效果呈現
七、結尾
上面簡要列一下實現步驟,可以留言深入討論。
有許多體驗還需要完善,以參考豆包舉例
1、實現手動停止響應
2、刷新或者頁面關閉自動停止流式輸出,重連后流式輸出繼續
3、將多個Token打包發送,減少SSE幀數量