[spring-cloud: 負載均衡]-源碼分析

獲取服務列表

ServiceInstanceListSupplier

ServiceInstanceListSupplier 接口是一個提供 ServiceInstance 列表的供應者,返回一個響應式流 Flux<List<ServiceInstance>>,用于服務發現。

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {String getServiceId();default Flux<List<ServiceInstance>> get(Request request) {return get();}static ServiceInstanceListSupplierBuilder builder() {return new ServiceInstanceListSupplierBuilder();}}

DelegatingServiceInstanceListSupplier

DelegatingServiceInstanceListSupplier 是一個抽象類,繼承自 ServiceInstanceListSupplier,它通過委托給另一個 ServiceInstanceListSupplier 實例來實現其功能,同時支持選定服務實例的回調、初始化和銷毀操作。

public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, SelectedInstanceCallback, InitializingBean, DisposableBean {protected final ServiceInstanceListSupplier delegate;public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {Assert.notNull(delegate, "delegate may not be null");this.delegate = delegate;}public ServiceInstanceListSupplier getDelegate() {return delegate;}@Overridepublic String getServiceId() {return delegate.getServiceId();}@Overridepublic void selectedServiceInstance(ServiceInstance serviceInstance) {if (delegate instanceof SelectedInstanceCallback selectedInstanceCallbackDelegate) {selectedInstanceCallbackDelegate.selectedServiceInstance(serviceInstance);}}@Overridepublic void afterPropertiesSet() throws Exception {if (delegate instanceof InitializingBean) {((InitializingBean) delegate).afterPropertiesSet();}}@Overridepublic void destroy() throws Exception {if (delegate instanceof DisposableBean) {((DisposableBean) delegate).destroy();}}}

負載均衡實現

ReactorLoadBalancer

ReactorLoadBalancer 是基于 Reactor 實現的響應式負載均衡器,通過 Mono<Response<T>> 異步選擇服務實例。

public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {@SuppressWarnings("rawtypes")Mono<Response<T>> choose(Request request);default Mono<Response<T>> choose() {return choose(REQUEST);}}

ReactorServiceInstanceLoadBalancer

ReactorServiceInstanceLoadBalancer 是一個標記接口,繼承自 ReactorLoadBalancer,專門用于選擇 ServiceInstance 對象的負載均衡器。

// RandomLoadBalancer, RoundRobinLoadBalancer
public interface ReactorServiceInstanceLoadBalancer extends ReactorLoadBalancer<ServiceInstance> {}

核心代碼邏輯

推薦閱讀:[spring-cloud: @LoadBalanced & @LoadBalancerClient]-源碼分析

1. BlockingLoadBalancerInterceptor

// LoadBalancerInterceptor, RetryLoadBalancerInterceptor 
public interface BlockingLoadBalancerInterceptor extends ClientHttpRequestInterceptor {}

LoadBalancerInterceptor

public class LoadBalancerInterceptor implements BlockingLoadBalancerInterceptor {private final LoadBalancerClient loadBalancer;private final LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}// 重點!@Overridepublic ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}}

2. BlockingLoadBalancerClient

ServiceInstanceChooser

ServiceInstanceChooser 接口用于通過負載均衡器選擇與指定服務ID對應的服務實例,支持帶請求上下文的選擇。

public interface ServiceInstanceChooser {ServiceInstance choose(String serviceId);<T> ServiceInstance choose(String serviceId, Request<T> request);}

LoadBalancerClient

LoadBalancerClient 接口用于客戶端負載均衡,選擇服務實例并執行請求,同時提供將邏輯服務名重構為實際服務實例的 URI 的功能。

