生產環境中如何使用Caffeine+Redis實現二級緩存(詳細分析了各種情況)
本篇主要講解的是實現Caffeine+Redis實現一個現成的使用流程。下一篇講解什么是Caffeine以及caffeine的使用
00背景:
使用Caffeine和Redis的二級緩存方案源自于分布式系統中對高性能、高可用性、低延遲、數據一致性的需求。二級緩存結合了本地緩存的快速訪問能力和分布式緩存的數據共享與持久化特性,解決了單極緩存的局限性,可用于高并發及分布式場景。
1.設計目標
高性能:利用 Caffeine 的低延遲和 Redis 的高吞吐量。
一致性:確保 L1 和 L2 緩存與數據源的數據一致。
高可用性:處理緩存失效、Redis 宕機等異常情況(下面會分析解決方案)。
擴展性:支持多實例部署和水平擴展。
可監控:提供命中率、延遲等指標,便于調優。
2.架構設計
架構描述
L1 緩存(Caffeine)
- 部署在每個應用程序實例的 JVM 內存中,存儲熱點數據。
- 特點:納秒級訪問延遲,適合高頻訪問的少量數據。
- 限制:數據僅限當前實例,無法跨實例共享。
L2 緩存(Redis)
- 部署為獨立的分布式緩存服務(單機、主從或集群模式)。
- 特點:支持跨實例共享、持久化、復雜數據結構。
- 限制:微秒到毫秒級延遲,受網絡影響。
數據源
- 數據庫 MySQL)
- 當 L1 和 L2 緩存均未命中時,從數據源加載數據。
工作流程
-
讀取數據:
客戶端請求數據,應用程序首先查詢 L1 緩存(Caffeine)。
若 L1 未命中(cache miss),查詢 L2 緩存(Redis)。
若 L2 也未命中,從數據源(如數據庫)加載數據。
將數據寫入 L2(Redis),并回填到 L1(Caffeine)。
-
更新數據
數據更新時,先更新數據庫
然后通過Redis的發布/訂閱機制通知所有應用實例,每個實例刪除或更新本地L1緩存。由于發布/訂閱不會持久化消息,可以使用消息隊列替換Redis中的發布/訂閱 -
緩存同步
使用Redis的Pub/Sub或其他消息隊列廣播失效消息
確保L1和L2緩存與數據源保持一致
3.實現代碼
首先加入依賴
<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.9.3</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
代碼如下:
package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;
@Service
public class TwoLevelCacheService1 {// Caffeine 緩存(L1)private final LoadingCache<String, User> caffeineCache;// Redis 連接(L2)private final StatefulRedisConnection<String, String> redisConnection;// Redis Pub/Sub 連接(用于緩存失效通知)private final StatefulRedisPubSubConnection<String, String> redisPubSubConnection;// 數據源(模擬數據庫)private final UserRepository userRepository;// Redis 緩存前綴private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 頻道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService1(StatefulRedisConnection<String, String> redisConnection,StatefulRedisPubSubConnection<String, String> redisPubSubConnection,UserRepository userRepository) {this.redisConnection = redisConnection;this.redisPubSubConnection = redisPubSubConnection;this.userRepository = userRepository;// 配置 Caffeine 緩存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大緩存 1000 個用戶.expireAfterWrite(Duration.ofMinutes(10)) // 寫入后 10 分鐘過期.recordStats() // 開啟統計.build(this::loadFromRedisOrDb); // 加載邏輯}// 初始化 Pub/Sub 監聽@PostConstructpublic void initPubSub() {//添加一個監聽器,處理接收的消息redisPubSubConnection.addListener(new RedisPubSubAdapter<String,String>() {@Overridepublic void message(String channel, String message) {if (INVALIDATE_CHANNEL.equals(channel)) {caffeineCache.invalidate(message); // 失效 L1 緩存}}});//獲取Redis Pub/Sub連接的異步命令接口,并訂閱指定的頻道RedisPubSubAsyncCommands<String, String> async = redisPubSubConnection.async();async.subscribe(INVALIDATE_CHANNEL);}// 獲取用戶(先查 L1,再查 L2,最后查數據庫)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用戶并失效緩存public void updateUser(User user) {// 更新數據庫userRepository.save(user);// 失效 L2 緩存RedisCommands<String, String> commands = redisConnection.sync();commands.del(CACHE_PREFIX + user.getId());// 廣播失效消息,通知所有實例失效 L1 緩存commands.publish(INVALIDATE_CHANNEL, user.getId());}// 從 Redis 或數據庫加載數據private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)RedisCommands<String, String> commands = redisConnection.sync();String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查數據庫,查詢到的數據存到Redis,并且隱式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,設置 1 小時過期commands.setex(CACHE_PREFIX + userId, 3600, serializeUser(user));}return user;}// 序列化用戶對象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用戶對象private User deserializeUser(String data) {// 簡單解析 JSON,生產環境建議使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 獲取緩存統計信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}
4.配置代碼
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 100max-idle: 10min-idle: 5timeout: 2000
5.異常處理
-
Redis宕機的話直接回退到數據庫查詢
在loadFromRedisOrDb方法中捕獲Redis異常:try {String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser);} } catch (Exception e) {// 記錄日志,降級到數據庫邏輯log.error("Redis error, fallback to DB", e); }
-
Caffeine加載失敗
若加載邏輯拋出異常,返回默認值或拋出自定義異常caffeineCache = Caffeine.newBuilder().build(key -> {try {return loadFromRedisOrDb(key);} catch (Exception e) {throw new CacheException("Failed to load key: " + key, e);}});
6.監控與調優
-
caffeine監控
通過caffeineCache.stats()獲取命中率、驅逐率、加載時間
集成Prometheus或者Micrometer,暴露指標:
7.補充
- 不知道大家會有這樣的疑問沒,因為Caffeine未命中后會查Redis,Redis命中后會將數據寫入Caffeine中,Redis沒有命中就會查數據庫,然后將數據分別寫入Redis和Caffeine中,那么Caffeine和Redis中的數據不就是高度重合了嗎?
答:其實并不會高度重合,因為在Caffeine中會設置容量,比如我們這里設置的1000條,并且Caffeine達到1000后會通過LRU(最近最少使用)驅逐策略去刪除舊數據。因為根據LRU驅逐策略留下的數據都是高訪問量的數據。 - 對于代碼中的 buid函數
this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大緩存 1000 個用戶.expireAfterWrite(Duration.ofMinutes(10)) // 寫入后 10 分鐘過期.recordStats() // 開啟統計.build(this::loadFromRedisOrDb); // 加載邏輯
這里的build需要接收一個CacheLoader類型的參數,CacheLoader是接口(函數式接口)
@NonNullpublic <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(@NonNull CacheLoader<? super K1, V1> loader) {...........}
CacheLoader使用了@FunctionalInterface注解,說明是函數時接口,其中只有一個load抽象方法
@FunctionalInterface
@SuppressWarnings({"PMD.SignatureDeclareThrowsException", "FunctionalInterfaceMethodChanged"})
public interface CacheLoader<K, V> extends AsyncCacheLoader<K, V> {@NullableV load(@NonNull K key) throws Exception;
}
那么buid的代碼就可以進行優化,Java編譯器根據caffeineCache的類型(LoadingCache<String,User>)推斷出key=String,value=User,發現與loadFromRedisOrDb方法一致,因此可以使用this::loadFromRedisOrDb作為參數
.build(new CacheLoader<String, User>() {@Overridepublic @Nullable User load(@NonNull String key) throws Exception {return loadFromRedisOrDb(key);}}); // 加載邏輯
//----->>>
.build((userId)->loadFromRedisOrDb(userId)); // 加載邏輯
//------>>>
.build(this::loadFromRedisOrDb); // 加載邏輯
Java 的函數式接口機制允許將方法引用直接賦值給接口類型,只要簽名匹配。
this::loadFromRedisOrDb 的簽名 User (String) 滿足 CacheLoader<String, User> 的要求,編譯器自動適配。
8.使用RedisTemplate實現
主要代碼:
package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;@Service
public class TwoLevelCacheService_RedisTemplate {// Caffeine 緩存(L1)private final LoadingCache<String, User> caffeineCache;//RedisTemplateprivate final RedisTemplate<String, String> redisTemplate;// 數據源(模擬數據庫)private final UserRepository userRepository;// Redis 緩存前綴private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 頻道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService_RedisTemplate(RedisTemplate<String, String> redisTemplate,UserRepository userRepository) {this.redisTemplate = redisTemplate;this.userRepository = userRepository;// 配置 Caffeine 緩存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大緩存 1000 個用戶.expireAfterWrite(Duration.ofMinutes(10)) // 寫入后 10 分鐘過期.recordStats() // 開啟統計.build(this::loadFromRedisOrDb); // 加載邏輯}// 獲取用戶(先查 L1,再查 L2,最后查數據庫)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用戶并失效緩存public void updateUser(User user) {// 更新數據庫userRepository.save(user);// 失效 L2 緩存redisTemplate.delete(CACHE_PREFIX + user.getId());// 廣播失效消息,通知所有實例失效 L1 緩存redisTemplate.convertAndSend(INVALIDATE_CHANNEL, user.getId());}// 從 Redis 或數據庫加載數據private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)String cachedUser = redisTemplate.opsForValue().get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查數據庫,查詢到的數據存到Redis,并且隱式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,設置 1 小時過期redisTemplate.opsForValue().set(CACHE_PREFIX+user.getId(),serializeUser(user),Duration.ofHours(1));}return user;}// 序列化用戶對象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用戶對象private User deserializeUser(String data) {// 簡單解析 JSON,生產環境建議使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 獲取緩存統計信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}
配置類 (redisTempalte和訂閱channel)
@Configuration
public class RedisConfig {/*** 配置Redis模板*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 設置序列化器,確保鍵值是字符串template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}/*** 配置監聽容器*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,LoadingCache<String, User> caffeineCache) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(new CacheInvalidationListener(caffeineCache),new ChannelTopic("user:invalidate"));return container;}
}
配置消息監聽處理邏輯
public class CacheInvalidationListener implements MessageListener {private final LoadingCache<String, User> caffeineCache;private static final String INVALIDATE_CHANNEL = "user:invalidate";public CacheInvalidationListener(LoadingCache<String, User> caffeineCache) {this.caffeineCache = caffeineCache;}@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = message.getChannel().toString();if (INVALIDATE_CHANNEL.equals(channel)){String userId = message.getBody().toString();caffeineCache.invalidate(userId);}}
}
需要補充的地方請大家在下面留言一起討論