日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊storm的LoggingMetricsConsumer

發(fā)布時(shí)間:2025/4/5 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊storm的LoggingMetricsConsumer 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文主要研究一下storm的LoggingMetricsConsumer

LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java

public class LoggingMetricsConsumer implements IMetricsConsumer {public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);static private String padding = " ";@Overridepublic void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {}@Overridepublic void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {StringBuilder sb = new StringBuilder();String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",taskInfo.timestamp,taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,taskInfo.srcTaskId,taskInfo.srcComponentId);sb.append(header);for (DataPoint p : dataPoints) {sb.delete(header.length(), sb.length());sb.append(p.name).append(padding).delete(header.length() + 23, sb.length()).append("\t").append(p.value);LOG.info(sb.toString());}}@Overridepublic void cleanup() {} }
  • LoggingMetricsConsumer實(shí)現(xiàn)了IMetricsConsumer接口,在handleDataPoints方法將taskInfo及dataPoints打印到log;具體打印到哪個(gè)log呢,這個(gè)需要看storm的log4j2的配置

log4j2/worker.xml

<?xml version="1.0" encoding="UTF-8"?> <!--Licensed to the Apache Software Foundation (ASF) under one or morecontributor license agreements. See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License. You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. --><configuration monitorInterval="60" shutdownHook="disable"> <properties><property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property><property name="patternNoTime">%msg%n</property><property name="patternMetrics">%d %-8r %m%n</property> </properties> <appenders><RollingFile name="A1"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz"><PatternLayout><pattern>${pattern}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="9"/></RollingFile><RollingFile name="STDOUT"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz"><PatternLayout><pattern>${patternNoTime}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="4"/></RollingFile><RollingFile name="STDERR"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz"><PatternLayout><pattern>${patternNoTime}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="4"/></RollingFile><RollingFile name="METRICS"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz"><PatternLayout><pattern>${patternMetrics}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="2 MB"/></Policies><DefaultRolloverStrategy max="9"/></RollingFile><Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/> </appenders> <loggers><root level="info"> <!-- We log everything --><appender-ref ref="A1"/><appender-ref ref="syslog"/></root><Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false"><appender-ref ref="METRICS"/></Logger><Logger name="STDERR" level="INFO"><appender-ref ref="STDERR"/><appender-ref ref="syslog"/></Logger><Logger name="STDOUT" level="INFO"><appender-ref ref="STDOUT"/><appender-ref ref="syslog"/></Logger> </loggers> </configuration>
  • 以worker.xml為例,這里對(duì)name為org.apache.storm.metric.LoggingMetricsConsumer的logger配置了info級(jí)別的輸出,additivity為false
  • METRICS的appender指定了文件名為${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics,比如workers-artifacts/tickDemo-1-1541070680/6700/worker.log.metrics
  • METRCIS配置的是RollingFile,SizeBasedTriggeringPolicy的大小為2MB

配置

topology配置

conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
  • 可以在topology提交的時(shí)候,在conf注冊(cè)LoggingMetricsConsumer;這種配置只有該topology的worker生效,即有指標(biāo)數(shù)據(jù)的話,會(huì)寫入topology的worker.log.metrics文件

storm.yaml配置

topology.metrics.consumer.register:- class: "org.apache.storm.metric.LoggingMetricsConsumer"max.retain.metric.tuples: 100parallelism.hint: 1- class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"parallelism.hint: 1argument: "http://example.com:8080/metrics/my-topology/"
  • storm.yaml配置是作用于所有的topology,注意這里配置的是topology.metrics.consumer.register,是topology級(jí)別的,數(shù)據(jù)是寫入worker.log.metrics文件
  • 如果是cluster級(jí)別的話,配置的是storm.cluster.metrics.consumer.register,而且只能使用storm.yaml的配置方式,開啟這個(gè)的話,有指標(biāo)數(shù)據(jù)會(huì)寫入nimbus.log.metrics以及supervisor.log.metrics文件
  • 啟動(dòng)nimbus以及supervisor采用的log4j配置參數(shù)為-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;而啟動(dòng)woker采用的log4j配置參數(shù)為-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各個(gè)組件的-Dlogfile.name參數(shù)分別為nimbus.log、supervisor.log、worker.log

MetricsConsumerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java

public class MetricsConsumerBolt implements IBolt {public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);private final int _maxRetainMetricTuples;private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;private final DataPointExpander _expander;private final BlockingQueue<MetricsTask> _taskQueue;IMetricsConsumer _metricsConsumer;String _consumerClassName;OutputCollector _collector;Object _registrationArgument;private Thread _taskExecuteThread;private volatile boolean _running = true;public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {_consumerClassName = consumerClassName;_registrationArgument = registrationArgument;_maxRetainMetricTuples = maxRetainMetricTuples;_filterPredicate = filterPredicate;_expander = expander;if (_maxRetainMetricTuples > 0) {_taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);} else {_taskQueue = new LinkedBlockingDeque<>();}}@Overridepublic void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {try {_metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();} catch (Exception e) {throw new RuntimeException("Could not instantiate a class listed in config under section " +Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);}_metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);_collector = collector;_taskExecuteThread = new Thread(new MetricsHandlerRunnable());_taskExecuteThread.setDaemon(true);_taskExecuteThread.start();}@Overridepublic void execute(Tuple input) {IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);while (!_taskQueue.offer(metricsTask)) {_taskQueue.poll();}_collector.ack(input);}private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));}@Overridepublic void cleanup() {_running = false;_metricsConsumer.cleanup();_taskExecuteThread.interrupt();}static class MetricsTask {private IMetricsConsumer.TaskInfo taskInfo;private Collection<IMetricsConsumer.DataPoint> dataPoints;public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {this.taskInfo = taskInfo;this.dataPoints = dataPoints;}public IMetricsConsumer.TaskInfo getTaskInfo() {return taskInfo;}public Collection<IMetricsConsumer.DataPoint> getDataPoints() {return dataPoints;}}class MetricsHandlerRunnable implements Runnable {@Overridepublic void run() {while (_running) {try {MetricsTask task = _taskQueue.take();_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());} catch (InterruptedException e) {break;} catch (Throwable t) {LOG.error("Exception occurred during handle metrics", t);}}}}}
  • MetricsConsumerBolt在構(gòu)造器里頭創(chuàng)建了_taskQueue,如果_maxRetainMetricTuples大于0,則創(chuàng)建的是有界隊(duì)列,否則創(chuàng)建的是無界隊(duì)列;讀取的是topology.metrics.consumer.register下面的max.retain.metric.tuples值,讀取不到默認(rèn)為100
  • MetricsConsumerBolt在prepare的時(shí)候啟動(dòng)了MetricsHandlerRunnable線程,該線程從_taskQueue取出MetricsTask,然后調(diào)用_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
  • MetricsConsumerBolt的execute方法,在接收到tuple的時(shí)候,就會(huì)往_taskQueue添加數(shù)據(jù),如果添加不進(jìn)去,則poll掉一個(gè)再添加

