日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

twitter storm源码走读(五)

發(fā)布時間:2023/12/4 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 twitter storm源码走读(五) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

TridentTopology創(chuàng)建過程詳解

從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用于Stream上的各種Operation。在實現(xiàn)層面來看,無論是stream,還是后續(xù)的operation都會轉(zhuǎn)變成為各個Node,這些Node之間的關系通過重要的數(shù)據(jù)結構來維護。具體到TridentTopology,實現(xiàn)圖的各種操作的組件是jgrapht。

說到圖,兩個基本的概念會閃現(xiàn)出來,一是結點,二是描述結點之間關系的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。

TridentTopology在轉(zhuǎn)換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行于一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的并發(fā)度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。

關于jgrapht的更多信息,請參考其官方網(wǎng)站?http://jgrapht.org

概要

在TridentTopology中向圖中添加結點的api有三種:

  • addNode
  • addSourcedNode
  • addSourcedStateNode
  • 其中addNode在創(chuàng)建stream是使用,addSourcedStateNode在partitionPersist時使用到,其它的operation使用到的是addSourcedNode.

    addNode與其它兩個方法的一個重要區(qū)別還在于,addNode是不需要添加邊(Edge),而其它兩個API需要往圖中添加edge,以確定該node的源是哪個。

    TridentTopology

    1 2 3 4 public?TridentTopology() { ????????_graph =?new?DefaultDirectedGraph(new?ErrorEdgeFactory()); ????????_gen =?new?UniqueIdGen(); ????}

    ?在TridentTopology的構造函數(shù)中,創(chuàng)建了DAG(有向無環(huán)圖)。利用這個_graph來作為容器以存儲后續(xù)過程中創(chuàng)建的各個node及它們之間的關系。

    newStream

    ?newStream會為DAG(有向無環(huán)圖)中創(chuàng)建源結點,其調(diào)用關系如下所示。

    • newStream
      • addNode
        • registerNode
    1 protected void registerNode(Node n) { 2 _graph.addVertex(n); 3 if(n.stateInfo!=null) { 4 String id = n.stateInfo.id; 5 if(!_colocate.containsKey(id)) { 6 _colocate.put(id, new ArrayList()); 7 } 8 _colocate.get(id).add(n); 9 } 10 }

    ?

    each

    作用于stream上的Operation有很多,以each為例來看新的operation是如何轉(zhuǎn)換成為node添加到_graph中的。

    //Stream.java
    public Stream each(Fields inputFields, Function function, Fields functionFields) {projectionValidation(inputFields);return _topology.addSourcedNode(this,new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new EachProcessor(inputFields, function)));}

    調(diào)用關系描述如下

    • Stream::each
    • TridentTopology::addSourcedNode
    • TridentTopology::registerSourcedNode

    registerSourcedNode的實現(xiàn)如下

    protected void registerSourcedNode(List<Stream> sources, Node newNode) {registerNode(newNode);int streamIndex = 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex++;} }

    注意此處添加edge是,是有索引的,這樣可以區(qū)別處理的先后順序。

    在Stream中含有成員變量_node,表示stream最近停泊的node,有了該變量添加edge才成為了可能。

    ?

    partitionPersist

    public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}

    調(diào)用關系

    • Stream::partitionPersist
    • TridentTopology::addSourcedStateNode
    • TridentTopology::registerSourcedNode

    與addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream

    既然談到了TridentState就不得不談到其另一面Stream::stateQuery,

    public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {projectionValidation(inputFields);String stateId = state._node.stateInfo.id;Node n = new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new StateQueryProcessor(stateId, inputFields, function));_topology._colocate.get(stateId).add(n);return _topology.addSourcedNode(this, n);}

    從此處可以看出stateQueryNode最起碼有兩個inputStream,一是從TridentState而來表示狀態(tài)已經(jīng)改變,另一個是處于drpcStream這個方面的上一跳結點。

    build

    TridentTopology::build是將TridentTopology轉(zhuǎn)變?yōu)镾tormTopology的過程,這一過程中最重要的一環(huán)就是將_graph中含有的node進行分組。

    grouping

    算法邏輯概述

    • 將boltNodes中的每個boltNode作為一個group加入全部加入initialGroups
    • 以graph和initialGroups作為入?yún)?chuàng)建GraphGrouper
    • 分組的過程其實就是進行合并的過程,詳見GraphGrouper::mergeFully()
      • 如果從當前group1的輸出目的地都是屬于group2,則將group1,group2合并
      • 如果當前group1的所有輸入源都是來自于group2,則將group1,group2合并
      • 將需要合并的group1,group2作為入?yún)?chuàng)建新的group,同時將group1,group2從已有的集合出移除
    public void mergeFully() {boolean somethingHappened = true;while(somethingHappened) {somethingHappened = false;for(Group g: currGroups) {Collection<Group> outgoingGroups = outgoingGroups(g);if(outgoingGroups.size()==1) {Group out = outgoingGroups.iterator().next();if(out!=null) {merge(g, out);somethingHappened = true;break;}}Collection<Group> incomingGroups = incomingGroups(g);if(incomingGroups.size()==1) {Group in = incomingGroups.iterator().next();if(in!=null) {merge(g, in);somethingHappened = true;break;}} }}}

    GraphGrouper::merge()

    private void merge(Group g1, Group g2) {Group newGroup = new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);}}

    在group之間添加partitionNode

    // add identity partitions between groupsfor(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) { Group g1 = grouper.nodeGroup(e.source);Group g2 = grouper.nodeGroup(e.target);// g1 being null means the source is a spout nodeif(g1==null && !(e.source instanceof SpoutNode))throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");if(g1==null || !g1.equals(g2)) {graph.removeEdge(e);PartitionNode pNode = makeIdentityPartition(e.source);graph.addVertex(pNode);graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); }}}


    _graph中所有的node在變換過后,變成兩組元素,一是spoutNodes,另一個是合并后的mergedGroup.

    spoutNodes中的每個元素作為spout添加到TridentTopologyBuilder的_spouts數(shù)組中,mergedGroup中的每個group添加到TridentTopologyBuilder的_bolt數(shù)組中。在TridentTopologyBuilder::build()中最主要的事情是為每個_spouts和_bolts數(shù)組中的成員添加grouping關系。

    小結

    到目前為止,通過兩篇文章分析了TridentTopology的創(chuàng)建過程及其運行時在每個TridentBoltExecutor中的消息傳遞情況。接下來會分析TridentTopology提供的API實現(xiàn)及其作用場景。

    總結

    以上是生活随笔為你收集整理的twitter storm源码走读(五)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。