構建RAG混合開發---PythonAI+JavaEE+Vue.js前端的實踐

7GB顯存如何部署bf16精度的DeepSeek-R1 70B大模型?-CSDN博客

服務容錯治理框架resilience4j&sentinel基礎應用---微服務的限流/熔斷/降級解決方案-CSDN博客

conda管理python環境-CSDN博客

快速搭建對象存儲服務 - Minio,并解決臨時地址暴露ip、短鏈接請求改變瀏覽器地址等問題-CSDN博客

大模型LLMs的MCP入門-CSDN博客

使用LangGraph構建多代理Agent、RAG-CSDN博客

大模型LLMs框架Langchain之鏈詳解_langchain.llms.base.llm詳解-CSDN博客

大模型LLMs基于Langchain+FAISS+Ollama/Deepseek/Qwen/OpenAI的RAG檢索方法以及優化_faiss ollamaembeddings-CSDN博客

大模型LLM基于PEFT的LoRA微調詳細步驟---第二篇:環境及其詳細流程篇-CSDN博客

大模型LLM基于PEFT的LoRA微調詳細步驟---第一篇:模型下載篇_vocab.json merges.txt資源文件下載-CSDN博客?使用docker-compose安裝Redis的主從+哨兵模式_使用docker部署redis 一主一從一哨兵模式 csdn-CSDN博客

docker-compose安裝canal并利用rabbitmq同步多個mysql數據_docker-compose canal-CSDN博客

目錄

技術體系

PythonAI ---- 本文主要是本地Ollama加載模型

流式輸出核心代碼

?完整的代碼

JavaAI

引入依賴

返回Flux流 --- 測試

解決中文亂碼問題

結合Python流接口

初始化WebClient

接收String流

接收JSON流

日志&錯誤處理

移除多余的前綴

保存歷史記錄

完整代碼

SpringBoot啟動---瀏覽器請求

Test/Main啟動

Vue前端

創建項目

HBuilderX創建

使用命令創建

?修改HelloWorld.vue

請求測試


寫在前文:之所以設計這一套流程,是因為 Python在前沿的科技前沿的生態要比Java好,而Java在企業級應用層開發比較活躍;

畢竟許多企業的后端服務、應用程序均采用Java開發,涵蓋權限管理、后臺應用、緩存機制、中間件集成及數據庫交互等方面。但是現在的AI技術生態發展得很快,而Python在科研(數據科學/機器學習領域)語言,在這方面有天然的優勢;所以為了接入大模型LLMs,選擇用Python接入大模型LLMs,然后通過FastAPI發布HTTP接口,讓Java層負責與前端Vue.js應用及Python流接口進行交互,這樣的話,前端直接訪問Java應用,企業應用只需要保持現有生態即可,當前的權限、后臺應用、緩存、中間件等流程都不用再Python端再次開發,省去了很多工作;

整個流程如下: python負責和模型交互---Java作為中間層負責和前端Vue以及Python流接口交互-----Vue負責展示;

技術體系

PythonAI端:
- LLM模型:本地ChatOllama+Qwen VLLm+Qwen、本地通過HF的Transformer加載- Embedding向量:OllamaEmbedding + nomic-embed-text:latest- 向量庫FAISS:使用本地版本的Faiss庫- 檢索優化:混合(二階段)檢索:similarity_score_threshold相似性打分(向量檢索)+BM25(關鍵字)檢索構成的混合檢索 結合 FlashRank重排序優化檢索繼續優化,多階段檢索:多查詢檢索(LLM擴展)、混合檢索(向量檢索+BM25關鍵字檢索)、重排序優化、LLM壓縮;---之所以不使用多查詢LLM擴展和LLM壓縮,是因為性能問題---在使用LLM壓縮時,最好結合微調效果會好很多,不然可能會排除掉一些問題和答案關聯性不強但實際上是一一對應的問題答案;使用單獨使用混合檢索的時候滿足絕大多數情況;- 重排序:離線的FlashRankRerank+默認ms-marco-MultiBERT-L-12模型
- 流輸出:使用StreamingResponse包裝結合yield關鍵字;
- 性能優化:調用astream的異步執行方法/如果要使用stream同步方法,那么使用iterate_in_threadpool轉異步/也可以使用async+with來管理異步執行
JavaAI端:
- 核心:Springboot
- 請求流接口:WebClient
- 返回流結果:Flux前端:vue3+vite構建項目

核心接口主要包含下面的功能:

Python的流輸出:Python通過yield定義一個生成器函數(可以不間斷的返回數據),然后通過“StreamingResponse”包裝后流式返回;

---注意:return是一次性返回;

Java請求流接口:在Java端我們使用WebClient請求Python的流接口;

Java流輸出:將結果轉為Flux類型的數據返回到前端頁面;

---?此時這兩個接口,都是可以直接通過瀏覽器訪問接口查看效果

--- 如果使用Postman,必須返回標準的SSE格式的數據,不然是看不到效果的;

SSE數據格式:每個數據塊以"data: "開頭,結尾加兩個換行符

PythonAI ---- 本文主要是本地Ollama加載模型

