目錄
什么是冪等?
解決冪等的常見解決方案:
唯一標識符案例
數據庫唯一約束 案例
樂觀鎖案例
分布式鎖(Distributed Locking)
實踐精選方案
首先 為什么不直接使用分布式鎖呢?
自定義實現冪等組件!
RepeatExecuteLimitAutoConfiguration
repeatExecuteLimitAspect防重復冪等 切面
本地鎖
localLockCache是本地鎖緩存,可根據鎖名和鎖類型(公平鎖/非公平鎖)來獲得ReentrantLock的實例
本地鎖的作用:
分布式鎖
設置冪等標識的作用
注意
什么是冪等?
在web項目中,冪等性同樣是一個重要的概念。這主要是因為在網絡和分布式系統中,由于網絡的不穩定性和其他潛在問題,可能會導致請求被重復發送。如果一個操作不是冪等的,那么重復執行該操作可能會產生不一致的結果或副作用。例如,一個非冪等的操作可能會導致數據被重復添加、更新或刪除,從而破壞數據的一致性。
因此,JavaWeb項目需要保證冪等性,主要是為了確保無論請求被發送一次還是多次,系統都能產生相同的結果。這有助于避免由于重復請求導致的數據不一致或其他潛在問題。實現冪等性的方法有很多種,包括但不限于使用數據庫唯一索引、樂觀鎖、分布式鎖、令牌等技術。
總的來說,冪等性是JavaWeb項目中一個非常重要的概念,它有助于確保系統的穩定性和數據的一致性。通過實現冪等性,我們可以有效地處理重復請求,并減少由于網絡不穩定或其他原因導致的潛在問題。
解決冪等的常見解決方案:
-
唯一標識符(Unique Identifiers): 為每個請求生成一個唯一的標識符(如UUID),并將其作為請求的一部分發送。當接收到請求時,服務器可以檢查該標識符是否已處理過。如果已處理,則拒絕或忽略該請求;如果未處理,則處理該請求并記錄標識符。
-
數據庫唯一約束: 使用數據庫的唯一約束(如主鍵或唯一索引)來確保即使多次嘗試插入相同的數據,也只有一條記錄會被保存。如果嘗試插入重復的數據,數據庫會拋出異常,服務器可以捕獲這個異常并返回冪等性的響應。
-
樂觀鎖(Optimistic Locking): 使用版本號或時間戳來檢查數據是否已被其他操作修改過。在更新數據時,如果版本號或時間戳與預期的不符,則拒絕更新并返回沖突信息。這樣,即使多次嘗試更新相同的數據,也只有一次會成功。
-
分布式鎖(Distributed Locking): 在分布式系統中,可以使用分布式鎖來確保同一時間只有一個節點能夠執行某個操作。這可以防止多個節點同時處理相同的請求,從而實現冪等性。
唯一標識符案例
-
使用 setex 命令存儲ID并設置過期時間,避免內存泄漏。
-
適用于高并發場景,如支付、訂單創建等。
// 生成唯一ID(客戶端)
String requestId = UUID.randomUUID().toString();// 服務端校驗(基于Redis)
public class IdempotencyService {private Jedis jedis; // Redis客戶端public boolean checkRequest(String requestId) {if (jedis.exists(requestId)) {return false; // 已處理,拒絕重復請求}jedis.setex(requestId, 3600, "processed"); // 存儲ID并設置過期時間return true;}
}
數據庫唯一約束 案例
-
場景:用戶注冊防重復。
實現思路:數據庫表中為關鍵字段(如用戶名、郵箱)添加唯一索引,插入重復數據時拋出異常。
-
數據庫自動拒絕重復數據,無需額外代碼判斷
-
適用于新增操作(如注冊、訂單號生成)
// JPA實體類定義
@Entity
@Table(name = "users", uniqueConstraints = @UniqueConstraint(columnNames = "email"))
public class User {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String email; // 唯一字段
}// 插入邏輯
try {userRepository.save(user); // 嘗試插入
} catch (DataIntegrityViolationException e) {throw new DuplicateKeyException("郵箱已存在"); // 捕獲唯一約束異常
}
樂觀鎖案例
場景:庫存扣減防超賣。
實現思路:通過版本號字段控制并發更新,僅當版本匹配時才允許操作。
-
JPA的
@Version
注解自動管理版本號 -
更新失敗時拋出
OptimisticLockingFailureException
,需重試或提示用戶。
// 實體類添加版本號字段
@Entity
public class Product {@Idprivate Long id;private int stock;@Versionprivate int version; // 樂觀鎖版本號
}// 更新邏輯(JPA)
@Transactional
public void deductStock(Long productId, int quantity) {Product product = productRepository.findById(productId).orElseThrow();if (product.getStock() >= quantity) {product.setStock(product.getStock() - quantity);productRepository.save(product); // 自動檢查版本號} else {throw new InsufficientStockException();}
}
分布式鎖(Distributed Locking)
場景:分布式系統中全局資源操作(如優惠券發放)。
實現思路:使用Redisson實現分布式鎖,確保同一時刻僅一個節點執行關鍵邏輯。
-
tryLock
設置超時時間防止死鎖 -
適用于跨服務或集群環境下的資源競爭場景
// 基于Redisson的分布式鎖
public class CouponService {private RedissonClient redissonClient;public void grantCoupon(String userId) {RLock lock = redissonClient.getLock("COUPON_GRANT_" + userId);try {if (lock.tryLock(0, 10, TimeUnit.SECONDS)) { // 嘗試獲取鎖// 執行業務邏輯(如發放優惠券)grantCouponToUser(userId);}} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
方案 | 適用場景 | 優點 | 缺點 |
唯一標識符 | 高并發請求(如支付) | 實現簡單,適合分布式環境 | 需維護ID存儲(如Redis) |
數據庫唯一約束 | 數據唯一性要求(如注冊) | 依賴數據庫特性,無需額外邏輯 | 不適用于更新操作 |
樂觀鎖 | 低沖突更新(如庫存扣減) | 無鎖競爭,性能較高 | 需處理版本沖突和重試邏輯 |
分布式鎖 | 跨服務資源競爭(如秒殺) | 強一致性保證 | 實現復雜,可能影響性能 |
實踐精選方案
除了單純實現冪等,而是要在保證實現冪等的前提下,還要考慮高并發下的高效率執行,不能影響程序的性能和吞吐量
基于上述這些要求,最終選擇利用redis來實現,而在對使用redis上,Redisson又是非常優秀的開源中間件,其中的分布式鎖是非常的經典,項目中也對分布式鎖做了封裝,使用起來靈活而方便,而這次冪等組件也是對Redisson基礎上進行封裝,保證了性能,支持MQ中間件和用戶請求的冪等。
首先 為什么不直接使用分布式鎖呢?
為什么還要額外設計出冪等組件?首先直接使用分布式鎖是可以實現冪等的,當然業務邏輯驗證也要做驗證,但其實分布式鎖會浪費一些性能。
分布式鎖的特點是多個請求并發執行,這些請求是來自不同的用戶,也就是這些請求雖然要依次等待鎖執行,但最終還是要把這些請求都執行完的(執行時間太長超時的異常情況排除),總結起來就是都要獲得鎖,沒有獲得鎖的請求,也要爭取獲得鎖接著執行。
冪等的特點也是多個請求并發執行,但這些請求是來自同一個用戶,也就是說這些請求只要保證第一個請求能執行,其余的請求要直接拒絕掉,總結起來就是只有第一個請求獲得鎖執行就可以,其余的請求看到已經上了鎖,那么就要直接結束掉。
自定義實現冪等組件!
通過定義注解實現哦!
@Target(value = {ElementType.TYPE, ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RepeatExecuteLimit {/*** 業務名稱** @return name*/String name() default "";/*** key設置** @return key*/String[] keys();/*** 在多長時間內一直保持冪等,如果不配置則以執行方法為準*/long durationTime() default 0L;/*** 當消息執行已經出發防重復執行的限制時,提示信息*/String message() default "提交頻繁,請稍后重試";}
RepeatExecuteLimitAutoConfiguration
public class RepeatExecuteLimitAutoConfiguration {@Bean(LockInfoType.REPEAT_EXECUTE_LIMIT)public LockInfoHandle repeatExecuteLimitHandle(){return new RepeatExecuteLimitLockInfoHandle();}@Beanpublic RepeatExecuteLimitAspect repeatExecuteLimitAspect(LocalLockCache localLockCache,LockInfoHandleFactory lockInfoHandleFactory,ServiceLockFactory serviceLockFactory,RedissonDataHandle redissonDataHandle){return new RepeatExecuteLimitAspect(localLockCache, lockInfoHandleFactory,serviceLockFactory,redissonDataHandle);}
}
RepeatExecuteLimitAutoConfiguration是自動裝配類,用于加載需要的對象,repeatExecuteLimitHandle是鎖鍵名處理器、repeatExecuteLimitAspect是冪等切面。
repeatExecuteLimitAspect防重復冪等 切面
@Slf4j
@Aspect
@Order(-11)
@AllArgsConstructor
public class RepeatExecuteLimitAspect {private final LocalLockCache localLockCache;private final LockInfoHandleFactory lockInfoHandleFactory;private final ServiceLockFactory serviceLockFactory;private final RedissonDataHandle redissonDataHandle;@Around("@annotation(repeatLimit)")public Object around(ProceedingJoinPoint joinPoint, RepeatExecuteLimit repeatLimit) throws Throwable {// 指定保持冪等的時間long durationTime = repeatLimit.durationTime();// 提示信息String message = repeatLimit.message();Object obj;// 獲取鎖信息LockInfoHandle lockInfoHandle = lockInfoHandleFactory.getLockInfoHandle(LockInfoType.REPEAT_EXECUTE_LIMIT);// 獲取鎖名String lockName = lockInfoHandle.getLockName(joinPoint,repeatLimit.name(), repeatLimit.keys());// 冪等標識String repeatFlagName = PREFIX_NAME + lockName;// 獲得冪等標識String flagObject = redissonDataHandle.get(repeatFlagName);//如果冪等標識的值為success,說明已經有請求在執行了,這次請求直接結束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}// 獲得本地鎖ReentrantLock localLock = localLockCache.getLock(lockName,true);// 本地鎖嘗試獲取boolean localLockResult = localLock.tryLock();// 如果本地鎖獲取失敗,說明有其他請求在執行,這次請求直接結束if (!localLockResult) {throw new DaMaiFrameException(message);}try {// 獲得分布式鎖ServiceLocker lock = serviceLockFactory.getLock(LockType.Fair);// 嘗試獲取分布式鎖boolean result = lock.tryLock(lockName, TimeUnit.SECONDS, 0);// 加鎖成功執行if (result) {try{// 再次獲取冪等標識flagObject = redissonDataHandle.get(repeatFlagName);// 如果冪等標識的值為success,說明已經有請求在執行了,這次請求直接結束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}obj = joinPoint.proceed();if (durationTime > 0) {try {// 業務邏輯執行成功后,設置 指定冪等保持時間 設置請求標識redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}}else{// 獲得鎖失敗,說明有其他請求在執行,這次請求直接結束throw new DaMaiFrameException(message);}}finally {localLock.unlock();}}
}
RepeatExecuteLimitAspect是負責冪等執行的切面也是核心流程。
負責冪等的切面順序要優先于分布式鎖前,所以這里是-11。
這個實踐中給我最大的震撼是本地鎖、分布式鎖、還有本地緩存的使用!
本地鎖
ReentrantLock localLock = localLockCache.getLock(lockName,true);
boolean localLockResult = localLock.tryLock();
if (!localLockResult) {throw new DaMaiFrameException(message);
}
localLockCache是本地鎖緩存,可根據鎖名和鎖類型(公平鎖/非公平鎖)來獲得ReentrantLock的實例
public class LocalLockCache {/*** 本地鎖緩存* */private Cache<String, ReentrantLock> localLockCache;/*** 本地鎖的過期時間(小時單位)* */@Value("${durationTime:2}")private Integer durationTime;@PostConstructpublic void localLockCacheInit(){localLockCache = Caffeine.newBuilder().expireAfterWrite(durationTime, TimeUnit.HOURS).build();}/*** 獲得鎖,Caffeine的get是線程安全的* */public ReentrantLock getLock(String lockKey,boolean fair){return localLockCache.get(lockKey, key -> new ReentrantLock(fair));}
}
LocalLockCache其實是用Caffeine緩存來保存的鎖信息,并可以設置鎖實例的保存時間,默認是2小時,這個時間可以根據durationTime
來進行配置,如果時間過大,那么鎖的實例就會過多,對項目的內存就會有壓力。如果時間過小,那么構建鎖的頻率就會增加,性能就會受到影響,使用時,可根據業務特點進行靈活配置
Caffeine是基于Java 1.8的高性能本地緩存庫,由Guava改進而來,而且在Spring5開始的默認緩存實現就將Caffeine代替原來的Google Guava,官方說明指出,其緩存命中率已經接近最優值。實際上Caffeine這樣的本地緩存和ConcurrentMap很像,即支持并發,并且支持O(1)時間復雜度的數據存取。二者的主要區別在于:
-
ConcurrentMap將存儲所有存入的數據,直到你顯式將其移除;
-
Caffeine將通過給定的配置,自動移除“不常用”的數據,以保持內存的合理占用。
因此,一種更好的理解方式是:Cache是一種帶有存儲和移除策略的Map。
本地鎖的作用:
之所以先使用本地鎖去加鎖的原因是,可以很大程度上節省分布式鎖的資源,雖然分布式鎖是利用reids實現的,redis的性能又非常的高,但是它再高,依舊存在網絡損耗,而本地鎖的操作都是基于內存中,一個是內存中操作,一個是網絡操作,前者的效率可是后者的幾十倍差距。
如果一秒內有100個請求,服務的實例有5個,那么每個實例就有20個請求,這20個請求就可以靠本地鎖來攔截掉,那么到分布式鎖那里,就有5個請求來獲得鎖了,其余的95個請求都可以被提前結束掉。
這是一個經典的思想,優先考慮本地內存操作,經過本地內存操作后,再去操作第三方中間件。
分布式鎖
當本地鎖獲得了鎖之后,還要用分布式鎖去嘗試獲得鎖,因為本地鎖只能保證當前自己的實例范圍內能鎖住請求,微服務多個實例部署的話,就需要分布式鎖了
//加鎖成功執行
if (result) {try{//再次獲取冪等標識flagObject = redissonDataHandle.get(repeatFlagName);//如果冪等標識的值為success,說明已經有請求在執行了,這次請求直接結束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}//執行業務邏輯obj = joinPoint.proceed();if (durationTime > 0) {try {//業務邏輯執行成功 并且 指定了設置冪等保持時間 設置請求標識redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}
}else{//獲取鎖失敗,說明已經有請求在執行了,這次請求直接結束throw new DaMaiFrameException(message);
}
當通過分布式鎖工廠獲取到客戶端實例后,就會嘗試去獲取分布式鎖了,如果加鎖失敗,說明之前已經有請求獲得了鎖在執行中沒有釋放掉,那么這次請求直接結束
如果加鎖成功則執行業務邏輯joinPoint.proceed()
-
如果執行業務邏輯成功,如果設置了冪等保持時間,那么設置冪等標識
-
如果執行業務邏輯失敗,那么直接釋放鎖
設置冪等標識的作用
有的小伙伴可能會好奇,為什么要設置冪等標識?直接使用本地鎖+分布式鎖不就可以實現冪等了嗎?為什么多此一舉?只使用本地鎖+分布式鎖的方法確實能實現冪等,但此項目是為了高并發的,考慮的細節要全面。
冪等包括用戶請求冪等和MQ消息冪等
要介紹這兩種的特點
-
用戶請求的冪等特點是用戶在短時間內多次的點擊,比如說一秒內發出了10次請求,那么就第1次請求正常執行,而剩余9次請求要全部被攔截將請求直接結束掉,特點是短時間內的多次請求
-
MQ消息冪等特點是MQ為了保證消息的可靠性。在有些異常情況確實會重復投遞的,比如說 某個服務監聽到了消息,接著執行業務邏輯,但在執行過程中,這個服務宕機了,沒有給MQ發送消息提交機制,MQ就會認為消息沒有消費成功,就會再次投遞。但其實這個邏輯都執行成功了,就差給MQ提交確認了。
這就需要保證冪等。而MQ的重試就沒有用戶多次重復請求那么頻繁,可能會1分鐘 5分鐘 10分鐘,這種情況就需要冪等標識,當有了標識后邏輯直接結束。
注意
有的小伙伴剛接觸開發,對并發產生的細節問題不太清楚,比如冪等和分布式鎖,直接使用就覺得沒有問題了,但其實并不是這樣,我們來舉一個例子,比如說添加用戶的操作
-
請求獲得鎖得到執行
-
開啟事務
-
向數據庫中添加用戶
-
提交事務
這個流程其實是有問題的,比如說第一個請求添加了用戶后釋放了鎖,第二個請求是重復提交的,也添加了用戶,這兩個用戶就是重復的。所以要在第2步后,要有驗證用戶是否存在的步驟,這個屬于業務驗證,需要業務來實現,組件只能防止并發重復,并不能防止業務重復。正確的流程:
-
請求獲得鎖得到執行
-
開啟事務
-
查詢用戶是否已存在
-
向數據庫中添加用戶
-
提交事務