一、整體架構設計
1.核心組件
- 負載均衡器:負責選擇可用的服務節點
- 健康檢查器:定期檢測服務節點的可用性
- 服務節點管理:維護所有可用節點的狀態信息
2.負載均衡策略
- 輪詢(Round Robin)
- 隨機(Random)
- 加權輪詢(Weighted Round Robin)
- 最少連接(Least Connection)
- 一致性哈希(Consistent Hash)
二、完整代碼
1.定義節點服務類
public class ServiceNode {private String ip;private int port;private boolean active;private int weight;private int currentConnections;private long lastActiveTime;// 熔斷相關屬性private int failureCount;private long circuitOpenedTime;private static final int FAILURE_THRESHOLD = 3;private static final long CIRCUIT_BREAKER_TIMEOUT = 30000; // 30秒// 判斷節點是否可用(考慮熔斷狀態)public boolean isAvailable() {if (failureCount >= FAILURE_THRESHOLD) {// 檢查是否應該嘗試恢復if (System.currentTimeMillis() - circuitOpenedTime > CIRCUIT_BREAKER_TIMEOUT) {return true; // 進入半開狀態}return false; // 熔斷中}return active; // 正常狀態}public void recordFailure() {failureCount++;if (failureCount >= FAILURE_THRESHOLD) {circuitOpenedTime = System.currentTimeMillis();}}public void recordSuccess() {failureCount = 0;circuitOpenedTime = 0;}// 其他getter/setter方法...
}
2.負載均衡接口
public interface LoadBalancer {ServiceNode selectNode(List<ServiceNode> nodes);default void onRequestSuccess(ServiceNode node) {node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}default void onRequestFail(ServiceNode node) {node.setActive(false);}
}
3.實現不同的負載均衡策略
輪詢策略,考慮熔斷狀態
public class RoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> availableNodes = nodes.stream().filter(ServiceNode::isAvailable).collect(Collectors.toList());if (availableNodes.isEmpty()) {// 嘗試從不可用節點中選擇已經過了熔斷超時的節點List<ServiceNode> halfOpenNodes = nodes.stream().filter(n -> !n.isHealthy() && n.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD &&System.currentTimeMillis() - n.getCircuitOpenedTime() > ServiceNode.CIRCUIT_BREAKER_TIMEOUT).collect(Collectors.toList());if (!halfOpenNodes.isEmpty()) {// 選擇其中一個進入半開狀態的節點int index = counter.getAndIncrement() % halfOpenNodes.size();ServiceNode selected = halfOpenNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}throw new RuntimeException("No available nodes");}int index = counter.getAndIncrement() % availableNodes.size();ServiceNode selected = availableNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}@Overridepublic void onRequestSuccess(ServiceNode node) {node.recordSuccess();node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}@Overridepublic void onRequestFail(ServiceNode node) {node.recordFailure();node.setActive(false);node.setCurrentConnections(node.getCurrentConnections() - 1);}
}
加權輪詢策略
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> activeNodes = nodes.stream().filter(ServiceNode::isHealthy).collect(Collectors.toList());if (activeNodes.isEmpty()) {throw new RuntimeException("No available nodes");}int totalWeight = activeNodes.stream().mapToInt(ServiceNode::getWeight).sum();int current = counter.getAndIncrement() % totalWeight;for (ServiceNode node : activeNodes) {if (current < node.getWeight()) {node.setCurrentConnections(node.getCurrentConnections() + 1);return node;}current -= node.getWeight();}// fallback to simple round robinint index = counter.get() % activeNodes.size();ServiceNode selected = activeNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}
}
4.健康檢查器
public class HealthChecker {private final List<ServiceNode> nodes;private final ScheduledExecutorService scheduler;private final HttpClient httpClient;public HealthChecker(List<ServiceNode> nodes) {this.nodes = nodes;this.scheduler = Executors.newSingleThreadScheduledExecutor();this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(2)).build();}public void start() {scheduler.scheduleAtFixedRate(this::checkAllNodes, 0, 10, TimeUnit.SECONDS);}public void stop() {scheduler.shutdown();}private void checkAllNodes() {nodes.parallelStream().forEach(this::checkNode);}private void checkNode(ServiceNode node) {// 如果節點處于熔斷狀態但未超時,不檢查if (node.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD && System.currentTimeMillis() - node.getCircuitOpenedTime() < ServiceNode.CIRCUIT_BREAKER_TIMEOUT) {return;}try {HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + "/health")).timeout(Duration.ofSeconds(2)).GET().build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {node.setActive(true);node.recordSuccess(); // 重置熔斷狀態node.setLastActiveTime(System.currentTimeMillis());} else {node.recordFailure();node.setActive(false);}} catch (Exception e) {node.recordFailure();node.setActive(false);}}
}
5.服務調用封裝
public class ExternalServiceInvoker {private final LoadBalancer loadBalancer;private final List<ServiceNode> nodes;private final HttpClient httpClient;public ExternalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {this.loadBalancer = loadBalancer;this.nodes = nodes;this.httpClient = HttpClient.newHttpClient();}public String invokeService(String path, String requestBody) throws Exception {ServiceNode node = null;try {node = loadBalancer.selectNode(nodes);HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + path)).header("Content-Type", "application/json").timeout(Duration.ofSeconds(5)).POST(HttpRequest.BodyPublishers.ofString(requestBody)).build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() >= 200 && response.statusCode() < 300) {loadBalancer.onRequestSuccess(node);return response.body();} else {loadBalancer.onRequestFail(node);throw new RuntimeException("Service call failed with status: " + response.statusCode());}} catch (Exception e) {if (node != null) {loadBalancer.onRequestFail(node);}throw e;}}
}
三、在Spring Boot中的使用
1.配置為Spring Bean
@Configuration
public class LoadBalancerConfiguration {@Beanpublic List<ServiceNode> serviceNodes() {return Arrays.asList(new ServiceNode("192.168.1.101", 8080, 1),new ServiceNode("192.168.1.102", 8080, 1),new ServiceNode("192.168.1.103", 8080, 1),new ServiceNode("192.168.1.104", 8080, 1));}@Beanpublic LoadBalancer loadBalancer() {return new WeightedRoundRobinLoadBalancer();}@Beanpublic HealthChecker healthChecker(List<ServiceNode> nodes) {HealthChecker checker = new HealthChecker(nodes);checker.start();return checker;}@Beanpublic ExternalServiceInvoker externalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {return new ExternalServiceInvoker(loadBalancer, nodes);}
}
2.在Controller中使用
@RestController
@RequestMapping("/api")
public class MyController {@Autowiredprivate ExternalServiceInvoker serviceInvoker;@PostMapping("/call-external")public ResponseEntity<String> callExternalService(@RequestBody String request) {try {String response = serviceInvoker.invokeService("/external-api", request);return ResponseEntity.ok(response);} catch (Exception e) {return ResponseEntity.status(500).body("Service call failed: " + e.getMessage());}}
}
四、總結與最佳實踐
-
選擇合適的負載均衡策略:
- 對于性能相近的節點,使用輪詢或隨機
- 對于性能差異大的節點,使用加權輪詢
- 對于長連接服務,考慮最少連接算法
-
健康檢查配置建議:
- 檢查間隔:5-30秒
- 超時時間:1-3秒
- 檢查端點:/health或/actuator/health
-
生產環境注意事項:
- 添加日志記錄節點選擇過程和健康狀態變化
- 實現優雅降級,當所有節點不可用時返回緩存數據或友好錯誤
- 考慮使用分布式配置中心動態調整節點列表
-
性能優化:
- 使用連接池減少連接建立開銷
- 實現請求重試機制(帶退避策略)
- 添加熔斷器模式防止級聯故障
-
結合熔斷機制
- 完整的熔斷機制:基于失敗計數和超時的自動熔斷/恢復
- 三種狀態:
- 關閉(CLOSED):正常狀態
- 打開(OPEN):熔斷狀態,拒絕請求
- 半開(HALF-OPEN):嘗試恢復狀態
- 與負載均衡深度集成:
- 節點選擇時自動跳過熔斷中的節點
- 健康檢查會重置成功節點的熔斷狀態
- 支持半開狀態下的試探性請求
- 可視化監控:通過REST端點查看節點和熔斷狀態