版本
- Spring Booot 版本 3.2.4
- Spring Cloud 版本 2023.0.1
- Spring Cloud Alibaba 版本 2023.0.1.2
依賴
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
配置文件
- bootstrap.yml配置
spring:application:name: ${app_name:${APP_NAME:demo}} cloud:nacos:username: ${config_username:${CONFIG_USERNAME:nacos}}password: ${config_password:${CONFIG_PASSWORD:nacos}}discovery:server-addr: ${config_server_uri:${CONFIG_SERVER_URI:10.244.36.167:8848}}namespace: ${config_profiles_active:${CONFIG_PROFILES_ACTIVE:dev}}group: ${config_group:${CONFIG_GROUP:demo}}service: ${spring.application.name}config:server-addr: ${config_server_uri:${CONFIG_SERVER_URI:10.244.36.167:8848}}namespace: ${config_profiles_active:${CONFIG_PROFILES_ACTIVE:dev}}group: ${config_group:${CONFIG_GROUP:demo}}file-extension: ymlrefresh-enabled: true # 開啟配置自動刷新,默認開啟,此處顯示聲明
- ?application-dev.yml配置
thread-pool:core-size: 2 # 核心線程數max-size: 2 # 最大線程數queue-capacity: 100 # 隊列容量keep-alive-time: 60 # 空閑線程存活時間(秒)
配置類
- NacosThreadPoolProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos 線程池配置類* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "thread-pool")
public class NacosThreadPoolProperties {/*** 核心線程數*/private int coreSize;/*** 最大線程數*/private int maxSize;/*** 隊列容量*/private int queueCapacity;/*** 空閑線程活躍時間*/private int keepAliveTime;
}
- NacosDiscoveryProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos discovery 配置類* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {/*** nacos服務地址*/private String serverAddr;/*** 命名空間*/private String namespace;/*** 分組*/private String group;
}
- NacosTokenProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos 線程池配置類* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.cloud.nacos")
public class NacosTokenProperties {/*** 用戶名*/private String username;/*** 密碼*/private String password;
}
常量類
package com.xxx.xxx.amedia.media.constant;/*** nacos 常量類* @author xxx* @date 2025-5-26 10:07:23*/
public final class NacosConstants {private NacosConstants() {}/*** nacos配置 username*/public static final String USERNAME = "username";/*** nacos配置 password*/public static final String PASSWORD = "password";/*** nacos配置 server-addr*/public static final String SERVER_ADDR = "serverAddr";/*** nacos配置 namespace*/public static final String NAMESPACE = "namespace";/*** nacos配置 group*/public static final String GROUP = "group";}
nacos 核心類 NamingService 配置
package com.xxx.amedia.media.config;import com.xxx.amedia.media.constant.NacosConstants;
import com.xxx.amedia.media.properties.NacosDiscoveryProperties;
import com.xxx.amedia.media.properties.NacosTokenProperties;import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** nacos 核心類 NamingService 配置* @author xxx* @date 2025-5-23 15:18:24*/
@Configuration
public class NamingServiceConfig {@Autowiredprivate NacosTokenProperties nacosTokenProperties;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;@Beanpublic NamingService namingService() throws NacosException {Properties properties = new Properties();// nacos 服務器地址properties.put(NacosConstants.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());// 命名空間properties.put(NacosConstants.NAMESPACE, nacosDiscoveryProperties.getNamespace());properties.put(NacosConstants.GROUP, nacosDiscoveryProperties.getGroup());properties.put(NacosConstants.USERNAME, nacosTokenProperties.getUsername());properties.put(NacosConstants.PASSWORD, nacosTokenProperties.getPassword());return NacosFactory.createNamingService(properties);}
}
?nacos實例 線程池管理類
package com.xxx.amedia.media.manager;import com.xxx.amedia.media.properties.NacosDiscoveryProperties;
import com.xxx.amedia.media.properties.NacosThreadPoolProperties;
import com.xxx.amedia.rpc.config.MediaRpcProperties;import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** nacos實例 線程池管理類* @author xxx* @date 2025-5-23 10:30:21*/
@Slf4j
@Component
public class NacosThreadPoolManager {@Autowiredprivate MediaRpcProperties mediaRpcProperties;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;@Autowiredprivate NacosThreadPoolProperties nacosThreadPoolConfig;/*** 訂閱服務上線和下線事件*/@Autowiredprivate NamingService namingService;/*** 存儲 IP 和線程池的映射關系*/private final Map<String, ThreadPoolExecutor> ipThreadPoolMap = new ConcurrentHashMap<>();@PostConstructpublic void init() {// 注冊監聽器registerServiceListener(mediaRpcProperties.getServiceName());}/*** 注冊服務監聽器,監聽指定服務的實例變化** @param serviceName 服務名稱*/private void registerServiceListener(String serviceName) {try {namingService.subscribe(serviceName, nacosDiscoveryProperties.getGroup(), event -> {if (event instanceof NamingEvent namingEvent) {String serviceNameFromEvent = namingEvent.getServiceName();if (StringUtils.equals(serviceNameFromEvent, serviceName)) {handleInstanceChange(namingEvent);}}});} catch (NacosException ex) {// 處理異常,例如記錄日志log.error("Register service listener occur exception.", ex);}}/*** 處理實例變化事件* @param event NamingEvent*/private void handleInstanceChange(NamingEvent event) {// 獲取當前所有實例的 IPSet<String> currentIps = event.getInstances().stream().map(Instance::getIp).collect(Collectors.toSet());// 處理實例變化事件handleInstanceChange(currentIps);}/*** 處理實例變化事件* @param currentIps 當前所有實例的 IP*/private void handleInstanceChange(Set<String> currentIps) {// 舊實例IP集合Set<String> previousIps = new HashSet<>(ipThreadPoolMap.keySet());// 新增的 IPSet<String> newIps = new HashSet<>(currentIps);newIps.removeAll(previousIps);// 移除的 IPSet<String> removedIps = new HashSet<>(previousIps);removedIps.removeAll(currentIps);// 對新增的 IP 創建線程池for (String ip : newIps) {createThreadPoolForIp(ip);}// 對移除的 IP 銷毀線程池for (String ip : removedIps) {destroyThreadPoolForIp(ip);}}/*** 創建指定 IP 的線程池* @param ip 實例 IP*/public ThreadPoolExecutor createThreadPoolForIp(String ip) {// ThreadFactoryBuilder 設置線程名稱ThreadFactory customThreadFactory = new ThreadFactoryBuilder().setNamePrefix("nacos-pool-" + ip + "-thread-%d").setDaemon(false).setPriority(Thread.NORM_PRIORITY).build();// 創建線程池,超出隊列長度由調用者執行ThreadPoolExecutor executor = new ThreadPoolExecutor(nacosThreadPoolConfig.getCoreSize(),nacosThreadPoolConfig.getMaxSize(),nacosThreadPoolConfig.getKeepAliveTime(),TimeUnit.SECONDS,new LinkedBlockingQueue<>(nacosThreadPoolConfig.getQueueCapacity()),customThreadFactory,new ThreadPoolExecutor.CallerRunsPolicy());log.info("Created thread pool for IP: {}", ip);ipThreadPoolMap.put(ip, executor);return executor;}/*** 銷毀指定 IP 的線程池* @param ip 實例 IP*/private void destroyThreadPoolForIp(String ip) {ThreadPoolExecutor executor = ipThreadPoolMap.remove(ip);if (executor == null) {return;}executor.shutdown(); // 關閉線程池,不再接受新任務try {if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {executor.shutdownNow(); // 強制關閉}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}log.info("Destroyed thread pool for IP: {}", ip);}/*** 獲取指定 IP 的線程池* @param ip 實例 IP* @return 對應的線程池,如果不存在則返回 null*/public ThreadPoolExecutor getThreadPoolForIp(String ip) {ThreadPoolExecutor executor = ipThreadPoolMap.get(ip);// 獲取不到線程池手動創建if (executor == null) {executor = createThreadPoolForIp(ip);}return executor;}/*** 應用關閉時銷毀所有線程池*/@PreDestroypublic void shutdownAllThreadPools() {for (Map.Entry<String, ThreadPoolExecutor> entry : ipThreadPoolMap.entrySet()) {String ip = entry.getKey();ThreadPoolExecutor executor = entry.getValue();executor.shutdown();try {if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}ipThreadPoolMap.clear();log.info("succeed shutdown all thread pools.");}/*** 每分鐘監控一次線程池的狀態*/@Scheduled(fixedRate = 60000)public void monitorThreadPool() {for (Map.Entry<String, ThreadPoolExecutor> entry : ipThreadPoolMap.entrySet()) {String ip = entry.getKey();ThreadPoolExecutor executor = entry.getValue();log.info("Monitor thread pool. ip: {}, Active Threads: {}, Queue Size: {}",ip,executor.getActiveCount(),executor.getQueue().size());}}
}
執行線程任務 管理類?
package com.xxx.amedia.media.manager;import com.xxx.amedia.media.bo.FailureTaskBO;
import com.xxx.amedia.rpc.enums.RpcTypeEnum;
import com.xxx.amedia.rpc.model.RpcReloadResponse;
import com.xxx.amedia.rpc.proxy.RpcReloadProxy;
import com.xxx.amedia.system.api.NacosService;import cn.hutool.core.lang.Pair;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;/*** 執行線程任務 管理類* @author xxx* @date 2025-5-23 11:01:07*/
@Slf4j
@Service
public class RpcReloadManager {@Autowiredprivate NacosService nacosService;@Autowiredprivate RpcReloadProxy rpcReloadProxy;@Autowiredprivate NacosThreadPoolManager threadPoolManager;/*** 異步執行任務 所有節點 無參* @param rpcTypeEnum rpc類型枚舉* @return 失敗節點列表*/public List<FailureTaskBO> execute(RpcTypeEnum rpcTypeEnum) {return execute(rpcTypeEnum, Collections.emptyList());}/*** 異步執行任務 所有節點 無參* @param rpcTypeEnum rpc類型枚舉* @param params rpc接口參數* @return 失敗節點列表*/public List<FailureTaskBO> execute(RpcTypeEnum rpcTypeEnum, Object params) {List<String> ips = nacosService.getInstanceIps();List<FailureTaskBO> failureList = new ArrayList<>();if (CollectionUtils.isEmpty(ips)) {return failureList;}CountDownLatch countDownLatch = new CountDownLatch(ips.size());ips.forEach(execNodeIp -> {try {execute(execNodeIp, rpcTypeEnum, params, failureList);} catch (Exception ex) {FailureTaskBO failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc任務執行失敗").build();failureList.add(failureTaskBO);log.error("execute task occur exception: ip is {}, rpcTypeEnum is {}, msg is {}.", execNodeIp, rpcTypeEnum, ex.getMessage(), ex);} finally {countDownLatch.countDown();}});try {// 主線程等待所有任務完成countDownLatch.await();} catch (InterruptedException e) {// 恢復中斷狀態interrupt();}return failureList;}/*** 異步執行任務 指定節點列表* @param rpcTypeEnum rpc類型枚舉* @param ips 指定節點列表* @return 失敗節點列表*/public List<FailureTaskBO> executeForIps(RpcTypeEnum rpcTypeEnum, List<String> ips) {return executeForIps(rpcTypeEnum, ips, Collections.emptyList());}/*** 異步執行任務 指定節點列表* @param rpcTypeEnum rpc類型枚舉* @param ips 指定節點列表* @param params rpc接口參數* @return 失敗節點列表*/public List<FailureTaskBO> executeForIps( RpcTypeEnum rpcTypeEnum, List<String> ips, Object params) {List<FailureTaskBO> failureList = new ArrayList<>();if (CollectionUtils.isEmpty(ips)) {return failureList;}CountDownLatch countDownLatch = new CountDownLatch(ips.size());ips.forEach(execNodeIp -> {try {execute(execNodeIp, rpcTypeEnum, params, failureList);} catch (Exception ex) {FailureTaskBO failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc reload error").build();failureList.add(failureTaskBO);log.error("execute task occur exception: ip is {}, rpcTypeEnum is {}, msg is {}.", execNodeIp, rpcTypeEnum, ex.getMessage(), ex);} finally {countDownLatch.countDown();}});try {// 主線程等待所有任務完成countDownLatch.await();} catch (InterruptedException e) {// 恢復中斷狀態interrupt();}return failureList;}/*** 恢復中斷狀態*/private void interrupt() {// 恢復中斷狀態Thread.currentThread().interrupt();log.info("Thread {} is interrupted.", Thread.currentThread().getName());}/*** 異步線程池執行rpc任務* @param execNodeIp 執行節點IP* @param rpcTypeEnum rpc類型枚舉* @param failureList 失敗節點信息* @throws ExecutionException 執行異常* @throws InterruptedException 中斷異常*/private void execute(String execNodeIp, RpcTypeEnum rpcTypeEnum, Object params, List<FailureTaskBO> failureList)throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = threadPoolManager.getThreadPoolForIp(execNodeIp);FutureTask<Pair<Boolean, RpcReloadResponse>> futureTask = new FutureTask<>(() -> {return rpcReloadProxy.reload(execNodeIp, rpcTypeEnum, params);});executor.submit(futureTask);Pair<Boolean, RpcReloadResponse> pair = futureTask.get();if (!pair.getKey()) {FailureTaskBO failureTaskBO = bulidFailureTaskBO(pair, execNodeIp);failureList.add(failureTaskBO);}}/*** 構建失敗信息* @param pair rpc響應信息* @param execNodeIp 執行節點IP* @return FailureTaskBO*/private FailureTaskBO bulidFailureTaskBO(Pair<Boolean, RpcReloadResponse> pair, String execNodeIp) {RpcReloadResponse res = pair.getValue();FailureTaskBO failureTaskBO;if (res == null) {failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc reload error").build();} else {failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(res.getCode()).msg(res.getMsg()).build();}return failureTaskBO;}
}
nacos服務實例service類
package com.xxx.amedia.system.api;import com.xxx.amedia.system.converter.ServiceInstanceConverter;
import com.xxx.amedia.system.vo.ServiceInstanceVO;
import com.xxx.amedia.tool.common.constants.CommonConstants;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;/*** nacos服務實例service類* @author xxx* @date 2025-5-17 11:38:22*/
@Service
public class NacosService{@Value("${media.rpc.service-name}")private String serviceName;@Autowiredprivate DiscoveryClient discoveryClient;@Autowiredprivate ServiceInstanceConverter serviceInstanceConverter;@Overridepublic List<ServiceInstanceVO> list() {List<ServiceInstance> serviceInstances = getInstances();return serviceInstanceConverter.toServiceInstanceVOList(serviceInstances);}/*** 從nacos獲取指定服務名的實例IP集合* @return 實例IP集合*/@Overridepublic List<String> getInstanceIps() {List<ServiceInstance> instances = getInstances();if (CollectionUtils.isEmpty(instances)) {return Collections.emptyList();}List<String> instanceIps = new ArrayList<>(instances.size());instances.forEach(instance -> {instanceIps.add(instance.getHost());});return instanceIps;}/*** 檢查icc-amedia-admin服務中是否存在指定 IP 的節點* @param instanceIps 獲取 icc-amedia-rpc 服務的所有實例Ip* @param targetIp 目標 IP(如 10.199.1.1)* @return true 存在 / false 不存在*/@Overridepublic boolean checkNodeExists(List<String> instanceIps, String targetIp) {// 遍歷實例,檢查 IPfor (String instanceIp : instanceIps) {if (StringUtils.equals(targetIp, instanceIp)) {return true;}}return false;}/*** 從nacos獲取指定服務名的實例集合* @return 實例集合*/private List<ServiceInstance> getInstances() {List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);// 按host升序排列serviceInstances.sort(Comparator.comparing(ServiceInstance::getHost));return serviceInstances;}}