用spring-webmvc包實現AI(Deepseek)事件流(SSE)推送

?前后端:? Spring Boot + Angular

spring-webmvc-5.2.2包

代碼片段如下:

控制層:

@GetMapping(value = "/realtime/page/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)@ApiOperation(value = "獲取告警記錄進行AI分析")public SseEmitter getRealTimeAlarmAi(AlarmRecordQueryParam queryParam) {final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");IPage<AlarmRecord> page = alarmRecordService.findRealTimeByParam(queryParam);StringBuilder alarmInfo = new StringBuilder();try {// 根據狀態設置前綴String prefix = queryParam.getStatus() == 1 ?"最近十條歷史告警記錄:" : "當前十條實時告警信息:";String emptyMessage = queryParam.getStatus() == 1 ?"暫無歷史告警" : "當前無實時告警";if (page.getRecords() != null && !page.getRecords().isEmpty()) {alarmInfo.append(prefix);sseService.buildAlarmContent(page, alarmInfo, timeFormatter);} else {alarmInfo.append(emptyMessage);}sseService.validatePromptLength(alarmInfo, maxPromptLength);} catch (Exception e) {log.error("告警信息處理異常", e);}return sseService.createStreamConnection(alarmInfo.toString(), "告警");}
    @ApiOperation("查詢圖表數據用AI分析數據詳情")@GetMapping("/chart/ai/sse")@OpLog(inputExpression = "開始時間:{#queryParam.startTime},結束時間:{#queryParam.endTime},圖表ID:{#queryParam.chartId}",outputExpression = "{#code}")public SseEmitter chartAiSSEData(@Validated ChartDataQueryParam queryParam) throws Exception {String ChartAi = "報表";ChartInstance chart = Optional.ofNullable(chartService.getById(queryParam.getChartId())).orElseThrow(() -> new Exception("找不到:" + queryParam.getChartId() + "的圖表定義"));List<ChartDeviceSensor> deviceSensors = ChartInstance.toChartDeviceSensorList(chart);String endTime = DataQueryParam.endTime(queryParam.getEndTime());DataQueryParam dataQueryParam = new DataQueryParam(queryParam.getStartTime(), endTime, deviceSensors);IChartDataService chartDataService = chartDataServiceManager.getInstance(chart.getChartTypeId());List dataList = chartDataService.getChartData(dataQueryParam);List<String> times = dataQueryParam.getDateType().getTimes(dataQueryParam.getStartTime(), dataQueryParam.getEndTime());ChartData chartData = new ChartData<>(chart.getId(), chart.getName(), chart.getChartFormat(), chart.getChartTypeId(), chart.getShowType(), chart.getCategoryId(), times, dataList);// 將 ChartData 轉換為壓縮字符串String csvData = ChartDataFormatter.formatChartData(chartData);log.info("當前請求字符長度:" + csvData.length());try {if (csvData.length() > maxPromptLength) {OpLogAspect.setCode(400); // 設置錯誤碼throw new IllegalArgumentException("數據長度超過限制,最大允許長度:" + maxPromptLength);}OpLogAspect.setCode(200);return sseService.createStreamConnection(csvData,ChartAi);} catch (IllegalArgumentException e) {OpLogAspect.setCode(400); // 參數錯誤throw e;} catch (Exception e) {OpLogAspect.setCode(500); // 系統錯誤throw new RuntimeException("處理請求失敗", e);}}

業務層代碼:

package com.keydak.project.core.chart.ai.service.impl;import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.keydak.project.core.alarm.data.vo.AlarmRecord;
import com.keydak.project.core.chart.ai.dto.KeydakAiConfigDTO;
import com.keydak.project.core.chart.ai.exception.BalanceException;
import com.keydak.project.core.chart.ai.service.SSEService;
import com.keydak.repository.core.enums.SystemGlobalConfigEnum;
import com.keydak.repository.core.service.ISystemGlobalConfigService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.core.type.TypeReference;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** AI服務實現** @author xyt*/
@Service
@Slf4j
public class SSEServiceImpl implements SSEService {@Autowiredprivate ISystemGlobalConfigService systemGlobalConfigService;private final ObjectMapper objectMapper = new ObjectMapper();private RateLimiter rateLimiter;@PostConstructpublic void init() {try {// 初始化限流器時動態獲取配置KeydakAiConfigDTO initialConfig = getConfig();rateLimiter = new RateLimiter(initialConfig.getRateLimit());} catch (Exception e) {throw new RuntimeException("初始化失敗,無法獲取Keydak AI配置", e);}}// 線程池配置private static final int CORE_POOL_SIZE = 5; // 核心線程數private static final int MAX_POOL_SIZE = 8; // 最大線程數private static final long KEEP_ALIVE_TIME = 30;  // 線程空閑時間private static final int QUEUE_CAPACITY = 30; //隊列private final ExecutorService executor = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,new LinkedBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.AbortPolicy() // 使用 AbortPolicy 直接拒絕任務而不執行);/*** 刷新限流器配置*/@Overridepublic synchronized void refreshRateLimiter(Integer rate) {try {if (rateLimiter == null) {rateLimiter = new RateLimiter(rate);} else {rateLimiter.updateRate(rate);}log.info("限流器已更新,新速率限制: {}", rate);} catch (Exception e) {log.error("刷新限流器配置失敗", e);}}@PreDestroypublic void destroy() {executor.shutdown();try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}/*** 獲取Keydak AI配置信息。** @return 返回Keydak AI配置信息的DTO對象*/private KeydakAiConfigDTO getConfig() throws Exception {KeydakAiConfigDTO config = systemGlobalConfigService.getTag(SystemGlobalConfigEnum.KEYDAK_AI_CONFIG,KeydakAiConfigDTO.class);if (config == null) {throw new Exception("Keydak AI配置不存在");}return config;}@Overridepublic SseEmitter createStreamConnection(String message, String aiType) {SseEmitter emitter = new SseEmitter(120_000L); // 2分鐘超時try {KeydakAiConfigDTO config = getConfig();double balance = getBalance();log.info("當前余額: {} 元", balance);log.warn("當前可用令牌數: {}", rateLimiter.tokens.get());if (!rateLimiter.tryAcquire()) {log.warn("請求被限流 | 當前允許的QPS:{}", config.getRateLimit());handleRateLimitError(emitter);return emitter;}} catch (BalanceException e) {handleBalanceError(emitter, e.getMessage());return emitter;} catch (Exception e) {handleBalanceError(emitter, "系統錯誤: " + e.getMessage());return emitter;}// 保持原有事件監聽emitter.onCompletion(() -> log.info("SSE連接完成"));emitter.onTimeout(() -> {log.warn("SSE連接超時");rateLimiter.refill(); // 超時請求返還令牌});emitter.onError(e -> log.error("SSE連接錯誤", e));// 保持原有線程池處理executor.execute(() -> {try {processSSEStream(message, aiType, emitter);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}/*** 新增限流錯誤處理方法** @param emitter 事件發射器* @throws IOException 如果發送失敗*/private void handleRateLimitError(SseEmitter emitter) {try {Map<String, Object> error = new LinkedHashMap<>();error.put("error", "rate_limit_exceeded");error.put("message", "請求過于頻繁,請稍后再試");error.put("timestamp", System.currentTimeMillis());emitter.send(SseEmitter.event().data(objectMapper.writeValueAsString(error)).name("rate-limit-error").reconnectTime(5000L));emitter.complete();} catch (IOException e) {log.error("發送限流錯誤失敗", e);}}private void handleBalanceError(SseEmitter emitter, String errorMsg) {try {JSONObject error = new JSONObject();error.put("error", "balance_insufficient");error.put("message", errorMsg);emitter.send(SseEmitter.event().data(error.toJSONString()).name("balance-error"));emitter.complete();} catch (Exception e) {log.error("發送余額錯誤信息失敗", e);}}private void processSSEStream(String message, String aiType, SseEmitter emitter) throws Exception {HttpURLConnection connection = null;try {connection = createConnection();String jsonBody = buildRequestBody(message, aiType);log.info("發送AI請求數據: {}", jsonBody); // 記錄請求體sendRequestData(connection, jsonBody);validateResponse(connection);try (InputStream inputStream = connection.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {String line;while ((line = reader.readLine()) != null) {if (Thread.currentThread().isInterrupted()) {throw new InterruptedException("處理被中斷");}if (line.startsWith("data: ")) {String jsonData = line.substring(6).trim();log.debug("AI響應數據: {}", jsonData);if ("[DONE]".equals(jsonData)) {log.info("收到流結束標記");sendCompletionEvent(emitter);  // 發送完成事件break;  // 結束循環}try {processStreamData(emitter, jsonData);} catch (Exception e) {log.error("數據處理失敗,終止連接", e);emitter.completeWithError(e);break;}}}}} catch (Exception e) {log.error("SSE處理發生異常", e);throw e;} finally {if (connection != null) connection.disconnect();}}private void processStreamData(SseEmitter emitter, String jsonData) throws Exception {try {Map<String, Object> apiResponse = objectMapper.readValue(jsonData,new TypeReference<Map<String, Object>>() {});List<Map<String, Object>> choices = (List<Map<String, Object>>) apiResponse.get("choices");if (choices == null || choices.isEmpty()) return;Map<String, Object> choice = choices.get(0);Map<String, Object> delta = (Map<String, Object>) choice.get("delta");Map<String, Object> chunk = new LinkedHashMap<>();chunk.put("timestamp", System.currentTimeMillis());chunk.put("messageId", UUID.randomUUID().toString());// 處理思考過程if (delta.containsKey("reasoning_content")) {String reasoning = (String) delta.get("reasoning_content");if (reasoning != null && !reasoning.trim().isEmpty()) {chunk.put("type", "reasoning");chunk.put("content", reasoning);sendChunk(emitter, chunk);}}// 處理正式回答if (delta.containsKey("content")) {String content = (String) delta.get("content");if (content != null) {chunk.put("type", "answer");chunk.put("content", content);sendChunk(emitter, chunk);}}} catch (JsonProcessingException e) {log.error("JSON解析失敗 | 原始數據: {} | 錯誤: {}", jsonData, e.getMessage());throw new IOException("Failed to process stream data", e);} catch (ClassCastException e) {log.error("數據結構類型錯誤 | 原始數據: {} | 錯誤: {}", jsonData, e.getMessage());throw new IllegalStateException("Invalid data structure", e);} catch (Exception e) {log.error("處理數據塊時發生未知錯誤 | 原始數據: {}", jsonData, e);throw e;}}private void sendChunk(SseEmitter emitter, Map<String, Object> chunk) throws IOException {String chunkJson = objectMapper.writeValueAsString(chunk);log.debug("發送數據塊: {}", chunkJson);SseEmitter.SseEventBuilder event = SseEmitter.event().data(chunkJson).id(UUID.randomUUID().toString()).name("ai-message").reconnectTime(5000L);emitter.send(event);}private void sendCompletionEvent(SseEmitter emitter) {try {Map<String, Object> completionEvent = new LinkedHashMap<>();completionEvent.put("event", "done");completionEvent.put("timestamp", System.currentTimeMillis());completionEvent.put("messageId", UUID.randomUUID().toString());String eventJson = objectMapper.writeValueAsString(completionEvent);emitter.send(SseEmitter.event().data(eventJson).id("COMPLETION_EVENT").name("stream-end").reconnectTime(0L));  // 停止重連log.info("已發送流結束事件");} catch (IOException e) {log.error("發送完成事件失敗", e);} finally {emitter.complete();log.info("SSE連接已關閉");}}private HttpURLConnection createConnection() throws Exception {KeydakAiConfigDTO config = getConfig();HttpURLConnection connection = (HttpURLConnection) new URL(config.getUrl()).openConnection();connection.setRequestMethod("POST");connection.setDoOutput(true);connection.setRequestProperty("Content-Type", "application/json");connection.setRequestProperty("Authorization", "Bearer " + config.getKey());connection.setRequestProperty("Accept", "text/event-stream");connection.setConnectTimeout(30_000);connection.setReadTimeout(120_000);return connection;}private void sendRequestData(HttpURLConnection connection, String jsonBody) throws Exception {try (OutputStream os = connection.getOutputStream()) {os.write(jsonBody.getBytes(StandardCharsets.UTF_8));os.flush();}}private void validateResponse(HttpURLConnection connection) throws Exception {if (connection.getResponseCode() != 200) {String errorMsg = readErrorStream(connection);throw new RuntimeException("API請求失敗: " + connection.getResponseCode() + " - " + errorMsg);}}private String readErrorStream(HttpURLConnection connection) throws IOException {try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8))) {StringBuilder response = new StringBuilder();String line;while ((line = reader.readLine()) != null) {response.append(line);}return response.toString();}}private String buildRequestBody(String userMessage, String aiType) throws IOException {KeydakAiConfigDTO config = null;try {config = getConfig();} catch (Exception e) {e.printStackTrace();}Map<String, Object> request = new HashMap<>();request.put("model", config.getModelType());request.put("stream", true);List<Map<String, String>> messages = new ArrayList<>();Map<String, String> message = new HashMap<>();message.put("role", "user");if ("報表".equals(aiType)) {//報表提問詞message.put("content", buildPrompt(config.getPrompt(), userMessage));} else {//告警提問詞message.put("content", buildPrompt(config.getPromptAlarm(), userMessage));}messages.add(message);request.put("messages", messages);return objectMapper.writeValueAsString(request);}private String buildPrompt(String basePrompt, String userMessage) {return String.format("%s\n%s\n", basePrompt, userMessage);}/*** 查詢當前余額** @return 當前余額* @throws IOException 如果請求失敗*/@Override@SneakyThrowspublic double getBalance() {HttpURLConnection connection = null;try {KeydakAiConfigDTO config = getConfig();URL url = new URL(config.getBalanceUrl());connection = (HttpURLConnection) url.openConnection();connection.setRequestMethod("GET");connection.setRequestProperty("Authorization", "Bearer " + config.getKey());connection.setConnectTimeout(5000);connection.setReadTimeout(5000);int responseCode = connection.getResponseCode();if (responseCode != 200) {String errorBody = readErrorStream(connection); // 復用已有的錯誤流讀取方法throw new IOException("HTTP Error: " + responseCode + " - " + errorBody);}try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {StringBuilder response = new StringBuilder();String line;while ((line = reader.readLine()) != null) {response.append(line);}JSONObject jsonObject = JSON.parseObject(response.toString());// 以下解析邏輯保持原樣if (!jsonObject.containsKey("is_available") || !jsonObject.containsKey("balance_infos")) {throw new IOException("Invalid balance response format");}JSONArray balanceInfos = jsonObject.getJSONArray("balance_infos");if (jsonObject.getBoolean("is_available") && balanceInfos != null && !balanceInfos.isEmpty()) {JSONObject balanceInfo = balanceInfos.getJSONObject(0);if (!balanceInfo.containsKey("total_balance")) {throw new IOException("Missing total_balance field");}return balanceInfo.getDouble("total_balance");} else {throw new IOException("Balance information is not available");}}} finally {if (connection != null) {connection.disconnect();}}}/*** 限流器實現**/private static class RateLimiter {private volatile int capacity;private final AtomicInteger tokens;private volatile long lastRefillTime;private final Object lock = new Object();RateLimiter(int rate) {this.capacity = rate;this.tokens = new AtomicInteger(rate);this.lastRefillTime = System.currentTimeMillis();}public void refill() {synchronized (lock) {long now = System.currentTimeMillis();long elapsed = now - lastRefillTime;if (elapsed >= 1000) {tokens.set(capacity); // 直接重置為最大容量lastRefillTime = now;}}}public boolean tryAcquire() {synchronized (lock) {refill();if (tokens.get() > 0) {tokens.decrementAndGet();return true;}return false;}}public void updateRate(int newRate) {synchronized (lock) {this.capacity = newRate;tokens.set(Math.min(tokens.get(), newRate));lastRefillTime = System.currentTimeMillis();}}}/*** 告警內容構建方法**/@Overridepublic void buildAlarmContent(IPage<AlarmRecord> page,StringBuilder alarmInfo,DateTimeFormatter formatter) {page.getRecords().forEach(record -> {// 時間格式化(使用首次告警時間)String time = Optional.ofNullable(record.getFirstTime()).map(t -> t.format(formatter)).orElse("時間未知");// 設備名稱空值處理String device = StringUtils.defaultString(record.getDeviceName(), "未知設備");// 狀態/數值處理邏輯String state = resolveStateValue(record);// 告警描述處理String desc = StringUtils.defaultString(record.getContent(), "未知告警類型");// 按規范格式拼接alarmInfo.append(String.format("%s %s %s %s;", time, device, state, desc));});}/*** 狀態值解析方法*/private String resolveStateValue(AlarmRecord record) {if (record.getValue() != null) {return record.getValue().stripTrailingZeros().toPlainString();}return record.getStatus() != null ?(record.getStatus() ? "1" : "0") : "狀態未知";}/*** 長度校驗方法**/@Overridepublic void validatePromptLength(StringBuilder content, int maxLength) {if (content.length() > maxLength) {throw new IllegalArgumentException("告警數據過長,請縮小查詢范圍");}}}

前端代碼:

<div class="modal-area"><form name="formNg" novalidate><div class="modal-header"><h3 class="modal-title" style="color: #FFFFFF">AI分析</h3></div><div class="modal-body"><div class="form"><!-- 加載狀態 - 修改為動態效果 --><div ng-if="connectionStatus === 'connecting'" class="loading"><div class="ai-thinking-container"><span>AI思考中</span><div class="ai-typing-indicator"><div class="typing-dot"></div><div class="typing-dot"></div><div class="typing-dot"></div></div></div></div><!-- 思考過程 --><div class="thinking-panel" ng-if="thinkingContent"><div class="thinking-header"><i class="fa fa-brain"></i> 思考過程<!-- 總用時顯示(完成后保留) --><span ng-if="thinkingTime">({{thinkingTime}}秒)</span></div><div class="thinking-content"ng-bind-html="thinkingContent"scroll-to-bottom="thinkingContent"></div></div><!-- 正式回答 --><div class="answer-panel" ng-if="answerContent"><div class="answer-header"><i class="fa fa-comment"></i> 以下是AI的分析</div><div class="answer-content"ng-bind-html="answerContent"scroll-to-bottom="answerContent"></div></div><!-- 錯誤提示 --><div ng-if="connectionStatus === 'error'" class="alert alert-danger"><i class="fa fa-exclamation-triangle"></i> 連接異常,請嘗試重新分析</div></div></div><div class="modal-footer"><button ng-click="retry()"class="btn btn-warning"ng-disabled="connectionStatus === 'connecting'"><i class="fa fa-redo"></i> 重新分析</button><button ng-click="cancel()" class="btn btn-danger"><i class="fa fa-times"></i> 關閉</button></div></form>
</div><style>.thinking-header span {margin-left: 5px;font-size: 0.9em;opacity: 0.8;color: #a0c4ff;}.modal-body {height: 6rem; /* 設置固定高度 */overflow-y: auto; /* 內容超出時顯示滾動條 */padding: 15px;background: #1B448A; /* 背景改為藍色 */border-radius: 4px;font-family: 'Consolas', monospace;color: #FFFFFF;}/* 思考過程樣式 */.thinking-panel {margin-bottom: 20px;border-left: 3px solid #4a90e2;padding-left: 15px;}.thinking-header {color: #4a90e2;font-size: 16px;margin-bottom: 10px;}.thinking-content {background: rgba(255, 255, 255, 0.05);padding: 12px;border-radius: 4px;color: #e0e0e0;line-height: 1.6;}/* 正式回答樣式 */.answer-panel {margin-top: 25px;border-top: 1px solid #00c85333;padding-top: 15px;}.answer-header {color: #00c853;font-size: 16px;margin-bottom: 10px;}.answer-content {background: rgba(255, 255, 255, 0.05);padding: 12px;border-radius: 4px;color: #ffffff;line-height: 1.6;}/* 圖標樣式 */.fa-brain {color: #4a90e2;margin-right: 8px;}.fa-comment {color: #00c853;margin-right: 8px;}/* 新的加載動畫樣式 */.loading {color: #FFF;text-align: left;padding: 15px;font-size: 16px;}.ai-thinking-container {display: flex;align-items: center;gap: 8px;}.ai-typing-indicator {display: flex;align-items: center;gap: 4px;height: 20px;}.typing-dot {width: 8px;height: 8px;background-color: #FFFFFF;border-radius: 50%;opacity: 0.4;animation: typing-animation 1.4s infinite ease-in-out;}.typing-dot:nth-child(1) {animation-delay: 0s;}.typing-dot:nth-child(2) {animation-delay: 0.2s;}.typing-dot:nth-child(3) {animation-delay: 0.4s;}@keyframes typing-animation {0%, 60%, 100% {transform: translateY(0);opacity: 0.4;}30% {transform: translateY(-5px);opacity: 1;}}
</style>
// 報表分析
UI.Controllers.controller("AiTipsCtrl", ["$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {// 狀態管理$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 新增:思考時間變量$scope.startTime = null; // 新增:開始時間戳let eventSource = null;let thinkingBuffer = "";let answerBuffer = "";// 自動滾動指令$scope.scrollToBottom = function() {$timeout(() => {const container = document.querySelector('.modal-body');if (container) {container.scrollTop = container.scrollHeight + 120;}}, 50);};// 內容更新方法function processChunkData(data) {if (data.type === 'reasoning') {// 如果是第一條思考內容,記錄開始時間if (!thinkingBuffer && !$scope.startTime) {$scope.startTime = new Date().getTime();}thinkingBuffer += data.content;$scope.thinkingContent = $sce.trustAsHtml(thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));// 更新思考時間updateThinkingTime();}else if (data.type === 'answer') {answerBuffer += data.content;$scope.answerContent = $sce.trustAsHtml(answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));}$scope.scrollToBottom();}function updateThinkingTime() {if ($scope.startTime) {const currentTime = new Date().getTime();$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);}}// 初始化SSE連接function initSSE() {const url = '/data/chart/ai/sse?' + $.param(parent.queryParam);eventSource = new EventSource(url);eventSource.onopen = () => {$scope.$apply(() => {$scope.connectionStatus = 'connected';});};// 處理消息事件eventSource.addEventListener('ai-message', e => {$scope.$apply(() => {try {const data = JSON.parse(e.data);processChunkData(data);} catch (err) {console.error('消息解析錯誤:', err);$scope.answerContent = $sce.trustAsHtml('<div class="text-danger">數據格式錯誤</div>');}});});// 處理結束事件eventSource.addEventListener('stream-end', () => {$scope.$apply(() => {$scope.connectionStatus = 'completed';//最終更新一次思考時間updateThinkingTime();safeClose();});});// 錯誤處理eventSource.onerror = (err) => {$scope.$apply(() => {console.error('SSE連接錯誤:', err);$scope.connectionStatus = 'error';safeClose();});};}// 安全關閉連接function safeClose() {if (eventSource) {eventSource.close();eventSource = null;}}// 重新嘗試$scope.retry = () => {safeClose();thinkingBuffer = "";answerBuffer = "";$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; //重置思考時間$scope.startTime = null; //重置開始時間$scope.connectionStatus = 'connecting';initSSE();};// 關閉模態框$scope.cancel = () => {safeClose();$uibModalInstance.dismiss();};// 初始化initSSE();// 清理$scope.$on('$destroy', () => {safeClose();});}
]);
// 告警分析
UI.Controllers.controller("AiAlarmTipsCtrl", ["$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {// 狀態管理$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 新增:思考時間變量$scope.startTime = null; // 新增:開始時間戳let eventSource = null;let thinkingBuffer = "";let answerBuffer = "";// 自動滾動指令$scope.scrollToBottom = function() {$timeout(() => {const container = document.querySelector('.modal-body');if (container) {container.scrollTop = container.scrollHeight + 120;}}, 50);};// 內容更新方法function processChunkData(data) {if (data.type === 'reasoning') {// 如果是第一條思考內容,記錄開始時間if (!thinkingBuffer && !$scope.startTime) {$scope.startTime = new Date().getTime();}thinkingBuffer += data.content;$scope.thinkingContent = $sce.trustAsHtml(thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));// 更新思考時間updateThinkingTime();}else if (data.type === 'answer') {answerBuffer += data.content;$scope.answerContent = $sce.trustAsHtml(answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));}$scope.scrollToBottom();}// 更新思考時間function updateThinkingTime() {if ($scope.startTime) {const currentTime = new Date().getTime();$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);}}// 初始化SSE連接function initSSE() {const url = '/alarm/record/realtime/page/ai/sse?' + $.param(parent.queryParam);eventSource = new EventSource(url);eventSource.onopen = () => {$scope.$apply(() => {$scope.connectionStatus = 'connected';});};// 處理消息事件eventSource.addEventListener('ai-message', e => {$scope.$apply(() => {try {const data = JSON.parse(e.data);processChunkData(data);} catch (err) {console.error('消息解析錯誤:', err);$scope.answerContent = $sce.trustAsHtml('<div class="text-danger">數據格式錯誤</div>');}});});// 處理結束事件eventSource.addEventListener('stream-end', () => {$scope.$apply(() => {$scope.connectionStatus = 'completed';// 最終更新一次思考時間updateThinkingTime();safeClose();});});// 錯誤處理eventSource.onerror = (err) => {$scope.$apply(() => {console.error('SSE連接錯誤:', err);$scope.connectionStatus = 'error';safeClose();});};}// 安全關閉連接function safeClose() {if (eventSource) {eventSource.close();eventSource = null;}}// 重新嘗試$scope.retry = () => {safeClose();thinkingBuffer = "";answerBuffer = "";$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 重置思考時間$scope.startTime = null; // 重置開始時間$scope.connectionStatus = 'connecting';initSSE();};// 關閉模態框$scope.cancel = () => {safeClose();$uibModalInstance.dismiss();};// 初始化initSSE();// 清理$scope.$on('$destroy', () => {safeClose();});}
]);showAiTips: function (resolve) {this.showDialog("Template/AiTips.html", "AiTipsCtrl", resolve, 600);},showAiAlarmTips: function (resolve) {this.showDialog("Template/AiAlarmTips.html", "AiAlarmTipsCtrl", resolve, 600);}

數據庫結構:

INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'balanceUrl', 'https://api.deepseek.com/user/balance', '余額查詢');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'enable', 'true', '啟用AI報表助手');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'key', '', 'API密鑰');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'modelType', 'deepseek-reasoner', 'deepseek模型類型');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'prompt', '', 'AI提問詞');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'promptAlarm', '', 'AI提問詞');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'rateLimit', '3', '限制每秒多少次請求');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'url', 'https://api.deepseek.com/v1/chat/completions', 'API接口');