StormCommon.systemTopologyImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {validateBasic(topology);StormTopology ret = topology.deepCopy();addAcker(topoConf, ret);if (hasEventLoggers(topoConf)) {addEventLogger(topoConf, ret);}addMetricComponents(topoConf, ret);addSystemComponents(topoConf, ret);addMetricStreams(ret);addSystemStreams(ret);validateStructure(ret);return ret;}public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {topology.put_to_bolts(entry.getKey(), entry.getValue());}}public static void addMetricStreams(StormTopology topology) {for (Object component : allComponents(topology).values()) {ComponentCommon common = getComponentCommon(component);StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);}}public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {Map<String, Bolt> metricsConsumerBolts = new HashMap<>();Set<String> componentIdsEmitMetrics = new HashSet<>();componentIdsEmitMetrics.addAll(allComponents(topology).keySet());componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);Map<GlobalStreamId, Grouping> inputs = new HashMap<>();for (String componentId : componentIdsEmitMetrics) {inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());}List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);if (registerInfo != null) {Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();for (Map<String, Object> info : registerInfo) {String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);List<String> whitelist = (List<String>) info.get(TOPOLOGY_METRICS_CONSUMER_WHITELIST);List<String> blacklist = (List<String>) info.get(TOPOLOGY_METRICS_CONSUMER_BLACKLIST);FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);Boolean expandMapType = ObjectReader.getBoolean(info.get(TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);String metricNameSeparator = ObjectReader.getString(info.get(TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,maxRetainMetricTuples, filterPredicate, expander);Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,boltInstance, null, phintNum, metricsConsumerConf);String id = className;if (classOccurrencesMap.containsKey(className)) {// e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"int occurrenceNum = classOccurrencesMap.get(className);occurrenceNum++;classOccurrencesMap.put(className, occurrenceNum);id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;} else {id = Constants.METRICS_COMPONENT_ID_PREFIX + className;classOccurrencesMap.put(className, 1);}metricsConsumerBolts.put(id, metricsConsumerBolt);}}return metricsConsumerBolts;}
  • StormCommon在創(chuàng)建systemTopologyImpl的時(shí)候,會(huì)添加添加一些系統(tǒng)的components,這里就調(diào)用了addMetricComponents以及addMetricStreams
  • addMetricComponents根據(jù)conf創(chuàng)建MetricsConsumerBolt,并使用shuffle以及Constants.METRICS_STREAM_ID指定所有的component為輸入源
  • addMetricStreams給每個(gè)component配置了輸出數(shù)據(jù)到Constants.METRICS_STREAM_ID,且輸出的字段為Arrays.asList("task-info", "data-points")

Executor.setupMetrics

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;protected void setupMetrics() {for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {StormTimer timerTask = workerData.getUserTimer();timerTask.scheduleRecurring(interval, interval,() -> {TupleImpl tuple =new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,(int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);try {receiveQueue.publish(metricsTickTuple);receiveQueue.flush(); // avoid buffering} catch (InterruptedException e) {LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");Thread.currentThread().interrupt();return;}});}}public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {return intervalToTaskToMetricToRegistry;}
  • Executor在setupMetrics方法里頭,設(shè)置了定時(shí)任務(wù),采用BROADCAST_DEST的方式定時(shí)向METRICS_TICK_STREAM_ID發(fā)射metricsTickTuple
  • 這里是依據(jù)intervalToTaskToMetricToRegistry來配置的,其key為interval
  • intervalToTaskToMetricToRegistry在Executor構(gòu)造器中初始化:intervalToTaskToMetricToRegistry = new HashMap<>()

Task.mkTopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {Map<String, Object> conf = workerData.getConf();return new TopologyContext(topology,workerData.getTopologyConf(),workerData.getTaskToComponent(),workerData.getComponentToSortedTasks(),workerData.getComponentToStreamToFields(),// This is updated by the Worker and the topology has shared access to itworkerData.getBlobToLastKnownVersion(),workerData.getTopologyId(),ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),taskId,workerData.getPort(), workerData.getLocalTaskIds(),workerData.getDefaultSharedResources(),workerData.getUserSharedResources(),executor.getSharedExecutorData(),executor.getIntervalToTaskToMetricToRegistry(),executor.getOpenOrPrepareWasCalled());}
  • mkTopologyContext方法在創(chuàng)建TopologyContext的時(shí)候,傳遞進(jìn)去了executor.getIntervalToTaskToMetricToRegistry()

TopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java

public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {private Integer _taskId;private Map<String, Object> _taskData = new HashMap<>();private List<ITaskHook> _hooks = new ArrayList<>();private Map<String, Object> _executorData;private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {if (_openOrPrepareWasCalled.get()) {throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +"IBolt::prepare() or ISpout::open() method.");}if (metric == null) {throw new IllegalArgumentException("Cannot register a null metric");}if (timeBucketSizeInSecs <= 0) {throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +"greater than or equal to 1 second.");}if (getRegisteredMetricByName(name) != null) {throw new RuntimeException("The same metric name `" + name + "` was registered twice.");}Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;if (!m1.containsKey(timeBucketSizeInSecs)) {m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());}Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);if (!m2.containsKey(_taskId)) {m2.put(_taskId, new HashMap<String, IMetric>());}Map<String, IMetric> m3 = m2.get(_taskId);if (m3.containsKey(name)) {throw new RuntimeException("The same metric name `" + name + "` was registered twice.");} else {m3.put(name, metric);}return metric;}//...... }
  • Executor的intervalToTaskToMetricToRegistry最后傳遞給了TopologyContext的_registeredMetrics
  • registerMetric方法會(huì)往_registeredMetrics添加值,其key為timeBucketSizeInSecs
  • 內(nèi)置metrics的timeBucketSizeInSecs讀取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)值,在defaults.yaml中默認(rèn)為60,即Executor每隔60秒發(fā)射一次metricsTickTuple,其streamId為Constants.METRICS_TICK_STREAM_ID

Executor.metricsTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

public void metricsTick(Task task, TupleImpl tuple) {try {Integer interval = tuple.getInteger(0);int taskId = task.getTaskId();Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);Map<String, IMetric> nameToRegistry = null;if (taskToMetricToRegistry != null) {nameToRegistry = taskToMetricToRegistry.get(taskId);}if (nameToRegistry != null) {IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(hostname, workerTopologyContext.getThisWorkerPort(),componentId, taskId, Time.currentTimeSecs(), interval);List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {IMetric metric = entry.getValue();Object value = metric.getValueAndReset();if (value != null) {IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);dataPoints.add(dataPoint);}}if (!dataPoints.isEmpty()) {task.sendUnanchored(Constants.METRICS_STREAM_ID,new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);executorTransfer.flush();}}} catch (Exception e) {throw Utils.wrapInRuntime(e);}}
  • SpoutExecutor以及BoltExecutor在tupleActionFn中接收到streamId為Constants.METRICS_TICK_STREAM_ID的tuple的時(shí)候,會(huì)調(diào)用父類Executor.metricsTick方法
  • metricsTick采用task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);發(fā)射數(shù)據(jù),發(fā)射到Constants.METRICS_STREAM_ID中,values為taskInfo及dataPoints;dataPoints的數(shù)據(jù)從TopologyContext的_registeredMetrics中讀取(這個(gè)使用的是舊版的metrics,非V2版本)
  • MetricsConsumerBolt接收到數(shù)據(jù)之后,就是放入_taskQueue隊(duì)列;與此同時(shí)MetricsHandlerRunnable線程會(huì)阻塞從_taskQueue中取數(shù)據(jù),然后回調(diào)_metricsConsumer.handleDataPoints方法來消費(fèi)數(shù)據(jù)

小結(jié)

  • LoggingMetricsConsumer是storm metric提供的,metrics2中沒有;nimbus及supervisor使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;worker使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各個(gè)組件的-Dlogfile.name分別為nimbus.log、supervisor.log、worker.log
  • storm在構(gòu)建topology的時(shí)候會(huì)添加系統(tǒng)的component,其中就包括添加metricsConsumerBolt以及metricStreams;同時(shí)Executor在init方法中會(huì)setupMetrics,定時(shí)發(fā)射metricsTickTuple;SpoutExecutor以及BoltExecutor在tupleActionFn接收到metricsTickTuple的時(shí)候,會(huì)調(diào)用metricsTick方法來生產(chǎn)數(shù)據(jù)發(fā)射到Constants.METRICS_STREAM_ID中,之后MetricsConsumerBolt就可以接收數(shù)據(jù),然后回調(diào)_metricsConsumer.handleDataPoints方法來消費(fèi)數(shù)據(jù)
  • 這里要注意兩個(gè)參數(shù),一個(gè)是MetricsConsumerBolt里頭用到的max.retain.metric.tuples,它是配置在topology.metrics.consumer.register下面的,如果沒有配置默認(rèn)為100;它是MetricsConsumerBolt里頭_taskQueue隊(duì)列的大小,如果設(shè)置為0,則表示無界;內(nèi)置metric的interval讀取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)參數(shù),默認(rèn)為60,即60秒發(fā)射一次metricsTickTuple

doc

  • Storm Metrics
  • New Metrics Reporting API

總結(jié)

以上是生活随笔為你收集整理的聊聊storm的LoggingMetricsConsumer的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲少妇在线 | 久久免费一区 | 少妇又紧又深又湿又爽视频 | 波多野结衣视频免费 | 欧美日韩高清一区 | 欧美精品播放 | 精品福利视频一区二区 | 污黄网站在线观看 | 97看片吧 | 91日韩在线视频 | 久久99久久99精品蜜柚传媒 | 日本少妇激情 | 黄色网在线免费观看 | 欧美xxxx8888| 日韩在线免费视频观看 | 中文字幕av网 | 女同性做受全过程动图 | 国产精品毛片一区二区在线看舒淇 | 国产综合无码一区二区色蜜蜜 | 国产xxxx视频| 天天操天天做 | 亚洲精品一区二三区 | 国产精品电影网 | 番号动态图 | 欧美激情第五页 | 亚洲xx在线| 夜夜操操 | 穿扒开跪着折磨屁股视频 | 日本成人中文字幕 | 五月天综合色 | 九色蝌蚪9l视频蝌蚪9l视频 | 老司机av福利 | 亚洲影院在线播放 | 久久久久久久中文字幕 | 国产精品18久久久久久久久 | 蜜桃av噜噜一区二区三区网址 | 色一情一区二 | 日韩另类在线 | 亚洲精品二区三区 | 无码人妻丰满熟妇区毛片蜜桃精品 | 黄色一级一片 | 国产精品宾馆在线精品酒店 | 日韩久久久精品 | 黑人巨大精品欧美一区二区免费 | 麻豆短视频在线观看 | 欧美日韩在线国产 | 国产精品久久久久久影视 | 成人天堂 | 国产精品自偷自拍 | 人妻精品一区二区在线 | 国产精选在线观看 | 亚洲欧美在线综合 | 国产精品一区2区 | 欧美中文字幕在线播放 | 教练含着她的乳奶揉搓揉捏动态图 | av成人资源 | 色四月 | 欧美aaaaaaaaaa| 福利视频一区二区三区 | 日本大尺度做爰呻吟 | 久一在线| 国产影视一区二区 | 男同互操gay射视频在线看 | 亚洲精品美女 | 中文字幕在线观看免费高清 | 波多野结衣一区二区 | 一色道久久88加勒比一 | 亚洲成a人片在线www | 久久久久久久久久久综合 | 九一精品一区 | 激情五月俺也去 | 久久久久久久久久影院 | 一级特黄高清 | 欧美一区二区三区久久成人精品 | 久久福利视频导航 | 久久躁日日躁aaaaxxxx | 欧美日韩亚洲精品内裤 | 亚洲av毛片基地 | 香蕉视频色 | 操你啦免费视频 | 天天狠天天透 | 明里柚番号 | 国产一区二 | 人人爱人人看 | 精品视频在线一区二区 | 精品免费在线 | 国产三级在线观看完整版 | a级在线看| 亚洲 欧美 日韩 国产综合 在线 | 国产福利一区二区三区视频 | 色倩网站 | missav|免费高清av在线看 | 久久久久久久av | 四虎在线免费观看视频 | 欧美a级片视频 | 性久久久久久久久久久 | 污污在线免费观看 | 大肉大捧一进一出好爽mba | 视频一区二区不卡 |