日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume 1.7 源码分析(三)程序入口

發布時間:2024/2/28 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume 1.7 源码分析(三)程序入口 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)整體架構
Flume 1.7 源碼分析(三)程序入口
Flume 1.7 源碼分析(四)從Source寫數據到Channel

4 程序入口

啟動Flume的過程可以簡單分為2個步驟:
1. 獲取相關配置文件(一般來說就是flume-conf.properties)。
2. 啟動各組件。不特別說明,本文中的組件是指實現了LifecycleAware接口的類的對象,一般就是Source、Channel、Sink這3種對象。

4.1 獲取啟動配置

4.1.1 Main函數

啟動Flume的Main函數在flume-ng-node模塊的org.apache.flume.node.Application。該函數的功能可以簡單劃分為以下三個步驟:
1. 使用commons.cli類獲取命令行參數(就是啟動時傳入的參數)
2. 根據啟動參數確定的讀取配置的方式。讀取配置的方式總共有4種,分別根據配置是保存在zookeeper上還是本地properties文件、以及是否reload(自動重載配置文件)分為4種方式。
3. 根據相應的配置啟動程序,并注冊關閉鉤子。
接下來以properties文件、不重載的方式為例,主要的代碼如下:

PropertiesFileConfigurationProvider configurationProvider =new PropertiesFileConfigurationProvider(agentName, configurationFile); //創建Application對象,包含初始化組件列表(components),初始化LifecycleSupervisor。 application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); //start方法用于檢查所有組件是否是啟動狀態,如果不是則啟動該組件。 application.start(); //監聽程序關閉事件,用于當程序被kill后能夠執行一些清理工作。 final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {public void run() {appReference.stop();} });

上面的代碼,有兩處比較關鍵:

  • configurationProvider.getConfiguration()會返回一個MaterializedConfiguration類型的對象,用于從文件形式的配置轉為物化的配置,即包含實際的channel、sinkRunner等對象的實例,在“物化配置”一節分析。
  • handleConfigurationEvent用于停止所有components,并使用新的配置進行啟動,在“使用新配置重啟”一節分析。

4.1.2 物化配置

configurationProvider.getConfiguration()方法主要做了以下兩件事:
1. 讀取配置文件(flume-conf.properties),保存在AgentConfiguration對象中。

public static class AgentConfiguration {private final String agentName;private String sources;private String sinks;private String channels;private String sinkgroups;private final Map<String, ComponentConfiguration> sourceConfigMap;private final Map<String, ComponentConfiguration> sinkConfigMap;private final Map<String, ComponentConfiguration> channelConfigMap;private final Map<String, ComponentConfiguration> sinkgroupConfigMap;private Map<String, Context> sourceContextMap;private Map<String, Context> sinkContextMap;private Map<String, Context> channelContextMap;private Map<String, Context> sinkGroupContextMap;private Set<String> sinkSet;private Set<String> sourceSet;private Set<String> channelSet;private Set<String> sinkgroupSet; }

到這個步驟還僅僅是做好了分類的文本形式的配置項。
2. 創建出配置中的各組件實例,并添加到MaterializedConfiguration實例中。

public interface MaterializedConfiguration {public void addSourceRunner(String name, SourceRunner sourceRunner);public void addSinkRunner(String name, SinkRunner sinkRunner);public void addChannel(String name, Channel channel);public ImmutableMap<String, SourceRunner> getSourceRunners();public ImmutableMap<String, SinkRunner> getSinkRunners();public ImmutableMap<String, Channel> getChannels(); }

在這個實例中,可以獲取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相關組件的實例。

4.2 啟動所有組件

4.2.1 使用新配置重啟

有了上面的MaterializedConfiguration實例,我們就可以啟動組件了。
在handleConfigurationEvent方法中,首先會停止所有組件,然后再啟動所有組件。

stopAllComponents(); startAllComponents(conf); //這里的conf就是上節的MaterializedConfiguration。

在startAllComponents方法中,會遍歷組件列表(SourceRunners、SinkRunners、Channels),分別調用supervise方法。以Channel為例:

for (Entry<String, Channel> entry :materializedConfiguration.getChannels().entrySet()) {try {logger.info("Starting Channel " + entry.getKey());supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);} catch (Exception e) {logger.error("Error while starting {}", entry.getValue(), e);} }

這個supervise方法簡單來說,就是將相應組件的狀態轉化為期望的狀態。例如上面代碼中的LifecycleState.START就是期望的狀態。

4.2.2 LifecycleSupervisor

上節的supervisor是一個LifecycleSupervisor對象。前面有說到,在創建Application的時候初始化了一個LifecycleSupervisor對象,就是這里的supervisor。這個對象,我理解為各組件生命周期的管理者,用于實時監控所有組件的狀態,如果不是期望的狀態(desiredState),則進行狀態轉換。

上節的代碼中調用了supervisor.supervise方法,接下來分析一下supervise這個方法:

