Storm系列(四)Topology提交校验过程

功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务.

实现源码:

(^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                 (SubmitOptions. TopologyInitialStatus/ACTIVE)))

从以上源码中看出submitTopology内部是对submitTopologyWithOpts方法的调用。

submitTopologyWithOpts函数原型如下:

^void submitTopologyWithOpts
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
         ^SubmitOptions submitOptions]

在submitTopologyWithOpts中主要做了以下几件事情:

  1. 校验submitOptions参数不能为空。
  2. 检查storm-name中是否包含非法字符。
  3. 校验storm-name与正在运行的Topology是否有重名,重名将造成冲突。
  4. 将nimbus(nimbus-data类型)中的submitted-count已提交Topology计数字段加1。
  5. 为所提交的Topology创建唯一的storm-id(topology-id),格式:<storm-name>-<submitted-count>-<当前时间>
  6. 通过normalize-conf获取提交的Topology的Storm配置,首先将参数serializedConf进行反序列化,然后加入storm-name,storm-id等。
  7. 将Storm默认的配置(conf)与第六步得到的Storm配置进行合并,合并原则为两份配置中重复的配置项以第六步中的配置为准。
  8. 调用normalize-topology计算提交的Topology中每个组件并行度及更新TOPOLOGY_TASKS配置项.
  9. 获取nimbus(nimbus-data类型)中storm-cluster-state对象。
  10. 调用System-topology!方法对Topology结构进行校验。
  11. 获取nimbus中的submit-lock锁。
  12. 调用setup-storm-code为Topology创建对应的本地文件夹、复制jar并写入序列化后的Storm配置项和Topology信息.
  13. 调用setup-hearbeats!为Topology在Zookeeper中创建心跳路径,/storm/workerbeats/topology-id.
  14. 定义一个从thrift-status到keyword-status的哈希表,该哈希表用来将传入的submitOptions中的thrift-status转化为对应的keyword-status.
  15. 调用start-storm设置stormBase,它在Zookeeper中路径是/storm/storms/<topology-id>,stormBase的信息将做为该路径所对应的存储值。
  16. 调用mk-assignments为所提交的Topology分配资源.

normalize-topology

实现源码:

(defn normalize-topology [storm-conf ^StormTopology topology]
  (let [ret (.deepCopy topology)]
    (doseq [[_ component] (all-components ret)]
      (.set_json_conf
        (.get_common component)
        (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
             (merge (component-conf component))
             to-json )))
ret ))

实现说明:

  • 调用deepCopy对topology进行深度拷贝,赋值给ret.
  • 遍历topology(ret)所有组件,调用component-parallelism更新组件配置中的TOPOLOGY_TASKS信息。

component-parallelism实现源码(计算组件并行度):

(defn- component-parallelism [storm-conf component]
  (let [storm-conf (merge storm-conf (component-conf component))
        num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
        max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
        ]
    (if max-parallelism
      (min max-parallelism num-tasks)
      num-tasks)))
实现说明:
  • 将Topology配置信息与组件(component)配置信息进行合并,两者存在重复的配置项时以组件的配置项为准。
  • 计算组件并行度(num-tasks),若果配置storm-conf中配置了TOPOLOGY-TASKS信息,就以该配置值做为组件的并行度,否则通过调用num-start-executors获取用户对组件设置的并行度做为num-tasks.
  • 获取storm-conf配置中TOPOLOGY-MAX-TASK-PARALLELISM配置项的值。
  • 返回TOPOLOGY-MAX-TASK-PARALLELISM与num-tasks较小的值做为组件的并行度。
TopologyBuilder builder = new TopologyBuilder();
// 4对应对用用户设置的组件并行度,10对应TOPOLOGY-TASK配置项的值
builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random").setNumTasks(6); Config conf = new Config();
// 8对应 TOPOLOGY-MAX-TASK-PARALLELISM配置项的值
Conf.setMaxTaskParallelism(8);

 

system-topology!

功能:

验证用户提交的Topology,同时为提交的topology添加一些系统组件和流。

实现源码:

(defn system-topology! [storm-conf ^StormTopology topology]
  (validate-basic! topology)
  (let [ret (.deepCopy topology)]
    (add-acker! storm-conf ret)
    (add-metric-components! storm-conf ret)    
    (add-system-components! storm-conf ret)
    (add-metric-streams! ret)
    (add-system-streams! ret)
    (validate-structure! ret)
10      ret
11  ))

