日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume 1.7 源码分析(五)从Channel获取数据写入Sink

發(fā)布時(shí)間:2024/2/28 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume 1.7 源码分析(五)从Channel获取数据写入Sink 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)整體架構(gòu)
Flume 1.7 源碼分析(三)程序入口
Flume 1.7 源碼分析(四)從Source寫數(shù)據(jù)到Channel
Flume 1.7 源碼分析(五)從Channel獲取數(shù)據(jù)寫入Sink

6 從Channel獲取數(shù)據(jù)寫入Sink

6.1 Sink部分

Sink部分主要分為以下3個步驟:
1. 由SinkRunner不斷調(diào)用SinkProcessor的process方法。
2. 根據(jù)配置的SinkProcessor的不同,會使用不同的策略來選擇sink。SinkProcessor有3種,默認(rèn)是DefaultSinkProcessor。
3. 調(diào)用選擇的sink的process方法。

6.1.1 Sink的Process方法

以LoggerSink為例進(jìn)行說明。這個方法來自Sink接口,主要用于取出數(shù)據(jù)進(jìn)行處理,如果失敗則回滾(takeList中內(nèi)容退回quene):

public Status process() throws EventDeliveryException {Status result = Status.READY;Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;try {transaction.begin();event = channel.take();//從channel中獲取一條數(shù)據(jù)if (event != null) {if (logger.isInfoEnabled()) {logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog)); //輸出event到日志}} else {result = Status.BACKOFF;}transaction.commit();//執(zhí)行提交操作} catch (Exception ex) {transaction.rollback();//執(zhí)行回滾操作throw new EventDeliveryException("Failed to log event: " + event, ex);} finally {transaction.close();}return result; }

6.2 Channel部分

6.2.1 doTake方法

這個方法中主要是從queue中取出事件,放到takeList中。

protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();//獲取take列表容量的許可,如果沒有則報(bào)異常。if (takeList.remainingCapacity() == 0) {throw new ChannelException("");} //嘗試獲取queue數(shù)量的許可,如果沒有則代表沒有數(shù)據(jù)可以取,直接返回。if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;synchronized (queueLock) {event = queue.poll();//從queue中取出一條數(shù)據(jù)}Preconditions.checkNotNull(event, "");takeList.put(event);//放到takeList中int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);takeByteCounter += eventByteSize;//設(shè)置計(jì)數(shù)器return event; }

6.2.2 doCommit方法

前面說到put和take操作的提交都是通過這個方法來提交的。

這個步驟要做的事情有:
1. putList放入queue,完成后就代表eventList->putList->queue這個步驟完成。
2. 假如doTake過程沒報(bào)錯(能進(jìn)到這個方法說明沒報(bào)錯),說明sink那邊已經(jīng)獲取到了全部的event,這時(shí)可直接清空takeList,代表queue?takeList & sink這個步驟完成。

綜上,兩個事情合并在一起的話,要做的就是,把putList放入queue再清空takeList。

protected void doCommit() throws InterruptedException {int remainingChange = takeList.size() - putList.size();if (remainingChange < 0) {if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {throw new ChannelException("");}if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("");}}int puts = putList.size();int takes = takeList.size();synchronized (queueLock) {if (puts > 0) {while (!putList.isEmpty()) {if (!queue.offer(putList.removeFirst())) {throw new RuntimeException("");}}}putList.clear();takeList.clear();}//后面是重新設(shè)置相關(guān)計(jì)數(shù)器 }

這個方法一開始去比較takeList和putList的容量差,是為了簡化申請?jiān)S可的過程。正常的流程是清空takeList,釋放takeList.size個許可,再申請putList.size個許可,它是兩個步驟合并起來的。

6.2.3 doRollback方法

與doCommit方法類似,這里的回滾,也分為2種情況:
- 由take操作引起的
該transaction的流程如下:queue->takeList & sink,所以回滾操作要做的事情就是:把takeList放回queue。
- 由put操作引起的
該transaction的流程如下:eventList->putList->queue,由于doPut和doCommit執(zhí)行出現(xiàn)異常就直接跳出了,還沒執(zhí)行清空語句,也就是eventList還沒有清空,所以可以直接清空putList,這樣下次循環(huán)還會重新讀取該eventList中的數(shù)據(jù)。

綜上,兩種操作要合為一個方法的話,就把takeList放回queue,然后清理putList就可以了。代碼如下:

protected void doRollback() {int takes = takeList.size();synchronized (queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),"");while (!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}putList.clear();}//后面是重新設(shè)置相關(guān)計(jì)數(shù)器 }

附注:從目前的代碼看,在take操作的時(shí)候,應(yīng)該已經(jīng)獲取到了部分?jǐn)?shù)據(jù),如果這個時(shí)候異常了,把takeList返回queue的話,會導(dǎo)致重復(fù)數(shù)據(jù)。

總結(jié)

以上是生活随笔為你收集整理的Flume 1.7 源码分析(五)从Channel获取数据写入Sink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。