流式計算的歷史:
? ? 早在7、8年前諸如UC伯克利、斯坦福等大學就開始了對流式數據處理的研究,但是由于更多的關注于金融行業的業務場景或者互聯網流量監控的業務場景,以及當時互聯網數據場景的限制,造成了研究多是基于對傳統數據庫處理的流式化,對流式框架本身的研究偏少。目前這樣的研究逐漸沒有了聲音,工業界更多的精力轉向了實時數據庫。
? ? 2010年Yahoo!對S4的開源,2011年twitter對Storm的開源,改變了這個情況。以前互聯網的開發人員在做一個實時應用的時候,除了要關注應用邏輯計算處理本身,還要為了數據的實時流轉、交互、分布大傷腦筋。但是現在情況卻大為不同,以Storm為例,開發人員可以快速的搭建一套健壯、易用的實時流處理框架,配合SQL產品或者NoSQL產品或者MapReduce計算平臺,就可以低成本的做出很多以前很難想象的實時產品:比如一淘數據部的量子恒道品牌旗下的多個產品就是構建在實時流處理平臺上的。
——————————————————————————————————————
流式計算的最新進展:
? ?在數據處理時間和方式上,Storm與Hadoop MapReduce基本上是兩個對立面,而這兩個技術具備整合可能性極大程度該歸結于YARN這個集群管理層。Hortonworks當下正在致力于通過新型處理框架Tez 來提高Hive的速度,同時YARN還允許Hadoop用戶運行Spark內存處理框架。同時,微軟也在使用YARN讓Hadoop更加適合機器學習用例。
? ? 此外,通過YARN,同集群上同時運行HBase、Giraph等不同技術也成為可能。此外,集群管理技術Mesos(加州大學伯克利分校出品,現已成為Apache項目) 同樣支持了類似YARN功能,盡管其不是像YARN這樣與HDFS捆綁。
? ? 更多技術的整合預示Hadoop這個大數據處理平臺絕不是曇花一現,同時也會讓Hadoop在大數據應用程序領域獲得更高的統治力。
——————————————————————————————————————
? ? 在大數據處理方面相信大家對hadoop已經耳熟能詳,基于Google Map/Reduce來實現的Hadoop為開發者提供了map、reduce原語,使并行批處理程序變得非常地簡單和優美。同樣,Storm也為大數據的實時計算提供了一些簡單優美的原語,這大大降低了開發并行實時處理的任務的復雜性,幫助你快速、高效的開發應用。
? ? 在Storm集群中真正運行topology的主要有三個實體:工作進程、線程和任務。Storm集群中的每臺機器上都可以運行多個工作進程,每個工作進程又可創建多個線程,每個線程可以執行多個任務,任務是真正進行數據處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。因此,計算任務在多個線程、進程和服務器之間并行進行,支持靈活的水平擴展。
? ? Storm可以保證spout發出的每條消息都能被“完全處理”,這也是直接區別于其他實時系統的地方,如S4。
? ? spout發出的消息后續可能會觸發產生成千上萬條消息,可以形象的理解為一棵消息樹,其中spout發出的消息為樹根,Storm會跟蹤這棵消息樹的處理情況,只有當這棵消息樹中的所有消息都被處理了,Storm才會認為spout發出的這個消息已經被“完全處理”。如果這棵消息樹中的任 何一個消息處理失敗了,或者整棵消息樹在限定的時間內沒有“完全處理”,那么spout發出的消息就會重發。
? ? 如果在消息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元。Storm保證一個處理單元永遠運行(除非你顯式殺掉這個處理單元)。當然,如果處理單元中存儲了中間狀態,那么當處理單元重新被Storm啟動的時候,需要應用自己處理中間狀態的恢復。
? ? Hadoop擅長于分布式離線批處理,而Storm設計為支持分布式實時計算;
? ? Hadoop新的spark組件提供了在hadoop平臺上運行storm的可能性
——————————————————————————————————————
Storm的基本概念:
Topologies:拓撲,也俗稱一個任務
Spouts:拓撲的消息源
Bolts:拓撲的處理邏輯單元
tuple:消息元組
Streams:流
Stream groupings:流的分組策略
Tasks:任務處理單元
Executor:工作線程
Workers:工作進程
Configuration:topology的配置
——————————————————————————————————————
Storm與Hadoop的對比:
一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)
Nimbus 與 ResourManager
在Storm的集群里面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus后臺程序,它的作用類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器,并且監控狀態。
Supervisor (worker進程)與NodeManager(YarnChild)
每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。?
Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面,要么在本地磁盤上。
這也就意味著你可以用kill -9來殺死Nimbus和Supervisor進程,然后再重啟它們,就好像什么都沒有發生過。這個設計使得Storm異常的穩定。
一個消息流是一個沒有邊界的tuple序列,而這些tuple序列會以一種分布式的方式并行地創建和處理;
通過對stream中tuple序列中每個字段命名來定義stream;
在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array;
可以自定義類型(只要實現相應的序列化器)。
一般來說消息源會從一個外部源讀取數據并且向topology里面發出消息:tuple;
Spouts可以是可靠的也可以是不可靠的:如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發射一個tuple,但是不可靠的消息源spouts一旦發出一個tuple就不能重發了;
消息源可以發射多條消息流stream:
? ? 使用OutputFieldsDeclarer.declareStream來定義多個stream,
? ? 然后使用SpoutOutputCollector來發射指定的stream。
Bolts可以做很多事情:過濾,聚合,查詢數據庫等等。
Bolts可以簡單的做消息流的傳遞,也可以通過多級Bolts的組合來完成復雜的消息流處理;比如求TopN、聚合操作等(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
? ? 使用OutputCollector.emit來選擇要發射的stream;
Bolts的主要方法是execute:
? ? 它以一個tuple作為輸入,使用OutputCollector來發射tuple;
? ? 通過調用OutputCollector的ack方法,以通知這個tuple的發射者spout;
Bolts一般的流程:?
? ? 處理一個輸入tuple, ?發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了;
? ? storm提供了一個IBasicBolt會自動調用ack。
stream grouping就是用來定義一個stream應該如何分配數據給bolts;
Storm里面有7種類型的stream grouping:
Shuffle Grouping——隨機分組,隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同;
Fields Grouping——按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts里的一個task,而不同的userid則會被分配到不同的bolts里的task;
Global Grouping——全局分組,這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task;
Non Grouping——不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行;
消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id);
Local or shuffle grouping——如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
每個worker是一個物理JVM并且執行整個topology的一部分;
比如,對于并行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks;
Storm會盡量均勻的工作分配給所有的worker;
每一個executor對應到一個線程,在這個線程上運行多個task
stream grouping則是定義怎么從一堆task發射tuple到另外一堆task
可以調用TopologyBuilder類的setSpout和setBolt來設置并行度(也就是有多少個task)