Flume數據傳輸事務分析[轉]

本文基于ThriftSource,MemoryChannel,HdfsSink三個組件,對Flume數據傳輸的事務進行分析,如果使用的是其他組件,Flume事務具體的處理方式將會不同。一般情況下,用MemoryChannel就好了,我們公司用的就是這個,FileChannel速度慢,雖然提供日志級別的數據恢復,但是一般情況下,不斷電MemoryChannel是不會丟數據的。

Flume提供事物操作,保證用戶的數據的可靠性,主要體現在:

  • 數據在傳輸到下個節點時(通常是批量數據),如果接收節點出現異常,比如網絡異常,則回滾這一批數據。因此有可能導致數據重發
  • 同個節點內,Source寫入數據到Channel,數據在一個批次內的數據出現異常,則不寫入到Channel。已接收到的部分數據直接拋棄,靠上一個節點重發數據。

編程模型

Flume在對Channel進行Put和Take操作的時候,必須要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物開始
txn.begin();
try {Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName("UTF-8")); //往臨時緩沖區Put數據 ch.put(eventToStage); //或者ch.take() //將這些數據提交到channel中 txn.commit(); } catch (Throwable t) { txn.rollback(); if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } 

Put事務流程

Put事務可以分為以下階段:

  • doPut:將批數據先寫入臨時緩沖區putList
  • doCommit:檢查channel內存隊列是否足夠合并。
  • doRollback:channel內存隊列空間不足,拋棄數據

我們從Source數據接收到寫入Channel這個過程對Put事物進行分析。


