1. 添加依賴 (pom.xml)
<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SSE 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- HTTP客戶端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId></dependency>
</dependencies>
2. 配置類 (WebClientConfig.java
)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.deepseek.com/v1").defaultHeader("Authorization", "Bearer YOUR_API_KEY") // 替換為你的API密鑰.build();}
}
3. 請求/響應DTO
import lombok.Data;
import java.util.List;@Data
public class DeepSeekRequest {private String model = "deepseek-chat";private List<Message> messages;private boolean stream = true;@Datapublic static class Message {private String role;private String content;public Message(String role, String content) {this.role = role;this.content = content;}}
}@Data
public class DeepSeekResponse {private List<Choice> choices;@Datapublic static class Choice {private Delta delta;}@Datapublic static class Delta {private String content;}
}
4. SSE服務實現 (DeepSeekService.java
)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;import java.util.Collections;@Service
public class DeepSeekService {private final WebClient webClient;public DeepSeekService(WebClient webClient) {this.webClient = webClient;}public Flux<String> streamCompletion(String userMessage) {// 使用 FluxProcessor 替代 SinksFluxProcessor<String, String> processor = DirectProcessor.<String>create().serialize();FluxSink<String> sink = processor.sink();DeepSeekRequest request = new DeepSeekRequest();request.setMessages(Collections.singletonList(new DeepSeekRequest.Message("user", userMessage)));webClient.post().uri("/chat/completions").contentType(MediaType.APPLICATION_JSON).bodyValue(request).accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).subscribe(data -> {ObjectMapper objectMapper = new ObjectMapper();try {String jsonString = objectMapper.writeValueAsString(data);sink.next(jsonString);} catch (JsonProcessingException e) {sink.error(e);}},sink::error,sink::complete);return processor;}
}
5. SSE控制器 (SseController.java
)
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;@RestController
@RequestMapping("/sse")
public class SseController {private final DeepSeekService deepSeekService;public SseController(DeepSeekService deepSeekService) {this.deepSeekService = deepSeekService;}@GetMapping(path = "/deepseek", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamDeepSeekResponse(@RequestParam String message) {SseEmitter emitter = new SseEmitter(60 * 1000L); // 60秒超時Flux<String> responseStream = deepSeekService.streamCompletion(message);responseStream.subscribe(content -> {try {// 發送SSE事件emitter.send(SseEmitter.event().data(content).name("message"));} catch (Exception e) {emitter.completeWithError(e);}},emitter::completeWithError,emitter::complete);return emitter;}
}
6. 前端實現 (HTML + JavaScript)
<!DOCTYPE html>
<html>
<head><title>DeepSeek SSE Demo</title>
</head>
<body><input type="text" id="message" placeholder="輸入你的問題"><button onclick="startSSE()">開始對話</button><div id="output" style="white-space: pre-wrap; margin-top: 20px;"></div><script>let eventSource;function startSSE() {const message = document.getElementById('message').value;const outputDiv = document.getElementById('output');outputDiv.innerHTML = ''; // 清空之前的內容if (eventSource) eventSource.close();// 創建SSE連接eventSource = new EventSource(`/sse/deepseek?message=${encodeURIComponent(message)}`);eventSource.addEventListener("message", (event) => {// 實時追加內容outputDiv.innerHTML += event.data;});eventSource.addEventListener("error", (err) => {console.error("SSE error:", err);outputDiv.innerHTML += "\n\n[連接已關閉]";eventSource.close();});}</script>
</body>
</html>
關鍵點說明:
????????SSE流式傳輸:
????????使用SseEmitter實現服務端推送
????????通過text/event-stream內容類型保持長連接
DeepSeek API集成:
設置stream=true啟用流式響應
? ? ? ? ?處理data: [DONE]結束標記
? ? ? ? ?解析JSON響應中的content字段
????????響應式編程:
????????使用WebClient處理HTTP流
????????使用Sinks進行背壓管理
????????Flux實現響應式流處理
????????前端實現:
????????使用EventSource?API接收SSE
????????實時追加內容到DOM
????????處理連接錯誤和關閉
測試步驟:
1.啟動Spring Boot應用
2.訪問前端頁面(默認端口8080)
3.輸入問題并點擊按鈕
4.查看實時輸出的思考過程
注意事項:
1.替換YOUR_API_KEY為實際的DeepSeek API密鑰
2.生產環境建議:
3.添加JSON解析庫(如Jackson)處理響應
4.增加錯誤處理和重試機制
5.添加API速率限制
6.實現更健壯的SSE連接管理
此實現能讓前端實時接收并顯示DeepSeek API返回的流式響應,實現"思考過程"的逐字顯示效果。