使用MongoDB進行事件流

MongoDB是一個非常出色的“ NoSQL”數據庫,具有廣泛的應用程序。 在SoftwareMill開發的一個項目中,我們將其用作復制的事件存儲,然后將事件從事件流傳輸到其他組件。

介紹

基本思想非常簡單(另請參閱Martin Fowler關于Event Sourcing的文章)。 我們的系統生成一系列事件。 這些事件將保留在事件存儲中。 系統中的其他組件遵循事件流并對其進行“處理”。 例如,可以將它們匯總并寫入報告數據庫(另一方面,它類似于CQRS )。 這種方法有很多優點:

  • 事件的讀取和寫入是解耦的(異步的)
  • 鑒于它沒有死得太久,任何后續組件都可能死亡,然后“追趕”
  • 可能有多個關注者。 跟隨者可以從從屬副本讀取數據,以獲得更好的可伸縮性
  • 事件活動的爆發對事件接收器的影響減少; 最壞的情況下,報告生成速度會變慢

這里的關鍵組件當然是快速可靠的事件存儲。 我們用來實現一個的MongoDB的三個關鍵功能是:

  • 上限集合和尾部游標
  • 快速收集附件
  • 復制集


采集

作為基礎,我們使用有上限的集合 ,根據定義,該集合受大小限制。 如果編寫新事件將導致集合超出大小限制,則最早的事件將被覆蓋。 這給了我們類似于事件的循環緩沖區的功能。 (此外,我們也很安全地避免了磁盤空間不足錯誤。)

在2.2版之前,默認情況下,上限集合沒有_id字段(因此沒有索引)。 但是,由于我們希望事件能夠在整個副本集上可靠地寫入,因此_id字段及其上的索引都是必需的。

寫作活動

編寫事件是一個簡單的Mongo插入操作; 插入也可以分批完成。 根據我們對事件丟失的容忍度,我們可能會使用各種Mongo 寫入問題 (例如,等待來自單節點或多個節點的寫入確認)。

所有事件都是不可變的。 除了更好的,線程安全的Java代碼外,這是事件流的必要條件。 如果事件是可變的,事件接收器將如何知道更新的內容? 而且,這對Mongo的性能有很好的影響。 由于永遠不會更改數據,因此寫入磁盤的文檔永遠不會縮小或擴展,因此無需在磁盤上移動塊。 實際上,在具有上限的集合中,Mongo不允許增長曾經編寫的文檔。

閱讀活動

讀取事件流要復雜一些。 首先,可能有多個閱讀器,每個閱讀器在流中具有不同的進度。 其次,如果流中沒有事件,我們希望讀者等待一些事件可用,并避免主動輪詢。 最后,我們想分批處理事件,以提高性能。

有尾游標可以解決這些問題。 要創建這樣的游標,我們必須提供一個起點–事件的ID,我們將從該事件開始讀取; 如果未提供ID,則光標將返回最早的可用事件。 因此,每個讀取器必須存儲它已讀取和處理的最后一個事件。

更重要的是,如果沒有新數據可用,可尾光標可以有選擇地阻塞一段時間,從而解決了主動輪詢問題。

(順便說一下,mongo用于在副本集之間復制數據的oplog集合也是一個有上限的集合。從屬Mongo實例在該集合后面尾隨,流式傳輸“事件”(即數據庫操作),并按順序在本地應用它們。 )

讀取Java中的事件

使用Mongo Java驅動程序時 ,有一些“問題”。 首先,您需要初始化游標。 為此,我們需要提供(1)最后一個事件ID(如果存在); (2)我們要讀取事件的順序(此處為自然順序,即插入順序); (3)兩個關鍵的游標選項,我們希望游標是可拖尾的,并且如果沒有新數據,我們希望將其阻止:

DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);

您可能想知道為什么我們使用>= last_id而不是> 。 由于生成Mongo ObjectId的方式在這里需要。 如果使用一個簡單的> last_id我們可能會錯過一些與last_id事件在同一秒之后但之后發生的事件。 這也意味著我們的Java代碼必須處理這一事實,并丟棄收到的第一個事件。

游標的類擴展了基本的Java Iterator接口,因此非常易于使用。 因此,現在我們可以進行批處理了。 在游標上進行迭代時,驅動程序將批量從Mongo服務器接收數據; 因此我們可以像調用其他迭代器一樣簡單地調用hasNext()next()來接收后續元素,并且只有某些調用會真正導致與服務器的網絡通信。

在Mongo Java驅動程序中,實際上可能阻塞的hasNext()hasNext() 。 如果我們要分批處理事件,我們需要(a)只要有可用的元素就讀取它們,并且(b)在被阻止沒有更多事件之前有某種了解的方式,并且我們可以處理事件已經批處理。 由于hasNext()可以阻止,因此我們無法直接執行此操作。

