文章目錄
- 1. 前言
- 2. 項目啟動示例 - MYSQL 和 Zookeepr
- 2.1 Leaf-segment 模式
- 2.2 Leaf-snowflake 模式 - 單節點
- 2.3 Leaf-snowflake 模式 - 多節點
- 3. Leaf-segment 詳細講解
- 4. Leaf-segment 源碼解析
- 4.1 SegmentBuffer 號段緩存
- 4.2 Segment 號段
- 4.3 初始化號段服務 SegmentIDGenImpl
- 4.4 核心方法 get 獲取 ID
- 4.4.1 updateSegmentFromDb 更新 Segment
- 4.4.2 getIdFromSegmentBuffer 從號段中獲取 ID
- 5. Leaf-snowflake 源碼解析
- 5.1 SnowflakeIDGenImpl 的初始化
- 5.2 SnowflakeZookeeperHolder 的初始化
- 5.3 doService 構建定時任務
- 5.4 updateLocalWorkerID 更新本地文件
- 5.5 get 獲取 ID
- 5.6 差異
- 6. 小結
1. 前言
上一篇文章:【分布式 ID】生成唯一 ID 的幾種方式。前一篇文章我們介紹了分布式 ID 生成的幾種方式,這篇文章就來看下美團開源項目 Leaf 是如何生成 ID 的。Leaf 這個名字是來自德國哲學家、數學家萊布尼茨的一句話: There are no two identical leaves in the world 也就是 “世界上沒有兩片相同的樹葉”,首先如果大家想要詳細了解一些設計思路,可以去看美團官方的文章:Leaf——美團點評分布式ID生成系統,項目地址:Meituan-Dianping/Leaf。
2. 項目啟動示例 - MYSQL 和 Zookeepr
2.1 Leaf-segment 模式
首先我們用 git clone 把項目拉下來,然后按照項目下的文檔 README_CN.md 里面的介紹開始啟動,Leaf 有兩種模式,分別是數據庫號段和雪花算法,首先我們來看下數據庫號段模式,如果你的 MYSQL 版本是 8.0 以上,pom 文件中需要修改下 mysql 連接和 druid 的版本。
然后在 leaf.properties 中配置好 MYSQL 的地址,在項目啟動之前,先提前在數據庫中創建好號段表。
CREATE DATABASE leaf
CREATE TABLE `leaf_alloc` (`biz_tag` varchar(128) NOT NULL DEFAULT '',`max_id` bigint(20) NOT NULL DEFAULT '1',`step` int(11) NOT NULL,`description` varchar(256) DEFAULT NULL,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')
之后啟動就可以了,然后可以通過接口調用。這里我們就直接用代碼里面的測試類來測試就行。
如果要啟動 test 里面的測試類,注意如果用的是 IDGenServiceTest,則需要修改測試包下面的 leaf.properties,同時 pom 里面的 mysql 版本也改下,改成 8.0 版本的,然后就可以啟動了,如果你用的是上面我圈出來的這個 Test,就不需要改了,因為這個用的是 SpringBoot 啟動,下面的 leaf-server 的配置如果之前就改好了,這里就不用再改這個 test 包下面的 leaf.properties,啟動結果如下。
這個是數據庫里面的數據。
2.2 Leaf-snowflake 模式 - 單節點
上面是數據庫號段模式,下面是 Leaf-snowflake 方案,簡單來說就是使用雪花算法 + Zookeeper 來生成 ID,Zookeeper 用來生成機器 ID,Leaf 集群啟動的時候機器會向 Zookeeper 注冊當前的信息,包括 ip
、port
、上報的時間
,然后第一次注冊會在 zookeeper 的根目錄下生成一個 ip:port-WorkerID 的路徑,由于 zookeeper 可以自動遞增生成,所以不需要手動去配置每一臺 Leaf 的 WorkerID,直接用 zookeeper 生成的遞增 id 標識當前機器。
要啟動 Leaf-snowflake 模式就比較簡單了,首先你得確保啟動了 zookeeper 服務,然后如果是 idea 啟動的,需要配置下 VM 參數:-Djava.io.tmpdir=D:\javaCode\Leaf
。配置這個參數是因為 Leaf 除了會向 zookeeper 注冊信息之外,獲取到了 WorkerID 之后還會把這個 WorkerID 存一份數據到本地,避免 zookeeper 出問題的時候沒辦法獲取到 WorkerID。按官方的說法就是弱依賴 ZooKeeper。
這里我就直接用 leaf-server 服務來演示,還是一樣,需要修改配置文件 leaf-properties,在里面配置上你的 zk 地址和端口,把 leaf.snowflake.enable 設置為 true。
然后啟動 LeafServerApplication。
啟動成功之后使用接口工具請求發起請求,或者直接把這個鏈接在瀏覽器請求。
然后打開 zookeeper,就可以發現節點已經注冊上去了,/snowflake 下面的 forever 節點下面的 ip:port-workerID,可以看到我們注冊上去的節點 ID 為 0。
2.3 Leaf-snowflake 模式 - 多節點
那我們要怎么模擬一個集群呢?我們可以在 IDEA 里面啟動多個 LeafServerApplication,但是在這之前,我們要修改下配置。
首先,這里的配置 leaf.snowflake.port 改一下,我們把這個配置給注釋掉,因為這個 port 不是用來連接 zookeeper 的,只是用來標記當前 Leaf 的啟動端口,由于我們要在 IDEA 啟動多個 LeafServerApplication,這里的 port 肯定不能寫死。
接下來我們修改下源碼,主要就是修改 SnowflakeService 的構造器,原本是直接從配置文件里面獲取 port, 現在我們改成如果配置文件里面獲取不到,就從系統變量中獲取 server.port,也就是 Spring 項目的啟動端口,接下來我們構建兩個啟動 Application 服務。
接下來我們就可以啟動兩個項目了。
然后到 zookeeper 下面看下有沒有注冊上去,可以看到 8080 和 8081 都注冊上去了,且后面的 workerID 是遞增的。
上面我們也說了,項目啟動的時候不單單會注冊到 Zookeeper,也會在本地存儲一個 WorkerID 文件,防止 Zookeeper 出問題的時候獲取不到 WorkerID,也就是上面 -Djava.io.tmpdir
這個路徑。
可以看到本地存儲確實也沒什么問題,到這里我們就演示完了。不過大家會不會好奇,就是 zookeeper 下面這玩意是順序遞增的,那超過 1024 臺 Leaf 就注冊不下了?其實可以看到這個路徑是 /snowflake/${leaf.name}/forever
,leaf.name
是配置文件里面可以配置的,所以不是說固定存到某個路徑,每一個 Leaf 都可以配置自己的 leaf.name。
3. Leaf-segment 詳細講解
這種模式就是數據庫號段模式,區別于數據庫自增字段,這種模式下每一次都會從數據庫獲取一個號段,這個號段長度是業務自己設置,接著分配 ID 就在內存中分配這些號段 ID,而不是每一次都需要請求數據庫。
上面是官方的圖,可以看到,biz_tag 就是用來區分業務的,max_id 指當前號段下能夠分配的最大值 + 1,step 代表號段長度,比如 1000,那么分配的時候就從 1 開始分配,分配到 1000。
上面圖中 test_tag 在不同機器上面的號段不同,這時候如果第一臺機器的 1~ 1000 分配完了,再次去數據庫獲取號段,就會獲取到 3001~4000 的號段。這種方式可以將請求數據庫的頻率減少到 1/step
,每一次請求數據庫都是在分配的 step 長度的號段被消耗完之后才去請求數據庫再次分配,大大提高了性能。
這種方式有一定的容災性,在 DB 宕機的時候由于本地還有號段,所以還可以繼續提供服務,當然了這種強依賴數據庫的方式一旦 DB 宕機,服務不可用也是時間問題,而且每一次消耗完號段之后還得立刻去請求數據庫,這樣一來獲取 ID 的請求就會阻塞住,一旦請求數據庫有延遲,那么業務也會收到影響,所以為了解決這個問題,Leaf 設計了雙 Buffer 的方式來解決這個問題。
設計了雙 Buffer 之后,當當前號段消費到 10% 的時候,如果下一個號段還沒有準備好并且更新號段的線程沒有在執行,那么立刻使用線程去更新下一個號段,由于更新是使用了線程去執行,所以當前業務是不受影響的。
而且這里的設計是當當前號段消費到 10% 就立刻去請求處理下一個號段,當然這個比例我們沒辦法得知為什么要設置到 10%,如果可以設置成動態配置那就更好了,不過我們也可以發現如果這個比例設置的太高,假設設置到了 90%,那么會導致如果某一個時刻有大批量請求到來瞬間把這 10% 的號段消耗了,那么剩下的請求又要阻塞等待,所以設置的小一點是沒問題的。
那么 segment 長度設置成多少合適呢?官方推薦 segment 長度設置為服務高峰期發號 QPS 的 600倍(10分鐘),這樣即使 DB 宕機,Leaf 仍能持續發號 10-20 分鐘不受影響。實際上源碼中也會根據請求數據庫的頻率來對 segment 進行減半或者是加倍。
4. Leaf-segment 源碼解析
Leaf-segment 的源碼主要集中在 leaf-core 下面的 SegmentIDGenImpl。
先來看下號段模式下的一些對象。
4.1 SegmentBuffer 號段緩存
SegmentBuffer 就是號段 Buffer 緩存,里面存儲了雙 Segment 交替來使用。
上面是大致的結構,下面來看下里面的屬性。
/*** 雙 buffer*/
public class SegmentBuffer {private String key;private Segment[] segments; // 雙 buffer, 交換來使用private volatile int currentPos; // 當前的使用的 segment 的 indexprivate volatile boolean nextReady; //下一個 segment 是否處于可切換狀態private volatile boolean initOk; // 是否初始化完成private final AtomicBoolean threadRunning; // 線程是否在運行中private final ReadWriteLock lock;// 步長private volatile int step;// 最小步長private volatile int minStep;// 更新時間private volatile long updateTimestamp;...
}
其中 threadRunning
表示第二個 segment 是不是在線程中執行準備邏輯,然后 step
就是步長,由于步長是可以根據請求數據庫的時間來擴容的和縮容的,所以有一個 minStep
代表最小步長,也就是說這個步長就算再怎么縮小也不會小于這個數,最后一個 updateTimestamp
就是當請求數據庫獲取號段的時候會更新。
4.2 Segment 號段
public class Segment {// 當前要分配的 ID 值private AtomicLong value = new AtomicLong(0);// 這個號段可以分配的 ID 的最大值 + 1private volatile long max;// 號段分配的步長, 也是號段的大小private volatile int step;// 這個段所屬的 bufferprivate SegmentBuffer buffer;...
}
Segment 的注釋寫的比較清楚了,下面是里面的結構,注意一下 max 是這個號段下可以分配的最大值 + 1,比如這個號段 step 是 1000
,value 是 2001
,那么當前號段可分配的范圍就是 [2001,3000]
,共 1000 個數(step), max 就是 3001
。
4.3 初始化號段服務 SegmentIDGenImpl
回到 SegmentIDGenImpl 方法,里面的 init 方法是在創建 SegmentService 的時候創建出來的,SegmentService 提供了方法,比如 getId,getIdGen,里面的實現則是在 SegmentIDGenImpl 中完成。
public SegmentService() throws SQLException, InitException {Properties properties = PropertyFactory.getProperties();// 準備 MYSQLboolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));if (flag) {// Config dataSourcedataSource = new DruidDataSource();dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));dataSource.init();// 準備 DAOIDAllocDao dao = new IDAllocDaoImpl(dataSource);// 創建 SegmentIDGenImplidGen = new SegmentIDGenImpl();((SegmentIDGenImpl) idGen).setDao(dao);// 初始化if (idGen.init()) {logger.info("Segment Service Init Successfully");} else {throw new InitException("Segment Service Init Fail");}} else {// 如果 leaf.segment.enable 這個配置沒有設置為 true, 那么 ID 生成器的實現就是 ZeroIDGen, ID 直接返回 0idGen = new ZeroIDGen();logger.info("Zero ID Gen Service Init Successfully");}
}
SegmentService 的創建主要是完成兩部分操作:
- 準備數據庫配置。
- 創建 SegmentIDGenImpl,前提是
leaf.segment.enable
這個配置設置為 true,代表使用號段模式,如果沒有設置,那么創建的 ID 生成器就是ZeroIDGen
,這個生成器是統一返回 0 的。
而 idGen.init()
方法就是核心的邏輯,在里面會去初始化 SegmentIDGenImpl,下面來看下源碼。
@Override
public boolean init() {logger.info("Init ...");// 確保加載到 kv 后才初始化成功updateCacheFromDb();initOK = true;// 初始化定時任務, 啟動后 60s 開始執行, 之后每隔 1min 執行一次updateCacheFromDbAtEveryMinute();return initOK;
}private void updateCacheFromDbAtEveryMinute() {// 創建檢測緩存的任務線程池ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("check-idCache-thread");t.setDaemon(true);return t;}});service.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {// 定時更新數據庫的 tag 到緩存中updateCacheFromDb();}}, 60, 60, TimeUnit.SECONDS);
}
初始化主要完成兩個操作,首先是從 DB 中拉取 biz_tag 來為這些業務創建 SegmentBuffer,然后啟動一個定時任務,啟動后 60s 開始執行, 之后每隔 1min 執行一次,同樣調用 updateCacheFromDb 方法去維護新增的 tag,所以根據這一段介紹就可以明顯得知了 updateCacheFromDb 就是去維護數據庫的 biz_tag 的。
上面的兩個步驟就是這個方法要做的,主要就是同步數據庫和緩存的一致性,注意這里 init 之后 SegmentBuffer 里面的 initOk 還是 false,相當于說這里只是同步了一下數據庫和緩存,創建緩存中不存在的 SegmentBuffer,同時將數據庫中已經刪掉的 SegmentBuffer,緩存也刪掉。
// 從數據庫拉取所有業務的號段下來
// 1. 初始化調用
// 2. 定時任務調用
private void updateCacheFromDb() {logger.info("update cache from db");StopWatch sw = new Slf4JStopWatch();try {// 首先拉取所有的業務 tagList<String> dbTags = dao.getAllTags();if (dbTags == null || dbTags.isEmpty()) {return;}// 緩存里面現有的 tagList<String> cacheTags = new ArrayList<String>(cache.keySet());// 需要新增的 tag, 也就是數據庫有但是緩存里面沒有的Set<String> insertTagsSet = new HashSet<>(dbTags);// 需要刪除的 tag, 也就是數據庫沒有但是緩存有的Set<String> removeTagsSet = new HashSet<>(cacheTags);// 遍歷緩存里面的 tag, 這里是定時任務遍歷的時候才不會為空了for(int i = 0; i < cacheTags.size(); i++){String tmp = cacheTags.get(i);// 如果緩存里面的 tag 在數據庫中出現了, 就刪掉, 剩下的就是緩存里面沒有的// 比如緩存里面是 tag1、tag2、tag3, 數據庫里面是 tag2、tag3、tag4// 遍歷完成之后 insertTagsSet 里面就只剩下 tag4 了if(insertTagsSet.contains(tmp)){insertTagsSet.remove(tmp);}}// 遍歷需要新增的 tagfor (String tag : insertTagsSet) {// 新建一個 SegmentBuffer 對象, 一個 buffer 里面有兩個 segment 交替來使用, 注意這里只是新建 SegmentBuffer 和// Segment, 不是說初始化完成了就可以立馬開始使用里面的號段SegmentBuffer buffer = new SegmentBuffer();buffer.setKey(tag);// 創建第一個號段Segment segment = buffer.getCurrent();// 當前要分配的值, 初始化為 0segment.setValue(new AtomicLong(0));// 當前號段能分配的最大值 + 1segment.setMax(0);// 分配步長segment.setStep(0);// 添加到緩存中cache.put(tag, buffer);logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);}// 將緩存中已經失效的 tag 刪掉, 遍歷數據庫的 tagfor(int i = 0; i < dbTags.size(); i++){String tmp = dbTags.get(i);// 如果緩存里面 tag 還在數據庫中, 就從集合中刪掉// 比如緩存里面是 tag1、tag2、tag3, 數據庫里面是 tag2、tag3、tag4// 遍歷完成之后 removeTagsSet 就剩下 tag1 了if(removeTagsSet.contains(tmp)){removeTagsSet.remove(tmp);}}// 將失效的 tag 刪掉for (String tag : removeTagsSet) {cache.remove(tag);logger.info("Remove tag {} from IdCache", tag);}} catch (Exception e) {logger.warn("update cache from db exception", e);} finally {sw.stop("updateCacheFromDb");}
}
可以看到在處理 insertTagsSet 的時候,創建出了 buffer 之后只是設置了一個 key,而構造里里面也除了創建出兩個 Segment,其他屬性都沒有設置。
public SegmentBuffer() {segments = new Segment[]{new Segment(this), new Segment(this)};currentPos = 0;nextReady = false;initOk = false;threadRunning = new AtomicBoolean(false);lock = new ReentrantReadWriteLock();
}
4.4 核心方法 get 獲取 ID
下面就是核心方法,如何通過 get 去獲取號段 ID。
/*** 獲取 id* @param key 業務 key* @return*/
@Override
public Result get(final String key) {if (!initOK) {// 還沒初始化好return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);}// 如果緩存里面有這個 key, 就是數據庫里面的 biz_tagif (cache.containsKey(key)) {// 獲取 SegmentBufferSegmentBuffer buffer = cache.get(key);// 雙重檢查鎖if (!buffer.isInitOk()) {synchronized (buffer) {if (!buffer.isInitOk()) {try {// 如果 buffer 還沒有初始化, 就初始化, 注意這里 updateCacheFromDb 方法中通過 new// 創建出來的 SegmentBuffer 是還沒有初始化的, 會在這里去初始化updateSegmentFromDb(key, buffer.getCurrent());logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());buffer.setInitOk(true);} catch (Exception e) {logger.warn("Init buffer {} exception", buffer.getCurrent(), e);}}}}// 從號段中獲取 idreturn getIdFromSegmentBuffer(cache.get(key));}// 沒找到這個業務有號段信息return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
首先如果 cache 緩存里面沒找到這個 tag,說明這個業務沒有設置到數據庫,或者設置到數據庫了但是還沒有同步到緩存中。
而對于剛同步過來的 SegmentBuffer,由于 initOk 是 false,所以會通過 updateSegmentFromDb
真正初始化 SegmentBuffer,這里真正初始化的意思就是讀取數據里面的號段信息設置到 SegmentBuffer 中。
最后再通過 getIdFromSegmentBuffer
從號段中獲取下一個 ID 返回。
4.4.1 updateSegmentFromDb 更新 Segment
/*** 從數據庫獲取這個 tag 的信息來更新號段* @param key* @param segment*/
public void updateSegmentFromDb(String key, Segment segment) {StopWatch sw = new Slf4JStopWatch();// 獲取這個號段所屬的 SegmentBufferSegmentBuffer buffer = segment.getBuffer();LeafAlloc leafAlloc;// 如果還沒有初始化if (!buffer.isInitOk()) {// 通過業務 tag 查詢到號段信息leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 設置號段長度buffer.setStep(leafAlloc.getStep());// 設置最小號段長度buffer.setMinStep(leafAlloc.getStep());} else if (buffer.getUpdateTimestamp() == 0) {// 第一次更新, 比如前一個號段使用已經超過 10% 了, 就會去提前更新下一個號段的信息leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 設置更新時間是當前時間buffer.setUpdateTimestamp(System.currentTimeMillis());// 設置號段長度buffer.setStep(leafAlloc.getStep());// 設置號段的最小長度buffer.setMinStep(leafAlloc.getStep());} else {// 距離上一次更新的時間有多久long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();// 當前 buffer 的步長int nextStep = buffer.getStep();// 如果更新時間小于號段的過期時間, 說明消費的有點快if (duration < SEGMENT_DURATION) {// 號段擴容不能超過 1000000if (nextStep * 2 > MAX_STEP) {//do nothing} else {// 擴容成 2 倍nextStep = nextStep * 2;}} else if (duration < SEGMENT_DURATION * 2) {// 這里是更新時間超過了號段過期時間, 說明當前的號段長度夠用了} else {// 這里就是消費得太慢了, 縮容成原來的一半, 這里的最小步長是一開始創建的時候設置的, 后面不會再更新了nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;}logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);// 更新數據庫的號段信息LeafAlloc temp = new LeafAlloc();temp.setKey(key);temp.setStep(nextStep);leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);// 設置當前 buffer 的更新時間、號段大小和最小號段buffer.setUpdateTimestamp(System.currentTimeMillis());buffer.setStep(nextStep);buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step}// 設置下一次要分配的 id 值, 上面在 updateMaxIdByCustomStepAndGetLeafAlloc 之后 maxId 已經更新成 maxId + step 了// 比如 step 原來是 2000, 現在擴容成 4000 之后, maxId 也會被更新成 maxId + step, maxId 在創建數據庫的時候默認是 1,// 也就是說現在擴容之后 maxId 就變成了 4001, value = 2001long value = leafAlloc.getMaxId() - buffer.getStep();segment.getValue().set(value);segment.setMax(leafAlloc.getMaxId());segment.setStep(buffer.getStep());sw.stop("updateSegmentFromDb", key + " " + segment);
}
這個方法就是從數據庫獲取這個 tag 的信息來更新號段,主要分為三個階段,每個階段有不同的邏輯。
- 還沒有初始化的時候。
- 初始化了,但是是第一次更新信息,比如前一個號段使用已經超過 10% 了, 就會去提前更新下一個號段的信息。
- 已經更新很多次了。
首先是還沒有初始化的時候,也就是 !buffer.isInitOk()
,會去調用 updateMaxIdAndGetLeafAlloc
更新 max_id,我們之前說過 max_id 在創建數據表的時候就已經指定了默認是 1,而 updateMaxIdAndGetLeafAlloc 的邏輯就是先 updateMaxId 再 getLeafAlloc,就是下面的兩個 SQL。
所以假設設置了 step = 2000,那么 max_id 會被更新成 2001。當獲取到 leafAlloc 后,設置號段長度和最小號段長度為數據庫的 step,然后初始化完成了。
然后假設是已經初始化過了,但是當前是第一次更新,可以看到上面初始化的邏輯里面沒有設置 buffer 的 updateTimestamp,所以這一次更新會進入 else if (buffer.getUpdateTimestamp() == 0)
這個分支,邏輯跟上面的都一樣,只是多了一個 buffer.setUpdateTimestamp(System.currentTimeMillis())
設置更新時間。
最后是已經初始化過并且不是第一次更新了,就會進入最后的 else 的邏輯。首先會獲取當前距離上一次更新的時間有多久(duration),然后獲取當前 buffer 的步長 nextStep,接下來根據更新的間隔時間去判斷步長是否需要擴大還是縮小。
- 如果說更新的時間小于號段時間的過期時間(15 分鐘),那么說明消費有點快了,這種情況下將號段擴容成原來的 2 倍,但是最大不能超過 1000000。
- 如果距離上一次更新的時間超過了號段過期時間但是小于兩倍的號段過期時間,說明現在這個長度適合業務的消費速度,不需要修改。
- 如果說距離上一次更新時間超過了號段過期時間的 2 倍,說明這個號段長度太長了,有可能是擴容之后太大了,業務不需要消費這么快,這時候縮容成原來的 1/2。
為什么不將號段設置的很大,這樣就不需要擴容了呢? 下面是我的理解,不一定對,因為可能有多臺 Leaf 部署來承擔多個業務的 ID 生成,集群可能很大,如果號段設置得很大,就會導致每一臺機器都分配到一個很大的號段,但是業務消費速度慢,這種情況下如果集群有問題,重新啟動又得重新分配號段,就會比較浪費,當然這是我的理解,如果還有其他方面的問題也可以討論。
回到源碼,最后計算下 value 值,也就是下一次要分配的 id 值,上面在 updateMaxIdByCustomStepAndGetLeafAlloc 之后 maxId 已經更新成 maxId + step 了,比如 step 原來是 2000,現在擴容成 4000 之后,maxId 也會被更新成 maxId + step,maxId 在創建數據庫的時候默認是 1,也就是說現在擴容之后 maxId 就變成了 4001,value = 2001,本次從 2001 開始繼續分配。然后設置 Segment 的 value
、max
、step
,結束。
4.4.2 getIdFromSegmentBuffer 從號段中獲取 ID
當更新完 Segment 后,調用 getIdFromSegmentBuffer 從號段中獲取 ID,下面是全部邏輯。
/*** 從號段中獲取 ID* @param buffer* @return*/
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {// 死循環獲取while (true) {// 加讀鎖, 防止業務并發對同一個號段獲取buffer.rLock().lock();try {// 獲取當前正在使用的號段final Segment segment = buffer.getCurrent();// 如果當前的 buffer 下一個號段還沒有準備好, 同時當前號段還剩下的 ID 數小于 90%, 開始準備下一個號段if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {service.execute(new Runnable() {@Overridepublic void run() {// 獲取下一個要使用的號段, SegmentBuffer 是兩個 buffer 交替使用Segment next = buffer.getSegments()[buffer.nextPos()];boolean updateOk = false;try {// 從數據庫獲取下一個號段信息來更新 SegmentupdateSegmentFromDb(buffer.getKey(), next);updateOk = true;logger.info("update segment {} from db {}", buffer.getKey(), next);} catch (Exception e) {logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);} finally {// 更新下一個號段成功if (updateOk) {// 加寫鎖buffer.wLock().lock();// 設置屬性buffer.setNextReady(true);buffer.getThreadRunning().set(false);// 解除寫鎖buffer.wLock().unlock();} else {// 解除失敗buffer.getThreadRunning().set(false);}}}});}// 上面更新下一個號段成功后, 獲取當前要分配的 IDlong value = segment.getValue().getAndIncrement();// 返回結果if (value < segment.getMax()) {return new Result(value, Status.SUCCESS);}} finally {// 解除讀鎖buffer.rLock().unlock();}// 這里就是獲取號段失敗, 比如當前號段已經 >= segment.getMax, 在里面會去阻塞等到下一個號段準備成功waitAndSleep(buffer);// 加寫鎖buffer.wLock().lock();try {// 獲取當前的號段final Segment segment = buffer.getCurrent();// 再次判斷如果當前的號段是符合要求的, 就返回,可能是因為這里兩個線程同時來獲取,然后當前線程上下文切換給其他線程,其他線程先一步完成了 switchPos,那么當當前線程再獲取 buffer.getCurrent() 的時候獲取到的就是有值的號段long value = segment.getValue().getAndIncrement();if (value < segment.getMax()) {return new Result(value, Status.SUCCESS);}// 到這里當前號段用完了, 判斷下一個是否準備好了if (buffer.isNextReady()) {// 切換到下一個 Segmentbuffer.switchPos();// 設置下一個 Segment 的 nextReady 為 falsebuffer.setNextReady(false);} else {logger.error("Both two segments in {} are not ready!", buffer);return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);}} finally {// 解除寫鎖buffer.wLock().unlock();}}
}
獲取 ID 就是在一個 while 循環里面去獲取的,首先加讀鎖,然后重點來了,判斷如果當前的 buffer 下一個號段還沒有準備好,同時當前號段還剩下的 ID 數小于 90%,開始準備下一個號段,所謂的準備下一個號段就是提前給下一個號段分配好值,分配的方法就是上面的 updateSegmentFromDb
,當分配好之后當前號段如果用完了,就可以無縫切換到下一個號段。當線程中更新完號段了, nextReady 就會設置為 true,同時將 threadRunning 設置為 false,防止并發更新下一個號段。
上面是更新號段的流程,如果說當前號段分配的 value 比最大值要小,就可以直接返回結果,但是如果不是,就說明當前號段用完了,這種情況下需要阻塞等到下一個號段準備成功。上面我們也說了如果下一個號段準備成功 threadRunning 會設置為 false,所以 waitAndSleep 就是在 while 循環里面不斷判斷這個標記。
private void waitAndSleep(SegmentBuffer buffer) {int roll = 0;// 一直死循環阻塞等待下一個號段準備成功while (buffer.getThreadRunning().get()) {roll += 1;if(roll > 10000) {try {TimeUnit.MILLISECONDS.sleep(10);break;} catch (InterruptedException e) {logger.warn("Thread {} Interrupted",Thread.currentThread().getName());break;}}}
}
當然了準備好后,繼續判斷當前號段分配的值是否是符合要求的,再次判斷猜測是因為如果當前的號段是符合要求的, 就返回,可能是因為這里兩個線程同時來獲取,然后當前線程上下文切換給其他線程,其他線程先一步完成了 switchPos,那么當當前線程再獲取 buffer.getCurrent() 的時候獲取到的就是有值的號段,因為用的是讀寫鎖,并非單一的鎖?
如果還是不符合,就切換到下一個號段,然后設置 nextReady 為 false,下一次獲取 ID 就繼續去提前分配號段,最后解除寫鎖。
5. Leaf-snowflake 源碼解析
上面是數據庫號段模式,下面是 Leaf-snowflake,Leaf-snowflake 相比號段模式就簡單不少了,因為不設計數據庫的讀取,邏輯上比較簡單,核心就是上一篇文章的雪花算法,而 Leaf 在節點的持久化做了不少工作,下面來看下這個模式的源碼。
5.1 SnowflakeIDGenImpl 的初始化
snowflake 模式的具體實現是 SnowflakeIDGenImpl,這個類的 init 方法是會返回 true 的,所以不用管。
@Override
public boolean init() {return true;
}
我們主要看下構造器,看看這個類型的 ID 生成器創建的時候做了什么。
public SnowflakeIDGenImpl(String zkAddress, int port) {// Thu Nov 04 2010 09:42:54 GMT+0800 (中國標準時間)this(zkAddress, port, 1288834974657L);
}/*** @param zkAddress zk地址* @param port snowflake 監聽端口* @param twepoch 起始的時間戳*/
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) {// 啟動時間設置為 Thu Nov 04 2010 09:42:54 GMT+0800 (中國標準時間)this.twepoch = twepoch;Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime");// 獲取當前的服務器 IPfinal String ip = Utils.getIp();// 監聽端口默認是 8080SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port);// 初始化boolean initFlag = holder.init();if (initFlag) {workerId = holder.getWorkerID();LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId);} else {Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok");}Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023");
}
首先就是基準時間,因為雪花算法需要一個基準時間,這樣設置 41 位時間戳的時候就可以用當前時間 - 基準時間來設置,我們上一篇文章說過,雪花算法完整可以用 69 年,也就是可以用到 基準時間 + 69 年,Leaf 默認是 Thu Nov 04 2010 09:42:54 GMT+0800 (中國標準時間)
。
接下來創建 SnowflakeZookeeperHolder,這里面就是去維護節點信息的,比如節點上報 Zookeeper 等,port 不是 Zookeeper 的端口,而是標識當前 Leaf 服務的監聽端口,用于創建 Zookeeper 用的。
接下來就調用 init 去初始化,初始化完了之后獲取 workerId。
5.2 SnowflakeZookeeperHolder 的初始化
private String zk_AddressNode = null;// 保存自身的key ip:port-000000001
private String listenAddress = null;// 保存自身的key ip:port
// WorkerID, 也就是上面 ip:port-000000001 后面的數字
private int workerID;
// 持久化到 Zookeeper 的哪個路徑下
private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name");
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";
private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有數據持久的節點// ip 地址
private String ip;// 當前服務監聽端口
private String port;// Zookeeper 連接信息
private String connectionString;// 上一次上報到 Zookeeper 的時間
private long lastUpdateTime;
在看 init 方法的源碼之前,我們先來看下這個類里面的一些屬性。
- PATH_FOREVER:持久化的 Zookeeper 路徑,最終持久化的路徑是
/snowflake/${leaf.name}/forever
,leaf.name 可以在配置文件 leaf.properties 里面去配置。 - PROP_PATH :持久化的本地文件路徑,需要避免 Zookeeper 不可用時拿不到 workerID 的情況,因此需要將 workerID 在本地持久化一份,持久化的路徑就是
${java.io.tmpdir}/${leaf.name}//leafconf/{port}/workerID.properties
, 2.3 小節也有演示。 - zk_AddressNode:保存自身的 key,ip:port-000000001。
- workerID:上面 zk_AddressNode 中
-
后面的 000000001 就是工作 ID。 - lastUpdateTime:上一次上報到 Zookeeper 的時間。
上面是一些比較重要的屬性,下面就可以來看下初始化的源碼了。
/*** 初始化 zookeeper 配置* @return*/
public boolean init() {try {CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);curator.start();Stat stat = curator.checkExists().forPath(PATH_FOREVER);if (stat == null) {// 不存在根節點,機器第一次啟動,創建 /snowflake/ip:port-000000000,并上傳數據zk_AddressNode = createNode(curator);// worker id 默認是0updateLocalWorkerID(workerID);// 定時上報本機時間給 forever 節點ScheduledUploadData(curator, zk_AddressNode);return true;} else {Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)// 存在根節點,先檢查是否有屬于自己的根節點List<String> keys = curator.getChildren().forPath(PATH_FOREVER);for (String key : keys) {String[] nodeKey = key.split("-");realNode.put(nodeKey[0], key);nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));}// 是否存在當前機器的節點, 如果存在說明不是第一次上報Integer workerid = nodeMap.get(listenAddress);if (workerid != null) {// 有自己的節點,zk_AddressNode=ip:portzk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);workerID = workerid;// 啟動 worder 時使用會使用if (!checkInitTimeStamp(curator, zk_AddressNode)) {// 判斷是否發生了時間回退throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");}// 準備創建臨時節點doService(curator);// 在本地節點文件系統上緩存一個 workid 值,zk 失效,機器重啟時保證能夠正常啟動updateLocalWorkerID(workerID);LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);} else {// 表示新啟動的節點, 創建持久節點, 不用 check 時間String newNode = createNode(curator);zk_AddressNode = newNode;String[] nodeKey = newNode.split("-");// 節點 IDworkerID = Integer.parseInt(nodeKey[1]);doService(curator);// 在本地節點文件系統上緩存一個 workid 值,zk 失效,機器重啟時保證能夠正常啟動updateLocalWorkerID(workerID);LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);}}} catch (Exception e) {LOGGER.error("Start node ERROR {}", e);try {// 啟動異常, 從本地文件中加載 workerIdProperties properties = new Properties();properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));workerID = Integer.valueOf(properties.getProperty("workerID"));LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);} catch (Exception e1) {LOGGER.error("Read file error ", e1);return false;}}return true;
}
可以看到首先如果 PATH_FOREVER 這個路徑在 Zookeeper 中不存在,說明是第一次上報信息,這種情況下創建一個節點下來,創建節點路徑是 /snowflake/${leaf.name}/forever/ip:port-
,由于是順序創建,所以會在路徑后面自動拼接上編號,就是我們當前機器的 WorkerID。
/*** 創建持久順序節點 ,并把節點數據放入 value** @param curator* @return* @throws Exception*/
private String createNode(CuratorFramework curator) throws Exception {try {// 由于是順序節點, 所以創建出來的路徑就是 /snowflake/${leaf.name}/forever/ip:port-00000// /snowflake/${leaf.name}/forever/ip:port-00001// ...return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes());} catch (Exception e) {LOGGER.error("create node error msg {} ", e.getMessage());throw e;}
}
當然了這里的 WorkerID 第一次創建就默認是 0 來著,所以不需要再設置。updateLocalWorkerID
就是剛剛說的,Leaf 為了解決 Zookeeper 不可用時獲取不到 workerID 的問題,會在初始化的時候也往本地文件 ${java.io.tmpdir}/${leaf.name}//leafconf/{port}/workerID.properties
中存儲一份 workerID 的信息,最后通過 ScheduledUploadData 定時上報本機的時間給 Zookeeper。
那如果不是第一次創建,就說明可能是 Leaf 重啟什么的,這種情況下先獲取這個 PATH_FOREVER 下面的所有節點,可以看下面圖。
獲取到這些節點之后,設置到 realNode
和 nodeMap
中,然后通過當前機器的 ip+port 來獲取對于的 workerID。
如果能獲取到就通過 checkInitTimeStamp
判斷上一次上報的時間有沒有大于當前時間,如果大于說明有問題,有可能發生了時鐘回撥,這種情況下拋出異常 CheckLastTimeException
。
private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception {byte[] bytes = curator.getData().forPath(zk_AddressNode);Endpoint endPoint = deBuildData(new String(bytes));// 該節點的時間不能小于最后一次上報的時間return !(endPoint.getTimestamp() > System.currentTimeMillis());
}
如果都沒有問題,就通過 doService 啟動一個定時任務,定時上報當前節點的信息到 Zookeeper 中。然后通過 updateLocalWorkerID 更新本地的文件,在本地節點文件系統上緩存一個 workID 值,zk 失效,機器重啟時保證能夠正常啟動。
那如果是路徑創建了,但是獲取不到這個節點,說明這個 Leaf 節點也上報到了這個 PATH_FOREVER 路徑,但是是新上報的,需要新建一個新的節點,邏輯和上面基本一樣,不過這里就需要設置下 workerID,因為如果路徑不存在,當前節點的 workerID 肯定是 0,不用想,但是如果路徑存在,由于遞增的特性,當前節點不一定是 0,所以創建出節點之后需要設置下 workerID
,最后注意如果是新建節點是不需要 check 時間的。
最后如果說這個過程發生了異常,那么從本地文件中加載出 workerID,避免服務不可用。
5.3 doService 構建定時任務
private void doService(CuratorFramework curator) {ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001
}private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {// 創建一個單線程的線程池Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "schedule-upload-time");thread.setDaemon(true);return thread;}}).scheduleWithFixedDelay(new Runnable() {// 定時上報節點信息到 zookeeper@Overridepublic void run() {updateNewData(curator, zk_AddressNode);}}, 1L, 3L, TimeUnit.SECONDS);//每3s上報數據}
這里就是創建定時任務,初始化 1s 后執行,之后每隔 3s 上報一次當前機器的信息到 Zookeeper。
private void updateNewData(CuratorFramework curator, String path) {try {if (System.currentTimeMillis() < lastUpdateTime) {return;}curator.setData().forPath(path, buildData().getBytes());lastUpdateTime = System.currentTimeMillis();} catch (Exception e) {LOGGER.info("update init data error path is {} error is {}", path, e);}
}
我們來看下 buildData 的源碼,就是看下需要上報什么信息。
/*** 構建需要上傳的數據** @return*/
private String buildData() throws JsonProcessingException {Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis());ObjectMapper mapper = new ObjectMapper();String json = mapper.writeValueAsString(endpoint);return json;
}
可以看到,就是上報的就是 ip 和 port 和當前時間,這個時間就是 5.2 小節看到的,當 Leaf 啟動的時候使用當前時間和 Zookeeper 中最后一次上報的時間做對比,如果發現時鐘回退了,就直接返回,因為這種情況下生成的 ID 有可能是重復的。
5.4 updateLocalWorkerID 更新本地文件
這個方法就是在本地節點文件系統上緩存一個 workerID 值,zk 失效,機器重啟時保證能夠正常啟動。
/*** 在節點文件系統上緩存一個 workid 值,zk 失效,機器重啟時保證能夠正常啟動** @param workerID*/
private void updateLocalWorkerID(int workerID) {File leafConfFile = new File(PROP_PATH.replace("{port}", port));boolean exists = leafConfFile.exists();LOGGER.info("file exists status is {}", exists);if (exists) {try {FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);LOGGER.info("update file cache workerID is {}", workerID);} catch (IOException e) {LOGGER.error("update file cache error ", e);}} else {// 不存在文件,父目錄頁肯定不存在try {boolean mkdirs = leafConfFile.getParentFile().mkdirs();LOGGER.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workerID);if (mkdirs) {if (leafConfFile.createNewFile()) {FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);LOGGER.info("local file cache workerID is {}", workerID);}} else {LOGGER.warn("create parent dir error===");}} catch (IOException e) {LOGGER.warn("craete workerID conf file error", e);}}
}
本地緩存的地址是:${java.io.tmpdir}/${leaf.name}//leafconf/{port}/workerID.properties
。
5.5 get 獲取 ID
來到最后一個方法,通過 get 獲取 ID。
@Override
public synchronized Result get(String key) {// 獲取當前時間long timestamp = timeGen();// 時間回撥if (timestamp < lastTimestamp) {long offset = lastTimestamp - timestamp;if (offset <= 5) {// 如果回撥小于 5mstry {// 阻塞等待wait(offset << 1);// 再次獲取當前時間timestamp = timeGen();// 如果還是小于, 說明還是不行if (timestamp < lastTimestamp) {return new Result(-1, Status.EXCEPTION);}} catch (InterruptedException e) {LOGGER.error("wait interrupted");return new Result(-2, Status.EXCEPTION);}} else {return new Result(-3, Status.EXCEPTION);}}// 如果當前時間跟上一次時間相等, 說明在 1ms 內獲取了多次 idif (lastTimestamp == timestamp) {// 這里 & sequenceMask 是為了當 sequence 遞增到 4096 的時候重置成 0sequence = (sequence + 1) & sequenceMask;if (sequence == 0) {// seq 為 0 的時候表示是下一毫秒時間開始對 seq 做隨機, 做了隨機之后要想通過 id 去查詢一些信息比如訂單數量什么的就行不通了sequence = RANDOM.nextInt(100);// 一直等到下一毫秒timestamp = tilNextMillis(lastTimestamp);}} else {// 如果是新的 ms 開始sequence = RANDOM.nextInt(100);}// 設置本次訪問的時間lastTimestamp = timestamp;// 構造 idlong id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;// 返回結果return new Result(id, Status.SUCCESS);}
這里就是雪花算法 ID 生成的邏輯,但是要注意,由于時鐘回撥會導致 ID 重復,所以生成之前需要判斷如果當前時間小于上一次生成的時間,說明發生了時鐘回撥,但是如果回撥不超過 5ms,那么就可以阻塞等待,否則直接返回 Status.EXCEPTION
。
同時原來的雪花算法是如果 lastTimestamp != timestamp
,就說明到了下一 ms,繼續從 0 開始繼續生成 ID,但是這里是隨機了一下 100 以內的數字,避免連續生成 ID 導致一些業務上面的信息泄露。
其他的就跟雪花算法一樣的,感興趣可以看我的上一篇文章:【分布式 ID】生成唯一 ID 的幾種方式,里面有雪花算法的介紹。
5.6 差異
這個圖是美團官方的流程圖,當然這里獲取 Leaf_temporary 下面所有節點,然后做平均值這里代碼并沒有看到有哪里有寫,描述是如果是新服務節點,直接創建持久節點 leaf_forever/${self} 并寫入自身系統時間,接下來綜合對比其余 Leaf 節點的系統時間來判斷自身系統時間是否準確,具體做法是取 leaf_temporary 下的所有臨時節點(所有運行中的 Leaf-snowflake 節點)的服務IP:Port,然后通過 RPC 請求得到所有節點的系統時間,計算 sum(time)/nodeSize,可能這個是美團內部正在使用的,又或者是這項目很久沒有維護了,如果有知道的朋友也可以說下。
總之大家在看這篇文章的時候主要就是理解兩種 ID 生成方式的差異以及優化思想。
6. 小結
好了,這篇文章就到這了,差不多也 3w 字,屬實是花了不少篇幅去寫源碼的部分,主要還是里面的思想,包括說雪花算法如果解決時鐘回退的問題,又如何避免生成的 ID 總是 +1 遞增,還有就是數據庫號段模式下如何使用雙 buffer 在消耗完當前 Segment 的號段之后能夠不阻塞直接使用下一個號段的 ID。