下一篇更新:云服務通過VLLm部署模型,然后本地使用OpenAI加載云端的VLLm模型;以及“使用HuggingFace的原生Transformer加載LLM”

流式輸出核心代碼

主要是通過yield定義一個生成器函數,再通過StreamingResponse包裝返回,注意設置media_type="text/event-stream;charset=utf-8"

def llm_astream(self, faiss_query):from fastapi.concurrency import iterate_in_threadpoolsync_generator = self.chain.astream(faiss_query)# 如果 chain.stream 是同步生成器,使用 iterate_in_threadpool 轉換為異步async for chunk in iterate_in_threadpool(sync_generator):# yield chunk.content# 標準的SEE數據格式;如果不修改為下面這個,那么在使用Java的WebClient請求時,返回的是空白/報錯。# # 包裝成SSE格式,每個數據塊以"data: "開頭,結尾加兩個換行符# yield f"data: {chunk.content}\n\n" # 使用String字符串返回import json# 統一返回Json格式,并且禁止Unicode編碼---不然返回的就是Unicode編碼后的代碼yield f"data: {json.dumps({'content': chunk.content}, ensure_ascii=False)}\n\n" # # 如果 chain.stream 本身是異步生成器,直接使用:# async for chunk in self.chain.stream(faiss_query):#     yield chunk.content@app.get("/stream")
async def llm_stream(query: str):from starlette.responses import StreamingResponsereturn StreamingResponse( # 使用StreamingResponse包裝,流返回retriever.llm_astream(query),media_type="text/event-stream;charset=utf-8" # text/plain、text/event-stream;強制響應頭charset=utf-8)

?注意:此種方法可以使用瀏覽器看到效果;但是用Postman---如果不是標準的SSE格式數據就看不到效果

注意:我們返回數據時一定要返回SSE格式的數據,不然Java端要報錯“java.lang.NullPointerException: The mapper [xxxxxxx$$Lambda$880/0x0000000801118d08] returned a null value”;

SSE標準格式,返回JSON格式版本:yield f"data: {json.dumps({'content': chunk.content})}\n\n"

????????Java端使用“ServerSentEvent”事件接收;

SSE標準格式,返回字符串格式版本:yield f"data: {chunk.content}\n\n"

????????Java端使用“String”事件接收;

?完整的代碼

下面關于檢索優化、構建鏈、llm交互、rag知識入庫、向量庫、Ollama加載云端API等...技術知識見我之前發的幾篇文章;---- 安裝了依賴以后,可以直接運行下面代碼;

請求地址:http://localhost:8000/astream?query=xxxxx

---還可以訪問stream接口看看使用異步協程、同步返回的不同效果;

"""檢索agent"""
import asynciofrom langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
from langserve import add_routesdef log_retrieved_docs(ctx):# print(f"[{ctx['msgid']}] [{ctx['query']}] Retrieved documents:[{ctx['content']['content']}]")print(f"Retrieved documents:[{ctx}]")return ctx  # 確保返回原數據繼續鏈式傳遞class RetrieverLoad:def __init__(self):print("加載向量庫...")from langchain_ollama import OllamaEmbeddingsself.faiss_persist_directory = 'faiss路徑'self.embedding = OllamaEmbeddings(model='nomic-embed-text:latest')self.faiss_index_name = 'faiss_index名稱'self.faiss_vector_store = self.load_embed_store()self.llm = ChatOllama(model="qwen2.5:3b")self.prompt = ChatPromptTemplate.from_template(retriver_template)self.retriever = self.retriever()self.chain = self.llm_chain()def load_embed_store(self):return FAISS.load_local(self.faiss_persist_directory,embeddings=self.embedding,index_name=self.faiss_index_name,  # 需與保存時一致allow_dangerous_deserialization=True)def retriever(self, score_threshold: float = 0.5, k: int = 5):sst_retriever = self.faiss_vector_store.as_retriever(search_type='similarity_score_threshold',search_kwargs={"score_threshold": score_threshold, "k": k})# 初始化BM25檢索# (使用公共方法獲取文檔)documents = list(self.faiss_vector_store.docstore._dict.values())from langchain_community.retrievers import BM25Retrieverbm25_retriever = BM25Retriever.from_documents(documents,k=20,  # 返回數量k1=1.5,  # 默認1.2,增大使高頻詞貢獻更高b=0.8  # 默認0.75,減小以降低文檔長度影響)# 混合檢索:BM25+embedding的from langchain.retrievers import EnsembleRetrieverensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, sst_retriever],weights=[0.3, 0.7])# 混合檢索后 重排序# 構建壓縮管道:重排序 + 內容提取from flashrank import Rankerfrom langchain_community.document_compressors import FlashrankRerankflashrank_rerank = FlashrankRerank(client=Ranker(cache_dir='D://A4Project//LLM//flash_rankRerank//'),top_n=8)# 三階段檢索:粗檢索、重排序、內容壓縮from langchain.retrievers import ContextualCompressionRetrieverbase_retriever = ContextualCompressionRetriever(# step1. 粗檢索---30% BM25+70% Embedding向量檢索base_retriever=ensemble_retriever,# step2. 重排序---FlashRankbase_compressor=flashrank_rerank)# step3、內容壓縮---LLM---部分場景不推薦使用LLM內容壓縮,壓縮可能會刪除RAG里面原本Q&A對應但是答案中不包含問題,導致關聯性小的數據;而且很影響性能# from langchain.retrievers.document_compressors import LLMChainExtractor# # 重排序后 壓縮上下文# compressor_prompt = """#     鑒于以下問題和內容,提取與回答問題相關的背景*按原樣*的任何部分。如果上下文都不相關,則返回{no_output_str}。#     記住,*不要*編輯提取上下文的部分。#     問題: {{question}}#     內容: {{context}}#     提取相關部分:#     """# from langchain_core.prompts import PromptTemplate# compressor_prompt_template = PromptTemplate(#     input_variables=['question', 'context'],#     template=compressor_prompt.format(no_output_str='NO_OUTPUT'))# compressor = LLMChainExtractor.from_llm(prompt=compressor_prompt_template, llm=self.llm)# # compressor = LLMChainExtractor.from_llm(llm=self.llm)# pipeline_retriever = ContextualCompressionRetriever(#     base_retriever=base_retriever,#     base_compressor=compressor# )return base_retrieverdef llm_chain(self):# 處理檢索結果的函數(將文檔列表轉換為字符串)from langchain_core.runnables import RunnableLambda# process_docs = RunnableLambda(lambda docs: "\n".join([doc.page_content for doc in docs]))from langchain_core.runnables import RunnablePassthroughprompt = """請根據以下內容回答問題,內容中如果沒有的那就回答“請咨詢人工...”,內容中如果有其他不相干的內容,直接刪除即可。內容:{content}問題:{query}回答:"""prompt_template = ChatPromptTemplate.from_template(prompt)from operator import itemgetterchain = (# RunnableLambda(log_retrieved_docs) | # 直接打印傳遞進來的參數{# 這個content和query會繼續往下傳遞,直到prompt --->{content}、{query}"content": RunnableLambda(lambda x: x["query"])  # 必須,不然要報錯“TypeError: Expected a Runnable, callable or dict.Instead got an unsupported type: <class 'str'”| self.retriever  # 檢索# | RunnableLambda(log_retrieved_docs) # 打印出檢索到的文檔,檢索后未處理# 先檢索再處理文檔| RunnableLambda(lambda docs: "\n".join([doc.page_content for doc in docs])),# | process_docs  # 先檢索再處理文檔 --- 和上面方法二選一# | RunnableLambda(log_retrieved_docs),  # 打印出檢索后的文檔 --- 這里傳遞的僅僅是檢索到的內容且預處理后的內容"query": itemgetter("query"),  # 直接傳遞用戶原始問題"msgid": RunnableLambda(lambda x: x["msgid"]),  # 顯示傳遞msgid --- 和itemgetter同樣的效果}| RunnableLambda(log_retrieved_docs)  # 傳遞的是前面整個content、query、msgid的值到日志中| prompt_template  # 組合成完整 prompt| self.llm  # 傳給大模型生成回答# | RunnableLambda(log_retrieved_docs)  # 傳遞的是LLM生成的內容 --- 但是在這一步以后,系統會同步返回---不推薦在這里打印日志)return chainasync def llm_invoke(self, query):return self.chain.invoke(query)async def retriever_stream(self, query):return self.retriever.stream(query)async def llm_astream(self, query: str, msgid: str):# 直接使用astream 異步執行chunks = []result = ""async for chunk in self.chain.astream({"query": query, "msgid": msgid}):chunks.append(chunk.content)result += chunk.content# yield chunk.content# 標準的SEE數據格式;如果不修改為下面這個,那么在使用Java的WebClient請求時,返回的是空白/報錯。# # 包裝成SSE格式,每個數據塊以"data: "開頭,結尾加兩個換行符import json# 統一返回Json格式,并且禁止Unicode編碼---不然返回的就是Unicode編碼后的代碼yield f"data: {json.dumps({'content': chunk.content}, ensure_ascii=False)}\n\n"print(f"query:{query} msgid:{msgid} llm:{result}")# # 如果 chain.stream 本身是異步生成器,直接使用:# async for chunk in self.chain.stream(query):#     yield chunk.contentasync def llm_stream(self, query):from fastapi.concurrency import iterate_in_threadpool# 如果 chain.stream 是同步生成器,使用 iterate_in_threadpool 轉換為異步async for chunk in iterate_in_threadpool(self.chain.stream(query)):# yield chunk.content# 標準的SEE數據格式;如果不修改為下面這個,那么在使用Java的WebClient請求時,返回的是空白/報錯。# # 包裝成SSE格式,每個數據塊以"data: "開頭,結尾加兩個換行符import json# 統一返回Json格式,并且禁止Unicode編碼---不然返回的就是Unicode編碼后的代碼yield f"data: {json.dumps({'content': chunk.content}, ensure_ascii=False)}\n\n"# # 如果 chain.stream 本身是異步生成器,直接使用:# async for chunk in self.chain.stream(query):#     yield chunk.contentfrom fastapi import FastAPIapp = FastAPI(title='ruozhiba', version='1.0.0', description='ruozhiba檢索')
# 添加 CORS --- 跨域 中間件
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(CORSMiddleware,allow_origins=["*"],  # 允許所有來源,生產環境建議指定具體域名allow_credentials=True,  # 允許攜帶憑證(如cookies)allow_methods=["*"],  # 允許所有HTTP方法(可選:["GET", "POST"]等)allow_headers=["*"],  # 允許所有HTTP頭
)
# 同步返回
@app.get("/invoke")
async def llm_invoke(query: str):results = await retriever.llm_invoke(query)return {"results": results.content}@app.get("/retriever")
async def retriever_stream(query: str):return await retriever.retriever_stream(query)# 流式輸出——異步執行
@app.get("/astream")
async def astream(query: str, msgid: str):from starlette.responses import StreamingResponseprint(f"請求開始:query:{query} msgid:{msgid}")return StreamingResponse(retriever.llm_astream(query, msgid),media_type="text/event-stream;charset=utf-8"  # text/plain、text/event-stream;強制響應頭charset=utf-8)# 流式輸出——同步執行
@app.get("/stream")
async def llm_stream(query: str):from starlette.responses import StreamingResponsereturn StreamingResponse(retriever.llm_stream(query),media_type="text/event-stream;charset=utf-8"  # text/plain、text/event-stream;強制響應頭charset=utf-8)
if __name__ == '__main__':# asyncio.run(main())import uvicornretriever = RetrieverLoad()uvicorn.run(app, host='localhost', port=8000)

JavaAI

引入依賴

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.1</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency></dependencies>

返回Flux流 --- 測試

---瀏覽器訪問就可以看到流效果...但是中文的話,會是亂碼;

    @GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> chat() {
//        Thread.sleep()會阻塞線程,改用Flux.interval實現非阻塞延遲:return Flux.interval(Duration.ofMillis(100)) // 每100ms生成一個數字.map(i -> "消息:" + i) // 轉換為消息字符串.take(10); // 限制總數量為10000條
//        return Flux.create(emitter -> {
//            // 模擬數據流
//            for (int i = 0; i < 10000; i++) {
//                emitter.next("Message " + i);
//                try {
//                    Thread.sleep(100); // 模擬延遲
//                } catch (InterruptedException e) {
//                    emitter.error(e);
//                }
//            }
//            emitter.complete();
//        });}

解決中文亂碼問題

如果不使用Filter,那么在返回前端頁面的時候會是中文亂碼;

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class FluxPreProcessorFilter implements Filter {@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)throws IOException, ServletException {response.setCharacterEncoding("UTF-8");chain.doFilter(request, response);}
}

結合Python流接口

初始化WebClient

    private final WebClient webClient;public ChatMsgController(WebClient.Builder webClientBuilder) {this.webClient = webClientBuilder.clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(30)) // 第一次請求可以設置長點.compress(false)  // 關閉壓縮(如果開啟可能緩沖)))// .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(256 * 1024 * 1024))// .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)// 網上說:必須設置為JSON格式;初始設置MediaType.TEXT_EVENT_STREAM_VALUE會導致請求失敗,必須使用APPLICATION_JSON_VALUE// 實際測試,與這無關
//                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).baseUrl("http://localhost:8000") // python項目地址.build();}

接收String流

   public Flux<Object> stream(String query) {return webClient.get().uri("/stream?query={query}", query).accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).doOnError(e->System.err.println("發生錯誤: " + e.getMessage()));}

接收JSON流

public Flux<String> stream(String query) {return webClient.get().uri("/stream?query={query}", query).accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(ServerSentEvent.class)// 解析為標準的SSE事件.mapNotNull(ServerSentEvent::data)  // 提取標準的SEE數據格式的 數據部分.doOnError(e->System.err.println("發生錯誤: " + e.getMessage())).map(Object::toString); // 返回是否需要根據情況來定 --- 如果不需要,返回“Flux<Object>”即可}

日志&錯誤處理

    public Flux<String> stream(@PathVariable("query") String query) {String msgid = UUID.randomUUID().toString().replace("-", "");log.info("請求開始。msgid:{},query:{}", msgid, query);return webClient.get().uri("/stream?query={query}", query).accept(MediaType.TEXT_EVENT_STREAM).retrieve()// 請求失敗返回錯誤。// 狀態碼 --- 當未請求成功時的異常.onStatus(HttpStatusCode::isError, response -> {log.error("錯誤狀態碼。msgid:{},query:{},errorCode:{}", msgid, query, response.statusCode());return response.bodyToMono(String.class).flatMap(body -> Mono.error(new RuntimeException("HTTP錯誤: " + body)));}).bodyToFlux(String.class).log() // 打印日志.doOnNext(data -> System.out.println("接收到數據塊:" + data)) // 打印接收到的數據.doOnError(e -> log.error("發生錯誤。msgid:{},query:{},error:{}", msgid, query, e.getMessage())) // 請求成功后,返回來的異常}

移除多余的前綴

    @GetMapping(value = "/astream/{query}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> astream(@PathVariable("query") String query) {String msgid = UUID.randomUUID().toString().replace("-", "");log.info("請求開始。msgid:{},query:{}", msgid, query);return webClient.get().uri("/astream?query={query}&msgid={msgid}", query, msgid).accept(MediaType.TEXT_EVENT_STREAM).retrieve().onStatus(HttpStatusCode::isError, response -> {log.error("錯誤狀態碼。msgid:{},query:{},errorCode:{}", msgid, query, response.statusCode());return response.bodyToMono(String.class).flatMap(body -> Mono.error(new RuntimeException("HTTP錯誤: " + body)));}).bodyToFlux(String.class) // 轉為String// 移除多余的Python返回的前綴; --- 因為我們返回給前端的也是SSE數據格式,所以返回的數據也是默認會有data:.map(x -> x.substring(0, x.length() - 2).replace("{\"content\": \"", "")) .doOnError(e -> log.error("發生錯誤。msgid:{},query:{},error:{}", msgid, query, e.getMessage())) // 請求成功后,返回來的異常.map(Object::toString);}

保存歷史記錄

    public Flux<String> astream(@PathVariable("query") String query) {String msgid = UUID.randomUUID().toString().replace("-", "");log.info("請求開始。msgid:{},query:{}", msgid, query);Flux<String> dataFlux = webClient.get().uri("/astream?query={query}&msgid={msgid}", query, msgid).accept(MediaType.TEXT_EVENT_STREAM).retrieve()// 通過cache()共享數據流,單獨訂閱以保存數據,避免重復請求,確保保存操作僅觸發一次。// 必須開啟,不然在后續執行"dataFlux.collectList()/ dataFlux.reduce"會再次發起請求// 不開啟cache的話---相當于前端請求一次,后端發起了三次請求.cache().doOnError(e -> log.error("發生錯誤。msgid:{},query:{},error:{}", msgid, query, e.getMessage())) // 請求成功后,返回來的異常.map(Object::toString);// 單獨訂閱以收集并保存數據 --- 下面是測試方法,二選一// 使用.collectList() 收集成一個完整的 List<String>,打印出來的list是一個chunk,["你好", ",世界", "!"]// 自己拼裝ListdataFlux.collectList() //.flatMap(list -> {String join = String.join("", list);log.info("觸發collectList保存。msgid:{},query:{},llm:{}", msgid, query, join);return Mono.just(join);}).subscribeOn(Schedulers.boundedElastic()).subscribe(null,error -> log.error("collectList保存失敗。msgid:{},query:{}", msgid, query, error),() -> log.info("collectList數據已保存。msgid:{},query:{}", msgid, query));// 使用reduce,合并ChunkdataFlux.reduce((a, b) -> a + b).flatMap(list -> {log.info("觸發reduce保存。msgid:{},query:{},llm:{}", msgid, query, list);return Mono.just(list);}).subscribeOn(Schedulers.boundedElastic()).subscribe(null,error -> log.error("reduce保存失敗。msgid:{},query:{}", msgid, query, error),() -> log.info("reduce數據已保存。msgid:{},query:{}", msgid, query));return dataFlux;}

完整代碼

SpringBoot啟動---瀏覽器請求

@RestController
@RequestMapping("/ai")
@CrossOrigin("*")
@Slf4j
public class ChatMsgController {private final WebClient webClient;public ChatMsgController(WebClient.Builder webClientBuilder) {this.webClient = webClientBuilder.clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(60)).compress(false)  // 關閉壓縮(如果開啟可能緩沖)))
//                .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(256 * 1024 * 1024))// .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)// ?? 必須設置為JSON格式;初始設置MediaType.TEXT_EVENT_STREAM_VALUE會導致請求失敗,必須使用APPLICATION_JSON_VALUE
//                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).baseUrl("http://localhost:8000").build();}@GetMapping(value = "/astream/{query}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> astream(@PathVariable("query") String query) {String msgid = UUID.randomUUID().toString().replace("-", "");log.info("請求開始。msgid:{},query:{}", msgid, query);Flux<String> dataFlux = webClient.get().uri("/astream?query={query}&msgid={msgid}", query, msgid).accept(MediaType.TEXT_EVENT_STREAM).retrieve()// 請求失敗返回錯誤。// 狀態碼 --- 當未請求成功時的異常.onStatus(HttpStatusCode::isError, response -> {log.error("錯誤狀態碼。msgid:{},query:{},errorCode:{}", msgid, query, response.statusCode());return response.bodyToMono(String.class).flatMap(body -> Mono.error(new RuntimeException("HTTP錯誤: " + body)));}).bodyToFlux(String.class) // 轉為String
//                .bodyToFlux(ServerSentEvent.class)  // 解析為標準的SSE事件
//                .mapNotNull(ServerSentEvent::data)  // 提取標準的SEE數據格式的 數據部分.map(x -> x.substring(0, x.length() - 2).replace("{\"content\": \"", ""))
//                .log() // 打印日志 --- 這個日志是一個chunk一個chunk打印的.cache()// 通過cache()共享數據流,單獨訂閱以保存數據,避免重復請求,確保保存操作僅觸發一次。必須開啟,不然在后續執行"dataFlux.collectList()/ dataFlux.reduce"會再次發起請求---相當于前端請求一次,后端發起了三次請求
//                .doOnNext(data -> System.out.println("接收到數據塊:" + data)) // 打印接收到的數據.doOnError(e -> log.error("發生錯誤。msgid:{},query:{},error:{}", msgid, query, e.getMessage())) // 請求成功后,返回來的異常.map(Object::toString);// 單獨訂閱以收集并保存數據// 使用.collectList() 收集成一個完整的 List<String>,打印出來的list是一個chunk,["你好", ",世界", "!"]// 自己拼裝ListdataFlux.collectList() //.flatMap(list -> {String join = String.join("", list);log.info("觸發collectList保存。msgid:{},query:{},llm:{}", msgid, query, join);return Mono.just(join);}).subscribeOn(Schedulers.boundedElastic()).subscribe(null,error -> log.error("collectList保存失敗。msgid:{},query:{}", msgid, query, error),() -> log.info("collectList數據已保存。msgid:{},query:{}", msgid, query));// 使用reduce,合并ChunkdataFlux.reduce((a, b) -> a + b).flatMap(list -> {log.info("觸發reduce保存。msgid:{},query:{},llm:{}", msgid, query, list);return Mono.just(list);}).subscribeOn(Schedulers.boundedElastic()).subscribe(null,error -> log.error("reduce保存失敗。msgid:{},query:{}", msgid, query, error),() -> log.info("reduce數據已保存。msgid:{},query:{}", msgid, query));return dataFlux;}
}

Test/Main啟動

public class StreamApiClient {public void streamData(String query) {
//        WebClient client = WebClient.create("http://localhost:8000");WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(10))
//                                        .compress(false)  // 關閉壓縮(如果開啟可能緩沖)))
//                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).baseUrl("http://localhost:8000").build();Flux<String> stream = client.get().uri("/stream?query={query}", query).accept(MediaType.TEXT_EVENT_STREAM).retrieve().onStatus(HttpStatusCode::isError, response -> { // 請求失敗返回錯誤System.out.println("錯誤狀態碼: " + response.statusCode());return response.bodyToMono(String.class).flatMap(body -> Mono.error(new RuntimeException("HTTP錯誤: " + body)));}).bodyToFlux(String.class)
//                .log()
//                .doOnNext(data -> System.out.println("接收到數據塊:" + data)).doOnError(e -> System.err.println("發生錯誤: " + e.getMessage()));stream.subscribe(chunk -> System.out.println("Received chunk: " + chunk),error -> System.err.println("Error: " + error),() -> System.out.println("Stream completed"));}public static void main(String[] args) throws InterruptedException {StreamApiClient streamApiClient = new StreamApiClient();streamApiClient.streamData("五塊能娶幾個老婆");Thread.sleep(40000); // 可以適當提升時間,不然可能程序還沒得到返回就結束了,看不到效果}
}

Vue前端

前端使用vue3+vite構建

創建項目

HBuilderX創建

使用命令創建

npm create vite@latest my-vue-app -- --template vue    # 創建項目
cd my-vue-app    # 進入到項目中
npm install      # 安裝依賴
npm run dev      # 運行dev環境項目<!-- 其中: -->
<!-- index.html 主要是標簽---包含標簽頁itme樣式; -->
<!-- src/App.vue ---主頁訪問,系統 import HelloWorld from './components/HelloWorld.vue'導入了HelloWorld.vue? -->
<!-- src/components/HelloWorld.vue ---剛開始是廣告頁,可以修改內容 -->
<!-- 所以啟動成功后,訪問"http://localhost:3000/index"會直接進入HelloWorld.vue; -->
<!-- 我們現在只需要修改HelloWorld.vue即可 -->

?修改HelloWorld.vue

<template><div><input v-model="query" placeholder="輸入查詢內容" class="query-input" /><button @click="startStream" class="action-button start-button">調用Java接收流</button><button @click="chatStream" class="action-button start-button">調用Chat接口測試</button><button @click="pythonStream" class="action-button start-button">調用python接口</button><button @click="closeStream" class="action-button stop-button">停止接收</button><div class="result-container"><pre class="response-data">{{ responseData }}</pre><div>這個數據來源于App的MSG組件:{{msg}}</div></div></div>
</template>
<script setup>
import { ref, onBeforeUnmount } from 'vue'// Props 定義
const props = defineProps({msg: String
})// 響應式數據
const query = ref('為什么砍頭不找死刑犯來演')
const responseData = ref('')
const eventSource = ref(null)// 方法
const pythonStream = () => {closeStream()responseData.value = ''const url = `http://localhost:8000/astream?query=${encodeURIComponent(query.value)}&msgid=123456`eventSource.value = new EventSource(url)eventSource.value.onmessage = (event) => {const data = JSON.parse(event.data)responseData.value += data.content + "\n"}eventSource.value.onerror = (error) => {console.warn("EventSource warn:", error)closeStream()}
}const chatStream = () => {closeStream()responseData.value = ''const url = `http://localhost:8080/ai/chat`eventSource.value = new EventSource(url)eventSource.value.onmessage = (event) => {responseData.value += event.data + "\n"}eventSource.value.onerror = (error) => {console.warn("EventSource warn:", error)closeStream()}
}const startStream = () => {closeStream()responseData.value = ''const url = `http://localhost:8080/ai/astream/${encodeURIComponent(query.value)}`eventSource.value = new EventSource(url)eventSource.value.onmessage = (event) => {responseData.value += event.data + "-"}eventSource.value.onerror = (error) => {console.warn("EventSource warn:", error)closeStream()}
}const closeStream = () => {if (eventSource.value) {eventSource.value.close()eventSource.value = null}
}// 生命周期鉤子
onBeforeUnmount(() => {closeStream()
})
</script><style scoped>.container {max-width: 600px;margin: 50px auto;padding: 20px;border-radius: 10px;box-shadow: 0 4px 8px rgba(0, 0, 0, .1);background-color: #fff;}.query-input {width: calc(100% - 22px);padding: 10px;margin-bottom: 15px;border: 1px solid #ccc;border-radius: 5px;font-size: 16px;}.action-button {padding: 10px 20px;margin-right: 10px;border: none;border-radius: 5px;cursor: pointer;font-size: 16px;transition: all .3s ease;}.start-button {background-color: #4CAF50;color: white;}.stop-button {background-color: #f44336;color: white;}.start-button:hover {background-color: #45a049;}.stop-button:hover {background-color: #e53935;}.result-container {margin-top: 20px;padding: 15px;background: #f9f9f9;border-radius: 5px;overflow-x: auto;}.response-data {white-space: pre-wrap;word-wrap: break-word;font-size: 14px;color: #333;}
</style>

請求測試

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81482.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81482.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81482.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Java ee初階】jvm(3)

一、雙親委派機制&#xff08;類加載機制中&#xff0c;最經常考到的問題&#xff09; 類加載的第一個環節中&#xff0c;根據類的全限定類名&#xff08;包名類名&#xff09;找到對應的.class文件的過程。 JVM中進行類加載的操作&#xff0c;需要以來內部的模塊“類加載器”…

wps excel將表格輸出pdf時所有列在一張紙上

記錄&#xff1a;wps excel將表格輸出pdf時所有列在一張紙上 1&#xff0c;調整縮放比例&#xff0c;或選擇將所有列打印在一頁 2&#xff0c;將表格的所有鋪滿到這套虛線

分布式微服務系統架構第134集:筆記1運維服務器經驗,高并發,大數據量系統

加群聯系作者vx&#xff1a;xiaoda0423 倉庫地址&#xff1a;https://webvueblog.github.io/JavaPlusDoc/ https://1024bat.cn/ https://github.com/webVueBlog/fastapi_plus https://webvueblog.github.io/JavaPlusDoc/ ? 一、查看端口是否被占用的常用命令 1?? lsof 命令&…

IS-IS 中間系統到中間系統

前言&#xff1a; 中間系統到中間系統IS-IS&#xff08;Intermediate System to Intermediate System&#xff09;屬于內部網關協議IGP&#xff08;Interior Gateway Protocol&#xff09;&#xff0c;用于自治系統內部 IS-IS也是一種鏈路狀態協議&#xff0c;使用最短路徑優先…

前端安全:XSS、CSRF 防御與最佳實踐

引言 隨著互聯網應用的普及&#xff0c;前端安全問題日益凸顯。作為開發者&#xff0c;了解并防范常見的安全威脅至關重要。本文將深入探討兩種最常見的前端安全威脅&#xff1a;跨站腳本攻擊&#xff08;XSS&#xff09;和跨站請求偽造&#xff08;CSRF&#xff09;&#xff…

uniapp 彈窗封裝(上、下、左、右、中五個方位)

無腦復制即可&#xff01;&#xff01;&#xff01; <template><view><viewv-if"mask"class"tui-drawer-mask":class"{ tui-drawer-mask_show: visible }":style"{ zIndex: maskZIndex }"tap"handleMaskClick&qu…

Axure制作可視化大屏動態滾動列表教程

在可視化大屏設計中&#xff0c;動態滾動列表是一種常見且實用的展示方式&#xff0c;能夠有效地展示大量信息。本文將詳細介紹如何使用Axure制作一個動態滾動的列表展示模塊。 一、準備工作 打開Axure軟件&#xff1a;確保你已經安裝并打開了Axure RP軟件。創建新項目&#x…

零基礎玩轉Apache Superset可視化部署

根據官方Quick Start Guide&#xff0c;你可以按照以下步驟進行部署&#xff1a; 1. 確認環境2. 獲取代碼3. 獲取官方最新代碼4. 啟動服務5. 訪問Superset Web界面6. 接入數據源 前提條件&#xff1a; dockerdocker compose 1. 確認環境 安裝Docker和Docker Compose 確保你…

服務器數據恢復—XFS文件系統分區消失的數據恢復案例

服務器數據恢復環境&故障&#xff1a; 服務器上有一組由raid卡組建的raid5磁盤陣列。上層安裝linux才做系統&#xff0c;采用XFS文件系統&#xff0c;劃分了3個分區。 管理員將服務器的操作系統重裝后&#xff0c;發現服務器上的分區發生了改變&#xff1a;一個分區消失&am…

2025/5/18

繼續研究一下大佬的RAG項目。開始我的碎碎念。 RAG可以分成兩部分&#xff1a;一個是問答&#xff0c;一個是數據處理。 問答是人提問&#xff0c;然后查數據庫&#xff0c;把查的東西用大模型組織成人話&#xff0c;回答人的提問。 數據處理是把當下知識庫里的東西&#xf…

在 Vue 中插入 B 站視頻

前言 在 Vue 項目中&#xff0c;有時我們需要嵌入 B 站視頻來豐富頁面內容&#xff0c;為用戶提供更直觀的信息展示。本文將詳細介紹在 Vue 中插入 B 站視頻的多種方法。 使用<iframe>標簽直接嵌入,<iframe>標簽是一種簡單直接的方式&#xff0c;可將 B 站視頻嵌…

OpenCv高階(八)——攝像頭調用、攝像頭OCR

文章目錄 前言一、攝像頭調用通用方法1、導入必要的庫2、創建攝像頭接口 二、攝像頭OCR1.引入庫2、定義函數&#xff08;1&#xff09;定義顯示opencv顯示函數&#xff08;2&#xff09;保持寬高比的縮放函數&#xff08;3&#xff09;坐標點排序函數&#xff08;4&#xff09;…

特斯拉虛擬電廠:能源互聯網時代的分布式革命

在雙碳目標與能源轉型的雙重驅動下&#xff0c;特斯拉虛擬電廠&#xff08;Virtual Power Plant, VPP&#xff09;通過數字孿生技術與能源系統的深度融合&#xff0c;重構了傳統電力系統的運行范式。本文從系統架構、工程實踐、技術挑戰三個維度&#xff0c;深度解析這一顛覆性…

【漫話機器學習系列】258.拐點(Inflection Point)

拐點&#xff08;Inflection Point&#xff09;詳解&#xff1a;定義、原理與應用 在數學分析與數據建模中&#xff0c;“拐點&#xff08;Inflection Point&#xff09;”是一個非常重要的概念。今天這篇文章&#xff0c;我們將結合圖示&#xff0c;深入理解拐點的定義、數學…

語音識別——聲紋識別

通過將說話人的聲音與數據庫中的記錄聲音進行比對&#xff0c;判斷說話人是否為數據庫白名單中的同一人&#xff0c;從而完成語音驗證。目前&#xff0c;3D-Speaker 聲紋驗證的效果較為出色。 3D-Speaker 是一個開源工具包&#xff0c;可用于單模態和多模態的說話人驗證、說話…

DeepSeek 賦能軍事:重塑現代戰爭形態的科技密碼

目錄 一、引言&#xff1a;AI 浪潮下的軍事變革與 DeepSeek 崛起二、DeepSeek 技術原理與特性剖析2.1 核心技術架構2.2 獨特優勢 三、DeepSeek 在軍事偵察中的應用3.1 海量數據快速處理3.2 精準目標識別追蹤3.3 預測潛在威脅 四、DeepSeek 在軍事指揮決策中的應用4.1 戰場態勢實…

uWSGI是什么?

uWSGI 是一個功能強大的應用服務器&#xff0c;專為部署高性能 Web 應用設計&#xff0c;尤其適合 Python 生態系統。以下是對其核心介紹及適用場景的總結&#xff1a; uWSGI 是什么&#xff1f; uWSGI 是一個實現了 WSGI&#xff08;Web Server Gateway Interface&#xff09…

Digi XBee XR 系列介紹

Digi 延續了 20 多年來亞 GHz 射頻模塊的傳統&#xff0c;推出了 Digi XBee XR 系列遠距離模塊&#xff0c;包括 Digi XBee XR 900 - 已通過多個地區的預先認證 - 以及 Digi XBee XR 868 - 已通過歐洲地區應用的預先認證。 這些先進的射頻模塊專為遠距離抗干擾無線通信而設計。…

RabbitMq C++客戶端的使用

介紹 RabbitMQ 是一個開源的消息代理和隊列服務器&#xff0c;用于在分布式系統之間傳遞消息。它實現了高級消息隊列協議(AMQP)&#xff0c;同時也支持其他協議如 STOMP、MQTT 等。 核心概念 Producer(生產者): 發送消息的應用程序 Consumer(消費者): 接收消息的應用程序 Q…

HTML 中的 input 標簽詳解

HTML 中的 input 標簽詳解 一、基礎概念 1. 定義與作用 HTML 中的 <input> 標簽是表單元素的核心組件&#xff0c;用于創建各種用戶輸入字段。作為一個空標簽&#xff08;沒有閉合標簽&#xff09;&#xff0c;它通過 type 屬性來決定呈現何種輸入控件&#xff0c;是實…