這就是為什么我們引入了中間隊列( LinkedBlockingQueue )的原因。 在單獨的線程中,從游標讀取的事件在到達時即被放入隊列中。 如果沒有事件,則線程將在cursor.hasNext()cursor.hasNext() 。 阻塞隊列有一個可選的大小限制,因此,如果隊列已滿,則放置一個元素也將阻塞,直到有可用空間為止。 在事件消費者線程中,我們首先嘗試以阻塞方式(使用.poll從隊列中讀取單個元素,因此我們在這里等待所有事件可用。 然后,我們嘗試將隊列的全部內容消耗到一個臨時集合中(使用.drainTo ,構建批處理,并可能獲取0個元素,但我們始終擁有第一個)。

值得一提的是,如果集合為空,則Mongo不會阻止,因此我們必須回到主動輪詢。 我們還必須考慮到游標可能會在等待期間死亡的事實。 要對此進行檢查,我們應該驗證cursor.getCursorId() != 0 ,其中0是“死光標”的ID。 在這種情況下,我們只需要重新實例化游標即可。

加起來

綜上所述,我們得到了一個非常快速的事件源/流解決方案。 從某種意義上說,這是“自我調節”,即如果事件活動達到高峰,則事件接收器將大批量讀取這些事件。 如果事件活動少,則將分批快速處理它們。

我們還將同一個Mongo實例用于其他目的。 從操作角度來看,擁有一個數據庫系統來聚簇和維護常規數據和事件肯定是一件好事。

參考: Adam Warski博客的Blog中來自我們的JCG合作伙伴 Adam Warski的MongoDB事件流 。

翻譯自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/370872.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/370872.shtml
英文地址,請注明出處:http://en.pswp.cn/news/370872.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

hihocoder-Week173--A Game

hihocoder-Week173--A Game A Game 時間限制:10000ms單點時限:1000ms內存限制:256MB描述 Little Hi and Little Ho are playing a game. There is an integer array in front of them. They take turns (Little Ho goes first) to select a number from either the beginning …

php打亂數組二維數組、多維數組

//這個是針對二維數組的!下面針對多維數組的亂序方法<?php function shuffle_assoc($list) { if (!is_array($list)) return $list; $keys array_keys($list); shuffle($keys); $random array(); foreach ($keys as $key) $random[$key] $list[$key]; ret…

明明一樣的程序為啥有的系統就報錯有的就正常運行呢_SurfaceGo Android系統折騰筆記...

Surface Go平板在Win10系統下的表現我認為還是比較出色的&#xff0c;x86架構CPU意味著不考慮性能的情況下&#xff0c;臺式機上能跑的程序&#xff0c;這臺平板也能跑&#xff0c;新Galgame一出就能直接安裝上躺床上玩&#xff0c;妙哉。但遺憾的是現實世界還是要考慮性能問題…

c語言實訓作業總結,c語言程序設計上機實踐心得報告

c語言程序設計上機實踐心得報告 班級:11 電信 2 姓名:莫金波 學號:1107032242012.12.28 惠州學院 HUIZHOU UNIVERSITY 我們專業的學生在專業老師的帶領下進行了 c 語言設計基礎教程的 實踐學習。在這之前&#xff0c;我們已經對 c 語言這門課程學習了差不多一 個學期&#xff0…

JavaOne 2012:在JVM上診斷應用程序

值得參加Staffan Larsen &#xff08;Oracle Java Serviceability Architect&#xff09;的演講“ 在JVM上診斷應用程序 ”&#xff08;Hilton Plaza A / B&#xff09;&#xff0c;只是為了學習Oracle JVM 7隨附的新jcmd命令行工具。該演示對我來說是“獎金”&#xff0c;這對…

mysql慢查詢工具

GeorgeHao安裝過程&#xff1a; [rootlocalhost-centos6 ~]# wget percona.com/get/pt-query-digest [rootlocalhost-centos6 ~]# chmod ux pt-query-digest [rootlocalhost-centos6 ~]# mv /root/pt-query-digest /usr/bin/ 今天有在阿里云服務器跑分的時候出現"Cant loc…

python字符串轉date,在Python上將字符串轉換為Date類型

I have this string:2012-02-10 # (year-month-day)and I need it to be as date type for me to use the date function isoweekday().Does anyone know how I can convert this string into a date?解決方案You can do that with datetime.strptime()Example:>>> f…

文檔詞頻矩陣_論文理解:從詞嵌入到文檔距離

論文作者簡介本論文第一作者Matt J. Kusner是牛津大學的副教授&#xff0c;致力于設計適應現實世界問題需求的新機器學習模型&#xff08;例如&#xff0c;fair algorithms, discrete generative models, document distances, privacy, dataset compression, budgeted learning…

C# 線程理解

概念引用&#xff1a;http://blog.csdn.net/yujie_yang/article/details/53173752 多線程和多進程的區別&#xff1a;任務管理器里各種不同的進程就是多進程&#xff0c;或者是你同時運行多個”.exe’程序就可以理解為多進程&#xff0c;多進程是要更多消耗CPU資源的。 多線程是…

c語言主調函數和被調函數,在C語言中,何為主調函數和被調函數,他們之 – 手機愛問...

2007-08-30請詳細一些~最好舉出例子你好。評價寶寶的標準基本上是&#xff1a;技能>資質>成長因為寶寶的評價是一項 仁者見仁的活兒&#xff0c;但其中有些規律我想是可以具體話的&#xff0c;希望能對你有幫助&#xff1a;1&#xff1a;技能&#xff1a;技能的意義有多大…

學習關于display :flex 布局問題!

很多人不明白這個display:flex是到底是什么東西&#xff0c;如何使用的 。 1.什么是display&#xff1a;flex呢&#xff1f; 答&#xff1a;flex是 flexible box的縮寫&#xff0c;意為彈性布局 &#xff1b;這個東西的引入&#xff0c;為盒模型提供了最大的靈活性&#xf…

QT信號和槽函數學習筆記

//connect 函數有4個參數 分別是 發送者 信號。接受者 &#xff0c;槽 //connect(sender,signal,receiver,slot) /* * 信號和槽 * 信號 就是一個普通的函數 定義信號的時候需要在函數前面加上signals: &#xff0c;不需要實現 * 槽 函數 在QT5中科院是類的任意成員函數&#xf…

數據庫和Webapp安全

威脅模型 這是根據我網站上的快速參考頁松散地討論數據庫和Webapp安全的問題。 該頁面變得笨拙&#xff0c;并且使讀者無法輕松地與我或其他人進行交互。 威脅模型 所有安全分析都必須從檢查威脅模型開始。 威脅模型要求您回答四個問題&#xff1a; 我要保護的是什么&#…

note同步不及時 one_一輛理想ONE又“跪了”?理想官方緊急發文回應

汽車行業關注(autochat.com.cn)10月16日報道——10月15日&#xff0c;有網友在社交媒體上發布視頻&#xff0c;從視頻可以看到&#xff0c;一輛理想ONE在遭遇事故后&#xff0c;左前輪脫落在車外疑似斷軸,從視頻未能判定是斷軸引起的事故&#xff0c;還是事故引起的斷軸。針對該…

C語言連續多個空格合并一個,C語言合并連續空格

一開始自己寫的&#xff1a;a&#xff1a;#includemain(){int c;int state0;while (( cgetchar()) ! EOF) {if (c ){state1;continue;}if (state){state0;putchar( );putchar(c);}elseputchar(c);}}網上搜的&#xff1a;b:#include #define NONBLANK avoid main(){int c , last…

Skywalking 中 Agent 自動同步配置源碼解析

文章目錄 前言正文實現架構實現模型OAP 同步 ApolloConfigWatcherRegisterConfigChangeWatcher Agent 側 前言 本文代碼 OAP 基于 v9.7&#xff0c;Java Agent 基于 v9.1&#xff0c;配置中心使用 apollo。 看本文需要配合代碼“食用”。 正文 Skywalking 中就使用這種模型…

華為5720設置靜態路由不通_【干貨分享】交換機與路由器在環路中的處理機制了解一下!...

點擊藍字關注我們-今天小盟帶大家來討論一下交換機與路由器在環路中的處理機制-01基礎配置1---如圖配置路由器各接口地址&#xff0c;AR-2為PC-1的網關路由器2---AR-1配置靜態默認路由&#xff0c;下一跳地址指向AR-2&#xff1b;[AR-1]ip route-static 0.0.0.0 0 12.1.1.2AR-2…

IPC 進程間通信方式——信號量

信號量 本質上是共享資源的數目&#xff0c;用來控制對共享資源的訪問。用于進程間的互斥和同步每種共享資源對應一個信號量&#xff0c;為了便于大量共享資源的操作引入了信號量集&#xff0c;可對多對信號量一次性操作。對信號量集中所有的操作可以要求全部成功&#xff0c;也…

css選擇器的優先級

選擇器的優先級表述為4個部分&#xff0c;用0,0,0,0表示。 !important--1,0,0,0行內樣式ID選擇器--0,1,0,0類選擇器(例如,.example)、屬性選擇器&#xff08;例如, [type"radio"]&#xff09;或偽類&#xff08;例如, :hover&#xff09;--0,0,1,0元素&#xff08;例…

VisualVM介紹使用

1 打開VisualVM&#xff08;這個工具放在JDK安裝目錄的bin目錄下&#xff0c;雙擊jvisualvm.exe即可打開&#xff09;&#xff0c;如下圖所示 以VisualVM自身為例&#xff0c;VisualVM本身也是一個java程序&#xff0c;當然也而已用VisualVM來分析 2 概述頁面主要顯示程序…