高校文明建设网站ip 查询地址
news/
2025/9/24 1:34:34/
文章来源:
高校文明建设网站,ip 查询地址,大悟县建设局网站,网站改版 seoTridentTopology创建过程详解 从用户层面来看TridentTopology#xff0c;有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看#xff0c;无论是stream#xff0c;还是后续的operation都会转变成为各个Node#xff0c;这些Node之间的关系通… TridentTopology创建过程详解 从用户层面来看TridentTopology有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看无论是stream还是后续的operation都会转变成为各个Node这些Node之间的关系通过重要的数据结构图来维护。具体到TridentTopology实现图的各种操作的组件是jgrapht。 说到图两个基本的概念会闪现出来一是结点二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。 TridentTopology在转换成为普通的StormTopology时需要将原始的图分成各个group每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。 关于jgrapht的更多信息请参考其官方网站 http://jgrapht.org 概要 在TridentTopology中向图中添加结点的api有三种 addNodeaddSourcedNodeaddSourcedStateNode其中addNode在创建stream是使用addSourcedStateNode在partitionPersist时使用到其它的operation使用到的是addSourcedNode. addNode与其它两个方法的一个重要区别还在于addNode是不需要添加边(Edge)而其它两个API需要往图中添加edge以确定该node的源是哪个。 TridentTopology 1 2 3 4 public TridentTopology() { _graph new DefaultDirectedGraph(new ErrorEdgeFactory()); _gen new UniqueIdGen(); } 在TridentTopology的构造函数中创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。 newStream newStream会为DAG(有向无环图)中创建源结点其调用关系如下所示。 newStream addNode registerNode 1 protected void registerNode(Node n) {2 _graph.addVertex(n);3 if(n.stateInfo!null) {4 String id n.stateInfo.id;5 if(!_colocate.containsKey(id)) {6 _colocate.put(id, new ArrayList());7 }8 _colocate.get(id).add(n);9 }
10 } each 作用于stream上的Operation有很多以each为例来看新的operation是如何转换成为node添加到_graph中的。 //Stream.javapublic Stream each(Fields inputFields, Function function, Fields functionFields) {projectionValidation(inputFields);return _topology.addSourcedNode(this,new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new EachProcessor(inputFields, function)));} 调用关系描述如下 Stream::eachTridentTopology::addSourcedNodeTridentTopology::registerSourcedNoderegisterSourcedNode的实现如下 protected void registerSourcedNode(ListStream sources, Node newNode) {registerNode(newNode);int streamIndex 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex;} } 注意此处添加edge是是有索引的这样可以区别处理的先后顺序。 在Stream中含有成员变量_node表示stream最近停泊的node,有了该变量添加edge才成为了可能。 partitionPersist public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id _topology.getUniqueStateId();ProcessorNode n new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer true;n.stateInfo new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);} 调用关系 Stream::partitionPersistTridentTopology::addSourcedStateNodeTridentTopology::registerSourcedNode与addNode及addSourcedNode不同的是addSourcedStateNode返回的是TridentState而非Stream。 既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery, public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {projectionValidation(inputFields);String stateId state._node.stateInfo.id;Node n new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new StateQueryProcessor(stateId, inputFields, function));_topology._colocate.get(stateId).add(n);return _topology.addSourcedNode(this, n);} 从此处可以看出stateQueryNode最起码有两个inputStream一是从TridentState而来表示状态已经改变另一个是处于drpcStream这个方面的上一跳结点。 build TridentTopology::build是将TridentTopology转变为StormTopology的过程这一过程中最重要的一环就是将_graph中含有的node进行分组。 grouping 算法逻辑概述 将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups以graph和initialGroups作为入参创建GraphGrouper分组的过程其实就是进行合并的过程详见GraphGrouper::mergeFully() 如果从当前group1的输出目的地都是属于group2则将group1,group2合并如果当前group1的所有输入源都是来自于group2则将group1group2合并将需要合并的group1,group2作为入参创建新的group同时将group1,group2从已有的集合出移除 public void mergeFully() {boolean somethingHappened true;while(somethingHappened) {somethingHappened false;for(Group g: currGroups) {CollectionGroup outgoingGroups outgoingGroups(g);if(outgoingGroups.size()1) {Group out outgoingGroups.iterator().next();if(out!null) {merge(g, out);somethingHappened true;break;}}CollectionGroup incomingGroups incomingGroups(g);if(incomingGroups.size()1) {Group in incomingGroups.iterator().next();if(in!null) {merge(g, in);somethingHappened true;break;}} }}} GraphGrouper::merge() private void merge(Group g1, Group g2) {Group newGroup new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);}} 在group之间添加partitionNode // add identity partitions between groupsfor(IndexedEdgeNode e: new HashSetIndexedEdge(graph.edgeSet())) {if(!(e.source instanceof PartitionNode) !(e.target instanceof PartitionNode)) { Group g1 grouper.nodeGroup(e.source);Group g2 grouper.nodeGroup(e.target);// g1 being null means the source is a spout nodeif(g1null !(e.source instanceof SpoutNode))throw new RuntimeException(Planner exception: Null source group must indicate a spout node at this phase of planning);if(g1null || !g1.equals(g2)) {graph.removeEdge(e);PartitionNode pNode makeIdentityPartition(e.source);graph.addVertex(pNode);graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); }}} _graph中所有的node在变换过后变成两组元素一是spoutNodes另一个是合并后的mergedGroup. spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。 小结 到目前为止通过两篇文章分析了TridentTopology的创建过程及其运行时在每个TridentBoltExecutor中的消息传递情况。接下来会分析TridentTopology提供的API实现及其作用场景。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/914493.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!