【5】apollo编写python节点步骤及实例

  1. 在workspace/modules下新建包buildtool create --template component modules/test_one
    在这里插入图片描述

  2. 编译包 buildtool build -p modules/test_two/
    在这里插入图片描述

  3. 增加自己的proto消息
    在刚才自动生成的proto文件里面添加自己定义的消息,记得重新编译.

syntax = "proto2";package apollo;// message type of channel, just a placeholder for demo,
// you should use `--channel_message_type` option to specify the real message type
message TestTwoMsg {}message TestTwoConfig {optional string name = 1;
};
// new added
message TestTwoMymessage{optional string twoInfo = 1;
}

也可以新建proto文件, 修改BUILD文件;记得重新编译.
proto文件内容如下

syntax = "proto2";
package apollo;message TestTwoLwlgzy{optional string twoInfo = 1;optional string twoFault = 2;
}

修改对应BUILF的文件

load("//tools:apollo_package.bzl", "apollo_package")
load("//tools/proto:proto.bzl", "proto_library")
load("//tools:cpplint.bzl", "cpplint")package(default_visibility = ["//visibility:public"])proto_library(name = "test_two_proto",srcs = ["test_two.proto"],
)proto_library(name = "test_two_lwlgzy_proto",srcs = ["test_two_lwlgzy.proto"],
)
apollo_package()cpplint()
  1. 生成的pb2文件路径在/opt/apollo/neo/python/modules/test_two/proto
    a). 新建test.py文件加入如下内容 from modules.test_two.proto.test_two_pb2 import TestTwoMymessage
    b). 给运行权限sudo chmod 755 *.py
    运行没有报错即为正常.

本机没有编译or缺少需要引用的proto文件时候, 也可以直接将其他机器生成的pb2.py文件拷贝到类似/opt/apollo/neo/python/xxx/xxx/proto 下import才不会报错.

  1. python 文件中import 路径为from 包名路径.proto.xx_pb2 import xxx. 例如:from modules.test_two.proto.test_two_lwlgzy_pb2 import TestTwoLwlgzy
