日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm-源码分析-Stats (backtype.storm.stats)

發布時間:2025/5/22 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm-源码分析-Stats (backtype.storm.stats) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

會發現, 現在storm里面有兩套metrics系統, metrics framework和stats framework

并且在所有地方都是同時注冊兩套, 貌似準備用metrics來替代stats, 但當前版本UI仍然使用stats

?

這個模塊統計的數據怎么被使用,

1. 在worker中, 會定期調用do-executor-heartbeats去往zk同步hb
可以看到, stats也會作為hb的一部分被同步到zk上

(defnk do-executor-heartbeats [worker :executors nil];; stats is how we know what executors are assigned to this worker (let [stats (if-not executors(into {} (map (fn [e] {e nil}) (:executors worker)))(->> executors(map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))(apply merge)))zk-hb {:storm-id (:storm-id worker): executor-stats stats :uptime ((:uptime worker)):time-secs (current-time-secs)}];; do the zookeeper heartbeat(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) ))

2. 現在任何人都可以通過nimbus的thrift接口來得到相關信息

(^TopologyInfo getTopologyInfo [this ^String storm-id]beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))stats (:stats heartbeat))

3. 最直接的用戶就是storm UI, 在準備topology page的時候, 就會調用getTopologyInfo來獲取數據

(defn topology-page [id window include-sys?](with-nimbus nimbus(let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)] )

?

Stats

這個模塊用于spout和bolt來抽樣統計數據, 需要統計的具體metics如下

(def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) ;;acked and failed count individual tuples (defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies]) ;;acked and failed count tuple completion (defrecord SpoutExecutorStats [common acked failed complete-latencies])

?

抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置

為什么統計時每次加rate, 而不是加1?

因為這里的統計是抽樣的, 所以如果抽樣比例是10%, 那么發現一個, 應該加1/(10%), 10個

(defn sampling-rate [conf](->> (conf TOPOLOGY-STATS-SAMPLE-RATE)(/ 1)int))

?

然后統計是基于時間窗口的, 底下是對應默認的bucket和時間窗口的定義

(def NUM-STAT-BUCKETS 20) ;;bucket數 ;; 10 minutes, 3 hours, 1 day ;;定義3種時間窗口 (def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒

?

核心數據結構是RollingWindowSet, 包含:
統計數據需要的函數, updater extractor, 之所以治理也需要是因為需要統計all-time?
一組rolling windows, 默認是3個時間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時間區間上的統計結果

(defrecord RollingWindowSet [updater extractor windows all-time]) (defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes](RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil))

?

繼續看看rolling window的定義,
核心數據, buckets, hashmap, {streamid, data}, 初始化為{}
統計data需要的函數, updater merger extractor
時間窗口, buckets大小和buckets個數

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets]) (defn rolling-window [updater merger extractor bucket-size-secs num-buckets](RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

?

1. mk-stats

在mk-executedata的時候需要創建stats

mk-executor-stats <> (sampling-rate storm-conf)

?

;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate](stats/mk-spout-stats rate)) (defmethod mk-executor-stats :bolt [_ rate](stats/mk-bolt-stats rate))

第一個參數忽略, 其實就是分別調用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對于每個需要統計的數據, 創建一個rolling-windows-set

(defn- mk-common-stats [rate](CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))rate))(defn mk-bolt-stats [rate](BoltExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))(defn mk-spout-stats [rate](SpoutExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))

?

2. 數據更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms](update-executor-stat! stats :acked stream (stats-rate stats))(update-executor-stat! stats :complete-latencies stream latency-ms)) (defmacro update-executor-stat! [stats path & args](let [path (collectify path)]`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))

就以update-executor-stat! stats :acked stream (stats-rate stats)為例子看看怎么做的?

SpoutExecutorStats取出用于記錄spout acked情況的rolling-windows-set
然后使用update-rolling-window-set來swap這個atom

來看看記錄acked的rolling-windows-set是如何定義的?

keyed-counter-rolling-window-set, 預定義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對應的key的value上
merger, (partial merge-with +), 用+作為map merge的邏輯, 即出現相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無則返回{}
windows, rolling-window的list
all-time, 初始化為nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes](apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

?

好, 下面就看看, 當spout-acked-tuple!時更新:acked時, 如何update的?

首先更新每個rolling-window, 并把更新過的rolling-window-set更新到:windows
并且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來記錄整個時間區間上的, 某個stream的統計情況

(defn update-rolling-window-set([^RollingWindowSet rws & args](let [now (current-time-secs)new-windows (dofor [w (:windows rws)](apply update-rolling-window w now args))](assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args)))))

看下如何更新某個rolling-windw
根據now算出當前屬于哪個bucket, time-bucket
取出buckets, 并使用:updater更新相應的bucket, 這里的操作仍然是把rate疊加到streamid的value上

(defn update-rolling-window([^RollingWindow rw time-secs & args];; this is 2.5x faster than using update-in...(let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))buckets (:buckets rw)curr (get buckets time-bucket) curr (apply (:updater rw) curr args)](assoc rw :buckets (assoc buckets time-bucket curr)))))

轉載于:https://www.cnblogs.com/fxjwind/p/3223110.html

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Storm-源码分析-Stats (backtype.storm.stats)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。