Spring 和 Lettuce 源碼分析 Redis 節點狀態檢查與失敗重連的工作原理

關鍵步驟:

  1. Spring Boot 啟動時創建?LettuceConnectionFactory

  2. 根據配置類型(集群/哨兵/單機)初始化客戶端

  3. 對于集群模式:

    • 創建?RedisClusterClient

    • 調用?setOptions(getClusterClientOptions(configuration))?應用配置

2. 節點狀態檢查機制

拓撲刷新選項加載:

java

private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();return clusterClientOptions != null ? clusterClientOptions.getTopologyRefreshOptions() : FALLBACK_OPTIONS;
}
狀態檢查觸發條件:

java

private boolean isEnabled(RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);
}

3. 失敗重連原理

重連事件處理:

java

@Override
public void onReconnectAttempt(int attempt) {if (isEnabled(RefreshTrigger.PERSISTENT_RECONNECTS) &&attempt >= getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}
}
工作流程:
  1. 當發生網絡斷開時,Lettuce 自動嘗試重連

  2. 每次重連嘗試都會調用?onReconnectAttempt?方法

  3. 檢查是否啟用了?PERSISTENT_RECONNECTS?觸發器

  4. 檢查重連次數是否達到閾值(refreshTriggersReconnectAttempts

  5. 如果滿足條件,觸發拓撲刷新事件

4. 拓撲刷新執行過程

刷新激活機制:

java

private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {// 創建定時刷新任務ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(),options.getRefreshPeriod().toNanos(),TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}}
}
刷新過程:
  1. 檢查周期性刷新是否啟用

  2. 確保只有一個刷新任務被激活(原子操作)

  3. 創建定時任務,按配置的時間間隔執行刷新

  4. 刷新時重新獲取集群拓撲信息

  5. 更新客戶端內部節點路由表

5. 其他觸發機制

除了重連觸發外,還有多種觸發條件:

java

public enum RefreshTrigger {MOVED_REDIRECT,    // MOVED 重定向ASK_REDIRECT,      // ASK 重定向PERSISTENT_RECONNECTS, // 持續重連UNCOVERED_SLOT,    // 未覆蓋的槽位UNKNOWN_NODE       // 未知節點
}

對應的事件處理方法:

java

@Override
public void onMovedRedirection() {if (isEnabled(RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}
}

6. 配置生效關鍵點

在 Spring Boot 中確保配置生效的關鍵:

  1. 集群模式必須啟用:僅當使用集群配置時相關邏輯才會執行

  2. 正確設置 ClientOptions:通過?LettuceClientConfigurationBuilderCustomizer?注入

  3. 配置拓撲刷新選項:必須包含?PERSISTENT_RECONNECTS?觸發器

  4. 設置合理的重連閾值adaptiveRefreshTriggersReconnectAttempts

7. 完整工作流程

圖表

代碼

最佳實踐建議:

  1. 多觸發器組合配置:同時啟用多種觸發器提高集群感知能力

    java

    .enableAdaptiveRefreshTrigger(RefreshTrigger.PERSISTENT_RECONNECTS,RefreshTrigger.MOVED_REDIRECT,RefreshTrigger.UNKNOWN_NODE
    )
  2. 合理設置重連閾值:根據網絡環境調整

    java

    .adaptiveRefreshTriggersReconnectAttempts(3) // 生產環境建議 3-5
  3. 配置超時保護:防止拓撲刷新阻塞

    java

    .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
  4. 啟用周期性刷新:作為故障恢復的補充

    java

    .enablePeriodicRefresh(Duration.ofMinutes(10))

通過以上機制,Lettuce 能夠在集群節點故障或網絡問題時自動更新拓撲信息,保持客戶端與集群的同步,實現高可用連接。

##配置重連

import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Configuration;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@Configuration
public class RedisClusterConfig {@Beanpublic LettuceClientConfiguration lettuceClientConfiguration() {// 👇 配置集群拓撲刷新選項ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(true)  // 開啟周期刷新.refreshPeriod(Duration.ofSeconds(30))  // 每30秒刷新一次.enableAllAdaptiveRefreshTriggers()
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.ASK_REDIRECT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNKNOWN_NODE)
//                .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10)).refreshTriggersReconnectAttempts(5).build();return LettuceClientConfiguration.builder().clientOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build()).build();}@Beanpublic LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties, LettuceClientConfiguration lettuceClientConfiguration) {RedisProperties.Cluster clusterProperties = redisProperties.getCluster();RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());if (clusterProperties.getMaxRedirects() != null) {config.setMaxRedirects(clusterProperties.getMaxRedirects());}if (redisProperties.getPassword() != null) {config.setPassword(RedisPassword.of(redisProperties.getPassword()));}return new LettuceConnectionFactory(config, lettuceClientConfiguration);}// 可選:配置 RedisTemplate@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}}

##源碼

@Bean@ConditionalOnMissingBean(RedisConnectionFactory.class)LettuceConnectionFactory redisConnectionFactory(ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,ClientResources clientResources) throws UnknownHostException {LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,getProperties().getLettuce().getPool());return createLettuceConnectionFactory(clientConfig);}private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {if (getSentinelConfig() != null) {return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);}if (getClusterConfiguration() != null) {return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);}return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);}public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,LettuceClientConfiguration clientConfig) {this(clientConfig);Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null!");this.configuration = clusterConfiguration;}//createClient創建client 
public void afterPropertiesSet() {this.client = createClient();this.connectionProvider = createConnectionProvider(client, CODEC);this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);if (isClusterAware()) {this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient) client),new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),EXCEPTION_TRANSLATION);}if (getEagerInitialization() && getShareNativeConnection()) {initConnection();}}//clusterClient設置configuration配置  clusterClient.setOptions(getClusterClientOptions(configuration));
protected AbstractRedisClient createClient() {if (isStaticMasterReplicaAware()) {RedisClient redisClient = clientConfiguration.getClientResources() //.map(RedisClient::create) //.orElseGet(RedisClient::create);clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isRedisSentinelAware()) {RedisURI redisURI = getSentinelRedisURI();RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, redisURI)) //.orElseGet(() -> RedisClient.create(redisURI));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isClusterAware()) {List<RedisURI> initialUris = new ArrayList<>();ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;for (RedisNode node : configuration.getClusterNodes()) {initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));}RedisClusterClient clusterClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //.orElseGet(() -> RedisClusterClient.create(initialUris));clusterClient.setOptions(getClusterClientOptions(configuration));return clusterClient;}RedisURI uri = isDomainSocketAware()? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket()): createRedisURIAndApplySettings(getHostName(), getPort());RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, uri)) //.orElseGet(() -> RedisClient.create(uri));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();if (clusterClientOptions != null) {return clusterClientOptions.getTopologyRefreshOptions();}return FALLBACK_OPTIONS;}public ClusterTopologyRefreshOptions getTopologyRefreshOptions() {return topologyRefreshOptions;}private boolean isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);}@Overridepublic void onReconnectAttempt(int attempt) {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)&& attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}@Overridepublic void onMovedRedirection() {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/96148.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/96148.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/96148.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從ChatGPT到智能助手:Agent智能體如何顛覆AI應用

從ChatGPT到智能助手&#xff1a;Agent智能體如何顛覆AI應用 更多大模型知識分享&#xff0c;盡在>>>GitHub<<< Agent 智能體是什么 簡單來說&#xff0c;Agent 智能體是一種能夠感知環境&#xff0c;并根據自身目標自主采取行動的智能實體。它就像是一個擁…

Spring Boot應用實現圖片資源服務

在這篇文章中&#xff0c;我們將介紹如何使用Spring Boot創建一個REST API來提供服務器上的靜態圖片資源。該API包括路徑安全檢查、文件存在性驗證以及緩存控制等功能&#xff0c;并且代碼包含詳細的注釋以幫助理解。Maven依賴 首先&#xff0c;在您的pom.xml文件中添加以下依賴…

Word 中 MathType 公式編號問題與解決

注&#xff1a;本文為 “Word 中 MathType 公式編號” 相關合輯。 圖片清晰度受引文原圖所限。 略作重排&#xff0c;未整理去重。 如有內容異常&#xff0c;請看原文。 【Word】解決 MathType 已插入公式按新章節開始編號的問題 Allan326 于 2020-03-25 15:30:08 發布 問題…

19. 大數據-產品概念

文章目錄前言一、數據庫1. 簡介2. 使用場景3. 數據庫類型4. 數據類型二、數據倉庫1. 簡介2. 使用場景3. 數據倉庫架構三、數據平臺1. 簡介2. 使用場景3. 數據倉庫架構四、數據中臺1. 簡介2. 使用場景3. 數據中臺架構五、數據湖1. 簡介2. 使用場景3. 數據湖架構六、總結1. 區別2…

python學習DAY46打卡

DAY 46 通道注意力(SE注意力) 內容&#xff1a; 不同CNN層的特征圖&#xff1a;不同通道的特征圖什么是注意力&#xff1a;注意力家族&#xff0c;類似于動物園&#xff0c;都是不同的模塊&#xff0c;好不好試了才知道。通道注意力&#xff1a;模型的定義和插入的位置通道注意…

Ansible 中的文件包含與導入機制

Ansible 中的文件包含與導入機制本文介紹了在 Ansible 中如何通過模塊化方式管理復雜的 Playbook&#xff0c;包括使用 include 和 import 系列語句來拆分和重用代碼。概述 當 Playbook 變得冗長或復雜時&#xff0c;可以將其拆分為多個小文件以提高可管理性。Ansible 提供了模…

OpenCV-循環讀取視頻幀,對每一幀進行處理

原型代碼 內存模型&#xff1a; 核心變量&#xff1a;frame&#xff0c;Numpy ndarray&#xff0c;每次會被覆蓋&#xff0c;大小保持恒定import cv2video_path your_video.mp4cap cv2.VideoCapture(video_path)if not cap.isOpened():print("Cant open Video")exi…

決策樹的學習(二)

一、整體框架本 PPT 聚焦機器學習中的決策樹算法&#xff0c;圍繞 “核心算法&#xff08;ID3、C4.5、CART&#xff09;→ 特殊問題&#xff08;連續值處理&#xff09;→ 優化策略&#xff08;剪枝&#xff09;→ 代碼實現→ 課堂練習” 展開&#xff0c;系統補充決策樹的進階…

粗糧廠的基于spark的通用olap之間的同步工具項目

粗糧廠的基于spark的通用olap之間的同步工具項目1 項目背景2 項目實現2.1 實現原理2.2 細節要點3 抽樣說明4 項目運行狀態4.1 運行速度4.2 項目吞吐4.3 穩定性說的比較簡單&#xff0c;有需要的可以留言&#xff0c;我不斷補充完善1 項目背景 我們公司內部的需要一款&#xff…

C# 時間戳

在C#中&#xff0c;獲取當前時間的毫秒級時間戳可以通過多種方式實現。以下是幾種常見的方法&#xff1a;方法1&#xff1a;使用DateTime和DateTimeOffsetlong timestamp (long)(DateTimeOffset.Now.ToUnixTimeMilliseconds()); Console.WriteLine(timestamp);方法2&#xff1…

【牛客刷題】REAL792 小O的平面畫圓

文章目錄 一、題目介紹 1.1 輸入描述 1.2 輸出描述 1.3 示例 二、算法設計思路 2.1 核心問題分析 2.2 圖解兩個圓的位置關系 2.2.1. 相離 (Separate) 2.2.2. 外切 (Externally Tangent) 2.2.3. 相交 (Intersecting) 2.2.4. 內切 (Internally Tangent) 2.2.5. 包含 (Containing)…

uniapp:微信小程序使用Canvas 和Canvas 2D繪制圖形

一、Canvas 畫布 canvas 組件 提供了繪制界面&#xff0c;可以在之上進行任意繪制 功能描述 Canvas 畫布。2.9.0 起支持一套新 Canvas 2D 接口&#xff08;需指定 type 屬性&#xff09;&#xff0c;同時支持同層渲染&#xff0c;原有接口不再維護。 二、Canvas 和Canvas 2D 區…

word如何轉換為pdf

pip install pywin32import os import win32com.client import pythoncom # 新增&#xff1a;用于處理COM線程 import sysdef docx_to_pdf(docx_path, pdf_pathNone):"""將Word文檔轉換為PDF格式&#xff0c;修復退出時的COM錯誤"""if not os.p…

服務器Linux防火墻怎樣實現訪問控制

在互聯網世界里&#xff0c;Linux服務器就像一座城池&#xff0c;而防火墻便是城池的守衛者。沒有防火墻&#xff0c;外部的任何流量都能毫無阻攔地進入服務器;而有了防火墻&#xff0c;就可以像設關卡一樣&#xff0c;對進出城門的人進行盤查和控制。對企業運維人員來說&#…

【原創理論】Stochastic Coupled Dyadic System (SCDS):一個用于兩性關系動力學建模的隨機耦合系統框架

【原創理論】Stochastic Coupled Dyadic System (SCDS)&#xff1a;一個用于兩性關系動力學建模的隨機耦合系統框架 作者&#xff1a;[望月&#xff0c;GPT5,GPT-O3,Gemini2.5pro] 分類&#xff1a; 人工智能 理論模型 交叉學科 系統科學 人性 愛情 標簽&#xff1a; 關系動力…

星圖云開發者平臺新功能速遞 | 微服務管理器:無縫整合異構服務,釋放云原生開發潛能

在構建現代數字化應用的過程中&#xff0c;開發者常常面臨一個關鍵挑戰&#xff1a;如何高效、安全地集成和復用既有的復雜服務或自有業務系統&#xff1f;這些服務可能是核心算法引擎、遺留業務邏輯模塊&#xff0c;或是特定的SaaS能力。傳統方式下&#xff0c;將它們融入新的…

數據結構:構建 (create) 一個二叉樹

目錄 問題的本質——什么信息才能唯一確定一棵樹&#xff1f; 推導“最佳拍檔”——哪兩種遍歷序列能行&#xff1f; 遞歸思想——如何構建一棵樹&#xff1f; 第1步&#xff1a;確定整棵樹的根節點 第2步&#xff1a;劃分左右子樹的成員 第3步&#xff1a;遞歸構建左右子…

【STM32】HAL庫中的實現(五):ADC (模數轉換)

什么是 ADC&#xff08;模數轉換器&#xff09; ADC&#xff08;Analog to Digital Converter&#xff09;是將 模擬信號&#xff08;電壓&#xff09;轉換成數字信號&#xff08;數值&#xff09; 的器件。 在 STM32 中&#xff0c;ADC 通常具有以下特性&#xff1a;特性描述分…

智慧校園中IPTV融合對講:構建高效溝通新生態

在智慧校園的建設浪潮里&#xff0c;IPTV融合對講系統宛如一顆璀璨的新星&#xff0c;以其獨特的功能和強大的優勢&#xff0c;為校園的溝通與管理帶來了全新的變革&#xff0c;構建起一個高效、便捷、智能的溝通新生態。從日常溝通層面來看&#xff0c;IPTV融合對講系統打破了…

智能合約里的 “拒絕服務“ 攻擊:讓你的合約變成 “死機的手機“

你有沒有遇到過手機突然卡死&#xff0c;點什么都沒反應的情況&#xff1f;在區塊鏈世界里&#xff0c;智能合約也可能遭遇類似的 "罷工"—— 這就是 "拒絕服務攻擊"&#xff08;Denial of Service&#xff0c;簡稱 DoS&#xff09;。今天用大白話講講合約…