果园智能采摘机器人控制系统
实际应用场景描述
在现代农业生产中,水果采摘是一项劳动密集、季节性强、成本高昂的工作。传统的果园采摘依赖人工,存在效率低、成本高、劳动力短缺等问题。特别是在大型果园,成熟的果实需要在短时间内完成采摘,否则会造成严重的经济损失。
痛点分析
1. 劳动力短缺:农业劳动力老龄化,年轻劳动力不足
2. 采摘效率低:人工采摘速度慢,高峰期采摘不及时
3. 采摘质量不一:人工采摘容易造成果实损伤
4. 作业环境恶劣:高温、粉尘、农药残留等对工人健康不利
5. 采摘时机难把握:成熟度判断依赖经验,易误采或漏采
6. 果树损伤:不当采摘会损伤果树,影响来年产量
核心逻辑
1. 视觉感知:通过多传感器融合识别果实成熟度
2. 路径规划:基于A*算法和RRT算法规划最优采摘路径
3. 运动控制:使用PID和力控实现精准采摘动作
4. 避障防护:实时检测障碍物,避免碰撞损伤
5. 质量评估:采摘后自动分级和质量检测
6. 自主学习:通过强化学习优化采摘策略
系统架构
orchard_harvest_robot/
├── app.py # 主应用程序
├── config.py # 系统配置文件
├── requirements.txt # 依赖包列表
├── README.md # 系统说明文档
├── controllers/
│ ├── vision_controller.py # 视觉控制器
│ ├── motion_controller.py # 运动控制器
│ ├── path_planner.py # 路径规划器
│ └── harvest_controller.py # 采摘控制器
├── sensors/
│ ├── camera_system.py # 相机系统
│ ├── lidar_system.py # 激光雷达
│ └── force_sensor.py # 力传感器
├── actuators/
│ ├── robotic_arm.py # 机械臂控制
│ ├── gripper_control.py # 末端执行器
│ └── mobile_base.py # 移动底盘
├── models/
│ ├── fruit_detector.py # 果实检测模型
│ ├── ripeness_classifier.py # 成熟度分类器
│ └── tree_model.py # 果树三维模型
├── utils/
│ ├── calibration.py # 系统标定
│ └── logger.py # 日志记录
├── static/ # 静态资源
│ └── dashboard.html # 监控面板
└── data/ # 数据存储
├── fruit_dataset/ # 果实数据集
└── harvest_logs/ # 采摘日志
核心代码实现
1. 配置文件 (config.py)
"""
果园采摘机器人配置文件
基于智能控制原理的系统参数设置
"""
import json
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
from enum import Enum
import numpy as np
class FruitType(Enum):
"""水果类型枚举"""
APPLE = "apple" # 苹果
PEAR = "pear" # 梨
ORANGE = "orange" # 橙子
PEACH = "peach" # 桃子
GRAPE = "grape" # 葡萄
CHERRY = "cherry" # 樱桃
STRAWBERRY = "strawberry" # 草莓
KIWI = "kiwi" # 猕猴桃
class RipenessLevel(Enum):
"""成熟度等级枚举"""
UNRIPE = "unripe" # 未成熟
NEAR_RIPE = "near_ripe" # 接近成熟
RIPE = "ripe" # 成熟
OVER_RIPE = "over_ripe" # 过熟
ROTTEN = "rotten" # 腐烂
@dataclass
class FruitProperties:
"""水果物理属性"""
fruit_type: FruitType
min_diameter: float # 最小直径(mm)
max_diameter: float # 最大直径(mm)
min_weight: float # 最小重量(g)
max_weight: float # 最大重量(g)
stem_length: float # 果梗长度(mm)
detachment_force: float # 脱离力(N)
color_profile: Dict[str, List[int]] # 颜色特征
def get_size_category(self, diameter: float) -> str:
"""根据直径获取大小分类"""
if diameter < self.min_diameter:
return "small"
elif diameter < (self.min_diameter + self.max_diameter) / 2:
return "medium"
else:
return "large"
@dataclass
class RobotSpecifications:
"""机器人规格参数"""
# 机械臂参数
arm_dof: int = 6 # 自由度数量
max_reach: float = 1500.0 # 最大工作半径(mm)
positioning_accuracy: float = 0.5 # 定位精度(mm)
repeatability: float = 0.1 # 重复定位精度(mm)
max_payload: float = 5.0 # 最大负载(kg)
# 末端执行器参数
gripper_type: str = "adaptive" # 夹爪类型
gripper_force_range: Tuple[float, float] = (0.1, 10.0) # 夹持力范围(N)
max_opening: float = 120.0 # 最大开口(mm)
min_opening: float = 10.0 # 最小开口(mm)
# 移动底盘参数
base_type: str = "differential" # 底盘类型
max_speed: float = 1.0 # 最大速度(m/s)
climbing_ability: float = 15.0 # 爬坡角度(度)
battery_capacity: float = 20000.0 # 电池容量(mAh)
# 传感器参数
camera_resolution: Tuple[int, int] = (1920, 1080) # 相机分辨率
lidar_range: float = 10.0 # 激光雷达范围(m)
force_sensor_range: float = 50.0 # 力传感器范围(N)
@dataclass
class HarvestParameters:
"""采摘参数配置"""
# 视觉检测参数
detection_confidence: float = 0.7 # 检测置信度阈值
ripeness_threshold: float = 0.8 # 成熟度阈值
min_fruit_size: float = 30.0 # 最小果实尺寸(mm)
# 采摘策略参数
harvest_sequence: str = "ripe_first" # 采摘顺序
batch_size: int = 10 # 批次大小
max_attempts: int = 3 # 最大尝试次数
# 力控制参数
approach_force: float = 2.0 # 接近力(N)
grip_force: float = 5.0 # 抓取力(N)
detach_force: float = 8.0 # 脱离力(N)
twist_angle: float = 15.0 # 扭转角度(度)
pull_distance: float = 20.0 # 拉动距离(mm)
# 安全参数
max_collision_force: float = 15.0 # 最大碰撞力(N)
obstacle_clearance: float = 50.0 # 障碍物安全距离(mm)
emergency_stop_time: float = 0.1 # 急停响应时间(s)
class OrchardRobotConfig:
"""果园机器人配置主类"""
def __init__(self, config_file: str = None):
"""
初始化配置
参数:
config_file: 配置文件路径
"""
# 水果属性数据库
self.fruit_properties = self._init_fruit_properties()
# 机器人规格
self.robot_specs = RobotSpecifications()
# 采摘参数
self.harvest_params = HarvestParameters()
# 果园环境参数
self.orchard_params = {
'row_spacing': 3000.0, # 行间距(mm)
'tree_spacing': 2000.0, # 树间距(mm)
'tree_height': 2500.0, # 树高(mm)
'canopy_diameter': 1800.0, # 树冠直径(mm)
'ground_slope': 5.0 # 地面坡度(度)
}
# 控制系统参数
self.control_params = {
'control_frequency': 100.0, # 控制频率(Hz)
'pid_gains': {
'position': {'Kp': 2.0, 'Ki': 0.1, 'Kd': 0.5},
'force': {'Kp': 1.0, 'Ki': 0.05, 'Kd': 0.2}
},
'filter_params': {
'kalman_q': 0.01, # 过程噪声协方差
'kalman_r': 0.1, # 测量噪声协方差
'lowpass_cutoff': 10.0 # 低通滤波器截止频率(Hz)
}
}
# 路径规划参数
self.planning_params = {
'search_resolution': 10.0, # 搜索分辨率(mm)
'safety_margin': 30.0, # 安全裕度(mm)
'max_planning_time': 5.0, # 最大规划时间(s)
'smooth_iterations': 50 # 路径平滑迭代次数
}
# 加载自定义配置
if config_file:
self.load_config(config_file)
def _init_fruit_properties(self) -> Dict[FruitType, FruitProperties]:
"""初始化水果属性数据库"""
properties = {}
properties[FruitType.APPLE] = FruitProperties(
fruit_type=FruitType.APPLE,
min_diameter=60.0,
max_diameter=90.0,
min_weight=150.0,
max_weight=300.0,
stem_length=20.0,
detachment_force=7.5,
color_profile={
'unripe': [100, 150, 50], # 绿色
'ripe': [200, 50, 50], # 红色
'over_ripe': [150, 100, 100] # 深红色
}
)
properties[FruitType.PEAR] = FruitProperties(
fruit_type=FruitType.PEAR,
min_diameter=55.0,
max_diameter=85.0,
min_weight=120.0,
max_weight=280.0,
stem_length=25.0,
detachment_force=6.5,
color_profile={
'unripe': [150, 200, 100],
'ripe': [200, 200, 100],
'over_ripe': [180, 150, 100]
}
)
properties[FruitType.ORANGE] = FruitProperties(
fruit_type=FruitType.ORANGE,
min_diameter=65.0,
max_diameter=85.0,
min_weight=100.0,
max_weight=200.0,
stem_length=10.0,
detachment_force=8.0,
color_profile={
'unripe': [150, 150, 50],
'ripe': [255, 165, 0],
'over_ripe': [200, 100, 0]
}
)
properties[FruitType.PEACH] = FruitProperties(
fruit_type=FruitType.PEACH,
min_diameter=60.0,
max_diameter=80.0,
min_weight=100.0,
max_weight=200.0,
stem_length=8.0,
detachment_force=5.0,
color_profile={
'unripe': [200, 220, 150],
'ripe': [255, 200, 150],
'over_ripe': [200, 150, 100]
}
)
return properties
def load_config(self, config_file: str):
"""
从文件加载配置
参数:
config_file: 配置文件路径
"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
# 更新机器人规格
if 'robot_specs' in config_data:
specs = config_data['robot_specs']
self.robot_specs.arm_dof = specs.get('arm_dof', 6)
self.robot_specs.max_reach = specs.get('max_reach', 1500.0)
self.robot_specs.max_payload = specs.get('max_payload', 5.0)
# 更新采摘参数
if 'harvest_params' in config_data:
params = config_data['harvest_params']
self.harvest_params.detection_confidence = params.get('detection_confidence', 0.7)
self.harvest_params.ripeness_threshold = params.get('ripeness_threshold', 0.8)
self.harvest_params.grip_force = params.get('grip_force', 5.0)
# 更新果园参数
if 'orchard_params' in config_data:
orchard = config_data['orchard_params']
self.orchard_params['row_spacing'] = orchard.get('row_spacing', 3000.0)
self.orchard_params['tree_spacing'] = orchard.get('tree_spacing', 2000.0)
print(f"配置已从 {config_file} 加载")
except FileNotFoundError:
print(f"配置文件 {config_file} 未找到,使用默认配置")
except json.JSONDecodeError as e:
print(f"配置文件解析错误: {e}")
def save_config(self, config_file: str):
"""
保存配置到文件
参数:
config_file: 配置文件路径
"""
config_data = {
'robot_specs': {
'arm_dof': self.robot_specs.arm_dof,
'max_reach': self.robot_specs.max_reach,
'max_payload': self.robot_specs.max_payload,
'max_speed': self.robot_specs.max_speed
},
'harvest_params': {
'detection_confidence': self.harvest_params.detection_confidence,
'ripeness_threshold': self.harvest_params.ripeness_threshold,
'grip_force': self.harvest_params.grip_force,
'detach_force': self.harvest_params.detach_force
},
'orchard_params': self.orchard_params,
'control_params': self.control_params
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=2, ensure_ascii=False)
print(f"配置已保存到 {config_file}")
def validate_config(self) -> Tuple[bool, List[str]]:
"""
验证配置有效性
返回:
(是否有效, 错误信息列表)
"""
errors = []
# 验证机械臂参数
if self.robot_specs.max_reach < 500:
errors.append("机械臂工作半径太小")
if self.robot_specs.max_payload < 1.0:
errors.append("机械臂负载能力不足")
# 验证采摘参数
if self.harvest_params.grip_force > self.robot_specs.gripper_force_range[1]:
errors.append("抓取力超出夹爪能力范围")
if self.harvest_params.detach_force > self.harvest_params.max_collision_force:
errors.append("脱离力超出安全限制")
# 验证果园参数
if self.orchard_params['row_spacing'] < 2000:
errors.append("行间距太小,机器人无法通过")
if self.orchard_params['tree_spacing'] < 1500:
errors.append("树间距太小,影响作业空间")
return len(errors) == 0, errors
def get_fruit_property(self, fruit_type: FruitType) -> Optional[FruitProperties]:
"""
获取水果属性
参数:
fruit_type: 水果类型
返回:
水果属性,如果未找到返回None
"""
return self.fruit_properties.get(fruit_type)
def get_detachment_strategy(self, fruit_type: FruitType) -> Dict:
"""
获取采摘策略
参数:
fruit_type: 水果类型
返回:
采摘策略参数
"""
fruit_prop = self.get_fruit_property(fruit_type)
if not fruit_prop:
return {}
# 根据水果类型制定采摘策略
strategies = {
FruitType.APPLE: {
'approach_speed': 50.0, # 接近速度(mm/s)
'grip_duration': 1.0, # 抓取持续时间(s)
'twist_required': True, # 是否需要扭转
'pull_required': True, # 是否需要拉动
'detach_angle': 90.0 # 脱离角度(度)
},
FruitType.PEACH: {
'approach_speed': 30.0,
'grip_duration': 0.5,
'twist_required': False,
'pull_required': True,
'detach_angle': 45.0
},
FruitType.GRAPE: {
'approach_speed': 20.0,
'grip_duration': 2.0,
'twist_required': False,
'pull_required': False,
'cut_required': True # 需要切割
}
}
return strategies.get(fruit_type, {
'approach_speed': 40.0,
'grip_duration': 1.0,
'twist_required': True,
'pull_required': True,
'detach_angle': 60.0
})
def calculate_grip_force(self, fruit_type: FruitType, fruit_size: float) -> float:
"""
计算合适抓取力
参数:
fruit_type: 水果类型
fruit_size: 果实尺寸(mm)
返回:
建议抓取力(N)
"""
fruit_prop = self.get_fruit_property(fruit_type)
if not fruit_prop:
return self.harvest_params.grip_force
# 基于果实大小和类型计算抓取力
size_ratio = (fruit_size - fruit_prop.min_diameter) / \
(fruit_prop.max_diameter - fruit_prop.min_diameter)
size_ratio = max(0.0, min(1.0, size_ratio))
# 不同水果的硬度系数
hardness_factors = {
FruitType.APPLE: 1.2,
FruitType.PEAR: 1.1,
FruitType.ORANGE: 1.5,
FruitType.PEACH: 0.8,
FruitType.GRAPE: 0.5
}
hardness = hardness_factors.get(fruit_type, 1.0)
# 计算抓取力
base_force = 3.0
size_adjustment = size_ratio * 2.0
hardness_adjustment = hardness * 2.0
grip_force = base_force + size_adjustment + hardness_adjustment
# 限制在夹爪能力范围内
grip_force = max(self.robot_specs.gripper_force_range[0],
min(self.robot_specs.gripper_force_range[1], grip_force))
return grip_force
def get_safety_limits(self) -> Dict:
"""
获取安全限制参数
返回:
安全限制字典
"""
return {
'max_velocity': self.robot_specs.max_speed * 0.8, # 速度限制
'max_acceleration': 2.0, # 最大加速度(m/s²)
'max_jerk': 10.0, # 最大加加速度(m/s³)
'force_limit': self.harvest_params.max_collision_force,
'torque_limit': 20.0, # 扭矩限制(Nm)
'collision_threshold': 5.0 # 碰撞检测阈值(N)
}
2. 视觉控制器 (controllers/vision_controller.py)
"""
视觉控制器 - 果实检测与成熟度识别
基于深度学习和计算机视觉的智能感知
"""
import cv2
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import torch
import torchvision.transforms as transforms
from PIL import Image
import time
import json
from datetime import datetime
@dataclass
class FruitDetection:
"""果实检测结果"""
bbox: Tuple[float, float, float, float] # 边界框 [x1, y1, x2, y2]
confidence: float # 检测置信度
fruit_type: str # 水果类型
ripeness_level: str # 成熟度等级
ripeness_score: float # 成熟度分数(0-1)
position_3d: Optional[Tuple[float, float, float]] = None # 三维位置
size_estimate: Optional[float] = None # 尺寸估计(mm)
occlusion_level: float = 0.0 # 遮挡程度(0-1)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'bbox': self.bbox,
'confidence': self.confidence,
'fruit_type': self.fruit_type,
'ripeness_level': self.ripeness_level,
'ripeness_score': self.ripeness_score,
'position_3d': self.position_3d,
'size_estimate': self.size_estimate,
'occlusion_level': self.occlusion_level
}
@dataclass
class TreeAnalysis:
"""果树分析结果"""
tree_id: int
fruit_count: int
ripe_count: int
detection_map: List[FruitDetection] # 所有检测到的果实
canopy_volume: float # 树冠体积估计
health_score: float # 果树健康评分
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'tree_id': self.tree_id,
'fruit_count': self.fruit_count,
'ripe_count': self.ripe_count,
'ripe_percentage': self.ripe_count / max(1, self.fruit_count) * 100,
'canopy_volume': self.canopy_volume,
'health_score': self.health_score
}
class VisionController:
"""
视觉控制器
实现果实检测、成熟度识别、三维定位等功能
"""
def __init__(self, config, model_path: str = None):
"""
初始化视觉控制器
参数:
config: 系统配置
model_path: 模型文件路径
"""
self.config = config
# 相机参数
self.camera_matrix = None
self.dist_coeffs = None
self.camera_pose = None
# 模型初始化
self.detection_model = None
self.classification_model = None
self._init_models(model_path)
# 图像预处理
self.transform = transforms.Compose([
transforms.Resize((640, 640)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 检测结果缓存
self.detection_cache = {}
self.cache_size = 100
# 性能统计
self.performance_stats = {
'total_frames': 0,
'avg_process_time': 0,
'detection_rate': 0,
'recall_rate': 0
}
# 颜色空间转换
self.color_ranges = self._init_color_ranges()
print("视觉控制器初始化完成")
def _init_models(self, model_path: str):
"""初始化深度学习模型"""
try:
# 加载检测模型
if model_path and model_path.endswith('.pt'):
# 使用YOLO模型
self.detection_model = torch.hub.load('ultralytics/yolov5', 'custom',
path=model_path, force_reload=False)
print(f"YOLO模型从 {model_path} 加载")
else:
# 使用默认模型
self.detection_model = torch.hub.load('ultralytics/yolov5', 'yolov5s',
pretrained=True)
# 微调模型参数用于水果检测
self._adjust_model_for_fruits()
print("使用YOLOv5s默认模型")
# 加载分类模型(成熟度分类)
self.classification_model = self._init_classification_model()
except Exception as e:
print(f"模型加载失败: {e}, 使用传统方法")
self.detection_model = None
def _init_classification_model(self):
"""初始化成熟度分类模型"""
# 这里可以加载预训练的成熟度分类模型
# 目前使用基于颜色的简单分类
return None
def _init_color_ranges(self) -> Dict:
"""初始化颜色空间范围"""
return {
'apple': {
'ripe': [(0, 50, 50), (10, 255, 255), (170, 50, 50), (180, 255, 255)],
如果你觉得这个工具好用,欢迎关注我!