public synchronized void supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState) {//省略狀態檢查的代碼 Supervisoree process = new Supervisoree();process.status = new Status();process.policy = policy;process.status.desiredState = desiredState;process.status.error = false;MonitorRunnable monitorRunnable = new MonitorRunnable();monitorRunnable.lifecycleAware = lifecycleAware;monitorRunnable.supervisoree = process;monitorRunnable.monitorService = monitorService;supervisedProcesses.put(lifecycleAware, process);ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(monitorRunnable, 0, 3, TimeUnit.SECONDS);monitorFutures.put(lifecycleAware, future); }

由于所有的組件都實現了LifecycleAware接口,所以這里的supervise方法傳入的是LifecycleAware接口的對象。

可以看到創建了一個Supervisoree對象,顧名思義,就是被監控的的對象,該對象有以下幾種狀態:IDLE, START, STOP, ERROR。
scheduleWithFixedDelay每隔3秒觸發一次監控任務(monitorRunnable)。

4.2.3 MonitorRunnable

在MonitorRunnable中主要是檢查組件的狀態,并實現從lifecycleState到desiredState的轉變。

switch (supervisoree.status.desiredState) {case START:try {lifecycleAware.start();} catch (Throwable e) {省略}break;case STOP:try {lifecycleAware.stop();} catch (Throwable e) {省略}break;default:logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); }

到這里為止,可以看到監控的進程,調用了組件自己的start和stop方法來啟動、停止。前面有提到有3種類型的組件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化計數器,沒什么實質內容,所以接下來從SourceRunner的啟動(從Source寫數據到Channel)和SinkRunner的啟動(從Channel獲取數據寫入Sink)來展開說明。

總結

以上是生活随笔為你收集整理的Flume 1.7 源码分析(三)程序入口的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 在线观看亚洲大片短视频 | 99国产精品久久久久久久成人热 | 中国黄色网址 | 少妇高潮21p | 加勒比伊人 | 国产成人午夜精华液 | 久久精品国产亚洲 | 麻豆免费看片 | 国产白袜脚足j棉袜在线观看 | 免费黡色av | 六月丁香啪啪 | 视色av| 少妇又色又紧又爽又刺激视频 | 女尊高h男高潮呻吟 | 天天综合网在线 | 高潮av在线| 无遮挡在线观看 | 成人无码精品1区2区3区免费看 | 中国挤奶哺乳午夜片 | 爱爱免费网址 | 91在线视频| 免费欧美视频 | av美女在线观看 | 91在线超碰| 亚洲少妇色 | 熟妇人妻一区二区三区四区 | 久久婷婷国产麻豆91天堂 | 午夜免费网址 | 欧美变态口味重另类 | jzzijzzij亚洲成熟少妇在线播放 狠狠躁日日躁夜夜躁2022麻豆 | 亚洲一 | 色眯眯影视 | 高潮白浆女日韩av免费看 | 在线观看av一区二区 | 97xxxx| 艳妇乳肉豪妇荡乳av无码福利 | 一区二区三区精品在线观看 | 一级全黄毛片 | av一起看香蕉 | 日韩电影网站 | 一区二区三区四区精品视频 | 亚洲精华液一区二区 | 欧美日韩一区二区三区在线视频 | 黄色视屏在线免费观看 | 欧美日韩激情在线一区二区三区 | 天天射天天干天天色 | 精品久久一区二区三区 | 福利视频亚洲 | ass极品国模人体欣赏 | 精品国产理论 | 自拍偷拍亚洲精品 | 欧美自拍偷拍一区二区 | 国产精品第九页 | 精品国产无码一区二区 | 日本久久久久久久久久 | 亚州综合 | 欧美日韩激情在线一区二区三区 | 欧美久久久久久久久久久久 | 久草大| 欧美有码在线观看 | 国产成人短视频在线观看 | 日本三级中文字幕在线观看 | いいなり北条麻妃av101 | av大片免费 | jizz中国少妇 | 人人舔人人 | 伊人网av | 咪咪av| 国产精品精华液网站 | 91视频色 | 亚洲国产精品午夜久久久 | 极品美女av| 91看片网| 女人扒开腿让男人捅爽 | 超爽视频| 国产麻豆剧传媒精品国产 | 爱乃なみ加勒比在线播放 | 中文字幕一区二区在线老色批影视 | 色诱av手机版 | 丝袜老师办公室里做好紧好爽 | 长腿校花无力呻吟娇喘的视频 | 欧美亚洲国产精品 | 四虎影库在线播放 | 精品一区二区三区精华液 | 色91精品久久久久久久久 | av大全网站 | 91丨porny丨首页 | 久久国产主播 | 日韩成人无码影院 | 中文字幕一区二区三区免费视频 | 噜噜啪啪 | 国产精品欧美激情 | 草草影院网址 | 欧美成人影院 | 六月色婷婷 | 久久久国产精品成人免费 | 久久五十路 | 麻豆国产尤物av尤物在线观看 | 国产又粗又猛又爽又黄无遮挡 |