实现说明:

  • 使用validate-basic!校验所提交的Topology.
    主要用于确保topology中的组件id不重复而且不是系统id,以及确保每个组件的TOPOLOGY-TASKS配置项大于0时,组件的并行度设置也一定大于0.
  • 调用deepCopy对topology进行深度拷贝,赋值给ret.
  • 为Topology添加acker-bolt.
    用于追踪发送出去的消息是否被成功处理。
  • 使用add-metric-components为Topology添加metric-bolt.
  • 为Topology添加system-bolt.
    System-bolt没有输入流只有输出流分别为:SYSTEM-TICK-STREAM-ID,声明字段是[“rate_secs”],非直接模式;另一个为METRICS-TICK-STREAM-ID,声明字段为[“interval”]非直接模式,并行度为0.
  • 为Topology中的所有组件添加统计流。
    Stream-id为METRICS-STREAM-ID,声明字段为[“task-info”,”data-points”],非直接流模式.
  • 为Topology中的所有组件添加系统流。
    stream-id为SYSTEM-STREAM-ID,声明字段为[“event”],非直接流模式.
  • 使用validate-structure!检验以上步骤所组合后的Topology.

验证过程:
获取Topology中所有组件和组件的输入(包括component-id、stream-id、Grouping),对输入组件依次判断输入组件ID(component-id)是否在该Topology中,不存在则抛出异常,存在则再判断该组件的流类型是否为所对应的stream-id,若不存在则抛出异常,存在则继续检查该流的分组方式(Grouping)是否与能对应,所有组件检查完毕后没有异常抛出表示该Topology有效.

转载于:https://www.cnblogs.com/jianyuan/p/4792443.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/491829.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

附全文 |《数字中国指数报告2019》重磅发布,下一个数字经济增长点将由产业驱动...

来源&#xff1a;腾讯研究院5月21日&#xff0c;2019腾讯全球数字生态大会在昆明盛大开幕。在主峰会上&#xff0c;腾讯研究院发布《数字中国指数报告&#xff08;2019&#xff09;》&#xff08;以下简称“报告”&#xff09;&#xff0c;推出2019数字中国指数&#xff0c;全面…

将KEEL的数据集转换为CSV文件

使用MATLAB将KEEL的数据集&#xff08;以glass1数据集为例&#xff09;转换为CSV文件步骤如下&#xff1a; 预处理&#xff1a;首先将从KEEL下载的.dat文件用记事本方式打开&#xff0c;删除里面的非数据部分 点击MATLAB的"导入数据"按钮&#xff0c;选择要转换的.…

超维计算让AI有记忆和反应,还能解决自动驾驶难题

来源&#xff1a;雷锋网这可以打破我们与自动驾驶汽车和其他机器人之间的僵局&#xff0c;这也将促使更像人类AI模型的出现。马里兰大学的一个研究团队最近提出了超维计算理论&#xff08;hyperdimensional computing theory&#xff09;&#xff0c;能够让机器人拥有记忆和反应…

BZOJ 2150: 部落战争 最大流

2150: 部落战争 Time Limit: 1 Sec Memory Limit: 256 MB 题目连接 http://www.lydsy.com/JudgeOnline/problem.php?id2150Description lanzerb的部落在A国的上部&#xff0c;他们不满天寒地冻的环境&#xff0c;于是准备向A国的下部征战来获得更大的领土。 A国是一个M*N的矩…

数字图像处理-0.绪论

一、图像处理的概念 图像&#xff1a;是对客观物体的一种相似性的生动的写真或描述。 可见的图像&#xff1a;照片、图与画 、投影 /不可见光&#xff1a;不可见光成像&#xff08;红外、紫外等&#xff09;、数学模型的生成。 图像的类别&#xff1a;彩色和非彩色&#xff…

window下打开tensorboard

首先通过一个简单的 TensorFlow 程序&#xff0c;在这个程序中完成了TensorBoard 日志输出的功能。 import tensorflow as tfa tf.constant([1, 2, 3], nameinput1) b tf.constant([4, 5, 6], nameinput2) c tf.add_n([a, b], nameadd)with tf.Session() as sess:sess.run(…

数字图像处理-1.图像获取

一、图像数字化 原理&#xff1a;图像数字化是将一幅画面转化成计算机能处理的形式——数字图像的过程。 将一幅图像分割成如图的一个个小区域&#xff08;像元或像素&#xff09;&#xff0c;并将各小区域灰度用整数表示&#xff0c;形成一幅点阵式的数字图像。 包括采样和量…

刘炽平:腾讯要以生态方式进行开放,不做“颠覆者”

“我们希望做生态的‘共建者’&#xff0c;成为大家的‘帮助者’&#xff0c;而不是传统互联网行业的颠覆者。”在2019腾讯全球数字生态大会上&#xff0c;腾讯公司总裁刘炽平这样说道。本次大会由云南省人民政府指导&#xff0c;云南省文化和旅游厅、昆明市人民政府、腾讯公司…