實體類(使用AES加密 密鑰):

package com.keydak.project.core.chart.ai.dto;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** AI配置信息** @author xyt*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeydakAiConfigDTO {private Boolean enable;/*** API_URL地址**/private String url;/*** 查詢余額地址**/private String balanceUrl;/*** API密鑰**/private String key;/*** 限流次數**/private Integer rateLimit;/*** AI提問詞(報表)**/private String prompt;/*** AI提問詞(告警)**/private String promptAlarm;/*** 模型類型**/private String modelType;private static final String SALT = ""; // 16 bytes for AES-128private static final String ALGORITHM = "AES/ECB/PKCS5Padding";public void validate() {List<String> missingFields = new ArrayList<>();if (url == null) missingFields.add("API_URL地址");if (balanceUrl == null) missingFields.add("查詢余額地址");if (key == null) missingFields.add("API密鑰");if (rateLimit == null) missingFields.add("限流次數");if (prompt == null) missingFields.add("AI提問詞");if (!missingFields.isEmpty()) {throw new IllegalStateException("參數不能為空: " + String.join(", ", missingFields));}}/*** 判斷密鑰是否已經加密*/public boolean isEncryptedKey(String key) {try {// 嘗試解密,如果能成功解密則認為已經是加密過的decryptKey(key);return true;} catch (Exception e) {return false;}}/*** 加密密鑰**/public String encryptKey(String key) throws Exception {SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");Cipher cipher = Cipher.getInstance(ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, secretKey);byte[] encryptedKey = cipher.doFinal(key.getBytes(StandardCharsets.UTF_8));return Base64.getEncoder().encodeToString(encryptedKey);}/*** 解密密鑰*/public String decryptKey(String encryptedKey) throws Exception {SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");Cipher cipher = Cipher.getInstance(ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, secretKey);byte[] decryptedKey = cipher.doFinal(Base64.getDecoder().decode(encryptedKey));return new String(decryptedKey, StandardCharsets.UTF_8);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/75805.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/75805.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/75805.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于Python的招聘推薦數據可視化分析系統

【Python】基于Python的招聘推薦數據可視化分析系統&#xff08;完整系統源碼開發筆記詳細部署教程&#xff09;? 目錄 一、項目簡介二、項目界面展示三、項目視頻展示 一、項目簡介 &#x1f680;&#x1f31f; 基于Python的招聘推薦數據可視化分析系統&#xff01;&#x1…

使用注解開發springMVC

引言 在學習過第一個springMVC項目建造過后&#xff0c;讓我們直接進入真實開發中所必需的注解開發&#xff0c; 是何等的簡潔高效&#xff01;&#xff01; 注&#xff1a;由于Maven可能存在資源過濾的問題&#xff0c;在maven依賴中加入 <build><resources>&l…

linux專題3-----禁止SSH的密碼登錄

要在linux系統中禁止密碼登錄&#xff0c;您可以通過修改 SSH 配置來實現。請按照以下步驟操作(此處以 Ubuntu為例)&#xff1a; 1、SSH 登錄到您的服務器&#xff08;或直接在命令行模式下&#xff09;。 2、備份 SSH 配置文件&#xff1a; 在終端中運行以下命令以備份現有的…

基于LangChain和通義(Tongyi)實現NL2SQL的智能檢索(無需訓練)

在數據驅動的時代,如何高效地從數據庫中獲取信息成為了一個重要的挑戰。自然語言到SQL(NL2SQL)技術提供了一種便捷的解決方案,使用戶能夠用自然語言查詢數據庫,而無需深入了解SQL語法。本文將探討如何利用LangChain和通義(Tongyi)實現NL2SQL的智能檢索,具體步驟如下: …

深度學習處理文本(10)

保存自定義層 在編寫自定義層時&#xff0c;一定要實現get_config()方法&#xff1a;這樣我們可以利用config字典將該層重新實例化&#xff0c;這對保存和加載模型很有用。該方法返回一個Python字典&#xff0c;其中包含用于創建該層的構造函數的參數值。所有Keras層都可以被序…

機器視覺3D中激光偏鏡的優點

機器視覺的3D應用中,激光偏鏡(如偏振片、波片、偏振分束器等)通過其獨特的偏振控制能力,顯著提升了系統的測量精度、抗干擾能力和適應性。以下是其核心優點: 1. 提升3D成像精度 抑制環境光干擾:偏振片可濾除非偏振的環境雜光(如日光、室內照明),僅保留激光偏振信號,大…

線程同步的學習與應用

1.多線程并發 1).多線程并發引例 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <pthread.h>int wg0; void *fun(void *arg) {for(int i0;i<1000;i){wg;printf("wg%d\n",wg);} } i…

