日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Flume数据传输事务分析[转]

發布時間:2023/12/13 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume数据传输事务分析[转] 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文基于ThriftSource,MemoryChannel,HdfsSink三個組件,對Flume數據傳輸的事務進行分析,如果使用的是其他組件,Flume事務具體的處理方式將會不同。一般情況下,用MemoryChannel就好了,我們公司用的就是這個,FileChannel速度慢,雖然提供日志級別的數據恢復,但是一般情況下,不斷電MemoryChannel是不會丟數據的。

Flume提供事物操作,保證用戶的數據的可靠性,主要體現在:

  • 數據在傳輸到下個節點時(通常是批量數據),如果接收節點出現異常,比如網絡異常,則回滾這一批數據。因此有可能導致數據重發
  • 同個節點內,Source寫入數據到Channel,數據在一個批次內的數據出現異常,則不寫入到Channel。已接收到的部分數據直接拋棄,靠上一個節點重發數據。

編程模型

Flume在對Channel進行Put和Take操作的時候,必須要用事物包住,比如:

Channel ch = new MemoryChannel(); Transaction txn = ch.getTransaction(); //事物開始 txn.begin(); try {Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName("UTF-8")); //往臨時緩沖區Put數據 ch.put(eventToStage); //或者ch.take() //將這些數據提交到channel中 txn.commit(); } catch (Throwable t) { txn.rollback(); if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); }

Put事務流程

Put事務可以分為以下階段:

  • doPut:將批數據先寫入臨時緩沖區putList
  • doCommit:檢查channel內存隊列是否足夠合并。
  • doRollback:channel內存隊列空間不足,拋棄數據

我們從Source數據接收到寫入Channel這個過程對Put事物進行分析。


ThriftSource會spawn多個Worker線程(ThriftSourceHandler)去處理數據,Worker處理數據的接口,我們只看batch批量處理這個接口:

@Overridepublic Status appendBatch(List<ThriftFlumeEvent> events) throws TException {List<Event> flumeEvents = Lists.newArrayList();for(ThriftFlumeEvent event : events) {flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } //ChannelProcessor,在Source初始化的時候傳進來.將數據寫入對應的Channel getChannelProcessor().processEventBatch(flumeEvents); ... return Status.OK; }

事務邏輯都在processEventBatch這個方法里:

public void processEventBatch(List<Event> events) {...//預處理每行數據,有人用來做ETL嘛events = interceptorChain.intercept(events);...//分類數據,劃分不同的channel集合對應的數據 // Process required channels Transaction tx = reqChannel.getTransaction(); ... //事務開始,tx即MemoryTransaction類實例 tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { // 這個put操作實際調用的是transaction.doPut reqChannel.put(event); } //提交,將數據寫入Channel的隊列中 tx.commit(); } catch (Throwable t) { //回滾 tx.rollback(); ... } } ... }

每個Worker線程都擁有一個Transaction實例,保存在Channel(BasicChannelSemantics)里的ThreadLocal變量currentTransaction.

那么,事務到底做了什么?

實際上,Transaction實例包含兩個雙向阻塞隊列LinkedBlockingDeque(感覺沒必要用雙向隊列,每個線程寫自己的putList,又不是多個線程?),分別為:

  • putList
  • takeList

對于Put事物操作,當然是只用到putList了。putList就是一個臨時的緩沖區,數據會先put到putList,最后由commit方法會檢查channel是否有足夠的緩沖區,有則合并到channel的隊列。

channel.put -> transaction.doPut:

protected void doPut(Event event) throws InterruptedException {//計算數據字節大小 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); //寫入臨時緩沖區putList if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; }

transaction.commit:

@Overrideprotected void doCommit() throws InterruptedException { //檢查channel的隊列剩余大小是否足夠 ... int puts = putList.size(); ... synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { //寫入到channel的隊列 if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //清除臨時隊列 putList.clear(); ... } ... }

如果在事務期間出現異常,比如channel剩余空間不足,則rollback:

@Overrideprotected void doRollback() {...//拋棄數據,沒合并到channel的內存隊列 putList.clear(); ... }

Take事務

Take事務分為以下階段:

  • doTake:先將數據取到臨時緩沖區takeList
  • 將數據發送到下一個節點
  • doCommit:如果數據全部發送成功,則清除臨時緩沖區takeList
  • doRollback:數據發送過程中如果出現異常,rollback將臨時緩沖區takeList中的數據歸還給channel內存隊列。


Sink其實是由SinkRunner線程調用Sink.process方法來了處理數據的。我們從HdfsEventSink的process方法說起,Sink類都有個process方法,用來處理傳輸數據的邏輯。:

public Status process() throws EventDeliveryException {...Transaction transaction = channel.getTransaction();...//事務開始transaction.begin();...for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { //take數據到臨時緩沖區,實際調用的是transaction.doTake Event event = channel.take(); if (event == null) { break; } ... //寫數據到HDFS bucketWriter.append(event); ... // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } //commit transaction.commit(); ... } catch (IOException eIO) { transaction.rollback(); ... } finally { transaction.close(); } }

大致流程圖:

接著看看channel.take,作用是將數據放到臨時緩沖區,實際調用的是transaction.doTake:

protected Event doTake() throws InterruptedException {...//從channel內存隊列取數據synchronized(queueLock) {event = queue.poll();}...//將數據放到臨時緩沖區 takeList.put(event); ... return event; }

接著,HDFS寫線程bucketWriter將take到的數據寫到HDFS,如果批數據都寫完了,則要commit了:

protected void doCommit() throws InterruptedException {...takeList.clear();... }

很簡單,其實就是清空takeList而已。如果bucketWriter在寫數據到HDFS的時候出現異常,則要rollback:

protected void doRollback() {int takes = takeList.size();//檢查內存隊列空間大小,是否足夠takeList寫回去 synchronized(queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } ... } ... }

轉載于:https://www.cnblogs.com/whtydn/p/4384199.html

總結

以上是生活随笔為你收集整理的Flume数据传输事务分析[转]的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。