在 Kubernetes 為主流注冊發現的今天,給出如何在 Spring Boot 中基于 ZooKeeper 實現服務注冊/發現、分布式鎖、配置中心以及集群協調的完整代碼與最佳實踐。所有示例均可直接復制運行。
1. ZooKeeper 架構與核心原理
1.1 角色
- Leader:處理寫請求,廣播事務(ZAB 協議)。
- Follower / Observer:處理讀請求,Follower 參與選舉,Observer 僅擴展讀能力。
1.2 一致性協議:ZAB(ZooKeeper Atomic Broadcast)
- 所有寫請求統一由 Leader 生成全局遞增的
zxid
。 - 兩階段提交(Proposal → ACK → Commit)。
- 崩潰恢復階段:根據
zxid
選舉新 Leader,保證已 Commit 的事務不丟失。
1.3 數據模型
/
├── services
│ ├── user-service
│ │ ├── 192.168.1.10#8080 (EPHEMERAL_SEQUENTIAL)
│ │ └── 192.168.1.11#8080
│ └── order-service
├── configs
│ └── application.yml
└── locks├── pay_lock_0000000001 (EPHEMERAL_SEQUENTIAL)└── pay_lock_0000000002
- EPHEMERAL:會話斷則節點自動刪除,天然適合心跳/服務實例。
- SEQUENTIAL:節點名后綴自增,用于公平鎖、隊列。
2. Spring Boot 集成 ZooKeeper
場景:K8s 已有 Service 發現,但團隊需要異構語言互通、強一致配置、分布式鎖,于是引入 ZooKeeper。
2.1 依賴
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.5.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.5.0</version>
</dependency>
2.2 自動配置(Spring Boot 3.x)
@Configuration
@EnableConfigurationProperties(ZkProps.class)
public class ZkConfig {@Bean(initMethod = "start", destroyMethod = "close")public CuratorFramework curator(ZkProps p) {return CuratorFrameworkFactory.builder().connectString(p.getUrl()).sessionTimeoutMs(30_000).connectionTimeoutMs(10_000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();}@Beanpublic ServiceDiscovery<InstanceDetails> discovery(CuratorFramework client) throws Exception {ServiceDiscovery<InstanceDetails> sd = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath("/services").serializer(new JsonInstanceSerializer<>(InstanceDetails.class)).build();sd.start();return sd;}
}
2.3 服務注冊(應用啟動時自動注冊)
@Component
@RequiredArgsConstructor
public class ZkRegistrar implements ApplicationRunner {private final ServiceDiscovery<InstanceDetails> discovery;private final ZkProps props;@Overridepublic void run(ApplicationArguments args) throws Exception {InstanceDetails payload = new InstanceDetails(props.getProfile());ServiceInstance<InstanceDetails> instance = ServiceInstance.<InstanceDetails>builder().name(props.getAppName()).id(props.getPodIp() + ":" + props.getPort()).address(props.getPodIp()).port(props.getPort()).payload(payload).build();discovery.registerService(instance);}
}
2.4 服務發現(負載均衡示例)
@Component
@RequiredArgsConstructor
public class ZkLoadBalancer {private final ServiceDiscovery<InstanceDetails> discovery;public InstanceDetails choose(String serviceName) throws Exception {Collection<ServiceInstance<InstanceDetails>> instances =discovery.queryForInstances(serviceName);if (instances.isEmpty()) throw new IllegalStateException("No instances");// 輪詢return instances.stream().skip(ThreadLocalRandom.current().nextInt(instances.size())).findFirst().orElseThrow().getPayload();}
}
3. 分布式鎖:Curator Recipes
Curator 提供 InterProcessMutex(可重入)、InterProcessSemaphoreMutex(不可重入)等實現。
3.1 配置
@Bean
public InterProcessMutex payLock(CuratorFramework client) {return new InterProcessMutex(client, "/locks/pay");
}
3.2 業務中使用
@Service
@RequiredArgsConstructor
public class PayService {private final InterProcessMutex payLock;public void pay(String orderId) throws Exception {if (payLock.acquire(10, TimeUnit.SECONDS)) {try {// 冪等扣款邏輯} finally {payLock.release();}} else {throw new RuntimeException("獲取鎖超時");}}
}
3.3 高級:讀寫鎖
@Bean
public InterProcessReadWriteLock rwLock(CuratorFramework client) {return new InterProcessReadWriteLock(client, "/locks/rw");
}
4. 配置中心(動態刷新)
4.1 存儲
/configs/application.yml
4.2 監聽與熱更新
@Component
@RequiredArgsConstructor
public class ConfigWatcher {private final CuratorFramework client;private final Environment env;@PostConstructpublic void watch() throws Exception {TreeCache cache = TreeCache.newBuilder(client, "/configs").build();cache.getListenable().addListener((cf, event) -> {if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {String path = event.getData().getPath();if (path.endsWith("application.yml")) {byte[] data = event.getData().getData();// 這里觸發 Spring Environment 刷新((ConfigurableEnvironment) env).getPropertySources().replace("zk-config", new MapPropertySource("zk-config",new Yaml().load(new String(data))));}}});cache.start();}
}
5. 最佳實踐與注意事項
維度 | 建議 |
---|---|
部署 | 3 或 5 節點奇數集群,獨立 SSD,JVM 堆 4-8G,開啟快照自動清理。 |
會話 | 會話超時 < 客戶端 GC 時間;避免長時間 STW。 |
節點 | 數據節點 < 1MB,子節點 < 10 萬;使用 Observer 擴展讀。 |
鎖 | 鎖路徑獨立;鎖內邏輯冪等、可重試;設置超時避免死鎖。 |
K8s | 用 StatefulSet 部署 ZooKeeper;Headless Service 使 Pod 穩定 DNS。 |
遷移 | 若未來遷到 etcd,可通過 Curator-to-etcd Bridge 逐步替換。 |
6. 小結
功能 | K8s 原生 | ZooKeeper 方案優勢 |
---|---|---|
服務發現 | CoreDNS | 跨語言、精細權重、健康檢查可擴展 |
分布式鎖 | ? | 強一致、可重入、讀寫鎖 |
配置中心 | ConfigMap | 監聽粒度細、版本化、變更審計 |
集群協調 | ? | Leader 選舉、隊列、屏障(Barrier) |
在 K8s 為主的今天,ZooKeeper 并非過時,而是作為強一致協調層的補充,特別適合金融交易、庫存扣減、大規模異構系統。
參考閱讀
- Curator 官方文檔
- ZooKeeper Internals
如需進一步探討性能壓測腳本或K8s Operator 部署方案,歡迎留言!