寫.NET可以指定運行SUB MAIN嗎?調用任意一個里面的類時,如何先執行某段初始化代碼?

VB.NET 寫.NET可以指定運行SUB MAIN嗎?調用任意一個里面的類時,如何先執行某段初始化代碼? 分享 1. 在 VB.NET 中指定運行 Sub Main 在 VB.NET 里&#xff0c;你能夠指定 Sub Main 作為程序的入口點。下面為你介紹兩種實現方式&#xff1a; 方式一&#xff1a;在項目屬性…

【AI插件開發】Notepad++ AI插件開發實踐(代碼篇):從Dock窗口集成到功能菜單實現

一、引言 上篇文章已經在Notepad的插件開發中集成了選中即問AI的功能&#xff0c;這一篇文章將在此基礎上進一步集成&#xff0c;支持AI對話窗口以及常見的代碼功能菜單&#xff1a; 顯示AI的Dock窗口&#xff0c;可以用自然語言向 AI 提問或要求執行任務選中代碼后使用&…

關聯容器-模板類pair數對

關聯容器 關聯容器和順序容器有著根本的不同:關聯容器中的元素是按關鍵字來保存和訪問的,而順序容器中的元素是按它們在容器中的位置來順序保存和訪問的。 關聯容器支持高效的關鍵字查找和訪問。 兩個主要的關聯容器(associative-container),set和map。 set 中每個元素只包…

