Redis 7.0中5種新特性及實戰應用

Redis 7.0引入了多項革命性的新特性,不僅在性能和可靠性方面有所提升,更在功能和使用體驗上有了質的飛躍。本文將介紹Redis 7.0的五大關鍵新特性,可以根據實際情況利用Redis 7.0的強大功能,構建更高效、更可靠的應用系統。

特性一:Redis Functions(函數存儲)

技術原理

Redis Functions是Redis 7.0引入的重量級特性,它允許開發者將Lua腳本作為命名函數存儲在Redis服務器中。與傳統的EVAL命令不同,Redis Functions支持創建庫(Library)的概念,可以將相關功能的函數組織在一起,提供更好的可管理性。

關鍵優勢:

  • 函數持久化存儲在Redis中,無需每次連接時重新加載
  • 支持函數版本管理和庫的概念
  • 提供更好的權限控制和可觀測性
  • 減少網絡傳輸開銷,提高執行效率

實現示例

創建和注冊函數庫
# 創建一個簡單的計數器函數庫
FUNCTION LOAD "
#!lua name=mycounterredis.register_function('incr_by_and_get', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local result = redis.call('INCRBY', key, increment)return result
end)redis.register_function('get_and_incr_by', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local current = tonumber(redis.call('GET', key)) or 0redis.call('INCRBY', key, increment)return current
end)
"
調用函數
# 使用FCALL命令調用函數
FCALL incr_by_and_get 1 my_counter 5
# 返回遞增后的值,比如 5FCALL get_and_incr_by 1 my_counter 3
# 返回遞增前的值,比如 5(然后遞增到 8)
Java客戶端示例
@Service
public class RedisCounterService {private final StringRedisTemplate redisTemplate;public RedisCounterService(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void initializeCounterFunctions() {String functionScript = """#!lua name=mycounterredis.register_function('incr_by_and_get', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local result = redis.call('INCRBY', key, increment)return resultend)redis.register_function('get_and_incr_by', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local current = tonumber(redis.call('GET', key)) or 0redis.call('INCRBY', key, increment)return currentend)""";try {redisTemplate.execute((RedisCallback<Object>) connection -> {connection.serverCommands().functionLoad(functionScript);return null;});} catch (Exception e) {// 處理已存在的情況if (e.getMessage().contains("already exists")) {log.info("Counter functions already loaded");} else {throw e;}}}public Long incrementAndGet(String counterKey, int increment) {return redisTemplate.execute((RedisCallback<Long>) connection -> (Long) connection.serverCommands().functionCall("incr_by_and_get", Arrays.asList(counterKey), Arrays.asList(String.valueOf(increment))));}public Long getAndIncrement(String counterKey, int increment) {return redisTemplate.execute((RedisCallback<Long>) connection -> (Long) connection.serverCommands().functionCall("get_and_incr_by", Arrays.asList(counterKey), Arrays.asList(String.valueOf(increment))));}
}

實際應用場景

1. 原子計數器與限流器

函數庫可以實現復雜的計數邏輯,如分布式限流器、訪問頻率控制等。

#!lua name=ratelimiter-- 令牌桶限流算法
redis.register_function('acquire_token', function(keys, args)local key = keys[1]local capacity = tonumber(args[1])local rate = tonumber(args[2])local requested = tonumber(args[3]) or 1local now = tonumber(redis.call('TIME')[1])-- 獲取當前桶狀態local bucket = redis.call('HMGET', key, 'last_refill', 'tokens')local last_refill = tonumber(bucket[1]) or nowlocal tokens = tonumber(bucket[2]) or capacity-- 計算令牌補充local elapsed = now - last_refilllocal new_tokens = math.min(capacity, tokens + elapsed * rate)-- 嘗試獲取令牌if new_tokens >= requested thennew_tokens = new_tokens - requestedredis.call('HMSET', key, 'last_refill', now, 'tokens', new_tokens)return 1 -- 成功elseredis.call('HMSET', key, 'last_refill', now, 'tokens', new_tokens)return 0 -- 失敗end
end)
2. 復雜業務邏輯封裝

電商場景中的下單流程,涉及庫存檢查、價格計算、訂單創建等多個步驟:

#!lua name=ordersystem-- 下單流程
redis.register_function('create_order', function(keys, args)local product_key = keys[1]local order_key = keys[2]local product_id = args[1]local quantity = tonumber(args[2])local user_id = args[3]-- 檢查庫存local stock = tonumber(redis.call('HGET', product_key, 'stock'))if not stock or stock < quantity thenreturn {err = "Insufficient stock"}end-- 獲取價格local price = tonumber(redis.call('HGET', product_key, 'price'))if not price thenreturn {err = "Product price not found"}end-- 創建訂單local order_id = redis.call('INCR', 'order:id:counter')local total = price * quantity-- 減庫存redis.call('HINCRBY', product_key, 'stock', -quantity)-- 保存訂單local order_data = {id = order_id,user_id = user_id,product_id = product_id,quantity = quantity,price = price,total = total,status = "created",create_time = redis.call('TIME')[1]}redis.call('HMSET', order_key .. order_id, 'id', order_data.id,'user_id', order_data.user_id,'product_id', order_data.product_id,'quantity', order_data.quantity,'price', order_data.price,'total', order_data.total,'status', order_data.status,'create_time', order_data.create_time)-- 添加到用戶訂單列表redis.call('SADD', 'user:' .. user_id .. ':orders', order_id)return {order_id = order_id,total = total}
end)
3. 數據一致性保證

在需要保證多個操作原子性的場景中特別有用,如積分兌換:

#!lua name=pointsystem-- 積分兌換
redis.register_function('redeem_points', function(keys, args)local user_points_key = keys[1]local reward_key = keys[2]local user_rewards_key = keys[3]local user_id = args[1]local reward_id = args[2]local required_points = tonumber(args[3])-- 檢查用戶積分local current_points = tonumber(redis.call('GET', user_points_key)) or 0if current_points < required_points thenreturn {success = false, reason = "Insufficient points"}end-- 檢查獎勵是否有效local reward_exists = redis.call('EXISTS', reward_key)if reward_exists == 0 thenreturn {success = false, reason = "Reward not found"}end-- 扣減積分redis.call('DECRBY', user_points_key, required_points)-- 記錄兌換歷史local redeem_id = redis.call('INCR', 'redeem:id:counter')redis.call('HMSET', 'redeem:' .. redeem_id,'user_id', user_id,'reward_id', reward_id,'points', required_points,'time', redis.call('TIME')[1])-- 添加到用戶獎勵列表redis.call('SADD', user_rewards_key, reward_id)return {success = true,redeem_id = redeem_id,remaining_points = current_points - required_points}
end)

最佳實踐

  1. 功能分組:按業務功能將相關函數組織到同一個庫中,提高代碼可維護性
  2. 版本管理:為函數庫添加版本信息,便于升級和回滾
  3. 錯誤處理:在Lua函數中添加完善的錯誤處理邏輯
  4. 權限控制:結合ACL限制函數的調用權限
  5. 單一職責:每個函數保持功能單一,避免過于復雜的邏輯

特性二:分片發布/訂閱(Sharded Pub/Sub)

技術原理

Redis 7.0引入了分片發布/訂閱功能,這是對傳統Pub/Sub模型的重要增強。傳統的Pub/Sub在集群環境下存在效率和可擴展性問題,因為消息需要在所有節點間廣播。分片Pub/Sub通過將頻道分布到特定的節點,實現了更高效的消息傳遞。

關鍵優勢:

  • 消息只在特定節點處理,減少網絡開銷
  • 頻道數據和訂閱信息只存儲在特定節點,降低內存使用
  • 更好的可擴展性,適合大規模Redis集群
  • 避免了全局廣播帶來的性能問題

實現示例

Redis命令
# 訂閱分片頻道
SSUBSCRIBE news.sports# 向分片頻道發布消息
SPUBLISH news.sports "Team A won the championship"# 退訂分片頻道
SUNSUBSCRIBE news.sports
Java實現
@Service
public class ShardedPubSubService {private final RedisTemplate<String, String> redisTemplate;private final Map<String, MessageListener> subscriptions = new ConcurrentHashMap<>();public ShardedPubSubService(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;}public void publish(String channel, String message) {redisTemplate.execute((RedisCallback<Long>) connection -> {// 使用底層連接直接執行SPUBLISH命令return connection.execute("SPUBLISH", channel.getBytes(), message.getBytes());});}public void subscribe(String channel, Consumer<String> messageHandler) {MessageListener listener = (message, pattern) -> messageHandler.accept(new String(message.getBody()));RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisTemplate.getConnectionFactory());container.afterPropertiesSet();// 注冊分片訂閱container.addMessageListener(listener, new PatternTopic("__shard__:" + channel));// 存儲引用以便后續取消訂閱subscriptions.put(channel, listener);}public void unsubscribe(String channel) {MessageListener listener = subscriptions.remove(channel);if (listener != null) {redisTemplate.execute((RedisCallback<Void>) connection -> {connection.execute("SUNSUBSCRIBE", channel.getBytes());return null;});}}
}

實際應用場景

1. 地理位置感知的消息推送

在基于地理位置的應用中,可以使用分片Pub/Sub向特定區域的用戶推送消息:

@Service
public class LocationBasedNotificationService {private final ShardedPubSubService pubSubService;private final UserLocationService locationService;// 發送區域通知public void sendAreaNotification(String areaCode, String message) {String channel = "location.area." + areaCode;pubSubService.publish(channel, message);}// 用戶訂閱自己所在區域的通知public void subscribeUserToAreaNotifications(String userId) {String userArea = locationService.getUserAreaCode(userId);String channel = "location.area." + userArea;pubSubService.subscribe(channel, message -> {// 處理接收到的區域通知notifyUser(userId, message);});}
}
2. 實時聊天系統

分片Pub/Sub非常適合大規模聊天應用,減輕服務器負擔:

@Service
public class ChatService {private final ShardedPubSubService pubSubService;// 發送聊天消息public void sendChatMessage(String roomId, ChatMessage message) {String channel = "chat.room." + roomId;String messageJson = objectMapper.writeValueAsString(message);pubSubService.publish(channel, messageJson);}// 用戶加入聊天室public void joinChatRoom(String userId, String roomId) {String channel = "chat.room." + roomId;// 訂閱聊天室消息pubSubService.subscribe(channel, messageJson -> {ChatMessage message = objectMapper.readValue(messageJson, ChatMessage.class);// 將消息發送到用戶WebSocketwebSocketService.sendToUser(userId, message);});// 發送加入通知ChatMessage joinMessage = new ChatMessage("system", userId + " joined the room", System.currentTimeMillis());pubSubService.publish(channel, objectMapper.writeValueAsString(joinMessage));}
}
3. 分布式系統狀態同步

使用分片Pub/Sub實現微服務間的高效狀態同步:

@Service
public class SystemStateManager {private final ShardedPubSubService pubSubService;@PostConstructpublic void init() {// 訂閱配置更新pubSubService.subscribe("system.config", this::handleConfigUpdate);// 訂閱服務狀態變更pubSubService.subscribe("system.service." + getCurrentServiceName(), this::handleServiceCommand);}// 發布配置變更public void publishConfigChange(String configKey, String configValue) {ConfigChangeEvent event = new ConfigChangeEvent(configKey, configValue, System.currentTimeMillis());pubSubService.publish("system.config", objectMapper.writeValueAsString(event));}// 發送服務指令public void sendServiceCommand(String serviceName, String command, Map<String, Object> params) {ServiceCommand cmd = new ServiceCommand(command, params, System.currentTimeMillis());pubSubService.publish("system.service." + serviceName, objectMapper.writeValueAsString(cmd));}private void handleConfigUpdate(String message) {ConfigChangeEvent event = objectMapper.readValue(message, ConfigChangeEvent.class);// 更新本地配置configManager.updateConfig(event.getKey(), event.getValue());}private void handleServiceCommand(String message) {ServiceCommand command = objectMapper.readValue(message, ServiceCommand.class);// 執行命令commandExecutor.execute(command.getCommand(), command.getParams());}
}

最佳實踐

  1. 頻道命名規范:使用層次化的命名方式(如"category.subcategory.id")
  2. 消息序列化:使用JSON或其他格式序列化消息,便于跨語言使用
  3. 錯誤處理:在訂閱處理程序中添加異常處理邏輯
  4. 合理分片:根據業務特性合理設計頻道分布
  5. 組合傳統Pub/Sub:某些需要全局廣播的場景可以繼續使用傳統Pub/Sub

特性三:多部分AOF(Multi-part AOF)

技術原理

Redis 7.0對AOF(Append Only File)持久化機制進行了重大改進,引入了多部分AOF文件結構。傳統AOF是單一文件,在重寫時會導致磁盤壓力和性能波動。新的多部分AOF由基礎文件(base files)和增量文件(incremental files)組成,提供更高效的持久化機制。

關鍵優勢:

  • 減少AOF重寫期間的磁盤I/O壓力
  • 降低內存使用峰值
  • 更快的重寫過程,減少性能波動
  • 可靠性提升,減少數據丟失風險

配置示例

# redis.conf 配置# 啟用AOF持久化
appendonly yes# 使用新的多部分AOF格式
aof-use-rdb-preamble yes# 設置AOF目錄
dir /data/redis# 文件名前綴 (Redis 7.0新增)
appendfilename "appendonly.aof"# AOF自動重寫
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

工作原理詳解

多部分AOF的工作流程如下:

  1. 初始化:Redis在啟動時創建一個基礎(base)AOF文件和一個增量(incr)AOF文件
  2. 命令記錄:新的寫入命令追加到增量AOF文件中
  3. 重寫觸發:當滿足重寫條件時,Redis創建一個新的基礎文件
  4. 文件管理:重寫完成后,歷史增量文件被清理,新的增量文件開始接收命令

文件命名:

  • 基礎文件:appendonly_base.aof
  • 增量文件:appendonly_incr.aof.{seq}
  • 清單文件:appendonly.manifest.json(包含所有AOF文件的信息)

實際應用場景

1. 高寫入量數據庫優化

對于高頻寫入的Redis實例,多部分AOF可以顯著減少寫入峰值帶來的性能波動:

# 針對高寫入量的配置優化# 使用更激進的AOF fsync策略,減少數據丟失風險
appendfsync everysec# 優化內存使用
aof-rewrite-incremental-fsync yes# 增加重寫閾值,減少重寫頻率
auto-aof-rewrite-percentage 200
auto-aof-rewrite-min-size 128mb
2. 快速恢復方案實現

利用多部分AOF特性實現更快的數據恢復策略:

@Service
public class RedisPersistenceManager {@Value("${redis.data.dir}")private String redisDataDir;// 執行AOF分析public AOFAnalysisResult analyzeAOF() {File manifestFile = new File(redisDataDir, "appendonly.manifest.json");if (!manifestFile.exists()) {return new AOFAnalysisResult(false, "Manifest file not found");}// 解析清單文件AOFManifest manifest = objectMapper.readValue(manifestFile, AOFManifest.class);// 分析AOF文件信息long totalSize = 0;long commandCount = 0;File baseFile = new File(redisDataDir, manifest.getBaseAofName());totalSize += baseFile.length();for (String incrFile : manifest.getIncrAofNames()) {File f = new File(redisDataDir, incrFile);totalSize += f.length();commandCount += countCommands(f);}return new AOFAnalysisResult(true, "Base: " + manifest.getBaseAofName() + ", Incremental: " + manifest.getIncrAofNames().size() + ", Total size: " + formatSize(totalSize) + ", Commands: " + commandCount);}// 手動觸發AOF重寫public void triggerAofRewrite() {redisTemplate.execute((RedisCallback<String>) connection -> {connection.serverCommands().bgrewriteaof();return null;});}
}
3. 系統升級與遷移策略

在系統升級或Redis遷移場景中,利用多部分AOF簡化流程:

@Service
public class RedisMigrationService {// 準備Redis遷移public MigrationPlan prepareMigration(String sourceRedisUrl, String targetRedisUrl) {MigrationPlan plan = new MigrationPlan();// 1. 分析源Redis AOF狀態AOFAnalysisResult aofAnalysis = analyzeSourceRedisAOF(sourceRedisUrl);plan.setAofAnalysis(aofAnalysis);// 2. 如果未使用多部分AOF,建議升級if (!aofAnalysis.isMultiPartAof()) {plan.addStep("Enable multi-part AOF on source Redis");}// 3. 觸發AOF重寫以創建干凈的基礎文件plan.addStep("Trigger AOF rewrite to create clean base file");// 4. 設置數據傳輸步驟plan.addStep("Copy base AOF file to target server");plan.addStep("Start target Redis with base file");plan.addStep("Continue copying incremental files during migration");return plan;}// 執行遷移過程中的增量同步public void syncIncrementalAOF(String sourceRedisDataDir, String targetRedisDataDir) {// 讀取源Redis的清單文件AOFManifest sourceManifest = readManifest(sourceRedisDataDir);// 讀取目標Redis的清單文件AOFManifest targetManifest = readManifest(targetRedisDataDir);// 找出目標缺少的增量文件List<String> filesToSync = new ArrayList<>();for (String incrFile : sourceManifest.getIncrAofNames()) {if (!targetManifest.getIncrAofNames().contains(incrFile)) {filesToSync.add(incrFile);}}// 同步缺少的文件for (String file : filesToSync) {copyFile(new File(sourceRedisDataDir, file), new File(targetRedisDataDir, file));}// 更新目標Redis的清單文件writeManifest(targetRedisDataDir, sourceManifest);}
}

最佳實踐

  1. 磁盤規劃:為AOF文件分配足夠的磁盤空間,預留增長空間
  2. 監控AOF狀態:定期檢查AOF文件大小和重寫頻率
  3. 備份策略:將AOF文件納入常規備份計劃
  4. fsync策略選擇:根據數據重要性和性能需求選擇合適的fsync策略
  5. 與RDB結合:在某些場景下同時啟用RDB快照,提供額外保護

特性四:訪問控制列表(ACL)增強

技術原理

Redis 7.0對ACL(Access Control List)系統進行了重要增強,提供了更精細的權限控制機制。新增的ACL功能包括Pub/Sub頻道權限控制、KEYS命令的模式匹配權限,以及針對Redis Functions的權限管理。

關鍵增強:

  • 支持對Pub/Sub頻道的讀寫權限控制
  • 能夠定義KEYS命令可查詢的鍵模式
  • 對Redis Functions的調用權限控制
  • 改進的權限繼承和組合模式

實現示例

ACL規則配置
# 創建只能訪問特定前綴鍵且只讀的用戶
ACL SETUSER readonly ON >secret123 ~product:* +get +scan +keys +zrange +hgetall# 創建有Pub/Sub特定頻道權限的用戶
ACL SETUSER publisher ON >pubpassword ~notification:* +@all &channel:notifications:*# 創建可以調用特定函數的用戶
ACL SETUSER func_user ON >funcpass ~* +@all %f:mycounter:incr_by_and_get# 使用鍵模式限制keys命令
ACL SETUSER admin ON >adminpass ~* +@all %K~user:*
Java配置示例
@Configuration
public class RedisSecurityConfig {@Beanpublic RedisConnectionFactory redisConnectionFactory() {LettuceConnectionFactory factory = new LettuceConnectionFactory();factory.setUsername("app_user");factory.setPassword("app_password");return factory;}@Bean@Profile("admin")public RedisSecurityService redisSecurityService(StringRedisTemplate redisTemplate) {return new RedisSecurityService(redisTemplate);}
}@Service
public class RedisSecurityService {private final StringRedisTemplate redisTemplate;public RedisSecurityService(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void createReadOnlyUser(String username, String password, List<String> keyPatterns) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password);// 添加鍵模式for (String pattern : keyPatterns) {aclCommand.append(" ~").append(pattern);}// 添加只讀命令權限aclCommand.append(" +get +mget +scan +keys +hget +hgetall +zrange +scard +smembers +lrange +info");redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public void createFunctionUser(String username, String password, String libraryName, List<String> functions) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password).append(" ~*"); // 允許訪問所有鍵// 添加基本命令權限aclCommand.append(" +@read +@write");// 添加函數調用權限for (String function : functions) {aclCommand.append(" %f:").append(libraryName).append(":").append(function);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public void createPubSubUser(String username, String password, List<String> channelPatterns, boolean publishOnly) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password).append(" ~*"); // 通常Pub/Sub用戶不需要鍵訪問權限,但根據需要可以調整// 添加Pub/Sub權限if (publishOnly) {aclCommand.append(" +publish");} else {aclCommand.append(" +publish +subscribe +psubscribe");}// 添加頻道權限for (String pattern : channelPatterns) {aclCommand.append(" &").append(pattern);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public List<Map<String, Object>> listUsers() {List<Object> result = (List<Object>) redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute("ACL LIST"));List<Map<String, Object>> users = new ArrayList<>();if (result != null) {for (Object userInfo : result) {String[] parts = userInfo.toString().split(" ");Map<String, Object> user = new HashMap<>();user.put("username", parts[1]);// 解析權限信息List<String> flags = new ArrayList<>();List<String> commands = new ArrayList<>();List<String> keys = new ArrayList<>();List<String> channels = new ArrayList<>();List<String> functions = new ArrayList<>();for (int i = 2; i < parts.length; i++) {String part = parts[i];if (part.startsWith("+") || part.startsWith("-")) {commands.add(part);} else if (part.startsWith("~")) {keys.add(part);} else if (part.startsWith("&")) {channels.add(part);} else if (part.startsWith("%")) {functions.add(part);} else if (part.equals("on") || part.equals("off")) {flags.add(part);}}user.put("flags", flags);user.put("commands", commands);user.put("keys", keys);user.put("channels", channels);user.put("functions", functions);users.add(user);}}return users;}
}

實際應用場景

1. 多租戶SaaS應用

為不同租戶創建隔離的Redis訪問權限:

@Service
public class TenantRedisManager {private final RedisSecurityService redisSecurityService;public void setupNewTenant(String tenantId) {// 為租戶創建只能訪問自己數據的用戶String username = "tenant_" + tenantId;String password = generateSecurePassword();// 鍵模式限制List<String> keyPatterns = Arrays.asList("tenant:" + tenantId + ":*","shared:public:*");// 創建用戶redisSecurityService.createReadOnlyUser(username, password, keyPatterns);// 保存憑證(安全存儲)credentialManager.storeTenantRedisCredentials(tenantId, username, password);// 創建具有寫入權限的用戶String adminUsername = "tenant_" + tenantId + "_admin";String adminPassword = generateSecurePassword();StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(adminUsername).append(" ON >").append(adminPassword);// 添加鍵模式for (String pattern : keyPatterns) {aclCommand.append(" ~").append(pattern);}// 添加完整讀寫權限aclCommand.append(" +@all");// 執行ACL命令redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));// 保存管理員憑證credentialManager.storeTenantRedisAdminCredentials(tenantId, adminUsername, adminPassword);}
}
2. 消息系統中的發布者與訂閱者分離

在消息系統中創建不同角色的用戶:

@Service
public class NotificationSystemManager {private final RedisSecurityService redisSecurityService;// 設置消息發布者public void setupPublisherAccount(String system, String password) {List<String> channels = Arrays.asList("notifications:" + system + ":*");redisSecurityService.createPubSubUser("publisher_" + system, password, channels, true  // 只有發布權限);}// 設置訂閱者public void setupSubscriberAccount(String subscriberId, String password, List<String> systems) {List<String> channels = systems.stream().map(system -> "notifications:" + system + ":*").collect(Collectors.toList());StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER subscriber_").append(subscriberId).append(" ON >").append(password).append(" ~*")  // 不需要鍵訪問權限.append(" +subscribe +psubscribe");  // 只有訂閱權限// 添加頻道權限for (String channel : channels) {aclCommand.append(" &").append(channel);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}
}
3. API網關限流功能

為API網關創建有限制的Redis Functions調用權限:

@Service
public class ApiGatewayRateLimiterService {private final RedisSecurityService redisSecurityService;private final StringRedisTemplate adminRedisTemplate;@PostConstructpublic void setupRateLimiter() {// 部署限流函數庫String rateLimiterScript = """#!lua name=ratelimiterredis.register_function('check_rate_limit', function(keys, args)local key = keys[1]local limit = tonumber(args[1])local window = tonumber(args[2])local current = redis.call('INCR', key)if current == 1 thenredis.call('EXPIRE', key, window)endreturn current <= limitend)""";adminRedisTemplate.execute((RedisCallback<Object>) connection -> connection.serverCommands().functionLoad(rateLimiterScript));// 為API網關創建專用用戶String gatewayUser = "api_gateway";String gatewayPassword = secureRandomPassword();List<String> functions = Arrays.asList("check_rate_limit");redisSecurityService.createFunctionUser(gatewayUser, gatewayPassword, "ratelimiter", functions);// 安全保存憑證configService.saveApiGatewayRedisCredentials(gatewayUser, gatewayPassword);}
}

最佳實踐

  1. 最小權限原則:為每個用戶僅分配必要的權限
  2. 密碼復雜性:使用強密碼,并定期輪換
  3. 功能分割:按照功能角色創建不同的用戶,避免單一用戶擁有過多權限
  4. 監控ACL操作:記錄和審計ACL變更
  5. 分層權限模型:實現權限繼承和組合,簡化管理
  6. 定期審核:定期檢查和清理不再使用的賬戶

特性五:客戶端緩存增強

技術原理

Redis 7.0對客戶端緩存(Client-side Caching)進行了增強,使其更加實用和高效。客戶端緩存允許Redis客戶端在本地緩存數據,通過服務器通知機制在數據變更時使緩存失效。Redis 7.0添加了對集群環境和哈希子字段的支持,擴展了這一功能的應用范圍。

關鍵優勢:

  • 減少網絡請求和Redis服務器負載
  • 降低讀取操作延遲
  • 支持分片集群環境的客戶端緩存
  • 支持哈希字段級別的緩存控制
  • 改進的內存效率和緩存追蹤

實現原理

客戶端緩存有兩種模式:

  1. 跟蹤模式:Redis服務器記錄每個客戶端緩存的鍵,當鍵變更時發送通知
  2. 廣播模式:服務器廣播所有鍵的變更,客戶端根據自己的緩存內容決定是否使緩存失效

Redis 7.0中的改進包括:

  • 集群環境中的緩存一致性
  • 哈希字段級別的追蹤
  • 優化的內存使用

實現示例

使用Lettuce客戶端實現
@Configuration
public class RedisClientCacheConfig {@Beanpublic ClientResources clientResources() {return ClientResources.builder().build();}@Beanpublic RedisClient redisClient(ClientResources clientResources) {return RedisClient.create(clientResources, RedisURI.create("redis://localhost:6379"));}@Beanpublic StatefulRedisConnection<String, String> connection(RedisClient redisClient) {return redisClient.connect();}@Beanpublic RedisCommands<String, String> redisCommands(StatefulRedisConnection<String, String> connection) {return connection.sync();}
}@Service
public class CachingRedisClient {private final RedisCommands<String, String> redis;private final StatefulRedisConnection<String, String> connection;private final Map<String, String> localCache = new ConcurrentHashMap<>();private final Set<String> trackedKeys = ConcurrentHashMap.newKeySet();public CachingRedisClient(RedisCommands<String, String> redis, StatefulRedisConnection<String, String> connection) {this.redis = redis;this.connection = connection;setupClientCaching();}private void setupClientCaching() {// 設置失效通知處理器connection.addListener(message -> {if (message instanceof PushMessage) {PushMessage pushMessage = (PushMessage) message;if ("invalidate".equals(pushMessage.getType())) {List<Object> invalidations = pushMessage.getContent();// 處理失效通知processInvalidations(invalidations);}}});// 啟用客戶端緩存,使用跟蹤模式redis.clientTracking(ClientTrackingArgs.Builder.enabled().bcast().prefixes("user:", "product:").optIn());}public String get(String key) {// 先檢查本地緩存String cachedValue = localCache.get(key);if (cachedValue != null) {return cachedValue;}// 本地緩存未命中,從Redis獲取String value = redis.get(key);if (value != null) {// 存入本地緩存localCache.put(key, value);trackedKeys.add(key);}return value;}public void set(String key, String value) {// 更新Redisredis.set(key, value);// 更新本地緩存localCache.put(key, value);trackedKeys.add(key);}private void processInvalidations(List<Object> invalidations) {if (invalidations.size() >= 2) {String invalidationType = new String((byte[]) invalidations.get(0));if ("key".equals(invalidationType)) {// 單個鍵失效String key = new String((byte[]) invalidations.get(1));localCache.remove(key);trackedKeys.remove(key);} else if ("prefix".equals(invalidationType)) {// 前綴失效String prefix = new String((byte[]) invalidations.get(1));// 移除所有匹配前綴的緩存項Iterator<Map.Entry<String, String>> it = localCache.entrySet().iterator();while (it.hasNext()) {String key = it.next().getKey();if (key.startsWith(prefix)) {it.remove();trackedKeys.remove(key);}}}}}// 手動使緩存失效public void invalidateCache(String key) {localCache.remove(key);trackedKeys.remove(key);}// 獲取緩存統計信息public Map<String, Object> getCacheStats() {Map<String, Object> stats = new HashMap<>();stats.put("cacheSize", localCache.size());stats.put("trackedKeysCount", trackedKeys.size());// 簡單的統計信息Map<String, Integer> prefixCounts = new HashMap<>();for (String key : localCache.keySet()) {String prefix = key.split(":")[0] + ":";prefixCounts.put(prefix, prefixCounts.getOrDefault(prefix, 0) + 1);}stats.put("prefixCounts", prefixCounts);return stats;}
}

哈希字段級別緩存

@Service
public class HashFieldCachingService {private final RedisCommands<String, String> redis;private final StatefulRedisConnection<String, String> connection;private final Map<String, Map<String, String>> hashCache = new ConcurrentHashMap<>();public HashFieldCachingService(RedisCommands<String, String> redis, StatefulRedisConnection<String, String> connection) {this.redis = redis;this.connection = connection;setupClientCaching();}private void setupClientCaching() {// 設置失效通知處理器connection.addListener(message -> {if (message instanceof PushMessage) {PushMessage pushMessage = (PushMessage) message;if ("invalidate".equals(pushMessage.getType())) {List<Object> invalidations = pushMessage.getContent();// 處理失效通知processInvalidations(invalidations);}}});// 啟用客戶端緩存,使用跟蹤模式redis.clientTracking(ClientTrackingArgs.Builder.enabled().prefixes("user:", "product:").optIn());}// 獲取哈希字段public String hget(String key, String field) {// 先檢查本地緩存Map<String, String> cachedHash = hashCache.get(key);if (cachedHash != null && cachedHash.containsKey(field)) {return cachedHash.get(field);}// 本地緩存未命中,從Redis獲取String value = redis.hget(key, field);if (value != null) {// 存入本地緩存cachedHash = hashCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());cachedHash.put(field, value);}return value;}// 獲取整個哈希public Map<String, String> hgetall(String key) {// 先檢查本地緩存是否有完整哈希Map<String, String> cachedHash = hashCache.get(key);// 如果不存在或者不確定是否完整,從Redis獲取Map<String, String> redisHash = redis.hgetall(key);if (!redisHash.isEmpty()) {// 更新本地緩存hashCache.put(key, new ConcurrentHashMap<>(redisHash));return redisHash;}return cachedHash != null ? cachedHash : new HashMap<>();}// 設置哈希字段public void hset(String key, String field, String value) {// 更新Redisredis.hset(key, field, value);// 更新本地緩存Map<String, String> cachedHash = hashCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());cachedHash.put(field, value);}private void processInvalidations(List<Object> invalidations) {if (invalidations.size() >= 2) {String invalidationType = new String((byte[]) invalidations.get(0));if ("key".equals(invalidationType)) {// 單個鍵失效String key = new String((byte[]) invalidations.get(1));hashCache.remove(key);} else if ("prefix".equals(invalidationType)) {// 前綴失效String prefix = new String((byte[]) invalidations.get(1));// 移除所有匹配前綴的緩存項hashCache.keySet().removeIf(key -> key.startsWith(prefix));}}}
}

實際應用場景

1. 用戶配置文件管理

在需要頻繁讀取用戶個人信息但寫入較少的場景中:

@Service
public class UserProfileService {private final CachingRedisClient redisClient;// 獲取用戶資料public UserProfile getUserProfile(String userId) {String cacheKey = "user:" + userId + ":profile";// 利用客戶端緩存獲取數據String profileJson = redisClient.get(cacheKey);if (profileJson != null) {return objectMapper.readValue(profileJson, UserProfile.class);}return null;}// 更新用戶資料public void updateUserProfile(String userId, UserProfile profile) {String cacheKey = "user:" + userId + ":profile";// 序列化為JSONString profileJson = objectMapper.writeValueAsString(profile);// 更新Redis,客戶端緩存會自動更新redisClient.set(cacheKey, profileJson);// 記錄審計日志logProfileUpdate(userId, profile);}// 批量獲取多個用戶資料public Map<String, UserProfile> getUserProfiles(List<String> userIds) {Map<String, UserProfile> results = new HashMap<>();for (String userId : userIds) {UserProfile profile = getUserProfile(userId);if (profile != null) {results.put(userId, profile);}}return results;}
}
2. 產品目錄展示

電商平臺中的產品信息緩存:

@Service
public class ProductCatalogService {private final HashFieldCachingService hashCache;// 獲取產品基本信息public Product getProductBasicInfo(String productId) {String key = "product:" + productId;// 獲取基本信息字段String name = hashCache.hget(key, "name");String price = hashCache.hget(key, "price");String category = hashCache.hget(key, "category");if (name != null && price != null) {Product product = new Product();product.setId(productId);product.setName(name);product.setPrice(Double.parseDouble(price));product.setCategory(category);return product;}return null;}// 獲取產品完整信息public ProductDetails getProductDetails(String productId) {String key = "product:" + productId;// 獲取完整哈希Map<String, String> productData = hashCache.hgetall(key);if (productData.isEmpty()) {return null;}// 構建產品詳情對象ProductDetails details = new ProductDetails();details.setId(productId);details.setName(productData.get("name"));details.setPrice(Double.parseDouble(productData.get("price")));details.setCategory(productData.get("category"));details.setDescription(productData.get("description"));details.setBrand(productData.get("brand"));// 處理可選字段if (productData.containsKey("stock")) {details.setStock(Integer.parseInt(productData.get("stock")));}if (productData.containsKey("rating")) {details.setRating(Double.parseDouble(productData.get("rating")));}// 處理圖片列表if (productData.containsKey("images")) {details.setImages(Arrays.asList(productData.get("images").split(",")));}return details;}// 更新產品價格public void updateProductPrice(String productId, double newPrice) {String key = "product:" + productId;hashCache.hset(key, "price", String.valueOf(newPrice));// 記錄價格變更日志logPriceChange(productId, newPrice);}
}
3. 分布式配置管理

管理應用配置并實時同步更新:

@Service
public class DistributedConfigService {private final CachingRedisClient redisClient;private final Map<String, Map<String, ConfigValue>> configCache = new ConcurrentHashMap<>();// 獲取配置值public String getConfigValue(String application, String key) {String cacheKey = "config:" + application + ":" + key;// 使用客戶端緩存獲取值String value = redisClient.get(cacheKey);if (value != null) {// 解析JSON值ConfigValue configValue = objectMapper.readValue(value, ConfigValue.class);return configValue.getValue();}return null;}// 更新配置值public void setConfigValue(String application, String key, String value) {String cacheKey = "config:" + application + ":" + key;// 創建帶版本的配置值對象ConfigValue configValue = new ConfigValue();configValue.setValue(value);configValue.setVersion(System.currentTimeMillis());configValue.setUpdatedBy(getCurrentUser());// 序列化為JSONString valueJson = objectMapper.writeValueAsString(configValue);// 更新RedisredisClient.set(cacheKey, valueJson);// 發布配置變更事件publishConfigChangeEvent(application, key, value);}// 獲取應用的所有配置public Map<String, String> getAllConfig(String application) {// 使用SCAN命令查找所有應用配置鍵Set<String> configKeys = scanKeys("config:" + application + ":*");Map<String, String> config = new HashMap<>();for (String fullKey : configKeys) {String key = fullKey.substring(("config:" + application + ":").length());String value = getConfigValue(application, key);if (value != null) {config.put(key, value);}}return config;}
}

最佳實踐

  1. 選擇合適的緩存模式:根據數據訪問模式選擇追蹤或廣播模式
  2. 控制緩存粒度:對于頻繁變動的數據,考慮使用更細粒度的緩存
  3. 緩存大小管理:使用LRU或其他策略控制本地緩存大小
  4. 設置過期策略:為本地緩存設置合理的過期時間
  5. 優雅處理失效通知:實現健壯的失效處理邏輯
  6. 監控緩存效率:跟蹤緩存命中率和內存使用情況

總結

Redis 7.0通過這五大核心特性:Redis Functions、分片Pub/Sub、多部分AOF、ACL增強以及客戶端緩存優化,顯著提升了Redis的功能性、性能和可靠性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/79994.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/79994.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/79994.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PHP實現PDF自動簽名

技術要點&#xff1a;在PDF中找到一個固定錨點&#xff0c;在需要放置圖片的地方找到測試出錨點對應的XY位 // 使用了poppler方法&#xff0c;其他PDF庫在獲取坐標方面有各種問題&#xff0c;他的安裝是在Linux底層&#xff0c;比在PHP項目中用Composer安裝的庫看上去更穩定&a…

中達瑞和便攜式高光譜相機:珠寶鑒定領域的“光譜之眼”

在珠寶行業中&#xff0c;真偽鑒定始終是核心需求。隨著合成技術與優化處理手段的日益精進&#xff0c;傳統鑒定方法逐漸面臨挑戰。中達瑞和推出的便攜式高光譜相機&#xff0c;憑借其獨特的“圖譜合一”技術&#xff0c;為珠寶真假鑒定提供了科學、高效且無損的解決方案&#…

2025年滲透測試面試題總結-某戰隊紅隊實習面經(附回答)(題目+回答)

網絡安全領域各種資源&#xff0c;學習文檔&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各種好玩的項目及好用的工具&#xff0c;歡迎關注。 目錄 某戰隊紅隊實習面經 個人經歷與技術能力 2. HVV/攻防演練成績 3. 上一個工作主要內容 4. 有意思的邏…

【PostgreSQL數據分析實戰:從數據清洗到可視化全流程】5.1 描述性統計分析(均值/方差/分位數計算)

&#x1f449; 點擊關注不迷路 &#x1f449; 點擊關注不迷路 &#x1f449; 點擊關注不迷路 文章大綱 5.1 描述性統計分析&#xff1a;均值、方差與分位數計算實戰5.1.1 數據準備與分析目標數據集介紹分析目標 5.1.2 均值計算&#xff1a;從整體到分組分析總體均值計算加權均值…

npm下載插件無法更新package.json和package-lock.json文件的解決辦法

經過多番查證&#xff0c;使用npm config ls查看相關配置等方式&#xff0c;最后發現全局的.npmrc文件的配置多寫了globaltrue&#xff0c;去掉就好了 如果參數很多&#xff0c;不知道是哪個參數引起的&#xff0c;先只保留registryhttp://xxx/&#xff0c;試試下載&#xff0…

基于Anaconda的Pycharm環境配置

一、前提條件&#xff1a; 1、默認已安裝完Anaconda&#xff0c;且創建虛擬環境&#xff0c;參見https://blog.csdn.net/XIAOWEI_JIN/article/details/147657029?spm1001.2014.3001.5501 2、已安裝pycharm&#xff0c;下載鏈接見Pycharm官網&#xff0c;以下以PyCharm 2024.…

Word域操作記錄(從1開始的畢業論文格式排版)

傻逼Word。 寫在最前面 如果你的文章不包括&#xff1a;自動目錄、交叉引用、自動題注。請關閉此頁面。繼續閱讀本文是在浪費您用于跟格式如泥潭里纏斗的時間。 本文內容概述 從指導手冊到畢設初稿 基于多級列表的自動目錄生成方法 正片開始 關于文字 拿到畢設手冊&#…

Linux中的web服務

什么是www www是world wide web的縮寫&#xff0c;及萬維網&#xff0c;也就是全球信息廣播的意思 通常說的上網就是使用www來查詢用戶所需要的信息。 www可以結合文字、圖形、影像以及聲音等多媒體&#xff0c;超鏈接的方式將信息以Internet傳遞到世界各 處去。 當你連接w…

linux -c程序開發

目的是在linux中創建可執行的c語言程序的步驟 和gcc,make和git的簡單運用 建立可執行程序的步驟: -1:預處理: --:頭文件展開;--去掉注釋;--宏替換;--條件編譯 -2:編譯 --:將預處理之后的c語言替換為匯編語言帶阿米 --:語法分析,語義分析,代碼生成 --:檢查語法正確性并且優…

Netty 是一個基于 Java NIO 的高性能網絡通信框架

Netty 是一個基于 Java NIO 的高性能網絡通信框架&#xff0c;廣泛應用于構建分布式系統、RPC 框架、即時通信系統等場景。它的核心設計目標是 異步、非阻塞、高可擴展性&#xff0c;其底層原理涉及 事件驅動模型、線程模型、內存管理 等關鍵技術。以下是 Netty 的核心原理和架…

UI 庫 Ant Design 中的 Table 表格和分頁器:快速實現數據展示和分頁功能

&#x1f90d; 前端開發工程師、技術日更博主、已過CET6 &#x1f368; 阿珊和她的貓_CSDN博客專家、23年度博客之星前端領域TOP1 &#x1f560; 牛客高級專題作者、打造專欄《前端面試必備》 、《2024面試高頻手撕題》、《前端求職突破計劃》 &#x1f35a; 藍橋云課簽約作者、…

Java實現堆排序算法

1. 堆排序原理圖解 堆排序是一種基于二叉堆&#xff08;通常使用最大堆&#xff09;的排序算法。其核心思想是利用堆的性質&#xff08;父節點的值大于或等于子節點的值&#xff09;來高效地進行排序。堆排序分為兩個主要階段&#xff1a;建堆和排序。 堆排序步驟&#xff1a; …

【Hive入門】Hive安全管理與權限控制:審計日志全解析,構建完善的操作追蹤體系

目錄 引言 1 Hive審計日志概述 1.1 審計日志的核心價值 1.2 Hive審計日志類型 2 HiveServer2操作日志配置 2.1 基礎配置方案 2.2 日志格式解析 2.3 日志輪轉配置 3 Metastore審計配置 3.1 Metastore審計啟用 3.2 審計事件類型 4 高級審計方案 4.1 與Apache Ranger…

力扣-hot100 (缺失的第一個正數)

41. 缺失的第一個正數 困難 給你一個未排序的整數數組 nums &#xff0c;請你找出其中沒有出現的最小的正整數。 請你實現時間復雜度為 O(n) 并且只使用常數級別額外空間的解決方案。 示例 1&#xff1a; 輸入&#xff1a;nums [1,2,0] 輸出&#xff1a;3 解釋&#xff…

13前端項目----購物車修改

購物車修改 uuid臨時游客身份購物車部分功能全選修改商品數量修改商品勾選狀態刪除產品 uuid臨時游客身份 請求數據倉庫發起請求 ->問題&#xff1a;獲取不到購物車數據&#xff1f; 所以需要一個身份&#xff0c;告訴服務器是誰存的數據&#xff1f;是要獲取誰的數據&…

Mac電腦,idea突然文件都展示成了文本格式,導致ts,tsx文件都不能正常加載或提示異常,解決方案詳細說明如下

有一天使用clean my mac軟件清理電腦 突然發現idea出現了文件都以文本格式展示&#xff0c;如圖所示 然后就卸載&#xff0c;計劃重新安裝&#xff0c;安裝了好幾個版本&#xff0c;并且setting->file types怎么設置都展示不對&#xff0c;考慮是否idea沒卸載干凈&#xff…

Nginx搭建test服務器

創建test域名 進入阿里云添加解析 創建域名:test.xxxxx.com 服務器復制項目代碼 新建目錄,Git拉取項目代碼,安裝上插件包 修改配置文件,啟動測試服務 修改配置文件“服務器接口” 開啟服務pm2 start app.js --name "test" 表格含義: 列名含義說明id進程在…

MyBatis-Plus 非 Spring 環境使用時 `GenericTypeResolver` 缺失問題總結

MyBatis-Plus 非 Spring 環境使用時 GenericTypeResolver 缺失問題總結 問題描述 在非 Spring 環境中使用 MyBatis-Plus 3.4.3.1 及以上版本時&#xff0c;啟動程序會拋出以下錯誤&#xff1a; Exception in thread "main" java.lang.NoClassDefFoundError: org/s…

綜合案例:使用vuex對購物車的商品數量和價格等公共數據進行狀態管理

文章目錄 0.實現需求1.新建購物車模塊cart2.使用json-server模擬向后端請求數據3.在vuex請求獲取并存入數據,并映射到組件中,在組件中渲染【重點】3.1.安裝axios3.2.準備actions和mutations,獲取和存入數據到vuex中3.3.動態渲染:先用mapState映射list到組件頁面 4.點擊修改數量…

《數據結構初階》【順序表 + 單鏈表 + 雙向鏈表】

《數據結構初階》【順序表 單鏈表 順序表】 前言&#xff1a;先聊些其他的東西&#xff01;&#xff01;&#xff01;什么是線性表&#xff1f;什么是順序表&#xff1f;順序表的種類有哪些&#xff1f; 什么是鏈表&#xff1f;鏈表的種類有哪些&#xff1f; ---------------…