modules/test_two/
|-- BUILD
|-- conf
|   |-- test_two.conf
|   `-- test_two.pb.txt
|-- cyberfile.xml
|-- dag
|   `-- test_two.dag
|-- launch
|   `-- test_two.launch
|-- proto
|   |-- BUILD
|   |-- test_two.proto
|   `-- test_two_lwlgzy.proto
|-- test.py
|-- test_two_component.cc
`-- test_two_component.h4 directories, 12 files
  1. 播包cyber_recorder play -f modules/chassis.00000.20210622135417

订阅底盘话题处理故障实例

import time
import asyncio
from datetime import datetime
from cyber.python.cyber_py3 import cyber
from modules.canbus_vehicle.yt.proto.yt_pb2 import Yt
from od_health_test.proto.od_health_test_pb2 import OdErrorInfo, OdErrorMsgsimport logging
from modules.tools.common.logger import Logger
import osAPOLLO_ROOT = "/apollo"class xxHealthMonitor:def __init__(self) -> None:self.health_node_ = cyber.Node("xx_health_monitor_node")Logger.config(log_file = os.path.join(APOLLO_ROOT, 'data/log/xx_health.log'),use_stdout=True,log_level=logging.DEBUG)self.logger = Logger.get_logger("OdHealthMonitor")self.logger.info("od_health_monitor_node is started. ")self.health_node_.create_reader("/apollo/canbus/chassis_detail",Yt,self._chassis_cb_)self.health_writer = self.health_node_.create_writer("/od/health", OdErrorMsgs)self.has_data = Falseself.bat_total_voltage = 0self.bat_total_current = 0self.od_ErrMessages = OdErrorMsgs()e_info = OdErrorInfo()e_info.err_level = ''e_info.err_message = ''self.od_ErrMessages.error_msgs.append(e_info)self.last_data_time = None# /apollo/canbus/chassis_detail  100hzdef _chassis_cb_(self,msg):self.last_data_time = time.time()#...解析数据,添加逻辑判断def _padding_fault_msgs(self, level, msg):err_info = OdErrorInfo()err_info.err_level = levelerr_info.err_message = msgself.od_ErrMessages.error_msgs.append(err_info)def _error_checking(self):passdef run(self):while not cyber.is_shutdown():try:current_time = time.time()if self.last_data_time is None or (current_time -  self.last_data_time)>1:self.od_ErrMessages = OdErrorMsgs()e_info = OdErrorInfo()e_info.err_level = '1'e_info.err_message = 'No data received from chassis_detail'self.od_ErrMessages.error_msgs.append(e_info)self.health_writer.write(self.od_ErrMessages)else:self.od_ErrMessages = OdErrorMsgs()self._error_checking()self.health_writer.write(self.od_ErrMessages)                 # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))self.dump2log()except Exception as e:print("Error:", e)time.sleep(1) # 1hzself.has_data = Falsedef dump2log(self):dumpMsg = ""for oo in self.od_ErrMessages.error_msgs:dumpMsg += f"[{oo.err_level}:{oo.err_message}],"# print(writemsg.strip(','))self.logger.info(dumpMsg.strip(','))if __name__=="__main__":cyber.init()print("health monitor is starting!")health_monitor = OdHealthMonitor()health_monitor.run()print("health monitor is closing!")cyber.shutdown()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】美妆类商品跨境电商数据分析(源码+课程论文+数据集)【独一无二】

👉博__主👈:米码收割机 👉技__能👈:C/Python语言 👉公众号👈:测试开发自动化【获取源码商业合作】 👉荣__誉👈:阿里云博客专家博主、5…

使用RedisShake迁移自建Redis数据至阿里云Redis

一、前言 最近有个需求,需要把自建的Redis数据迁移至阿里云的Redis RDS。阿里云有官方的数据传输服务DTS(Data Transmission Service)。全量迁移是免费的,但增量迁移需要按量收费,遂放弃。经过一番搜索,发…

数据库的概念-数据库、数据库管理系统、数据库系统、数据库管理员、数据库设计人员、开发管理使用数据库系统的人员

一、数据库(DB) 1、数据库就是存储数据的仓库,只不过这个仓库是在计算机存储设备上 2、严格的说,数据库是长期存储在计算机内、有组织的、统一管理的、可共享的相关数据的集合 3、数据库应是为一个特定目标而设计、构建并装入数…

ClickHouse备份方案

ClickHouse备份方案主要包括以下几种方法: 一、使用clickhouse-backup工具: (参考地址:https://blog.csdn.net/qq_43510111/article/details/136570850) **安装与配置:**首先从GitHub获取clickhouse-bac…

利用MSSQL模拟提权

点击星标,即时接收最新推文 本文选自《内网安全攻防:红队之路》 扫描二维码五折购书 利用MSSQL模拟提权 在MS SQL数据库,可以使用EXECUTE AS语句,以其他用户的上下文执行SQL查询。需要注意的是只有明确授予模拟(Impers…

38.MessageToMessageCodec线程安全可被共享Handler

handler被注解@Sharable修饰的。 这样的handler,创建一个实例就够了。例如: ByteToMessageCodec的子类不能被@Sharable修饰 如果自定义类是MessageToMessageCodec的子类就是线程共享的,可以被@Sharable修饰的 package com.xkj.protocol;import com.xkj.message.Message; i…

Go日常分享 - error类型是指针类型吗?

背景 这个问题的产生来源于小泉在开发rpc接口时返回error遇到的问题,开发时想在defer里对err进行最终的统一处理赋值,发现外层接收一直都未生效。问题可以简化为成下面的小demo。 func returnError() error {var err errordefer func() {//err errors…

在 Oracle Linux 8.9 上安装中文和日文字体的完整指南

在 Oracle Linux 8.9 上安装中文和日文字体的完整指南 在 Oracle Linux 8.9 上安装中文和日文字体的完整指南前提条件安装步骤1. 更新系统2. 安装字体包安装中文字体安装日文字体 3. 安装字体配置工具4. 更新字体缓存5. 验证安装 可能遇到的问题及解决方案结语 在 Oracle Linux…

(一)SvelteKit教程:hello world

(一)SvelteKit教程:hello world sveltekit 的官方教程,在这里:Creating a project • Docs • SvelteKitCreating a project • Docs • SvelteKit 我们可以按照如下的步骤来创建一个项目: npm create s…

CentOs7 安装单机版redis

1.安装依赖 redis是由C语言开发,因此安装之前必须要确保服务器已经安装了gcc,可以通过如下命令查看机器是否安装: gcc -v如果没有安装则通过以下命令安装: yum install -y gcc如果安装gcc依赖报错则执行yum升级命令 # 先执行升…

NSIS 入门教程 (三)

引言 在教程的第二部分中,我们为安装程序增加了一个卸载程序,并查看了一些其他的向导页面以及安装部分的选择。第三部分的目标是使安装程序的外观更加现代化。 更现代的外观 为了给安装程序一个更现代的外观,我们要启用现代用户界面。要提…

Shell编辑之条件语句

一,条件测试操作 1:文件测试 文件测试操作用来检查文件的各种属性,如文件是否存在、是否可读、是否为空等。常用的文件测试操作符包括: -e 文件存在性测试-f 是否为普通文件-d 是否为目录-r 是否可读-w 是否可写-x 是否可执行-s…

学懂C#编程:常用高级技术——委托(Delegate)的概念及详细使用讲解

目录 委托的概念 常用应用场景 优势 C#中的委托(Delegate)是一种引用类型,它允许你封装一个方法的引用。委托类似于函数指针,但提供了更强大和类型安全的功能。委托在C#中扮演着多重角色,常用于实现回调方法、事件…

【栈和队列】

目录 1,栈(Stack) 1.1 概念 1.2 栈的使用 1.3 栈的模拟实现 1.4 栈的应用场景 1.5 概念区分 1.6 使用链表来实现栈 2, 队列(Queue) 2.1 概念 2.2 队列的使用 2.3 队列模拟实现 3,双端队列 (Deque) 4&…

【计算机组成原理】部分题目汇总

计算机组成原理 部分题目汇总 一. 简答题 RISC和CICS 简要说明,比较异同 RISC(精简指令集)注重简单快速的指令执行,使用少量通用寄存器,固定长度指令,优化硬件性能,依赖软件(如编译…

递归调用,将源路径下所有文件文件夹复制到目标路径中.

其实代码demo很简洁&#xff0c;只是逻辑有点绕&#xff0c;主要是要一层一层调用自己&#xff0c;要清楚当前是第几层调用&#xff0c;及递归调用时进的点和出的点在哪儿&#xff0c;一切就清晰明了了。 /// <summary>/// 删除指定目录下面的所有文件和文件夹/// </s…

C++学习合集

#整理到一块&#xff0c;方便查东西&#xff0c;顺便补充一些之前没有学习到的东西# 变量 char--1字节 short--2字节 int-4字节 long--4字节 long long(int)--8字节&#xff1b;准确来说变量的大小取决于编译器&#xff0c;1字节8个二进制位&#xff0c;其中最高位为符号位…

关于Mysql 的on duplicate key update操作,导致主键不连续自增的问题

一 相关说明 在实际的开发中,经常会遇到这样的场景:若数据库里面不存在数据,则插入;若存在,则更新。在Mysql中,可以使用ON DUPLICATE KEY UPDATE,一步就能完成上述操作。简单说,就是数据库中存在某个记录时,执行这个语句会更新,而不存在这条记录时,就会插入。 需要说…

CesiumJS【Basic】- #009 切换地形数据源

文章目录 切换地形数据源1 目标2 实现切换地形数据源 1 目标 切换地形数据源 2 实现 这段代码定义了一个名为 toggleTerrainProvider 的方法,用于在 CesiumJS 中切换地形数据源。如果当前的地形数据源是 EllipsoidTerrainProvider(一个简单的椭球体地形),则将其切换为 …

黑龙江等保测评

黑龙江等保测评概述 黑龙江等保测评是指根据《中华人民共和国网络安全法》及《信息安全等级保护管理办法》等相关法律法规&#xff0c;对信息系统安全保护能力进行评估和验证的过程。它旨在确保重要信息系统能够达到相应的安全保护级别&#xff0c;有效防范各种安全威胁&#…