京東運維面試題及參考答案

目錄 OSPF 實現原理是什么? 請描述 TCP 三次握手的過程。 LVS 的原理是什么? 闡述 Nginx 七層負載均衡的原理。 Nginx 與 Apache 有什么區別? 如何查看監聽在 8080 端口的是哪個進程(可舉例:netstat -tnlp | grep 8080)? OSI 七層模型是什么,請寫出各層的協議。 …

輸入框輸入數字且保持精度

在項目中如果涉及到金額等需要數字輸入且保持精度的情況下&#xff0c;由于輸入框是可以隨意輸入文本的&#xff0c;所以一般情況下可能需要監聽輸入框的change事件&#xff0c;然后通過正則表達式去替換掉不匹配的文本部分。 由于每次文本改變都會被監聽&#xff0c;包括替換…

使用 requests 和 BeautifulSoup 解析淘寶商品

以下將詳細解釋如何通過這兩個庫來實現按關鍵字搜索并解析淘寶商品信息。 一、準備工作 1. 安裝必要的庫 在開始之前&#xff0c;確保已經安裝了 requests 和 BeautifulSoup 庫。如果尚未安裝&#xff0c;可以通過以下命令進行安裝&#xff1a; bash pip install requests…

C#調用ACCESS數據庫,解決“Microsoft.ACE.OLEDB.12.0”未注冊問題