public interface LoadBalancerClient extends ServiceInstanceChooser {<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;URI reconstructURI(ServiceInstance instance, URI original);}
// BlockingLoadBalancerClientAutoConfiguration
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration// LoadBalancerClientFactoryprivate final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {this.loadBalancerClientFactory = loadBalancerClientFactory;}// 重點!@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {String hint = getHint(serviceId);LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, buildRequestContext(request, hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));// 通過 choose 方法來選擇一個合適的 ServiceInstanceServiceInstance serviceInstance = choose(serviceId, lbRequest);if (serviceInstance == null) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));throw new IllegalStateException("No instances available for " + serviceId);}return execute(serviceId, serviceInstance, lbRequest);}private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {if (delegate instanceof HttpRequestLoadBalancerRequest) {HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();if (request != null) {RequestData requestData = new RequestData(request);return new RequestDataContext(requestData, hint);}}return new DefaultRequestContext(delegate, hint);}// 通過生命周期鉤子函數來管理負載均衡請求的開始與結束,并處理可能的異常,確保負載均衡的執行過程有序@Overridepublic <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {if (serviceInstance == null) {throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);}DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));try {T response = request.apply(serviceInstance);Object clientResponse = getClientResponse(response);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));return response;}catch (IOException iOException) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));throw iOException;}catch (Exception exception) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));ReflectionUtils.rethrowRuntimeException(exception);}return null;}private <T> Object getClientResponse(T response) {ClientHttpResponse clientHttpResponse = null;if (response instanceof ClientHttpResponse) {clientHttpResponse = (ClientHttpResponse) response;}if (clientHttpResponse != null) {try {return new ResponseData(clientHttpResponse, null);}catch (IOException ignored) {}}return response;}private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),DefaultRequestContext.class, Object.class, ServiceInstance.class);}@Overridepublic URI reconstructURI(ServiceInstance serviceInstance, URI original) {return LoadBalancerUriTools.reconstructURI(serviceInstance, original);}@Overridepublic ServiceInstance choose(String serviceId) {return choose(serviceId, REQUEST);}// 重點!通過負載均衡器同步選擇一個服務實例并返回@Overridepublic <T> ServiceInstance choose(String serviceId, Request<T> request) {ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);if (loadBalancer == null) {return null;}Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();if (loadBalancerResponse == null) {return null;}return loadBalancerResponse.getServer();}private String getHint(String serviceId) {LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);String defaultHint = properties.getHint().getOrDefault("default", "default");String hintPropertyValue = properties.getHint().get(serviceId);return hintPropertyValue != null ? hintPropertyValue : defaultHint;}}

3. LoadBalancerRequestFactory

LoadBalancerRequestFactory 類用于創建封裝負載均衡請求的 LoadBalancerRequest 實例,支持請求轉換器和負載均衡客戶端的配置。

public class LoadBalancerRequestFactory {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,List<LoadBalancerRequestTransformer> transformers) {this.loadBalancer = loadBalancer;this.transformers = transformers;}public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {this.loadBalancer = loadBalancer;transformers = new ArrayList<>();}public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) {return new BlockingLoadBalancerRequest(loadBalancer, transformers,new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));}}
LoadBalancerRequestTransformer

LoadBalancerRequestTransformer 接口允許在負載均衡過程中根據不同的服務實例自定義轉換 HttpRequest,如修改請求頭、URL 等,同時通過 @Order 注解控制其執行順序。

@Order(LoadBalancerRequestTransformer.DEFAULT_ORDER)
public interface LoadBalancerRequestTransformer {int DEFAULT_ORDER = 0;HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}

4. BlockingLoadBalancerRequest

BlockingLoadBalancerRequest 類實現了負載均衡請求接口,負責將原始 HTTP 請求封裝為負載均衡請求,并支持應用請求轉換器和執行負載均衡操作。

