日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊storm TridentBoltExecutor的finishBatch方法

發布時間:2025/3/21 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊storm TridentBoltExecutor的finishBatch方法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文主要研究一下storm TridentBoltExecutor的finishBatch方法

MasterBatchCoordinator.nextTuple

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

public void nextTuple() {sync();}private void sync() {// note that sometimes the tuples active may be less than max_spout_pending, e.g.// max_spout_pending = 3// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),// and there won't be a batch for tx 4 because there's max_spout_pending tx activeTransactionStatus maybeCommit = _activeTx.get(_currTransaction);if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {maybeCommit.status = AttemptStatus.COMMITTING;_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);}if(_active) {if(_activeTx.size() < _maxTransactionActive) {Long curr = _currTransaction;for(int i=0; i<_maxTransactionActive; i++) {if(!_activeTx.containsKey(curr) && isReady(curr)) {// by using a monotonically increasing attempt id, downstream tasks// can be memory efficient by clearing out state for old attempts// as soon as they see a higher attempt id for a transactionInteger attemptId = _attemptIds.get(curr);if(attemptId==null) {attemptId = 0;} else {attemptId++;}_attemptIds.put(curr, attemptId);for(TransactionalState state: _states) {state.setData(CURRENT_ATTEMPTS, _attemptIds);}TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);_activeTx.put(curr, newTransactionStatus);_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);_throttler.markEvent();}curr = nextTransactionId(curr);}}}} 復制代碼
  • MasterBatchCoordinator是整個trident的真正的spout,它的nextTuple方法會向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID($batch)發射tuple

TridentSpoutCoordinator.execute

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

public void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {_state.cleanupBefore(attempt.getTransactionId());_coord.success(attempt.getTransactionId());} else {long txid = attempt.getTransactionId();Object prevMeta = _state.getPreviousState(txid);Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));_state.overrideState(txid, meta);collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));}} 復制代碼
  • TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID($batch)發過來的tuple,然后向包裝用戶spout的TridentBoltExecutor發送batch指令

TridentBoltExecutor(TridentSpoutExecutor)

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

public void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple); } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;} 復制代碼
  • TridentBoltExecutor.execute方法,首先會創建并初始化TrackedBatch(如果TrackedBatch不存在的話),之后接收到batch指令的時候,對tracked.receivedTuple累加,然后調用_bolt.execute(tracked.info, tuple)
  • 對于spout來說,這里的_bolt是TridentSpoutExecutor,它的execute方法會往下游的TridentBoltExecutor發射一個batch的tuples;由于spout的expectedTaskReports==0,所以這里在調用完TridentSpoutExecutor發射batch的tuples時,它就立馬調用finishBatch
  • finishBatch操作,這里會通過COORD_STREAM往下游的TridentBoltExecutor發射[id,count]數據,告知下游TridentBoltExecutor說它一共發射了多少tuples

TridentBoltExecutor(SubtopologyBolt)

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

@Overridepublic void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple); } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {if(tracked.failed) {failBatch(tracked);_collector.fail(tuple);return;}CoordCondition cond = tracked.condition;boolean delayed = tracked.delayedAck==null &&(cond.commitStream!=null && type==TupleType.COMMIT|| cond.commitStream==null);if(delayed) {tracked.delayedAck = tuple;}boolean failed = false;if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {if(tracked.receivedTuples == tracked.expectedTupleCount) {finishBatch(tracked, tuple); } else {//TODO: add logging that not all tuples were receivedfailBatch(tracked);_collector.fail(tuple);failed = true;}}if(!delayed && !failed) {_collector.ack(tuple);}}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;} 復制代碼
  • TridentBoltExecutor(SubtopologyBolt)是spout下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不為0,因而它是在接收到TupleType.COORD的tuple的時候,才進行checkFinish操作(這里先忽略TupleType.COMMIT類型)
  • 由于BoltExecutor是使用Utils.asyncLoop來挨個消費receiveQueue的數據的,而且emitBatch的時候也是挨個接收batch的tuples,最后再接收到TridentBoltExecutor(TridentSpoutExecutor)在finishBatch的時候通過COORD_STREAM發過來的[id,count]的tuple(注意這里的COORD_STREAM是分發給每個task的,如果TridentBoltExecutor有多個parallel,則他們是按各自的task來接收的)
  • 所以TridentBoltExecutor(SubtopologyBolt)先挨個處理每個tuple,處理完之后才輪到TupleType.COORD這個tuple,然后觸發checkFinish操作;在沒有commitStream的情況下,tracked.receivedCommit默認為true,因而這里只要檢測收到的tuples與應收的tuples數一致,就執行_bolt.finishBatch操作完成一個batch,然后再往它的下游TridentBoltExecutor發射它應收的[id,count]的tuple

小結

  • 對于trident來說,真正的spout是MasterBatchCoordinator,它的nextTuple會觸發batch的發送,它將batch指令發送給TridentSpoutCoordinator,而TridentSpoutCoordinator將觸發TridentBoltExecutor(TridentSpoutExecutor)的execute方法,進而觸發ITridentSpout的emitter的emitBatch,從而發送一個batch的數據
  • TridentBoltExecutor(TridentSpoutExecutor)的expectedTaskReports==0,它在調用完TridentSpoutExecutor發射batch的tuples時,就立馬調用finishBatch操作,通過COORD_STREAM往下游的TridentBoltExecutor發射[id,count]數據,告知下游TridentBoltExecutor說它一共發射了多少tuples
  • spout的下游bolt為TridentBoltExecutor(SubtopologyBolt),它的tracked.condition.expectedTaskReports不為0,因而它是在接收到TupleType.COORD的tuple的時候,才進行checkFinish操作(這里先忽略TupleType.COMMIT類型),由于spout是先執行emitBatch操作再最后finishBatch發送[id,count]數據,正常情況下按順序進入到TridentBoltExecutor(SubtopologyBolt)的receiveQueue隊列,然后TridentBoltExecutor(SubtopologyBolt)挨個消費tuple,調用SubtopologyBolt.execute,最后再處理[id,count]數據,觸發checkFinish操作,只要檢測收到的tuples與應收的tuples數一致,就執行SubtopologyBolt.finishBatch操作完成這個batch,然后再往它的下游TridentBoltExecutor發射它應收的[id,count]的tuple

doc

  • Trident Tutorial
  • 聊聊storm worker的executor與task
  • 聊聊storm的AggregateProcessor的execute及finishBatch方法

總結

以上是生活随笔為你收集整理的聊聊storm TridentBoltExecutor的finishBatch方法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。