什么是节点?
在ROS2(机器人操作系统2)中,节点(node)是执行程序的基本单元,也是构成整个机器人系统的核心“积木”。你可以把它理解为系统中一个独立、可执行的进程,每个节点都专注于完成一个特定的、单一的功能。这种设计哲学让复杂的机器人系统变得模块化,易于开发、维护和扩展。
节点的核心特性
-
模块化(Modularity) 这是节点最重要的特性。每个节点只做一件事,而且做得很好。例如,在一个移动机器人系统中,你不会把所有代码都写在一个庞大的程序里,而是会创建多个独立的节点来完成不同的任务:
-
感知节点:负责从摄像头或激光雷达等传感器收集数据。
-
规划节点:根据感知数据和目标,计算出一条最优路径。
-
控制节点:接收规划结果,并发送指令来控制机器人的电机。 这种分工合作的方式使得开发团队可以并行工作,并且当某个节点出现问题时,不会影响到整个系统的运行。
-
-
分布式(Distributed) 节点可以运行在不同的计算机上,甚至在不同的操作系统上。只要这些设备能通过网络连接,ROS2的通信机制就能让它们无缝地交换信息。例如,一个节点可以在机器人本体上的小型计算机上运行,而另一个负责复杂计算的节点则可以在远程的强大服务器上运行。这为构建复杂的、跨设备的机器人系统提供了极大的灵活性。
-
通信(Communication) 节点之间通过一系列标准的通信方式进行信息交换,这使得它们能够协同工作。最常见的通信方式是:
-
话题(Topics):这是一种发布/订阅(Publish/Subscribe)机制。一个节点(发布者)向一个特定的“话题”发送数据,而对这个话题感兴趣的节点(订阅者)会接收这些数据。例如,一个“激光雷达”节点可以持续向名为
/scan
的话题发布扫描数据,而“规划”节点会订阅这个话题来获取数据。 -
服务(Services):这是一种请求/响应(Request/Reply)机制。它类似于传统的函数调用,一个节点向另一个节点发送一个请求,并等待接收一个响应。这通常用于执行一次性的、同步的任务,比如请求机器人执行一个特定的动作。
-
动作(Actions):这是一种用于处理耗时任务的通信方式。一个节点向另一个节点发送一个目标,并能持续接收任务进度反馈,直到任务完成或被取消。
-
使用python代码简单创建节点
import rclpy
from rclpy.node import Nodedef main():rclpy.init() # 初始化node = Node('node_one') # 创建节点# 打印节点信息node.get_logger().info('node_one activate') node.get_logger().warn('node_one warn')node.get_logger().error('node_one error')rclpy.spin(node)rclpy.shutdown()if __name__=='__main__':main()
功能包
ROS2 中的功能包(package)是组织和管理代码的基本单位。你可以把它看作是一个装有所有相关文件的文件夹,这些文件共同实现了一个或多个特定的功能。
功能包的组成:
-
可执行文件(Nodes):实现具体功能的程序,也就是上面我们提到的节点。
-
库文件(Libraries):可供其他功能包使用的代码库。
-
配置文件:如 YAML 文件,用于设置参数和配置。
-
消息、服务和动作定义(
msg
、srv
、action
):定义了节点之间通信时所使用的标准数据格式。 -
配置文件(
CMakeLists.txt
或package.xml
):这是功能包的“身份证”和“构建脚本”,package.xml
描述了功能包的基本信息和依赖关系,而CMakeLists.txt
或其他构建脚本则告诉 ROS2 如何编译和安装这个功能包。 -
其他资源:如启动文件(launch files)、URDF 文件、图片和模型等。
进行简单的功能包的创建,打开终端输入以下命令,创建完功能包后需要先进行一些文件的改动才能进行功能包的构建
ros2 pkg create demo_python_pkg(包名) --build-type ament_python(构建类型) --license Apache-2.0(证书) (创建功能包)
colcon build (构建功能包)
source install/setup.bash (更新环境变量)
echo $AMENT_PREFIX_PATH (输出环境变量)
ros2 run demo_python_pkg python_node _main (执行可执行文件)
在功能包创建完成之后,先创建一个节点文件python_node
import rclpy
from rclpy.node import Node
def main():rclpy.init()node = Node("NodeOne")node.get_logger().info("1号节点启动")rclpy.spin(node)rclpy.shutdown()
之后修改setup.py文件和package.xml文件
构建之后得到的可执行文件
如果修改了源码则需要重新构建得到新的可执行文件,注:环境变量的更新只在本终端有效如果新开一个终端需要再次执行一次更新命令
面向对象的思想:将不同的事物分为不同的类,对于每一个类,进行实例化得到对象,同一个的类的对象变量和方法是一样的,但是变量储存的内容可能不一样。
Python常见类装饰器
1. @classmethod
@classmethod
装饰器用于将一个方法定义为类方法。类方法与普通实例方法不同,它接收的第一个参数是类本身,通常命名为 cls
。这使得你可以在不创建类的实例的情况下调用这个方法。类方法通常用于创建工厂方法,或者执行与类相关的操作。
class MyClass:count = 0def __init__(self, name):self.name = nameMyClass.count += 1@classmethoddef get_count(cls):return f"当前创建了 {cls.count} 个实例。"obj1 = MyClass("A")
obj2 = MyClass("B")
print(MyClass.get_count())
# 输出: 当前创建了 2 个实例。
2. @staticmethod
@staticmethod
装饰器用于将一个方法定义为静态方法。静态方法既不接收类 (cls
) 作为第一个参数,也不接收实例 (self
) 作为第一个参数。它与类或实例的状态完全无关,更像是一个普通的函数,只是被组织在类的命名空间内。
class Calculator:@staticmethoddef add(x, y):return x + y@staticmethoddef subtract(x, y):return x - yprint(Calculator.add(5, 3))
# 输出: 8
3. @property
@property
装饰器将一个方法转换为只读属性。这使得你可以像访问属性一样调用方法,而无需使用括号。它通常用于将类的内部数据封装起来,提供一个更干净的接口。同时,你还可以配合 @<property_name>.setter
和 @<property_name>.deleter
来定义设置和删除属性的行为。
class Person:def __init__(self, name):self._name = name # 通常用下划线表示这是内部属性@propertydef name(self):return self._name@name.setterdef name(self, value):if not isinstance(value, str):raise TypeError("姓名必须是字符串。")self._name = valuep = Person("Alice")
print(p.name) # 像访问属性一样调用
p.name = "Bob" # 使用setter修改
print(p.name)
4. @dataclass
@dataclass
装饰器是 Python 3.7 引入的,它主要用于快速创建数据类。数据类通常只包含数据,并且 dataclass
会自动为你生成许多有用的魔术方法,例如 __init__
、__repr__
、__eq__
等,从而大大减少样板代码。
from dataclasses import dataclass@dataclass
class Point:x: floaty: floatp1 = Point(1.0, 2.0)
p2 = Point(1.0, 2.0)
print(p1) # 自动生成 __repr__
# 输出: Point(x=1.0, y=2.0)print(p1 == p2) # 自动生成 __eq__
# 输出: True
5. @total_ordering
@total_ordering
装饰器位于 functools
模块中,它非常有用。如果你为一个类定义了至少一个富比较方法(__lt__
, __le__
, __gt__
, __ge__
),并且还定义了 __eq__
,@total_ordering
就会自动为你生成其余的比较方法。这避免了重复编写所有比较逻辑。
from functools import total_ordering@total_ordering
class Number:def __init__(self, value):self.value = valuedef __eq__(self, other):return self.value == other.valuedef __lt__(self, other):return self.value < other.valuen1 = Number(5)
n2 = Number(10)print(n1 < n2) # True
print(n1 > n2) # True (由 @total_ordering 自动生成)
print(n1 <= n2) # True (由 @total_ordering 自动生成)
Python之多线程
多线程:在同一个进程中同时执行多个线程,提高程序的并发能力。
由于 Python 解释器的 GIL(全局解释器锁),多线程不能真正实现 CPU 并行计算,但对 I/O 密集型任务(网络请求、文件读写)非常有用。
GIL 是 CPython 解释器 为了保证线程安全而引入的一种机制,在任意时刻,只允许 一个线程 执行 Python 字节码,其本质:它是一个互斥锁,确保解释器内部的数据结构在多线程情况下不会出错。
Python之回调函数
回调函数:就是把一个函数作为参数传递给另一个函数,等前者任务执行完后再调用这个函数,常用于 异步编程、事件驱动,比如按钮点击、网络请求完成、ROS 订阅消息等。
import threadingt = threading.Thread(target=函数名, args=(参数1, 参数2, ...))
t.start() # 启动线程
t.join() # 等待线程执行完成(可选)
一般多线程和回调函数的配合使用
import threading
import requests
from bs4 import BeautifulSoupclass Download:def download(self,url,callback_novel_name):response = requests.get(url)response.encoding = 'utf-8'soup = BeautifulSoup(response.text, "html.parser")title = soup.find("span", class_="txt").get_text()score = soup.find("span", class_="score").get_text()novel_name(title,score)# 多线程下载小说def strat(self,url,callback_novel_name):thread = threading.Thread(target = self.download,args=(url,callback_novel_name))thread.start()# 回调函数
def novel_name(title,score):print(f"title :{title},score :{score}")
if __name__=='__main__':download = Download()download.strat("https://www.qimao.com/shuku/1747899/",novel_name)download.strat("https://www.qimao.com/shuku/1672986/",novel_name) download.strat("https://www.qimao.com/shuku/1803136/",novel_name)