關鍵步驟:
Spring Boot 啟動時創建?
LettuceConnectionFactory
根據配置類型(集群/哨兵/單機)初始化客戶端
對于集群模式:
創建?
RedisClusterClient
調用?
setOptions(getClusterClientOptions(configuration))
?應用配置
2. 節點狀態檢查機制
拓撲刷新選項加載:
java
private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();return clusterClientOptions != null ? clusterClientOptions.getTopologyRefreshOptions() : FALLBACK_OPTIONS; }
狀態檢查觸發條件:
java
private boolean isEnabled(RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger); }
3. 失敗重連原理
重連事件處理:
java
@Override public void onReconnectAttempt(int attempt) {if (isEnabled(RefreshTrigger.PERSISTENT_RECONNECTS) &&attempt >= getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}} }
工作流程:
當發生網絡斷開時,Lettuce 自動嘗試重連
每次重連嘗試都會調用?
onReconnectAttempt
?方法檢查是否啟用了?
PERSISTENT_RECONNECTS
?觸發器檢查重連次數是否達到閾值(
refreshTriggersReconnectAttempts
)如果滿足條件,觸發拓撲刷新事件
4. 拓撲刷新執行過程
刷新激活機制:
java
private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {// 創建定時刷新任務ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(),options.getRefreshPeriod().toNanos(),TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}} }
刷新過程:
檢查周期性刷新是否啟用
確保只有一個刷新任務被激活(原子操作)
創建定時任務,按配置的時間間隔執行刷新
刷新時重新獲取集群拓撲信息
更新客戶端內部節點路由表
5. 其他觸發機制
除了重連觸發外,還有多種觸發條件:
java
public enum RefreshTrigger {MOVED_REDIRECT, // MOVED 重定向ASK_REDIRECT, // ASK 重定向PERSISTENT_RECONNECTS, // 持續重連UNCOVERED_SLOT, // 未覆蓋的槽位UNKNOWN_NODE // 未知節點 }
對應的事件處理方法:
java
@Override public void onMovedRedirection() {if (isEnabled(RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}} }
6. 配置生效關鍵點
在 Spring Boot 中確保配置生效的關鍵:
集群模式必須啟用:僅當使用集群配置時相關邏輯才會執行
正確設置 ClientOptions:通過?
LettuceClientConfigurationBuilderCustomizer
?注入配置拓撲刷新選項:必須包含?
PERSISTENT_RECONNECTS
?觸發器設置合理的重連閾值:
adaptiveRefreshTriggersReconnectAttempts
7. 完整工作流程
圖表
代碼
最佳實踐建議:
多觸發器組合配置:同時啟用多種觸發器提高集群感知能力
java
.enableAdaptiveRefreshTrigger(RefreshTrigger.PERSISTENT_RECONNECTS,RefreshTrigger.MOVED_REDIRECT,RefreshTrigger.UNKNOWN_NODE )
合理設置重連閾值:根據網絡環境調整
java
.adaptiveRefreshTriggersReconnectAttempts(3) // 生產環境建議 3-5
配置超時保護:防止拓撲刷新阻塞
java
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
啟用周期性刷新:作為故障恢復的補充
java
.enablePeriodicRefresh(Duration.ofMinutes(10))
通過以上機制,Lettuce 能夠在集群節點故障或網絡問題時自動更新拓撲信息,保持客戶端與集群的同步,實現高可用連接。
##配置重連
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Configuration;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@Configuration
public class RedisClusterConfig {@Beanpublic LettuceClientConfiguration lettuceClientConfiguration() {// 👇 配置集群拓撲刷新選項ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(true) // 開啟周期刷新.refreshPeriod(Duration.ofSeconds(30)) // 每30秒刷新一次.enableAllAdaptiveRefreshTriggers()
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.ASK_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNKNOWN_NODE)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10)).refreshTriggersReconnectAttempts(5).build();return LettuceClientConfiguration.builder().clientOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build()).build();}@Beanpublic LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties, LettuceClientConfiguration lettuceClientConfiguration) {RedisProperties.Cluster clusterProperties = redisProperties.getCluster();RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());if (clusterProperties.getMaxRedirects() != null) {config.setMaxRedirects(clusterProperties.getMaxRedirects());}if (redisProperties.getPassword() != null) {config.setPassword(RedisPassword.of(redisProperties.getPassword()));}return new LettuceConnectionFactory(config, lettuceClientConfiguration);}// 可選:配置 RedisTemplate@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}}
##源碼
@Bean@ConditionalOnMissingBean(RedisConnectionFactory.class)LettuceConnectionFactory redisConnectionFactory(ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,ClientResources clientResources) throws UnknownHostException {LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,getProperties().getLettuce().getPool());return createLettuceConnectionFactory(clientConfig);}private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {if (getSentinelConfig() != null) {return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);}if (getClusterConfiguration() != null) {return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);}return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);}public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,LettuceClientConfiguration clientConfig) {this(clientConfig);Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null!");this.configuration = clusterConfiguration;}//createClient創建client
public void afterPropertiesSet() {this.client = createClient();this.connectionProvider = createConnectionProvider(client, CODEC);this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);if (isClusterAware()) {this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient) client),new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),EXCEPTION_TRANSLATION);}if (getEagerInitialization() && getShareNativeConnection()) {initConnection();}}//clusterClient設置configuration配置 clusterClient.setOptions(getClusterClientOptions(configuration));
protected AbstractRedisClient createClient() {if (isStaticMasterReplicaAware()) {RedisClient redisClient = clientConfiguration.getClientResources() //.map(RedisClient::create) //.orElseGet(RedisClient::create);clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isRedisSentinelAware()) {RedisURI redisURI = getSentinelRedisURI();RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, redisURI)) //.orElseGet(() -> RedisClient.create(redisURI));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isClusterAware()) {List<RedisURI> initialUris = new ArrayList<>();ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;for (RedisNode node : configuration.getClusterNodes()) {initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));}RedisClusterClient clusterClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //.orElseGet(() -> RedisClusterClient.create(initialUris));clusterClient.setOptions(getClusterClientOptions(configuration));return clusterClient;}RedisURI uri = isDomainSocketAware()? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket()): createRedisURIAndApplySettings(getHostName(), getPort());RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, uri)) //.orElseGet(() -> RedisClient.create(uri));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();if (clusterClientOptions != null) {return clusterClientOptions.getTopologyRefreshOptions();}return FALLBACK_OPTIONS;}public ClusterTopologyRefreshOptions getTopologyRefreshOptions() {return topologyRefreshOptions;}private boolean isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);}@Overridepublic void onReconnectAttempt(int attempt) {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)&& attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}@Overridepublic void onMovedRedirection() {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}}}