Spring AI Advisors API 提供了一種靈活且強大的方式來攔截、修改和增強 Spring 應用程序中的 AI 驅動交互。其核心思想類似于 Spring AOP(面向切面編程)中的“通知”(Advice),允許開發者在不修改核心業務邏輯的情況下,在 AI 調用前后或過程中插入自定義邏輯。
Advisors 的作用
Advisors 的主要作用包括:
- 封裝重復的生成式 AI 模式:將常見的 AI 交互模式(如聊天記憶、RAG 檢索增強生成)封裝成可重用的組件。
- 數據轉換與增強:在發送到大型語言模型(LLMs)的數據和從 LLMs 返回的數據之間進行轉換和增強。
- 跨模型和用例的可移植性:提供一種機制,使得 AI 邏輯可以在不同的模型和用例之間輕松移植。
- 可觀測性:Advisors 參與到 Spring AI 的可觀測性棧中,可以查看與其執行相關的指標和跟蹤。
核心接口
Spring AI Advisors API 主要圍繞以下幾個核心接口構建:
Advisor
:所有 Advisor 的父接口,繼承自 Spring 的Ordered
接口,用于定義 Advisor 的執行順序和名稱。CallAdvisor
:用于非流式(non-streaming)場景的 Advisor 接口,其adviseCall
方法用于攔截和處理同步的 AI 調用。StreamAdvisor
:用于流式(streaming)場景的 Advisor 接口,其adviseStream
方法用于攔截和處理異步的 AI 流式 AI 調用。AdvisorChain
:定義了執行 Advisor 鏈的上下文。CallAdvisorChain
:CallAdvisor
的鏈,用于編排同步 Advisor 的執行。StreamAdvisorChain
:StreamAdvisor
的鏈,用于編排流式 Advisor 的執行。BaseAdvisor
:一個抽象基類,實現了CallAdvisor
和StreamAdvisor
的通用方面,減少了實現 Advisor 所需的樣板代碼,并提供了before
和after
方法來簡化邏輯的插入。
這些接口共同構成了 Spring AI Advisors 的核心骨架,允許開發者以統一的方式處理不同類型的 AI 交互。
架構設計與執行流程
Spring AI Advisors 的設計借鑒了 Spring 框架中經典的責任鏈模式和 AOP 思想,使得 AI 請求的處理流程高度可擴展和可定制。理解其內部架構和執行流程對于有效利用 Advisors 至關重要。
類圖概覽
首先,我們通過一個簡化的類圖來概覽 Spring AI Advisors 的核心接口和它們之間的關系:
從類圖中可以看出:
Advisor
是所有顧問的基石,它繼承了Ordered
接口,這意味著每個顧問都有一個排序值,用于確定其在鏈中的執行順序。CallAdvisor
和StreamAdvisor
分別處理同步和異步(流式)的 AI 調用。CallAdvisorChain
和StreamAdvisorChain
是顧問鏈的抽象,它們負責按順序調用鏈中的下一個顧問。BaseAdvisor
提供了一個方便的抽象,它實現了CallAdvisor
和StreamAdvisor
的默認行為,并引入了before
和after
方法,使得開發者可以更容易地在請求處理前后插入邏輯。
請求處理流程
Spring AI Advisors 的請求處理流程是一個典型的責任鏈模式。當一個 AI 請求(ChatClientRequest
)被發起時,它會依次通過配置好的 Advisor 鏈,每個 Advisor 都有機會在請求發送到 LLM 之前對其進行修改或增強,并在 LLM 返回響應之后對其進行處理。
以下是請求處理的詳細流程圖:
- 請求初始化:Spring AI 框架從用戶的
Prompt
創建一個AdvisedRequest
對象,并附帶一個空的AdvisorContext
對象。AdvisorContext
用于在整個 Advisor 鏈中共享狀態和數據。 - Advisor 鏈處理(請求階段):
AdvisedRequest
沿著 Advisor 鏈依次傳遞。每個 Advisor 都會執行其adviseCall
(同步)或adviseStream
(流式)方法。在這個階段,Advisor 可以:
-
- 檢查未密封的 Prompt 數據。
- 自定義和增強 Prompt 數據(例如,添加系統消息、上下文信息、函數定義等)。
- 調用鏈中的下一個實體(
chain.nextCall()
或chain.nextStream()
)。 - 選擇性地阻塞請求,即不調用
nextCall/nextStream
,而是直接生成響應。在這種情況下,該 Advisor 負責填充響應。
- 發送到 LLM:鏈中的最后一個 Advisor(通常由框架自動添加)負責將處理后的請求發送到實際的
Chat Model
(大型語言模型)。 - LLM 響應:
Chat Model
處理請求并返回一個響應。 - Advisor 鏈處理(響應階段):LLM 的響應被傳遞回 Advisor 鏈,并轉換為
AdvisedResponse
。AdvisedResponse
同樣包含共享的AdvisorContext
實例。每個 Advisor 都有機會處理或修改響應,例如,進行后處理、日志記錄、數據提取等。 - 返回客戶端:最終的
AdvisedResponse
返回給客戶端,并從中提取ChatCompletion
。
時序圖直觀地展示了請求如何從 Client
經過 ChatClient
,然后逐級深入到 Advisor
鏈,最終到達 ChatModel
。響應則以相反的順序回溯,每個 Advisor
再次有機會處理響應。這種“洋蔥式”的結構確保了請求和響應在到達核心 AI 模型之前和之后都能被靈活地處理。
Advisor 的執行順序
Advisor 在鏈中的執行順序由其 getOrder()
方法返回的值決定。Spring 框架中的 Ordered
接口定義了以下語義:
- 值越小,優先級越高:
Ordered.HIGHEST_PRECEDENCE
(Integer.MIN_VALUE) 表示最高優先級,Ordered.LOWEST_PRECEDENCE
(Integer.MAX_VALUE) 表示最低優先級。 - 相同值不保證順序:如果多個 Advisor 具有相同的
order
值,它們的執行順序是不確定的。
Advisor 鏈的操作類似于一個棧:
- 請求處理階段:
order
值最低(優先級最高)的 Advisor 最先處理請求。它位于棧的頂部,因此在請求向下傳遞時,它首先被調用。 - 響應處理階段:
order
值最低(優先級最高)的 Advisor 最后處理響應。當響應從 LLM 返回并沿著鏈回溯時,它會最后被調用。
這種設計使得開發者可以精確控制 Advisor 的執行時機。例如,如果你希望一個 Advisor 在所有其他 Advisor 之前處理請求并在所有其他 Advisor 之后處理響應(例如,用于全局日志記錄或異常處理),你應該給它設置一個非常低的 order
值(接近 Ordered.HIGHEST_PRECEDENCE
)。反之,如果你希望它最后處理請求并最先處理響應(例如,用于最終的數據格式化),則設置一個較高的 order
值(接近 Ordered.LOWEST_PRECEDENCE
)。
對于需要同時在請求和響應階段都處于鏈條最前端的用例,可以考慮使用兩個獨立的 Advisor,并為它們配置不同的 order
值,并通過 AdvisorContext
共享狀態。
代碼分析:深入理解 BaseAdvisor
在 Spring AI Advisors API 中,BaseAdvisor
接口扮演著至關重要的角色,它極大地簡化了自定義 Advisor 的實現。通過提供 CallAdvisor
和 StreamAdvisor
的默認實現,BaseAdvisor
將復雜的攔截邏輯抽象為兩個核心方法:before
和 after
。
讓我們再次回顧 BaseAdvisor
的定義:
public interface BaseAdvisor extends CallAdvisor, StreamAdvisor {Scheduler DEFAULT_SCHEDULER = Schedulers.boundedElastic();@Overridedefault ChatClientResponse adviseCall(ChatClientRequest chatClientRequest, CallAdvisorChain callAdvisorChain) {Assert.notNull(chatClientRequest, "chatClientRequest cannot be null");Assert.notNull(callAdvisorChain, "callAdvisorChain cannot be null");ChatClientRequest processedChatClientRequest = before(chatClientRequest, callAdvisorChain);ChatClientResponse chatClientResponse = callAdvisorChain.nextCall(processedChatClientRequest);return after(chatClientResponse, callAdvisorChain);}@Overridedefault Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,StreamAdvisorChain streamAdvisorChain) {Assert.notNull(chatClientRequest, "chatClientRequest cannot be null");Assert.notNull(streamAdvisorChain, "streamAdvisorChain cannot be null");Assert.notNull(getScheduler(), "scheduler cannot be null");Flux<ChatClientResponse> chatClientResponseFlux = Mono.just(chatClientRequest).publishOn(getScheduler()).map(request -> this.before(request, streamAdvisorChain)).flatMapMany(streamAdvisorChain::nextStream);return chatClientResponseFlux.map(response -> {if (AdvisorUtils.onFinishReason().test(response)) {response = after(response, streamAdvisorChain);}return response;}).onErrorResume(error -> Flux.error(new IllegalStateException("Stream processing failed", error)));}@Overridedefault String getName() {return this.getClass().getSimpleName();}/*** Logic to be executed before the rest of the advisor chain is called.* 在調用顧問鏈的其余部分之前執行的邏輯*/ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain);/*** Logic to be executed after the rest of the advisor chain is called.*/ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain);/*** Scheduler used for processing the advisor logic when streaming.* 流式傳輸時用于處理顧問邏輯的調度器。*/default Scheduler getScheduler() {return DEFAULT_SCHEDULER;}}
關鍵點分析:
- 統一的攔截點:
BaseAdvisor
通過adviseCall
和adviseStream
方法,為同步和流式調用提供了統一的攔截邏輯。這兩個方法內部都調用了before
方法來處理請求,然后調用chain.nextCall()
或chain.nextStream()
將請求傳遞給鏈中的下一個 Advisor 或最終的 AI 模型,最后調用after
方法來處理響應。 before
方法:
-
ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain)
- 作用:在請求發送到鏈中的下一個 Advisor 或 AI 模型之前執行的邏輯。你可以在這里修改
chatClientRequest
,例如添加或修改消息、參數等。返回的ChatClientRequest
將被傳遞給鏈的后續部分。
after
方法:
-
ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain)
- 作用:在鏈中的下一個 Advisor 或 AI 模型返回響應之后執行的邏輯。你可以在這里對
chatClientResponse
進行后處理,例如解析響應、提取信息、日志記錄等。返回的ChatClientResponse
將被傳遞回鏈的上游。
- 流式處理的復雜性:對于流式處理 (
adviseStream
),BaseAdvisor
使用 Reactor 的Flux
和Mono
來處理異步流。before
方法在Mono.just(chatClientRequest).map(...)
中被調用,確保在流開始之前對請求進行預處理。after
方法則在Flux.map(...)
中被調用,并且通過AdvisorUtils.onFinishReason().test(response)
判斷是否是流的最終響應,以確保after
邏輯在整個流完成時才執行,而不是對流中的每個分塊都執行。 - 調度器 (
Scheduler
):BaseAdvisor
提供了getScheduler()
方法,默認使用Schedulers.boundedElastic()
。這允許在處理流式 Advisor 邏輯時指定一個調度器,以避免阻塞主線程,這對于響應式編程至關重要。
通過實現 BaseAdvisor
,開發者只需要關注 before
和 after
這兩個業務邏輯相關的核心方法,而無需關心 Advisor 鏈的內部調用機制和流式處理的復雜性,大大降低了開發難度。
自定義 Advisor 示例
理解了 Spring AI Advisors 的核心概念和架構后,我們來看幾個實際的自定義 Advisor 示例,它們將幫助你更好地掌握如何在自己的應用中利用這一強大機制。
1. 日志記錄 Advisor (SimpleLoggerAdvisor
)
一個常見的需求是在 AI 請求處理的各個階段進行日志記錄,以便于調試和監控。我們可以實現一個簡單的日志記錄 Advisor,它在請求發送前和響應返回后打印相關信息。這個 Advisor 只觀察請求和響應,不進行修改。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.CallAdvisorChain;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
import org.springframework.ai.chat.client.advisor.AdvisorUtils;public class SimpleLoggerAdvisor implements BaseAdvisor {private static final Logger logger = LoggerFactory.getLogger(SimpleLoggerAdvisor.class);@Overridepublic String getName() {return this.getClass().getSimpleName();}@Overridepublic int getOrder() {return 0; // 默認順序,可以根據需要調整}@Overridepublic ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {logger.debug("BEFORE: {}", chatClientRequest);return chatClientRequest; // 不修改請求}@Overridepublic ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {logger.debug("AFTER: {}", chatClientResponse);return chatClientResponse; // 不修改響應}
}
代碼分析:
SimpleLoggerAdvisor
實現了BaseAdvisor
接口,因此它同時支持同步和流式調用。getName()
方法返回 Advisor 的名稱,通常是類名。getOrder()
方法返回 Advisor 的執行順序。這里設置為0
,表示一個中等的優先級。你可以根據需要在Ordered.HIGHEST_PRECEDENCE
和Ordered.LOWEST_PRECEDENCE
之間調整。before()
方法在請求處理前被調用,我們在這里記錄了ChatClientRequest
的內容。由于這個 Advisor 只是觀察者,所以直接返回了原始的chatClientRequest
。after()
方法在響應返回后被調用,我們在這里記錄了ChatClientResponse
的內容。同樣,直接返回了原始的chatClientResponse
。
如何使用:
你可以通過 ChatClient.builder().defaultAdvisors()
方法將 SimpleLoggerAdvisor
配置到 ChatClient
中:
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;// 假設 chatModel 已經注入
ChatModel chatModel = ...;ChatClient chatClient = ChatClient.builder(chatModel)
.defaultAdvisors(new SimpleLoggerAdvisor()
)
.build();String response = chatClient.prompt()
.user("你好,Spring AI!")
.call()
.content();System.out.println(response);
當上述代碼執行時,你將在日志中看到 BEFORE
和 AFTER
的輸出,展示了請求和響應的詳細信息。
2. 重讀 Advisor (ReReadingAdvisor
)
“重讀”(Re-Reading)是一種提高大型語言模型推理能力的技術,其核心思想是將用戶的輸入查詢重復一次,例如:
{Input_Query}
Read the question again: {Input_Query}
我們可以實現一個 Advisor 來自動將用戶的輸入查詢轉換為這種格式:
import java.util.HashMap;
import java.util.Map;import reactor.core.publisher.Flux;import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.CallAdvisorChain;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;public class ReReadingAdvisor implements BaseAdvisor {@Overridepublic String getName() {return this.getClass().getSimpleName();}@Overridepublic int getOrder() {return 0; // 默認順序,可以根據需要調整}@Overridepublic ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {Map<String, Object> advisedUserParams = new HashMap<>(chatClientRequest.userParams());advisedUserParams.put("re2_input_query", chatClientRequest.userText());return ChatClientRequest.from(chatClientRequest).userText("""{re2_input_query}Read the question again: {re2_input_query}""").userParams(advisedUserParams).build();}@Override public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {return chatClientResponse; // 不修改響應}
}
代碼分析:
ReReadingAdvisor
也實現了BaseAdvisor
。- 關鍵邏輯在于
before()
方法。它首先獲取原始的userText
,并將其作為參數re2_input_query
放入advisedUserParams
中。 - 然后,它使用
ChatClientRequest.from(chatClientRequest).userText(...)
來構建一個新的ChatClientRequest
,其中userText
被修改為包含重讀指令的模板字符串。這里利用了 Spring AI 的模板功能,{re2_input_query}
會被實際的用戶輸入替換。 after()
方法同樣不修改響應。
如何使用:
將 ReReadingAdvisor
配置到 ChatClient
中:
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;// 假設 chatModel 已經注入
ChatModel chatModel = ...;ChatClient chatClient = ChatClient.builder(chatModel).defaultAdvisors(new ReReadingAdvisor()).build();String response = chatClient.prompt().user("請解釋一下量子力學").call().content();System.out.println(response);
當用戶輸入“請解釋一下量子力學”時,實際發送給 LLM 的 Prompt 將會是:
請解釋一下量子力學
Read the question again: 請解釋一下量子力學
這有助于 LLM 更好地理解和處理復雜的查詢,從而提高響應質量。
3. 結合使用:聊天記憶 Advisor (MessageChatMemoryAdvisor) 和 RAG Advisor (QuestionAnswerAdvisor)
Spring AI 提供了開箱即用的 Advisors,例如 MessageChatMemoryAdvisor
用于管理聊天記憶,QuestionAnswerAdvisor
用于實現 RAG(檢索增強生成)。這些 Advisor 同樣遵循上述設計模式,可以方便地集成到你的應用中。
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.client.advisor.QuestionAnswerAdvisor;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.InMemoryChatMemory;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.vectorstore.VectorStore;// 假設 chatModel 和 vectorStore 已經注入
ChatModel chatModel = ...;
VectorStore vectorStore = ...;
ChatMemory chatMemory = new InMemoryChatMemory();ChatClient chatClient = ChatClient.builder(chatModel)
.defaultAdvisors(MessageChatMemoryAdvisor.builder(chatMemory).build(), // 聊天記憶 AdvisorQuestionAnswerAdvisor.builder(vectorStore).build() // RAG Advisor
)
.build();String conversationId = "user-123"; // 假設的用戶會話ID// 第一次對話
String response1 = chatClient.prompt()
.advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
.user("你好,我叫小明。")
.call()
.content();
System.out.println("Response 1: " + response1);// 第二次對話,會利用聊天記憶
String response2 = chatClient.prompt()
.advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
.user("我剛才說了什么?")
.call()
.content();
System.out.println("Response 2: " + response2);
代碼分析:
MessageChatMemoryAdvisor
會根據ChatMemory.CONVERSATION_ID
參數管理會話歷史,將其添加到發送給 LLM 的 Prompt 中,從而實現多輪對話的上下文感知。QuestionAnswerAdvisor
會利用VectorStore
進行信息檢索,并將檢索到的相關文檔片段添加到 Prompt 中,以增強 LLM 的回答能力,實現 RAG 模式。- 通過
advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
,我們可以在運行時為 Advisor 提供參數,這使得 Advisor 更加靈活和可配置。
這些示例展示了 Spring AI Advisors 的強大功能和靈活性,無論是簡單的日志記錄,還是復雜的 Prompt 增強,甚至集成高級的 AI 模式,Advisors 都提供了一個優雅的解決方案。
最佳實踐
在使用 Spring AI Advisors 時,遵循一些最佳實踐可以幫助你構建更健壯、高效和可維護的 AI 應用。
- 明確 Advisor 的職責:每個 Advisor 應該只負責一個單一的、明確的功能。例如,一個 Advisor 負責日志記錄,另一個負責 Prompt 增強,而不是將所有邏輯都塞到一個 Advisor 中。這有助于提高代碼的可讀性、可測試性和可重用性。
- 合理設置
order
值:仔細考慮每個 Advisor 的執行順序。例如,如果一個 Advisor 需要依賴另一個 Advisor 修改后的 Prompt,那么它應該在依賴的 Advisor 之后執行。對于全局性的操作(如日志、異常處理),通常設置較高的優先級(較低的order
值),以便它們在請求處理的最早和最晚階段介入。 - 利用
BaseAdvisor
簡化實現:對于大多數自定義 Advisor,優先考慮實現BaseAdvisor
接口。它提供了before
和after
兩個清晰的擴展點,大大減少了樣板代碼,讓你能夠專注于核心業務邏輯。 - 善用
AdvisorContext
共享狀態:如果多個 Advisor 之間需要共享數據或狀態,請使用AdvisorContext
。這比在 Advisor 之間傳遞復雜對象或使用全局變量更優雅和安全。 - 區分同步和流式處理:如果你的 Advisor 需要同時支持同步和流式 AI 調用,確保正確實現
CallAdvisor
和StreamAdvisor
的邏輯。BaseAdvisor
已經為你處理了大部分復雜性,但仍需注意流式處理中after
方法的觸發時機(通常是流結束時)。 - 錯誤處理:在 Advisor 中實現適當的錯誤處理機制。如果一個 Advisor 拋出異常,它可能會中斷整個鏈的執行。考慮如何優雅地處理這些異常,例如通過日志記錄、回退機制或將錯誤信息傳遞給
AdvisorContext
。
結論
Spring AI Advisors 提供了一個強大而靈活的機制,用于在 Spring AI 應用程序中攔截、修改和增強 AI 驅動的交互。通過深入理解其核心概念、架構設計、執行流程以及 BaseAdvisor
的實現細節,開發者可以有效地利用這一特性來構建更具可擴展性、可維護性和智能化的 AI 應用。
無論是簡單的日志記錄、復雜的 Prompt 增強,還是集成如聊天記憶和 RAG 等高級 AI 模式,Advisors 都提供了一個優雅且符合 Spring 哲學的設計模式。掌握 Advisors 的使用,將使你能夠更好地控制 AI 模型的行為,為用戶提供更智能、更個性化的體驗。