目錄
- 一、前言
- 二、如何通過Redis設計一個分布式全局唯一ID生成工具
- 2.1 使用 Redis 計數器實現
- 2.2 使用 Redis Hash結構實現
- 三、通過代碼實現分布式全局唯一ID工具
- 3.1 導入依賴配置
- 3.2 配置yml文件
- 3.3 序列化配置
- 3.4 編寫獲取工具
- 3.5 測試獲取工具
- 四、運行結果
一、前言
在很多項目中生成類似訂單編號、用戶編號等有唯一性數據時還用的UUID工具,或者自己根據時間戳+隨機字符串等組合來生成,在并發小的時候很少出問題,當并發上來時就很可能出現重復編號的問題了,單體項目和分布式項目都是如此,要想解決這個問題也有很多種方法,可以自己寫一個唯一ID生成規則,也可以通過數據庫來實現全局ID生成這個和使用Redis實現其實類似,還可以使用比較成熟的雪花算法工具實現,每種方法都有各自的優缺點這里不展開說明,這里詳細說明如何使用Redis實現生成分布式全局唯一ID。
還有一個問題為什么不能直接使用數據庫的自增ID,而是需要單獨生成一個分布式全局唯一ID,類似訂單IDON202311090001
,在數據庫中有自增ID,對于當前業務來說就是唯一的為什么不能用,還要去生成一個獨立的訂單ID,對于這個問題要從幾個方面分析:
1、數據庫自增ID是有序增長的很容易就被人猜到,比如我現在下一單看到的訂單ID為999那么就知道你的系統里最多只有999單,還有如果接口設計不合理,比如取消訂單接口只校驗了用戶是否登錄沒有校驗訂單是否屬于該用戶,接收一個訂單ID就能將訂單取消,那么這樣很容易就被人抓住漏洞,類似的情況有很多,也很多人寫接口是不會注意這個問題。2、這種自增ID沒有意義,而且不同業務的自增ID是重合的,對于信息區分度很低,而且考慮到多業務交互和用戶端展示也都是不合適的,想想看要是你在某寶下單,訂單ID是999,或者在對接別人訂單系統時,給你的訂單ID是999是不是很奇怪。3、分庫分表時自增ID會重復
全局ID生成器:是一種在【分布式系統下
】用來生成全局唯一ID的工具;
全局ID需要滿足的特性:
1.唯一性
2.高可用:集群、哨兵
機制;
3.高性能
4.遞增性:Redis中的String數據類型的有自增特性!
5.安全性:將自增數值進行拼接,不容易猜出來;
ID結構:
符號位(1位) + 時間戳(31位)
+ 序列號(32位)
;
時間戳為從起始時間
到現在的時間差;
理論上支持1秒鐘2^32個訂單;
二、如何通過Redis設計一個分布式全局唯一ID生成工具
用戶下單調用下單邏輯,先進行業務邏輯處理,然后攜帶訂單ID標識通過分布式全局唯一ID工具獲取一個唯一的訂單ID,這個訂單ID標識就是用于區分業務的,獲取到訂單ID后將數據組裝入庫,分布式全局唯一ID工具可以做成一個內嵌的utils,也可以封裝成一個獨立的jar,還可以做成一個分布式全局唯一ID生成服務供其它業務服務調用。
2.1 使用 Redis 計數器實現
Redis
的String
結構提供了計數器自增功能,類似Java中的原子類,還要優于Java的原子類,因為Redis是單線程執行的緩存讀寫本身就是線程安全的,也不用進行原子類的樂觀鎖操作,每一次獲取分布式全局唯一ID時就將自增序列加1
。
# 給key為GENERATEID:NO的value自增1,如果這key不存在則會添加到Redis中并且設置value為1
## GENERATEID:key前綴
## NO:訂單ID標識
127.0.0.1:6379> incr GENERATEID:NO
(integer) 1
2.2 使用 Redis Hash結構實現
Redis Hash結構中的每一個field也可以進行自增操作,可以用一個Hash結構存儲所有的標識信息和自增序列,方便管理,比較適合并發不高的小項目所有服務都是用的一個Redis,如果并發較高就不合適了,畢竟Redis操作普通String結構肯定比操作Hash結構快。
# 給key為GENERATEID,field為no的value自增1,如果這key不存在則會添加到Redis中并且設置value為1
## GENERATEID:分布式全局唯一ID Hash key
## NO:Hash結構中的field
127.0.0.1:6379> hincrby GENERATEID NO 1
(integer) 1
三、通過代碼實現分布式全局唯一ID工具
這里使用Redis 計數器實現,自增序列以天為單位存儲,在實際業務中,比如生成訂單編號組成規則都類似NO1699631999000-1(業務標識key+當前時間戳
+自增序列),這個規則可以自己定義,保證最終生成的訂單編號不重復即可,不建議直接一個自增序列干到底,訂單編號這類型的數據都是有長度限制的,或者是要求生成20字符的訂單編號,如果增長的過長反而不好處理。
3.1 導入依賴配置
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope>
</dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version>
</dependency>
3.2 配置yml文件
spring:#redis配置信息redis:## Redis數據庫索引(默認為0)database: 0## Redis服務器地址host: 127.0.0.1## Redis服務器連接端口port: 6379## Redis服務器連接密碼(默認為空)password:## 連接超時時間(毫秒)timeout: 1200lettuce:pool:## 連接池最大連接數(使用負值表示沒有限制)max-active: 8## 連接池最大阻塞等待時間(使用負值表示沒有限制)max-wait: -1## 連接池中的最大空閑連接max-idle: 8## 連接池中的最小空閑連接min-idle: 1
3.3 序列化配置
@Configuration
public class RedisConfig {//編寫我們自己的配置redisTemplate@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);// JSON序列化配置Jackson2JsonRedisSerializer jsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper=new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jsonRedisSerializer.setObjectMapper(objectMapper);// String的序列化StringRedisSerializer stringRedisSerializer=new StringRedisSerializer();//key和hash的key都采用String的序列化方式template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);//value和hash的value都采用jackson的序列化方式template.setValueSerializer(jsonRedisSerializer);template.setHashValueSerializer(jsonRedisSerializer);template.afterPropertiesSet();return template;}
}
3.4 編寫獲取工具
@Component
public class RedisGenerateIDUtils {@Resourceprivate RedisTemplate<String, Object> redisTemplate;// key前綴private String PREFIX = "GENERATEID:";/*** 獲取全局唯一ID* @param key 業務標識key*/public String generateId(String key) {// 獲取對應業務自增序列Long incr = getIncr(key);// 組裝最后的結果,這里可以根據需要自己定義,這里是按照業務標識key+當前時間戳+自增序列進行組裝String resultID = key + System.currentTimeMillis() + "-" + incr;return resultID;}/*** 獲取對應業務自增序列*/private Long getIncr(String key) {String cacheKey = getCacheKey(key);Long increment = 0L;// 判斷Redis中是否存在這個自增序列,如果不存在添加一個序列并且設置一個過期時間if (!redisTemplate.hasKey(cacheKey)) {// 這里存在線程安全問題,需要加分布式鎖,這里做簡單實現String lockKey = cacheKey + "_LOCK";// 設置分布式鎖boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, 1, 30, TimeUnit.SECONDS);if (!lock) {// 如果沒有拿到鎖進行自旋return getIncr(key);}increment = redisTemplate.opsForValue().increment(cacheKey);// 我這里設置24小時,可以根據實際情況設置當前時間到當天結束時間的插值redisTemplate.expire(cacheKey, 24, TimeUnit.HOURS);// 釋放鎖redisTemplate.delete(lockKey);} else {increment = redisTemplate.opsForValue().increment(cacheKey);}return increment;}/*** 組裝緩存key*/private String getCacheKey(String key) {return PREFIX + key + ":" + getYYYYMMDD();}/*** 獲取當前YYYYMMDD格式年月日*/private String getYYYYMMDD() {LocalDate currentDate = LocalDate.now();int year = currentDate.getYear();int month = currentDate.getMonthValue();int day = currentDate.getDayOfMonth();return "" + year + month + day;}
}
3.5 測試獲取工具
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedisUniqueIdDemoApplication.class)
class RedisUniqueIdDemoApplicationTests {@Resourceprivate RedisGenerateIDUtils redisGenerateIDUtils;@Testpublic void test() throws InterruptedException {// 定義一個線程池 設置核心線程數和最大線程數都為100,隊列根據需要設置ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000));CountDownLatch countDownLatch = new CountDownLatch(10000);long beginTime = System.currentTimeMillis();// 獲取10000個全局唯一ID 看看是否有重復CopyOnWriteArraySet<String> ids = new CopyOnWriteArraySet<>();for (int i = 0; i < 10000; i++) {executor.execute(() -> {// 獲取全局唯一IDlong beginTime02 = System.currentTimeMillis();String orderNo = redisGenerateIDUtils.generateId("NO");System.out.println(orderNo);System.out.println("獲取單個ID耗時 time=" + (System.currentTimeMillis() - beginTime02));if (ids.contains(orderNo)) {System.out.println("重復ID=" + orderNo);} else {ids.add(orderNo);}countDownLatch.countDown();});}countDownLatch.await();// 打印獲取到的全局唯一ID集合數量System.out.println("獲取到全局唯一ID count=" + ids.size());System.out.println("耗時毫秒 time=" + (System.currentTimeMillis() - beginTime));}
}
知識小貼士:關于countdownlatch
countdownlatch
名為信號槍:主要的作用是同步協調在多線程的等待于喚醒問題
我們如果沒有CountDownLatch ,那么由于程序是異步的,當異步程序沒有執行完時,主線程就已經執行完了,然后我們期望的是分線程全部走完之后,主線程再走,所以我們此時需要使用到CountDownLatch
CountDownLatch 中有兩個最重要的方法
-
countDown
-
await
await 方法 是阻塞方法,我們擔心分線程沒有執行完時,main線程就先執行,所以使用await可以讓main線程阻塞,那么什么時候main線程不再阻塞呢?當CountDownLatch 內部維護的 變量變為0時,就不再阻塞,直接放行,那么什么時候CountDownLatch 維護的變量變為0 呢,我們只需要調用一次countDown ,內部變量就減少1,我們讓分線程和變量綁定, 執行完一個分線程就減少一個變量,當分線程全部走完,CountDownLatch 維護的變量就是0,此時await就不再阻塞,統計出來的時間也就是所有分線程執行完后的時間。
四、運行結果
redis結果
代碼運行結果,id沒有出現重復:
代碼地址:Github
覺得有用的話還請來個三連!!!