C#調用ACCESS數據庫&#xff0c;解決“Microsoft.ACE.OLEDB.12.0”未注冊問題 解決方法&#xff1a; 1.將C#采用的平臺從AnyCpu改成X64 2.將官網下載的“Microsoft Access 2010 數據庫引擎可再發行程序包AccessDatabaseEngine_X64”文件解壓 3.安裝解壓后的文件 點擊下載安…

【文獻閱讀】Vision-Language Models for Vision Tasks: A Survey

發表于2024年2月 TPAMI 摘要 大多數視覺識別研究在深度神經網絡&#xff08;DNN&#xff09;訓練中嚴重依賴標注數據&#xff0c;并且通常為每個單一視覺識別任務訓練一個DNN&#xff0c;這導致了一種費力且耗時的視覺識別范式。為應對這兩個挑戰&#xff0c;視覺語言模型&am…

【Kubernetes】StorageClass 的作用是什么?如何實現動態存儲供應?

StorageClass 使得用戶能夠根據不同的存儲需求動態地申請和管理存儲資源。 StorageClass 定義了如何創建存儲資源&#xff0c;并指定了存儲供應的配置&#xff0c;例如存儲類型、質量、訪問模式等。為動態存儲供應提供了基礎&#xff0c;使得 Kubernetes 可以在用戶創建 PVC 時…