iOS 翻译-UIWebView的基本简介-官方文档翻译

继承关系&#xff1a;NSObject-UIResponder-UIView-UIWebView 遵循&#xff1a;NSCoding NSObject UIAppearance UIAppearanceContainer UICoordinateSpace UIDynamicItem UIScrollViewDelegate UITraitEnvironment 你可以使用UIWebView类嵌入网页内容在您的应用程序。这样做&a…

np.squeeze()

作用&#xff1a;从数组的形状中删除单维条目&#xff0c;即把shape中为1的维度去掉 例子&#xff1a; import numpy as npa np.array([[1], [2], [3]]) print(a) print(a.shape) 输出&#xff1a; [[1] [2] [3]] shape:(3, 1) 应用squeeze()后&#xff1a; a1 np.sq…

数字图像处理--2.图像变换

一、图像的傅里叶变换 目的与用途 图像变换的目的&#xff1a; 使得图像处理问题简化&#xff1b;有利于图像特征提取&#xff1b;&#xff08;我们知道特征提取的目的是为了对影像进行分析&#xff0c;根据特征从影像中提取目标等有用信息&#xff0c;特征提取对影像中提取…

《腾讯数字生活报告2019》发布,互联网时代新马斯洛需求金字塔预示什么?

来源&#xff1a;腾讯研究院5月22日&#xff0c;腾讯研究院、腾讯应用宝、腾讯开放平台联合出品的《腾讯数字生活报告2019》在腾讯全球数字生态大会应用生态主题论坛上发布&#xff0c;报告从生存、关系、发展三个层级解读了大众数字生活版图中的新趋势及核心洞察。透过这份数字…

0909论编译原理

编译原理学什么&#xff1f; 我觉得学习编译原理可以帮助自己更加深层次的理解程序语言和内部机制&#xff0c;学习一种新的解决问题的方法&#xff0c;从各种算法中可以得到启发&#xff0c;更加深入的了解计算机思想&#xff0c;进一步培养计算机思维。 为什么学编译原理&…

python实现KNN算法

inX是待测样本&#xff0c;dataSet是训练样本集&#xff0c;labels是训练样本集的标签集 &#xff0c;k是近邻数 from numpy import * import operatordef knn(inX, dataSet, labels, k): m dataSet.shape[0] # 获得训练样本的样本个数diffMat tile(inX, (m,1)) - dataSet …

数字图像处理--3.图像增强

一、图像增强的点运算 图像增强&#xff1a;采用一系列技术&#xff0c;改善图像的视觉效果&#xff0c;或者将图像转换成一种更适合于人或者机器进行分析和处理的形式。 图像增强方法&#xff1a;1.空间域增强&#xff1a;直接对图像各像素进行处理&#xff1b;2.对图像进行…

腾讯姚星:两大科技矩阵助力两张网,立志攻克通用人工智能和多模态问题

来源&#xff1a;腾讯AI实验室5月21日&#xff0c;2019腾讯全球数字生态大会在昆明滇池国际会展中心开幕。本次大会由云南省人民政府指导&#xff0c;云南省文化和旅游厅、昆明市人民政府、腾讯公司共同主办&#xff0c;是腾讯战略升级后&#xff0c;整合互联网数字经济峰会、云…

nginx的学习(配置文件,以及部署的疑惑)

1、在windows下安装nginx&#xff0c;解压之后&#xff0c;在此目录下&#xff0c;dos进去&#xff0c;start nginx 2、配置文件&#xff1a; http { include mime.types; default_type application/octet-stream; sendfile on; keepalive_timeou…

python将数据集分成训练样本和类标签

这里假设 类标签为largeDoses, smallDoses, didntLike三类&#xff0c;假设训练样本有三个特征属性&#xff0c;类标签放在数据集的最后一列 import numpy as npdef file2matrix(filename): # filename是文件保存地址love_dictionary {largeDoses:3, smallDoses:2, didntLik…

图像处理-5

1.图像的数学变换 空间域&#xff1a;图像的代数运算和几何运算都是利用对输入图像进行加工而得到输出图像 转换空间&#xff1a;最典型的有离散傅里叶变换将原定义在图像空间的图像以某种形式转换到另外一些空间&#xff0c;并利用输入图像在这些空间的特有性质有效而快速地…

量子算法、DNA计算与后经典计算时代

来源&#xff1a;资本实验室二进制与伟大的计算机相结合&#xff0c;推动人类进入了信息化时代。在这个基于物质世界的&#xff0c;由0和1构成的新世界中&#xff0c;我们依靠算法和电子技术不断解决了大量曾经无法解决的问题。然而&#xff0c;好奇的人类总是善于提出新的、更…