日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume 1.7 源码分析(四)从Source写数据到Channel

發布時間:2024/2/28 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume 1.7 源码分析(四)从Source写数据到Channel 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)整體架構
Flume 1.7 源碼分析(三)程序入口
Flume 1.7 源碼分析(四)從Source寫數據到Channel
Flume 1.7 源碼分析(五)從Channel獲取數據寫入Sink

5 從Source寫數據到Channel

5.1 Source部分

5.1.1 SourceRunner

SourceRunner就是專門用于運行Source的一個類。
在”物化配置”一節獲取配置信息后,會根據Source去獲取具體的SourceRunner,調用的是SourceRunner的forSource方法。

public static SourceRunner forSource(Source source) {SourceRunner runner = null;if (source instanceof PollableSource) {runner = new PollableSourceRunner();((PollableSourceRunner) runner).setSource((PollableSource) source);} else if (source instanceof EventDrivenSource) {runner = new EventDrivenSourceRunner();((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);} else {throw new IllegalArgumentException("No known runner type for source " + source);}return runner; }

可以看到source分為了2種類型,并有對應的sourceRunner(PollableSourceRunner、EventDrivenSourceRunner)。這2種source區別在于是否需要外部的驅動去獲取數據,不需要外部驅動(采用自身的事件驅動機制)的稱為EventDrivenSource,需要外部驅動的稱為PollableSource。

  • 常見的EventDrivenSource:AvroSource、ExecSource、SpoolDirectorySource。
  • 常見的PollableSource:TaildirSource、kafkaSource、JMSSource。

以EventDrivenSourceRunner為例,由MonitorRunnable調用其start方法:

public void start() {Source source = getSource();ChannelProcessor cp = source.getChannelProcessor();cp.initialize();//用于初始化Interceptorsource.start();lifecycleState = LifecycleState.START; }

這里的ChannelProcessor是比較重要的一個類,后面會具體說。接下來調用了Source的start方法。可以對照一下之前的整體架構的圖,start方法實現的就是這個部分:

5.1.2 ExecSource

以ExecSource的start方法為例:

public void start() {executor = Executors.newSingleThreadExecutor();runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);runnerFuture = executor.submit(runner);sourceCounter.start();super.start(); }

主要啟動了一個線程runner,初始化了一下計數器。具體實現還是要看ExecRunable類的run方法:

public void run() {do {timedFlushService = Executors.newSingleThreadScheduledExecutor(…); //使用配置的參數啟動Shell命令String[] commandArgs = command.split("\\s+");process = new ProcessBuilder(commandArgs).start(); //設置標準輸入流reader = new BufferedReader(new InputStreamReader(process.getInputStream()…));//設置錯誤流 StderrReader stderrReader = new StderrReader(…);stderrReader.start(); //啟動定時任務,將eventList中數據批量寫入到Channelfuture = timedFlushService.scheduleWithFixedDelay(new Runnable() {public void run() {synchronized (eventList) {if (!eventList.isEmpty() && timeout()) {flushEventBatch(eventList);}}}},batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); //按行讀取標準輸出流的內容,并寫入eventListwhile ((line = reader.readLine()) != null) {synchronized (eventList) {sourceCounter.incrementEventReceivedCount();eventList.add(EventBuilder.withBody(line.getBytes(charset))) //超出配置的大小或者超時后,將eventList寫到Channelif (eventList.size() >= bufferCount || timeout()) {flushEventBatch(eventList);} } }synchronized (eventList) {if (!eventList.isEmpty()){flushEventBatch(eventList);}}} while (restart);//如果配置了自動重啟,當Shell命令的進程結束時,自動重啟命令。 }

在該方法中啟動了2個reader,分別取讀取標準輸入流和錯誤流,將標準輸入流中的內容寫入eventList。

與此同時啟動另外一個線程,調用flushEventBatch方法,定期將eventList中的數據寫入到Channel。

private void flushEventBatch(List<Event> eventList) {channelProcessor.processEventBatch(eventList);//假如這里異常的話,eventList還沒有清空sourceCounter.addToEventAcceptedCount(eventList.size());eventList.clear();lastPushToChannel = systemClock.currentTimeMillis(); }

可以看到這里調用了channelProcessor.processEventBatch()來寫入Channel。

5.2 Channel部分

5.2.1 ChannelProcessor

ChannelProcessor的作用是執行所有interceptor,并將eventList中的數據,發送到各個reqChannel、optChannel。ReqChannel和optChannel是通過channelSelector來獲取的。

public interface ChannelSelector extends NamedComponent, Configurable {public void setChannels(List<Channel> channels);public List<Channel> getRequiredChannels(Event event);public List<Channel> getOptionalChannels(Event event);public List<Channel> getAllChannels();//獲取在當前Source中配置的全部Channel }

如果要自定義一個ChannelSelector,只需要繼承AbstractChannelSelector后,實現getRequiredChannels和getOptionalChannels即可。

ReqChannel代表一定保證存儲的Channel(失敗會不斷重試),optChannel代表可能存儲的Channel(即失敗后不重試)。

ReqChannel與optChannel的區別從代碼上來看,前者在出現異常時,會在執行完回滾后往上層拋,而optChannel則只執行回滾。注意到回滾操作只清空putList(5.2.4節會說明),而這一層如果沒有拋出異常的話,調用方(也就是上節的flushEventBatch)會清空eventList,也就是異常之后的數據丟失了。

發送其中一條數據的代碼如下:

try {tx.begin();reqChannel.put(event);tx.commit(); } catch (Throwable t) {tx.rollback();//省略部分代碼 }

其中put調用Channel的doPut方法,commit調用Channel的doCommit方法。
Channel主要包含4個主要方法:doPut、doTake、doCommit、doRollback。下面以MemoryChannel為例說明。

5.2.2 doPut方法

在這個方法中,只包含了遞增計數器和將事件添加到putList。

protected void doPut(Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount();int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("");}putByteCounter += eventByteSize; }

假如這個方法中出現了異常,則會拋到ChannelProcessor中執行回滾操作。

5.2.3 doCommit方法

這個方法是比較復雜的方法之一,原因在于put和take操作的commit都是通過這個方法來進行的,所以代碼里面其實混合了2個功能(即put和take操作)所需的提交代碼。

單純從Source寫數據到Channel這件事情,流程為eventList->putList->queue。

由于前面已經完成了把數據放到putList中,那接下來要做的事情就是將putList中數據放入queue中就可以了。這個部分先說明到這里,下一個章節結合take操作一起看這個方法。

5.2.4 doRollback方法

與doCommit方法類似,這里的回滾,也分為2種情況:由take操作引起的和由put方法引起的。

這里先說由put發起的,該transaction的流程如下:
eventList->putList->queue

由于doPut和doCommit執行出現異常就直接跳出了,還沒執行清空語句(這里可以參考“ExecSource“章節的最后一段代碼的注釋部分),也就是eventList還沒有清空,所以可以直接清空putList,這樣下次循環還會重新讀取該eventList中的數據。

附注:在put操作commit的時候,如果部分數據已經放進queue的話,這個時候回滾,那是否存在數據重復問題呢?根據代碼,由于在放隊列這個操作之前已經做過很多判斷(容量等等),這個操作只是取出放進隊列的操作,而這個代碼之后,也只是一些設置計數器的操作,理論上不會出現異常導致回滾了。

總結

以上是生活随笔為你收集整理的Flume 1.7 源码分析(四)从Source写数据到Channel的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲成成品网站 | www国产亚洲精品久久网站 | 91性| 国产精品99精品 | 精品一区欧美 | 操xxxx| 午夜视频在线免费播放 | 久草这里只有精品 | 欧美日韩精品一区 | 亚洲图片欧美在线 | 日韩三区视频 | 国产精品乱码久久久 | 欧美黄色大片网站 | 久久久精品免费看 | 一本色道久久综合亚洲二区三区 | 超碰公开在线观看 | av大片网站| 亚洲熟妇av日韩熟妇在线 | av永久免费在线观看 | 狠狠操狠狠操 | 亚洲一区二区三区四区电影 | www.夜夜爱 | 国产夜夜夜| 午夜极品 | 国产91久久精品一区二区 | 久久久久久黄色片 | 好男人www| 国内精品人妻无码久久久影院蜜桃 | 国产视频一区二区三区四区五区 | 福利一区在线 | 亚洲国产精品久久久久 | 欧美你懂得 | 中文字幕日韩一区 | 三级亚洲欧美 | 性自由色xxxx免费视频 | 好吊妞一区二区三区 | www.嫩草.com | 亚洲我射 | 无码h肉动漫在线观看 | a级片网址| 男女啪啪软件 | 久久久一二三 | 9191久久 | 美女被到爽高潮视频 | 国产精品日韩在线 | 亚洲一本二本 | 日韩精品视频在线观看免费 | 91一区在线观看 | 日韩激情综合网 | 欧美一级视频 | 女人和拘做爰正片视频 | 亚洲精品蜜桃 | 狠狠操网址 | 精产国品一二三产品蜜桃 | 思思99精品视频在线观看 | zjzjzjzjzj亚洲女人 | 美女高潮黄又色高清视频免费 | 一级不卡毛片 | 男女午夜爽爽爽 | 久久久久久无码精品人妻一区二区 | 天天干天天草天天射 | 亚洲精品偷拍视频 | 日本免费黄网站 | 操少妇视频 | 亚洲在线观看免费 | wwwjizzzcom | 国产中文一区二区三区 | 麻豆av免费在线观看 | 自拍偷拍日韩精品 | 色婷婷久 | 欧美gv在线 | 好吊色视频在线观看 | 伦伦影院午夜理论片 | 在线观看的黄网 | 麻豆一区二区 | 精品国产欧美 | 欧美黄色短视频 | 中文日韩 | 久久人人爽人人爽人人片 | 91久久久久一区二区 | 激情黄色小说视频 | 尤物视频最新网址 | 西比尔在线观看完整视频高清 | 精品无码av一区二区三区不卡 | 91免费看片网站 | 一本一道久久a久久精品蜜桃 | 色日韩| 欧美特级视频 | 久久综合狠狠 | heyzo国产| 人人妻人人澡人人爽人人精品 | 精品成人一区二区三区久久精品 | 蜜臀av性久久久久蜜臀aⅴ四虎 | a免费视频| 性欧美最猛 | 日韩美女视频在线 | 夜夜草影院 | 日韩欧美国产一区二区 | 一本大道综合伊人精品热热 |