Muduo網絡庫介紹

1.Reactor介紹 1.回調函數 **回調&#xff08;Callback&#xff09;**是一種編程技術&#xff0c;允許將一個函數作為參數傳遞給另一個函數&#xff0c;并在適當的時候調用該函數 1.工作原理 定義回調函數 注冊回調函數 觸發回調 2.優點 異步編程 回調函數允許在事件發生時…

Debian編譯安裝mysql8.0.41源碼包 筆記250401

Debian編譯安裝mysql8.0.41源碼包 以下是在Debian系統上通過編譯源碼安裝MySQL 8.0.41的完整步驟&#xff0c;包含依賴管理、編譯參數優化和常見問題處理&#xff1a; 準備工作 1. 安裝編譯依賴 sudo apt update sudo apt install -y \cmake gcc g make libssl-dev …

Git常用問題收集

gitignore 忽略文件夾 不生效 有時候我們接手別人的項目時&#xff0c;發現有的忽略不對想要修改&#xff0c;但發現修改忽略.gitignore后無效。原因是如果某些文件已經被納入版本管理在.gitignore中忽略路徑是不起作用的&#xff0c;這時候需要先清除本地緩存&#xff0c;然后…

編程哲學——TCP可靠傳輸

TCP TCP可靠傳輸 TCP的可靠傳輸表現在 &#xff08;1&#xff09;建立連接時三次握手&#xff0c;四次揮手 有點像是這樣對話&#xff1a; ”我們開始對話吧“ ”收到“ ”好的&#xff0c;我收到你收到了“ &#xff08;2&#xff09;數據傳輸時ACK應答和超時重傳 ”我們去吃…