class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;private final ClientHttpRequestData clientHttpRequestData;BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,ClientHttpRequestData clientHttpRequestData) {this.loadBalancer = loadBalancer;this.transformers = transformers;this.clientHttpRequestData = clientHttpRequestData;}@Overridepublic ClientHttpResponse apply(ServiceInstance instance) throws Exception {HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);if (this.transformers != null) {for (LoadBalancerRequestTransformer transformer : this.transformers) {serviceRequest = transformer.transformRequest(serviceRequest, instance);}}return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);}@Overridepublic HttpRequest getHttpRequest() {return clientHttpRequestData.request;}static class ClientHttpRequestData {private final HttpRequest request;private final byte[] body;private final ClientHttpRequestExecution execution;ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {this.request = request;this.body = body;this.execution = execution;}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92061.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92061.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92061.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Oracle 在線重定義

Oracle 在線重定義&#xff08;Online Redefinition&#xff09; 是一種功能&#xff0c;通過DBMS_REDEFINITION 包提供&#xff0c;允許DBA在不需要停止或顯著影響數據庫正常操作的情況下&#xff0c;對數據庫表進行結構化修改。可以實現的功能將表移動到其它表空間增加、修改…

Web 開發 12

1 網址里的 “搜索請求” 結構 這張圖是在教你怎么看懂 網址里的 “搜索請求” 結構&#xff0c;特別基礎但超重要&#xff0c;對你學前端幫別人做搜索功能超有用&#xff0c;用大白話拆成 3 步講&#xff1a; 1. 先看「協議&#xff08;Protocol&#xff09;」 HTTPS 就是瀏…

網絡安全 | 如何構建一個有效的企業安全響應團隊

網絡安全 | 如何構建一個有效的企業安全響應團隊 一、前言 二、團隊組建的基礎要素 2.1 人員選拔 2.2 角色定位 三、團隊應具備的核心能力 3.1 技術專長 3.2 應急處置能力 3.3 溝通協作能力 四、團隊的運作機制 4.1 威脅監測與預警流程 4.2 事件響應流程 4.3 事后復盤與改進機制…

HTTP、WebSocket、TCP、Kafka等通訊渠道對比詳解

在當今互聯的數字世界中&#xff0c;通信渠道是系統、應用程序和設備之間數據交換的支柱。從傳統的HTTP和TCP協議到專為特定場景設計的Kafka和MQTT等平臺&#xff0c;這些通信方式滿足了從實時消息傳遞到大規模數據流處理的多樣化需求。本文將深入探討主要的通信協議和平臺。一…

臭氧、顆粒物和霧霾天氣過程的大氣污染物計算 CAMx模型

隨著我國經濟快速發展&#xff0c;我國面臨著日益嚴重的大氣污染問題。大氣污染是工農業生產、生活、交通、城市化等方面人為活動的綜合結果&#xff0c;同時氣象因素是控制大氣污染的關鍵自然因素。大氣污染問題既是局部、當地的&#xff0c;也是區域的&#xff0c;甚至是全球…

數據結構(13)堆

目錄 1、堆的概念與結構 2、堆的實現 2.1 向上調整算法&#xff08;堆的插入&#xff09; 2.2 向下調整算法&#xff08;堆的刪除&#xff09; 2.3 完整代碼 3、堆的應用 3.1 堆排序 3.2 Top-K問題 1、堆的概念與結構 堆是一種特殊的二叉樹&#xff0c;根結點最大的堆稱…

C++模板知識點3『std::initializer_list初始化時逗號表達式的執行順序』

std::initializer_list初始化時逗號表達式的執行順序 在使用Qt Creator4.12.2&#xff0c;Qt5.12.9 MinGW開發的過程中發現了一個奇怪的現象&#xff0c;std::initializer_list<int>在初始化構造時的執行順序反了&#xff0c;經過一番測試發現&#xff0c;其執行順序可正…

【Unity3D】Shader圓形弧度裁剪

片元著色器&#xff1a; float3 _Center float3(0, 0, 0); float3 modelPos i.modelPos;// float angle atan2(modelPos.y - _Center.y, modelPos.x - _Center.x); // 計算角度&#xff0c;范圍-π到π float angle atan2(modelPos.y - _Center.y, modelPos.z - _Center.z)…

curl發送文件bodyParser無法獲取請求體的問題分析

問題及現象 開發過程使用curlPUT方式發送少量數據, 后端使用NodeJSexpress框架bodyParser,但測試發現無法獲取到請求體內容,現象表現為req.body 為空對象 {} 代碼如下: const bodyParser require(body-parser); router.use(/api/1, bodyParser.raw({limit: 10mb, type: */*}))…

Vue3 學習教程,從入門到精通,Vue 3 內置屬性語法知識點及案例代碼(25)

Vue 3 內置屬性語法知識點及案例代碼 Vue 3 提供了豐富的內置屬性&#xff0c;幫助開發者高效地構建用戶界面。以下將詳細介紹 Vue 3 的主要內置屬性&#xff0c;并結合詳細的案例代碼進行說明。每個案例代碼都包含詳細的注釋&#xff0c;幫助初學者更好地理解其用法。1. data …

機器學習基石:深入解析線性回歸

線性回歸是機器學習中最基礎、最核心的算法之一&#xff0c;它為我們理解更復雜的模型奠定了基礎。本文將帶你全面解析線性回歸的方方面面。1. 什么是回歸&#xff1f; 回歸分析用于預測連續型數值。它研究自變量&#xff08;特征&#xff09;與因變量&#xff08;目標&#xf…

OneCodeServer 架構深度解析:從組件設計到運行時機制

一、架構概覽與設計哲學1.1 系統定位與核心價值OneCodeServer 作為 OneCode 平臺的核心服務端組件&#xff0c;是連接前端設計器與后端業務邏輯的橋梁&#xff0c;提供了從元數據定義到應用程序執行的完整解決方案。它不僅是一個代碼生成引擎&#xff0c;更是一個全生命周期管理…

Jwts用于創建和驗證 ??JSON Web Token(JWT)?? 的開源庫詳解

Jwts用于創建和驗證 ??JSON Web Token&#xff08;JWT&#xff09;?? 的開源庫詳解在 Java 開發中&#xff0c;提到 Jwts 通常指的是 ??JJWT&#xff08;Java JWT&#xff09;庫??中的核心工具類 io.jsonwebtoken.Jwts。JJWT 是一個專門用于創建和驗證 ??JSON Web To…

如果發送的數據和接受的數據不一致時,怎么辦?

那ART4222這個板卡舉例&#xff0c;我之間輸入一個原始數據“6C532A14”&#xff0c;但是在選擇偶校驗時&#xff0c;接收的是“6C532B14”&#xff0c;我發送的碼率&#xff08;運行速度&#xff09;是100000&#xff0c;但接受的不穩定&#xff0c;比如&#xff1b;“100100.…

ISCC認證:可持續生產的新標桿。ISCC如何更快認證

在全球可持續發展浪潮中&#xff0c;ISCC&#xff08;國際可持續與碳認證&#xff09;體系已成為企業綠色轉型的重要工具。這一國際公認的認證系統覆蓋農業、林業、廢棄物處理等多個領域&#xff0c;通過嚴格的可持續性標準、供應鏈可追溯性要求和碳排放計算規范&#xff0c;建…

想對學習自動化測試的一些建議

Python接口自動化測試零基礎入門到精通&#xff08;2025最新版&#xff09;接觸了不少同行&#xff0c;由于他們之前一直做手工測試&#xff0c;現在很迫切希望做自動化測試&#xff0c;其中不乏工作5年以上的人。 本人從事軟件自動化測試已經近5年&#xff0c;從server端到web…

電子電氣架構 ---智能電動汽車嵌入式軟件開發過程中的block點

我是穿拖鞋的漢子,魔都中堅持長期主義的汽車電子工程師。 老規矩,分享一段喜歡的文字,避免自己成為高知識低文化的工程師: 做到欲望極簡,了解自己的真實欲望,不受外在潮流的影響,不盲從,不跟風。把自己的精力全部用在自己。一是去掉多余,凡事找規律,基礎是誠信;二是…

createAsyncThunk

下面&#xff0c;我們來系統的梳理關于 Redux Toolkit 異步操作&#xff1a;createAsyncThunk 的基本知識點&#xff1a;一、createAsyncThunk 概述 1.1 為什么需要 createAsyncThunk 在 Redux 中處理異步操作&#xff08;如 API 調用&#xff09;時&#xff0c;傳統方法需要手…

STM32F103C8T6 BC20模塊NBIOT GPS北斗模塊采集溫濕度和經緯度發送到EMQX

云平臺配置 訪問下載頁面&#xff1a;免費試用 EMQX Cloud 或 EMQX Enterprise | 下載 EMQX&#xff0c;根據需求選擇對應版本下載。將下載的壓縮包上傳至服務器&#xff08;推薦存放于C盤根目錄&#xff0c;便于后續操作&#xff09;&#xff0c;并解壓至指定路徑&#xff08…

YOLO11漲點優化:自研檢測頭, 新創新點(SC_C_11Detect)檢測頭結構創新,實現有效漲點

目標檢測領域迎來重大突破!本文揭秘原創SC_C_11Detect檢測頭,通過空間-通道協同優化與11層深度結構,在YOLO系列上實現mAP最高提升5.7%,小目標檢測精度暴漲9.3%!創新性結構設計+即插即用特性,為工業檢測、自動駕駛等場景帶來革命性提升! 一、傳統檢測頭的三大痛點 在目…