日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm系列(四)Topology提交校验过程

發(fā)布時間:2024/7/19 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm系列(四)Topology提交校验过程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

功能:提交一個新的Topology,并為Topology創(chuàng)建storm-id(topology-id),校驗其結(jié)構(gòu),設(shè)置必要的元數(shù)據(jù),最后為Topology分配任務(wù).

實現(xiàn)源碼:

1? (^void?submitTopology
2? ????????[this?^String?storm-name?^String?uploadedJarLocation?^String?serializedConf?^StormTopology?topology]
3? ????????(.submitTopologyWithOpts?this?storm-name?uploadedJarLocation?serializedConf?topology
4? ?????????????????????????????????(SubmitOptions.?TopologyInitialStatus/ACTIVE)))

從以上源碼中看出submitTopology內(nèi)部是對submitTopologyWithOpts方法的調(diào)用。

submitTopologyWithOpts函數(shù)原型如下:

1? ^void?submitTopologyWithOpts
2? ????????[this?^String?storm-name?^String?uploadedJarLocation?^String?serializedConf?^StormTopology?topology
3? ?????????^SubmitOptions?submitOptions]

在submitTopologyWithOpts中主要做了以下幾件事情:

  • 校驗submitOptions參數(shù)不能為空。
  • 檢查storm-name中是否包含非法字符。
  • 校驗storm-name與正在運行的Topology是否有重名,重名將造成沖突。
  • 將nimbus(nimbus-data類型)中的submitted-count已提交Topology計數(shù)字段加1。
  • 為所提交的Topology創(chuàng)建唯一的storm-id(topology-id),格式:<storm-name>-<submitted-count>-<當前時間>
  • 通過normalize-conf獲取提交的Topology的Storm配置,首先將參數(shù)serializedConf進行反序列化,然后加入storm-name,storm-id等。
  • 將Storm默認的配置(conf)與第六步得到的Storm配置進行合并,合并原則為兩份配置中重復(fù)的配置項以第六步中的配置為準。
  • 調(diào)用normalize-topology計算提交的Topology中每個組件并行度及更新TOPOLOGY_TASKS配置項.
  • 獲取nimbus(nimbus-data類型)中storm-cluster-state對象。
  • 調(diào)用System-topology!方法對Topology結(jié)構(gòu)進行校驗。
  • 獲取nimbus中的submit-lock鎖。
  • 調(diào)用setup-storm-code為Topology創(chuàng)建對應(yīng)的本地文件夾、復(fù)制jar并寫入序列化后的Storm配置項和Topology信息.
  • 調(diào)用setup-hearbeats!為Topology在Zookeeper中創(chuàng)建心跳路徑,/storm/workerbeats/topology-id.
  • 定義一個從thrift-status到keyword-status的哈希表,該哈希表用來將傳入的submitOptions中的thrift-status轉(zhuǎn)化為對應(yīng)的keyword-status.
  • 調(diào)用start-storm設(shè)置stormBase,它在Zookeeper中路徑是/storm/storms/<topology-id>,stormBase的信息將做為該路徑所對應(yīng)的存儲值。
  • 調(diào)用mk-assignments為所提交的Topology分配資源.
  • normalize-topology

    實現(xiàn)源碼:

    1? (defn?normalize-topology?[storm-conf?^StormTopology?topology]
    2? ??(let?[ret?(.deepCopy?topology)]
    3? ????(doseq?[[_?component]?(all-components?ret)]
    4? ??????(.set_json_conf
    5? ????????(.get_common?component)
    6? ????????(->>?{TOPOLOGY-TASKS?(component-parallelism?storm-conf?component)}
    7? ?????????????(merge?(component-conf?component))
    8? ?????????????to-json?)))
    9? ret?))

    實現(xiàn)說明:

    • 調(diào)用deepCopy對topology進行深度拷貝,賦值給ret.
    • 遍歷topology(ret)所有組件,調(diào)用component-parallelism更新組件配置中的TOPOLOGY_TASKS信息。

    component-parallelism實現(xiàn)源碼(計算組件并行度):

    1? (defn-?component-parallelism?[storm-conf?component]
    2? ??(let?[storm-conf?(merge?storm-conf?(component-conf?component))
    3? ????????num-tasks?(or?(storm-conf?TOPOLOGY-TASKS)?(num-start-executors?component))
    4? ????????max-parallelism?(storm-conf?TOPOLOGY-MAX-TASK-PARALLELISM)
    5? ????????]
    6? ????(if?max-parallelism
    7? ??????(min?max-parallelism?num-tasks)
    8? ??????num-tasks)))
    實現(xiàn)說明:
    • 將Topology配置信息與組件(component)配置信息進行合并,兩者存在重復(fù)的配置項時以組件的配置項為準。
    • 計算組件并行度(num-tasks),若果配置storm-conf中配置了TOPOLOGY-TASKS信息,就以該配置值做為組件的并行度,否則通過調(diào)用num-start-executors獲取用戶對組件設(shè)置的并行度做為num-tasks.
    • 獲取storm-conf配置中TOPOLOGY-MAX-TASK-PARALLELISM配置項的值。
    • 返回TOPOLOGY-MAX-TASK-PARALLELISM與num-tasks較小的值做為組件的并行度。
    1? TopologyBuilder?builder?=?new?TopologyBuilder();
    2? //?4對應(yīng)對用用戶設(shè)置的組件并行度,10對應(yīng)TOPOLOGY-TASK配置項的值
    3? builder.setBolt("transfer",?new?TransferBolt(),?4).shuffleGrouping("random").setNumTasks(6);?Config?conf?=?new?Config();
    4? //?8對應(yīng)?TOPOLOGY-MAX-TASK-PARALLELISM配置項的值
    5? Conf.setMaxTaskParallelism(8);

    ?

    system-topology!

    功能:

    驗證用戶提交的Topology,同時為提交的topology添加一些系統(tǒng)組件和流。

    實現(xiàn)源碼:

    1? (defn?system-topology!?[storm-conf?^StormTopology?topology]
    2? ??(validate-basic!?topology)
    3? ??(let?[ret?(.deepCopy?topology)]
    4? ????(add-acker!?storm-conf?ret)
    5? ????(add-metric-components!?storm-conf?ret)????
    6? ????(add-system-components!?storm-conf?ret)
    7? ????(add-metric-streams!?ret)
    8? ????(add-system-streams!?ret)
    9? ????(validate-structure!?ret)
    10? ????ret
    11? ))

    實現(xiàn)說明:

    • 使用validate-basic!校驗所提交的Topology.
      主要用于確保topology中的組件id不重復(fù)而且不是系統(tǒng)id,以及確保每個組件的TOPOLOGY-TASKS配置項大于0時,組件的并行度設(shè)置也一定大于0.
    • 調(diào)用deepCopy對topology進行深度拷貝,賦值給ret.
    • 為Topology添加acker-bolt.
      用于追蹤發(fā)送出去的消息是否被成功處理。
    • 使用add-metric-components為Topology添加metric-bolt.
    • 為Topology添加system-bolt.
      System-bolt沒有輸入流只有輸出流分別為:SYSTEM-TICK-STREAM-ID,聲明字段是[“rate_secs”],非直接模式;另一個為METRICS-TICK-STREAM-ID,聲明字段為[“interval”]非直接模式,并行度為0.
    • 為Topology中的所有組件添加統(tǒng)計流。
      Stream-id為METRICS-STREAM-ID,聲明字段為[“task-info”,”data-points”],非直接流模式.
    • 為Topology中的所有組件添加系統(tǒng)流。
      stream-id為SYSTEM-STREAM-ID,聲明字段為[“event”],非直接流模式.
    • 使用validate-structure!檢驗以上步驟所組合后的Topology.

    驗證過程:
    獲取Topology中所有組件和組件的輸入(包括component-id、stream-id、Grouping),對輸入組件依次判斷輸入組件ID(component-id)是否在該Topology中,不存在則拋出異常,存在則再判斷該組件的流類型是否為所對應(yīng)的stream-id,若不存在則拋出異常,存在則繼續(xù)檢查該流的分組方式(Grouping)是否與能對應(yīng),所有組件檢查完畢后沒有異常拋出表示該Topology有效.

    轉(zhuǎn)載于:https://www.cnblogs.com/jianyuan/p/4792443.html

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

    總結(jié)

    以上是生活随笔為你收集整理的Storm系列(四)Topology提交校验过程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。