ThriftSource會spawn多個Worker線程(ThriftSourceHandler)去處理數據,Worker處理數據的接口,我們只看batch批量處理這個接口:

    @Overridepublic Status appendBatch(List<ThriftFlumeEvent> events) throws TException {List<Event> flumeEvents = Lists.newArrayList();for(ThriftFlumeEvent event : events) {flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } //ChannelProcessor,在Source初始化的時候傳進來.將數據寫入對應的Channel getChannelProcessor().processEventBatch(flumeEvents); ... return Status.OK; } 

事務邏輯都在processEventBatch這個方法里:

public void processEventBatch(List<Event> events) {...//預處理每行數據,有人用來做ETL嘛events = interceptorChain.intercept(events);...//分類數據,劃分不同的channel集合對應的數據 // Process required channels Transaction tx = reqChannel.getTransaction(); ... //事務開始,tx即MemoryTransaction類實例 tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { // 這個put操作實際調用的是transaction.doPut reqChannel.put(event); } //提交,將數據寫入Channel的隊列中 tx.commit(); } catch (Throwable t) { //回滾 tx.rollback(); ... } } ... } 

每個Worker線程都擁有一個Transaction實例,保存在Channel(BasicChannelSemantics)里的ThreadLocal變量currentTransaction.

那么,事務到底做了什么?

實際上,Transaction實例包含兩個雙向阻塞隊列LinkedBlockingDeque(感覺沒必要用雙向隊列,每個線程寫自己的putList,又不是多個線程?),分別為:

  • putList
  • takeList

對于Put事物操作,當然是只用到putList了。putList就是一個臨時的緩沖區,數據會先put到putList,最后由commit方法會檢查channel是否有足夠的緩沖區,有則合并到channel的隊列。

channel.put -> transaction.doPut:

    protected void doPut(Event event) throws InterruptedException {//計算數據字節大小 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); //寫入臨時緩沖區putList if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } 

transaction.commit:

@Overrideprotected void doCommit() throws InterruptedException { //檢查channel的隊列剩余大小是否足夠 ... int puts = putList.size(); ... synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { //寫入到channel的隊列 if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //清除臨時隊列 putList.clear(); ... } ... } 

如果在事務期間出現異常,比如channel剩余空間不足,則rollback:

@Overrideprotected void doRollback() {...//拋棄數據,沒合并到channel的內存隊列 putList.clear(); ... } 

Take事務

Take事務分為以下階段:

  • doTake:先將數據取到臨時緩沖區takeList
  • 將數據發送到下一個節點
  • doCommit:如果數據全部發送成功,則清除臨時緩沖區takeList
  • doRollback:數據發送過程中如果出現異常,rollback將臨時緩沖區takeList中的數據歸還給channel內存隊列。


Sink其實是由SinkRunner線程調用Sink.process方法來了處理數據的。我們從HdfsEventSink的process方法說起,Sink類都有個process方法,用來處理傳輸數據的邏輯。:

public Status process() throws EventDeliveryException {...Transaction transaction = channel.getTransaction();...//事務開始transaction.begin();...for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { //take數據到臨時緩沖區,實際調用的是transaction.doTake Event event = channel.take(); if (event == null) { break; } ... //寫數據到HDFS bucketWriter.append(event); ... // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } //commit transaction.commit(); ... } catch (IOException eIO) { transaction.rollback(); ... } finally { transaction.close(); } } 

大致流程圖:

接著看看channel.take,作用是將數據放到臨時緩沖區,實際調用的是transaction.doTake:

protected Event doTake() throws InterruptedException {...//從channel內存隊列取數據synchronized(queueLock) {event = queue.poll();}...//將數據放到臨時緩沖區 takeList.put(event); ... return event; } 

接著,HDFS寫線程bucketWriter將take到的數據寫到HDFS,如果批數據都寫完了,則要commit了:

protected void doCommit() throws InterruptedException {...takeList.clear();...
}

很簡單,其實就是清空takeList而已。如果bucketWriter在寫數據到HDFS的時候出現異常,則要rollback:

protected void doRollback() {int takes = takeList.size();//檢查內存隊列空間大小,是否足夠takeList寫回去 synchronized(queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } ... } ... }

轉載于:https://www.cnblogs.com/whtydn/p/4384199.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/259134.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/259134.shtml
英文地址,請注明出處:http://en.pswp.cn/news/259134.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

最近的一些校招試題摘錄

最近又參加了一些校招&#xff0c;真是馬不停蹄啊。多參加考試是好的&#xff0c;可以不斷發現一些新的問題。下面摘錄一些我不太會的題。 1.volatile的作用是什么&#xff1f; 答案&#xff1a;volatile是類型修飾符&#xff0c;用它修飾的類型變量可能會被編譯器未知的因素…

yii2中的rules驗證規則

2019獨角獸企業重金招聘Python工程師標準>>> Rules驗證規則&#xff1a;required : 必須值驗證屬性||CRequiredValidator 的別名, 確保了特性不為空.[[字段名],required,requiredValue>必填值,message>提示信息];email : 郵箱驗證||CEmailValidator 的別名,確…

weblogic數據源配置的問題,weblogic密碼破解

weblogic 報錯 please increase XXX,得知是連接池出了問題&#xff0c;查看weblogic配置&#xff0c;發現沒有設置超時 查看oracle 當前session&#xff0c;可以看到連接的機器&#xff0c;用戶&#xff0c;當前執行的sqlid select * from v$session; select v$sql where sql_i…

自己寫的簡易多任務系統---基于pic18fxxx

這個工程只是實現了最簡單的OS任務調度&#xff0c;對于理解任務調度有點幫助。其實就是從UC/OS-II里面摘出來的&#xff0c;沒有原來的那么復雜&#xff0c;很精簡&#xff0c;但道理上是一樣的。工程中的CPU.C文件時直接拿Nathan Brown寫好的&#xff0c;因為關于PIC任務切換…

python語言整數類型-Python 的內置數值類型

Python 是一種敏捷的、動態類型化的、極富表現力的開源編程語言&#xff0c;可以被自由地安裝到多種平臺上。Python 代碼是被解釋的。如果您對編輯、構建和執行循環較為熟悉&#xff0c;則 Python 代碼對您來說更簡單。但是&#xff0c;請不要搞錯&#xff1a;Python 器可以是簡…

滴滴出行2016校招編程題

1. 給定一個m*n的數組&#xff08;m,n>2,數組值>0&#xff09;&#xff0c;要求選出和最大的子2*2數組。例如&#xff1a; 1 2 3 4 5 6 7 8 9 顯然和最大的2*2子數組是5 6;8 9.下面完成這個功能。 Input: &#xff08;m*n的數組&#xff09; 1 2 3 ; 4 5 6 ; 7 8 9 …

每天一個linux命令(22):find 命令的參數詳解

find一些常用參數的一些常用實例和一些具體用法和注意事項。 1&#xff0e;使用name選項&#xff1a; 文件名選項是find命令最常用的選項&#xff0c;要么單獨使用該選項&#xff0c;要么和其他選項一起使用。 可以使用某種文件名模式來匹配文件&#xff0c;記住要用引號將文件…

(WPF) DataGrid之綁定

通過ObservableCollection 綁定到 DataGrid. 1. 前臺Xaml. <DataGrid x:Name"dgMeasurements"HorizontalAlignment"Left"Margin"10,69,0,10"ItemsSource"{Binding}"AutoGenerateColumns"False"Width"370">…

程序=數據結構+算法

這句名言&#xff0c;我現在品來很有感覺&#xff0c;看看uc/os-II里面那些就緒表、查找最高優先級任務等等&#xff0c;算法設計的非常巧妙&#xff0c;整個OS都是圍繞著OS_TCB來運轉的&#xff0c;任務需要通信&#xff0c;那就在建立個OS_EVENT&#xff0c;通過.*OSTCBEvent…

去哪筆試兩題

1&#xff0c;a是一個有序數組&#xff0c;但經過向右移動數位&#xff0c;現在預在a中查找元素key的位置&#xff0c;如不存在&#xff0c;返回0。例如a[5,6.7.8,1,2,3,4]. 實現&#xff1a; 1 #quna12 def findPos(a,key):3 mina[0];4 for i in range(len(a)):5 …

MySQL5.6主從復制搭建基于日志(binlog)

什么是MySQL主從復制 簡單來說&#xff0c;就是保證主SQL&#xff08;Master&#xff09;和從SQL&#xff08;Slave&#xff09;的數據是一致性的&#xff0c;向Master插入數據后&#xff0c;Slave會自動從Master把修改的數據同步過來&#xff08;有一定的延遲&#xff09;&…

opengl 如何加陰影_動漫嘴唇厚涂如何繪制?厚涂嘴唇正確畫法

動漫嘴唇厚涂如何繪制&#xff1f;厚涂嘴唇正確畫法&#xff01;嘴巴怎么畫&#xff1f;畫嘴巴真的很考驗一個畫師功力&#xff0c;好看的嘴巴生動而豐滿&#xff0c;可以給整幅畫作添上亮點&#xff0c;而畫的不好的嘴巴呢&#xff0c;就容易把畫面整體的風格打破。那么零基礎…

位運算

我們復習一下位運算&#xff0c;這里介紹一下(& ,|, ^)的用途。 按位與 ------------& 規則&#xff1a; 0&00 0&10 1&0 0 1&11 &#xff08; 兩位為1&#xff0c;才是1&#xff09;作用&#xff1a; 清零與保位。通常用來將特定的位清零&…

詳解JMeter函數和變量

詳解JMeter函數和變量&#xff08;1&#xff09; JMeter函數可以被認為是某種特殊的變量&#xff0c;它們可以被采樣器或者其他測試元件所引用。函數調用的語法如下&#xff1a; ${__functionName(var1,var2,var3)} 其中&#xff0c;__functionName匹配被調用的函數名稱。用圓括…

信號反射

突然想起來前幾天調試CAN通訊的時候出現的BUG&#xff0c;那就是傳說中的“信號反射”&#xff0c;也有稱“振鈴”的。錯誤剛出現的時候沒有意識過來&#xff0c;還說怎么出現重復出現這么多條消息呢&#xff1f;光在書本上看到過這個概念&#xff0c;沒有“實物”與之對應起來…

hdu 5199 map或二分或哈希

題目描述&#xff1a;給出n棵樹的高度&#xff0c;每棵樹上都站著一只鳥&#xff0c;槍手Jack站在最左邊那棵樹的左邊對鳥進行射擊&#xff0c;當Jack在高度為H的地方向右發射一顆子彈的時候&#xff0c;高度為H的樹上的鳥兒就會掉落&#xff08;注&#xff1a;其他樹上的鳥兒不…

數字電路實驗怎么接線視頻講解_家庭影院中音箱、功放、投影機、4K播放機不知道怎么連接?手把手教你...

家庭影院中音箱、功放、投影機、4K播放機不知道怎么連接&#xff1f;手把手教你有不少用戶收到從家庭影院器材之后&#xff0c;表示完全不會連接。翻看說明書也覺得頭大&#xff0c;知識太多&#xff0c;然而卻很難找到要點。今天主要跟大家講講如何連接音箱、功放、投影機和影…

.NET開發過程中的全文索引使用技巧之Solr

前言&#xff1a;相信許多人都聽說過.net開發過程中基于Lucene.net實現的全文索引&#xff0c;而Solr是一個高性能&#xff0c;基于Lucene的全文搜索服務器。同時對其進行了擴展&#xff0c;提供了比Lucene更為豐富的查詢語言&#xff0c;同時實現了可配置、可擴展并對查詢性能…

關于字符的讀入與輸出

在筆試中&#xff0c;經常見到字符的讀入與輸出的題目。逆序打印輸入時最常見、最基本的考題&#xff0c;復雜點的就是統計單詞、逆序打印單詞之類的。難點是如何判斷輸入的結束&#xff0c;如果用getchar函數&#xff0c;其輸入結束符為EOF&#xff08;其打印值為-1&#xff0…

修正discuz發帖首次換行無效的問題

找遍了百度和google都沒有解決方案&#xff0c;連discuz官方都沒有出來解決&#xff0c;至今其官網仍有這個問題。 那就自己動手解決吧&#xff0c;順手打個補丁。雖然走了小路&#xff0c;但是能解決問題。 解決方案&#xff1a;修改static/js/bbcode.js 找到 html2bbcode()方…