MongoDB是一個非常出色的“ NoSQL”數據庫,具有廣泛的應用程序。 在SoftwareMill開發的一個項目中,我們將其用作復制的事件存儲,然后將事件從事件流傳輸到其他組件。
介紹
基本思想非常簡單(另請參閱Martin Fowler關于Event Sourcing的文章)。 我們的系統生成一系列事件。 這些事件將保留在事件存儲中。 系統中的其他組件遵循事件流并對其進行“處理”。 例如,可以將它們匯總并寫入報告數據庫(另一方面,它類似于CQRS )。 這種方法有很多優點:
- 事件的讀取和寫入是解耦的(異步的)
- 鑒于它沒有死得太久,任何后續組件都可能死亡,然后“追趕”
- 可能有多個關注者。 跟隨者可以從從屬副本讀取數據,以獲得更好的可伸縮性
- 事件活動的爆發對事件接收器的影響減少; 最壞的情況下,報告生成速度會變慢
這里的關鍵組件當然是快速可靠的事件存儲。 我們用來實現一個的MongoDB的三個關鍵功能是:
- 上限集合和尾部游標
- 快速收集附件
- 復制集
采集
作為基礎,我們使用有上限的集合 ,根據定義,該集合受大小限制。 如果編寫新事件將導致集合超出大小限制,則最早的事件將被覆蓋。 這給了我們類似于事件的循環緩沖區的功能。 (此外,我們也很安全地避免了磁盤空間不足錯誤。)
在2.2版之前,默認情況下,上限集合沒有_id字段(因此沒有索引)。 但是,由于我們希望事件能夠在整個副本集上可靠地寫入,因此_id字段及其上的索引都是必需的。
寫作活動
編寫事件是一個簡單的Mongo插入操作; 插入也可以分批完成。 根據我們對事件丟失的容忍度,我們可能會使用各種Mongo 寫入問題 (例如,等待來自單節點或多個節點的寫入確認)。
所有事件都是不可變的。 除了更好的,線程安全的Java代碼外,這是事件流的必要條件。 如果事件是可變的,事件接收器將如何知道更新的內容? 而且,這對Mongo的性能有很好的影響。 由于永遠不會更改數據,因此寫入磁盤的文檔永遠不會縮小或擴展,因此無需在磁盤上移動塊。 實際上,在具有上限的集合中,Mongo不允許增長曾經編寫的文檔。
閱讀活動
讀取事件流要復雜一些。 首先,可能有多個閱讀器,每個閱讀器在流中具有不同的進度。 其次,如果流中沒有事件,我們希望讀者等待一些事件可用,并避免主動輪詢。 最后,我們想分批處理事件,以提高性能。
有尾游標可以解決這些問題。 要創建這樣的游標,我們必須提供一個起點–事件的ID,我們將從該事件開始讀取; 如果未提供ID,則光標將返回最早的可用事件。 因此,每個讀取器必須存儲它已讀取和處理的最后一個事件。
更重要的是,如果沒有新數據可用,可尾光標可以有選擇地阻塞一段時間,從而解決了主動輪詢問題。
(順便說一下,mongo用于在副本集之間復制數據的oplog集合也是一個有上限的集合。從屬Mongo實例在該集合后面尾隨,流式傳輸“事件”(即數據庫操作),并按順序在本地應用它們。 )
讀取Java中的事件
使用Mongo Java驅動程序時 ,有一些“問題”。 首先,您需要初始化游標。 為此,我們需要提供(1)最后一個事件ID(如果存在); (2)我們要讀取事件的順序(此處為自然順序,即插入順序); (3)兩個關鍵的游標選項,我們希望游標是可拖尾的,并且如果沒有新數據,我們希望將其阻止:
DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
您可能想知道為什么我們使用>= last_id
而不是>
。 由于生成Mongo ObjectId的方式在這里需要。 如果使用一個簡單的> last_id
我們可能會錯過一些與last_id
事件在同一秒之后但之后發生的事件。 這也意味著我們的Java代碼必須處理這一事實,并丟棄收到的第一個事件。
游標的類擴展了基本的Java Iterator
接口,因此非常易于使用。 因此,現在我們可以進行批處理了。 在游標上進行迭代時,驅動程序將批量從Mongo服務器接收數據; 因此我們可以像調用其他迭代器一樣簡單地調用hasNext()
和next()
來接收后續元素,并且只有某些調用會真正導致與服務器的網絡通信。
在Mongo Java驅動程序中,實際上可能阻塞的hasNext()
是hasNext()
。 如果我們要分批處理事件,我們需要(a)只要有可用的元素就讀取它們,并且(b)在被阻止沒有更多事件之前有某種了解的方式,并且我們可以處理事件已經批處理。 由于hasNext()
可以阻止,因此我們無法直接執行此操作。
這就是為什么我們引入了中間隊列( LinkedBlockingQueue
)的原因。 在單獨的線程中,從游標讀取的事件在到達時即被放入隊列中。 如果沒有事件,則線程將在cursor.hasNext()
上cursor.hasNext()
。 阻塞隊列有一個可選的大小限制,因此,如果隊列已滿,則放置一個元素也將阻塞,直到有可用空間為止。 在事件消費者線程中,我們首先嘗試以阻塞方式(使用.poll
從隊列中讀取單個元素,因此我們在這里等待所有事件可用。 然后,我們嘗試將隊列的全部內容消耗到一個臨時集合中(使用.drainTo
,構建批處理,并可能獲取0個元素,但我們始終擁有第一個)。
值得一提的是,如果集合為空,則Mongo不會阻止,因此我們必須回到主動輪詢。 我們還必須考慮到游標可能會在等待期間死亡的事實。 要對此進行檢查,我們應該驗證cursor.getCursorId() != 0
,其中0是“死光標”的ID。 在這種情況下,我們只需要重新實例化游標即可。
加起來
綜上所述,我們得到了一個非常快速的事件源/流解決方案。 從某種意義上說,這是“自我調節”,即如果事件活動達到高峰,則事件接收器將大批量讀取這些事件。 如果事件活動少,則將分批快速處理它們。
我們還將同一個Mongo實例用于其他目的。 從操作角度來看,擁有一個數據庫系統來聚簇和維護常規數據和事件肯定是一件好事。
參考: Adam Warski博客的Blog中來自我們的JCG合作伙伴 Adam Warski的MongoDB事件流 。
翻譯自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html