上篇文章:
Spring Cloud系列—Eureka服務注冊/發現https://blog.csdn.net/sniper_fandc/article/details/149937589?fromshare=blogdetail&sharetype=blogdetail&sharerId=149937589&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
目錄
1 如何在IDEA中啟動一個服務的多個實例
2 負載均衡
3 Spring Cloud LoadBalancer實現負載均衡
3.1 添加注解@LoadBalanced
3.2 修改遠程調用的ip:端口號為服務名稱
4 Spring Cloud LoadBalancer負載均衡策略
5 Spring Cloud LoadBalancer負載均衡原理
????????在Eureka篇章中,使用了如下代碼獲取服務的實例:
List<ServiceInstance> productService = discoveryClient.getInstances("product-service");EurekaServiceInstance serviceInstance = (EurekaServiceInstance) productService.get(0);
????????由于只有一個服務實例,因此并不會有問題,但是如果一個服務有多個實例,就會出現問題。
1 如何在IDEA中啟動一個服務的多個實例
????????點擊頁面下方的Services:
????????點擊Add service,選擇正在運行的SpringBoot服務:
????????右鍵要復制實例的服務,點擊復制:
????????在打開的界面點擊Modify options,選擇Add VM options:
????????輸入-Dserver.port=端口號,這里的端口號注意不要重復,之后選中創建的實例右鍵運行即可:
2 負載均衡
????????創建多個實例后,多次訪問接口就會出現始終訪問端口號為同一個的實例,這是因為服務發現時Eureka給我們提供隨機的服務列表,但是每次都只獲取其中下標為0的服務實例,這就會導致某個實例負載過大,因此需要負載均衡。
????????如果不借助組件,可以用hash取余的方式來輪詢訪問每個服務實例:
private static AtomicInteger atomicInteger = new AtomicInteger(1);private static List<ServiceInstance> instances;@PostConstructpublic void init(){//根據應用名稱獲取服務列表instances = discoveryClient.getInstances("product-service");}public OrderInfo selectOrderById(Integer orderId) {OrderInfo orderInfo = orderMapper.selectOrderById(orderId);//String url = "http://127.0.0.1:8081/product/"+ orderInfo.getProductId();//服務可能有多個, 輪詢獲取實例int index = atomicInteger.getAndIncrement() % instances.size();ServiceInstance instance =instances.get(index);log.info(instance.getInstanceId());//拼接urlString url = instance.getUri()+"/product/"+ orderInfo.getProductId();ProductInfo productInfo = restTemplate.getForObject(url, ProductInfo.class);orderInfo.setProductInfo(productInfo);return orderInfo;}
????????這里把discoveryClient.getInstances()放到了方法外面,類加載時只獲取一次,防止每次獲取的服務列表順序都不一樣,同時節省網絡資源。由于多線程環境下,為避免線程安全問題,使用原子類來計算hash取余。這種方式就是一種負載均衡,是一種客戶端負載均衡。
????????但是上述代碼有一些不足之處:服務一旦啟動,服務發現一次,其余時間不再服務發現,因此對于服務的注冊和下線是無感知的。于是需要一些專業實現負載均衡的組件,分為客戶端負載均衡和服務端負載均衡:
????????服務端負載均衡:在服務端進行負載均衡算法分配。比如使用Nginx作為負載均衡器,請求先進入Nginx再由Nginx進行負載均衡算法選擇服務來進行訪問。
????????客戶端負載均衡:由客戶端服務發現后,根據負載均衡算法選擇一個服務,并向該服務發送請求。比如Spring Cloud LoadBalancer(Spring Cloud維護)。
3 Spring Cloud LoadBalancer實現負載均衡
3.1 添加注解@LoadBalanced
????????在負責遠程調用的對象restTemplate上添加@LoadBalanced注解,表示客戶端調用時開啟負載均衡(即客戶端負載均衡):
@Configurationpublic class BeanConfig {@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}
3.2 修改遠程調用的ip:端口號為服務名稱
@Servicepublic class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate RestTemplate restTemplate;public OrderInfo selectOrderById(Integer orderId) {OrderInfo orderInfo = orderMapper.selectOrderById(orderId);//負載均衡String url = "http://product-service/product/" + orderInfo.getProductId();ProductInfo productInfo = restTemplate.getForObject(url, ProductInfo.class);orderInfo.setProductInfo(productInfo);return orderInfo;}}
????????多次發送請求,發現請求被負載均衡到了各個服務上:
4 Spring Cloud LoadBalancer負載均衡策略
????????LoadBalancer默認采用輪詢方式進行負載均衡,但是也支持隨機選擇策略。要使用隨機選擇策略,需要自定義負載均衡策略器:
public class LoadBalancerConfig {@BeanReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);System.out.println("==============" + name);return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name,ServiceInstanceListSupplier.class), name);}}
????????注意:該策略器不能加@Configuration注解,并且要在Spring組件掃描范圍中(即默認和啟動類同一級目錄下)。
????????接著,在RestTemplate配置類上面添加@LoadBalancerClient注解(一個服務提供者使用)或@LoadBalancerClients注解(多個服務提供者使用):
@LoadBalancerClient(name = "product-service", configuration = LoadBalancerConfig.class)@Configurationpublic class BeanConfig {@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}
????????@LoadBalancerClient的name表示服務名稱,configuration則是定義的負載均衡策略器。
5 Spring Cloud LoadBalancer負載均衡原理
????????LoadBalancer最關鍵的源碼是LoadBalancerInterceptor類,該類定義攔截器,將所有請求進行攔截并解析處理。具體的調用流程圖如下:
????????具體是LoadBalancerInterceptor類的intercept()發揮作用:
????public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();//解析URL是否合法(.-等連接方式)Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);//execute()方法根據服務名稱來對請求進行增強(負載均衡)return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));}
????????execute()的實現是BlockingLoadBalancerClient類,具體作用就是根據服務實例名稱(serviceId)來服務發現,并選擇合適的負載均衡策略來選擇對應的服務實例:
????public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {String hint = this.getHint(serviceId);LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter(request, this.buildRequestContext(request, hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);supportedLifecycleProcessors.forEach((lifecycle) -> {lifecycle.onStart(lbRequest);});//choose()是核心方法,就是獲取服務實例并根據負載均衡策略來返回具體請求的實例。ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);if (serviceInstance == null) {supportedLifecycleProcessors.forEach((lifecycle) -> {lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse()));});throw new IllegalStateException("No instances available for " + serviceId);} else {return this.execute(serviceId, serviceInstance, lbRequest);}}
????????這個choose()方法也是BlockingLoadBalancerClient類實現的,內部調用了ReactiveLoadBalancer接口的choose()方法來進行負載均衡策略的選擇:
????public <T> ServiceInstance choose(String serviceId, Request<T> request) {//獲取服務實例列表loadBalancer,也就是負載均衡器ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);if (loadBalancer == null) {return null;} else {//根據負載均衡算法選擇合適的實例Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose(request)).block();return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();}}
????????loadBalancer.choose()的choose()方法是ReactiveLoadBalancer接口的choose()方法,該方法的實現有RandomLoadBalancer類實現的方法和RoundRobinLoadBalancer類實現的方法,這兩個類實現的choose()方法分別對應隨機選擇策略和輪詢策略。
????????在RandomLoadBalancer類中,choose()方法調用processInstanceResponse()方法,processInstanceResponse()調用getInstanceResponse()方法,最終在getInstanceResponse()方法可以看到通過隨機數來選擇隨機的服務實例進行訪問,即隨機選擇策略:
????public Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map((serviceInstances) -> {return this.processInstanceResponse(supplier, serviceInstances);});}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + this.serviceId);}return new EmptyResponse();} else {//此處就是隨機選擇策略最關鍵的幾行代碼int index = ThreadLocalRandom.current().nextInt(instances.size());ServiceInstance instance = (ServiceInstance)instances.get(index);return new DefaultResponse(instance);}}
????????RoundRobinLoadBalancer類的choose方法也采用了一樣的方法調用鏈,最終在getInstanceResponse()方法中,實現了本文的“負載均衡”部分的hash取余來輪詢選擇服務實例的方式:
????public Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map((serviceInstances) -> {return this.processInstanceResponse(supplier, serviceInstances);});}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + this.serviceId);}return new EmptyResponse();} else if (instances.size() == 1) {return new DefaultResponse((ServiceInstance)instances.get(0));} else {//通過hash取余的方式來輪詢選擇服務實例int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;ServiceInstance instance = (ServiceInstance)instances.get(pos % instances.size());return new DefaultResponse(instance);}}
下篇文章:
Spring Cloud系列—Nacos服務注冊/發現https://blog.csdn.net/sniper_fandc/article/details/149938785?fromshare=blogdetail&sharetype=blogdetail&sharerId=149938785&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link