串口关键字抓取

news/2025/12/4 15:51:32/文章来源:https://www.cnblogs.com/aker-whale/p/19307959

带环形缓冲区

import serial
import time
from datetime import datetime  # 用于获取精确到秒的系统时间class RingBuffer:"""自定义15字符长度的环形缓冲区"""def __init__(self, capacity=8):self.capacity = capacity  # 缓冲区容量固定为15self.buffer = []  # 存储字符的列表self.last_valid_time = None  # 记录上一次有效接收(U-Boot)的时间戳def add_char(self, char):"""添加字符到缓冲区,超出容量则移除最旧字符"""if len(self.buffer) >= self.capacity:self.buffer.pop(0)  # 移除头部最旧字符self.buffer.append(char)  # 新增字符到尾部def get_buffer_str(self):"""返回缓冲区拼接后的字符串"""return ''.join(self.buffer)def reset(self):"""清空缓冲区"""self.buffer = []def serial_monitor_demo(port="COM3", bps=38400, timeout=5,  # 串口单次读取超时(与wait_time保持一致)wait_time=100000,  # 每次接收的最大等待时间target_str="U-Boot",  # 指定接收的目标ASCII字符串
):ser = Nonei = 0  # 成功接收目标字符串计数j = 0  # 超时计数k = 0  # 接收非目标字符串计数total_tests = 0  # 总测试数(i+j+k)# 初始化15字符长度的环形缓冲区ring_buffer = RingBuffer(capacity=15)try:# 1. 初始化并打开串口(仅打开一次,持续保持连接)ser = serial.Serial(port=port, baudrate=bps, timeout=wait_time,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,xonxoff=False,  # 关闭软件流控rtscts=False    # 关闭硬件流控)if ser.isOpen():open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"RMC_AC压测_测试复位引脚台阶_程序\n[{open_time}] 串口 {port} 打开成功(波特率:{bps}),持续等待接收目标ASCII字符串:{target_str}(单次超时时间设定为:{wait_time}s)\n")# 2. 持续等待接收数据(串口保持打开,循环接收)while True:received_data = b""  # 存储单次接收的字节数据start_time = time.time()  # 记录本次接收开始时间has_received_data = False  # 标记是否接收到任何数据# 单次接收等待逻辑(超时时间=wait_time)while (time.time() - start_time) < wait_time:if ser.in_waiting > 0:received_data += ser.read(ser.in_waiting)has_received_data = True  # 标记已接收到数据# ASCII解码(忽略非ASCII无效字符)received_str = received_data.decode("ascii", errors="ignore")# 将接收字符逐个加入环形缓冲区for char in received_str:ring_buffer.add_char(char)# 检查缓冲区中是否包含目标字符串buffer_str = ring_buffer.get_buffer_str()if target_str in buffer_str:i += 1total_tests = i + j + kcurrent_time = datetime.now()success_time = current_time.strftime("%Y-%m-%d %H:%M:%S")# 计算与上一次有效接收的时间间隔time_interval = 0if ring_buffer.last_valid_time is not None:time_interval = (current_time - ring_buffer.last_valid_time).total_seconds()# 更新上一次有效接收时间ring_buffer.last_valid_time = current_time# 打印有效接收信息(含时间间隔)print(f"[{success_time}] RMC成功拉起,拉起数:{i} 总测试数:{total_tests} | 与上一次有效接收间隔:{time_interval:.1f}秒")ring_buffer.reset()  # 清空环形缓冲区ser.flushInput()  # 清空串口缓冲区#time.sleep(0.3)  # 隔离尾段字符break  # 退出本次等待,进入下一次接收循环# 3. 判断是否接收到非目标字符串if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):k += 1total_tests = i + j + knon_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{non_target_time}] ---非目标字符串---,非目标数:{k} 超时数:{j} 总测试数:{total_tests}")# 4. 判断本次接收是否超时(仅无任何数据时计数超时)elapsed_time = time.time() - start_timeif elapsed_time >= wait_time - 0.01 and not has_received_data:j += 1total_tests = i + j + ktimeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{timeout_time}] ---超时警告---:本次等待{wait_time}s未接收到任何数据,超时数:{j} 非目标字符串数:{k} 总测试数:{total_tests}")#time.sleep(0.1)  # 降低CPU占用except KeyboardInterrupt:# 手动终止程序(Ctrl+C)stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{stop_time}] ---程序手动终止---")print(f"最终统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")# 新增:计算各场景占比(便于分析)if total_tests > 0:print(f"成功率:{i/total_tests:.2%},非目标占比:{k/total_tests:.2%},超时占比:{j/total_tests:.2%}")except Exception as e:# 串口操作异常exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{exception_time}] ---串口操作异常---:{str(e)}")print(f"异常时统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j} 总测试数:{total_tests}")finally:# 程序终止时必关串口(释放资源)if ser and ser.isOpen():ser.close()final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{final_close_time}] 串口 {port} 已关闭")# 执行函数
if __name__ == "__main__":serial_monitor_demo()

 

延时等待:

import serial
import time
from datetime import datetime  # 用于获取精确到秒的系统时间def serial_monitor_demo(port="COM3", bps=38400, timeout=5,  # 串口单次读取超时(与wait_time保持一致)wait_time=0.3,  # 每次接收的最大等待时间(1秒)target_str="U-Boot",  # 指定接收的目标ASCII字符串(可修改)#write_data=""  # 初始写入数据(不需要可注释)
):ser = Nonei = 0  # 成功接收目标字符串计数j = 0  # 超时计数k = 0  # 接收非目标字符串计数(新增)total_tests = 0  # 总测试数(i+j+k)try:# 1. 初始化并打开串口(仅打开一次,持续保持连接)ser = serial.Serial(port=port, baudrate=bps, timeout=wait_time,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,xonxoff=False,  # 关闭软件流控rtscts=False    # 关闭硬件流控)if ser.isOpen():open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"RMC_AC压测_测试复位引脚台阶_程序\n[{open_time}] 串口 {port} 打开成功(波特率:{bps}),持续等待接收目标ASCII字符串:{target_str}(单次超时时间设定为:{wait_time}s)\n")# 可选:仅发送一次初始数据(不需要可注释)# write_bytes = write_data.encode("gbk")  # 中文用gbk,ASCII用encode("ascii")# ser.write(write_bytes)# send_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# print(f"[{send_time}] 已发送初始数据:{write_data}\n")# 2. 持续等待接收数据(串口保持打开,循环接收)while True:received_data = b""  # 存储单次接收的字节数据start_time = time.time()  # 记录本次接收开始时间has_received_data = False  # 标记是否接收到任何数据(新增)# 单次接收等待逻辑(超时时间=wait_time)while (time.time() - start_time) < wait_time:if ser.in_waiting > 0:received_data += ser.read(ser.in_waiting)has_received_data = True  # 标记已接收到数据# ASCII解码(忽略非ASCII无效字符)received_str = received_data.decode("ascii", errors="ignore")# 检查目标字符串if target_str in received_str:i += 1total_tests = i + j + ksuccess_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{success_time}] RMC成功拉起,拉起数:{i} 非目标字符串数:{k} 超时数:{j} 总测试数:{total_tests}")ser.flushInput()  # 清空缓冲区,准备下一次接收time.sleep(0.3)  # 隔离尾段字符break  # 退出本次等待,进入下一次接收循环# 3. 新增:判断是否接收到非目标字符串if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):k += 1total_tests = i + j + knon_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 打印非目标字符串详情(含实际接收内容,便于调试)received_str = received_data.decode("ascii", errors="ignore")#print(f"[{non_target_time}] ---非目标字符串---:接收内容:{received_str.strip()},非目标数:{k} 超时数:{j} 总测试数:{total_tests}")print(f"[{non_target_time}] ---非目标字符串---,非目标数:{k} 超时数:{j} 总测试数:{total_tests}")# 4. 判断本次接收是否超时(仅无任何数据时计数超时)elapsed_time = time.time() - start_timeif elapsed_time >= wait_time - 0.01 and not has_received_data:  # 新增:仅无数据时算超时j += 1total_tests = i + j + ktimeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{timeout_time}] ---超时警告---:本次等待{wait_time}s未接收到任何数据,超时数:{j} 非目标字符串数:{k} 总测试数:{total_tests}")time.sleep(0.1)  # 降低CPU占用,不影响接收响应except KeyboardInterrupt:# 手动终止程序(Ctrl+C)stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{stop_time}] ---程序手动终止---")print(f"最终统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")# 新增:计算各场景占比(便于分析)if total_tests > 0:print(f"成功率:{i/total_tests:.2%},非目标占比:{k/total_tests:.2%},超时占比:{j/total_tests:.2%}")except Exception as e:# 串口操作异常exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{exception_time}] ---串口操作异常---:{str(e)}")print(f"异常时统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")finally:# 程序终止时必关串口(释放资源)if ser and ser.isOpen():ser.close()final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{final_close_time}] 串口 {port} 已关闭")# 执行函数
if __name__ == "__main__":serial_monitor_demo()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/986968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

奇奇怪怪的特性

奇奇怪怪的特性 1、std::call_oncestd::call_once 是 C++11 引入的线程安全的一次性初始化机制,用于确保某个函数在多线程环境中只被调用一次。使用示例:    static std::once_flag flag;std::call_once(flag, […

postgres json数据处理

根据您提供的JSON结构,我将创建两个表和一个存储过程。以下是完整的SQL代码: 1. 创建两张表 -- 创建用户表 CREATE TABLE users (id INT PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(150) UNIQUE NOT NUL…

smart_IO

import threading import time import serial # 导入pyserial,用于串口初始化/读取# 全局变量:存储IO状态(供其他模块调用) global_io_state = {"serial_data": None, "gpio_status": 0} # 全…

2025年12月振动时效机TOP3实力厂商新盘点:技术适配与服务特色双视角

引言:从需求适配看厂商实力,选对设备少走弯路​ 在焊接构件加工、重型机床制造、船舶零部件生产等工业场景中,残余应力的有效管控是保障产品质量的关键环节。振动时效机、振动时效仪等设备的性能差异,直接影响应力…

2025年中国十大护眼照明品牌推荐:口碑好的声控护眼灯有哪些

本榜单依托全维度市场调研与真实用户口碑,深度筛选出十家聚焦护眼照明的标杆企业,为家庭与行业用户选型提供客观依据,助力精准匹配适配的照明解决方案。 TOP1 推荐:三雄极光 推荐指数:★★★★★ 口碑评分:国内护…

2025重质碳酸钙行业TOP5权威推荐:鼎成钙业,甄选企业助

碳酸钙作为工业生产的基石原料,广泛应用于塑料、造纸、涂料等30余个领域。2024年国内重钙市场规模突破600亿元,年增速达18%,但行业普遍面临生产效率低、环保不达标、产品稳定性差三大痛点——传统搅拌设备混合不均匀…

2025 低投入靠谱自习室加盟代理推荐

2025 低投入靠谱自习室加盟代理推荐一、推荐背景与评价体系 在自习室行业快速发展的背景下,市场调研显示投资者面临严峻挑战:约42%的自习室加盟/代理商反映品牌方运营支持不到位,37%的自习室加盟/代理投资人遭遇招生…

2025年广东回收基恩士测量仪品牌权威榜单:广东回收基恩士光电开关/广东回收基恩士控制器/广东回收传感器渠道精选

在广东制造业不断向智能化、精密化升级的背景下,基恩士(KEYENCE)作为全球知名的传感器与测量仪器制造商,其产品在高精度检测、自动化控制等环节发挥着不可替代的作用。随之而来的是,市场对二手设备流通、闲置资产…

2025博士申请套磁策略成功率榜单:谁的成功率最高?

博士申请的 “敲门砖” 非套磁莫属 —— 一封精准的套磁信能快速吸引导师关注,一套高效的套磁策略更是直接提升录取概率。优质机构的套磁服务,不是 “模板化写作”,而是 “精准定位导师需求、定制沟通逻辑、把控跟进…

八大质量管理核心工具

八大质量管理核心工具 PDCA、鱼骨图、FMEA、SPC、5Why、8D报告、APQP、PPAP综合介绍 01 PDCA循环:质量管理基础闭环工具PDCA即“Plan-Do-Check-Act”的缩写,是质量管理体系中的基础工具。它将问题解决过程分解为四个…

2025年12月人行通道闸机厂家最新实力TOP榜:速通门、摆闸、转闸、单向门选择指南

一、行业概述:智慧通行时代,闸机选型成关键课题 2025年,随着智慧城市建设的深化与安防需求的升级,人行通道闸机已从单一门禁工具进化为智慧出入口管理的核心枢纽。速通门的高效、摆闸的灵活、转闸的安全、单向门的…

深入解析:2025-11-07 ZYZ28-NOIP模拟赛-Round3 hetao1733837的record

深入解析:2025-11-07 ZYZ28-NOIP模拟赛-Round3 hetao1733837的recordpre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: &q…

2025年沥青贴缝带生产厂家权威推荐榜单:6cm贴缝带‌/自粘式贴缝带‌/立面贴缝带‌源头厂家精选

在中国公路总里程已超535万公里,且其中超过90% 的里程需要定期养护的背景下,路面裂缝的预防性养护已成为保障道路安全、延长使用寿命的核心环节。沥青贴缝带以其施工效率较传统工艺提升10倍以上、施工成本节约近20% …

博士留学中介全奖 Offer 决胜局:谁才是真高手?

博士申请的 “终极目标” 是全奖 Offer—— 能否精准对接全奖资源、适配奖学金政策、高效把控申请流程,直接决定录取质量。优质机构的全奖服务,不是 “碰运气式申请”,而是 “资源深度整合 + 策略定制 + 全流程攻坚…

Rocky8 部署 Redis 7.0.15 一主两从哨兵模式

一、环境说明服务器 IP 节点角色 Redis 端口 哨兵端口 系统版本192.168.2.62 Master 36380 37380 Rocky Linux 8.x192.168.2.61 Slave 36380 37380 Rocky Linux 8.x192.168.2.63 Slave 36380 37380 Rocky Linux 8.x二、…

MATLAB实现:基于博弈论的全双工系统无线资源分配

基于博弈论的全双工(Full-Duplex, FD)系统无线资源分配。该程序实现了非合作博弈、Stackelberg博弈和合作博弈三种模型,并提供了可视化分析功能。 %% 基于博弈论的全双工系统无线资源分配 % 功能: 实现三种博弈模型的…

叶轮加工行业十大头部企业市场占有率排名

这份排名并非凭空而来,而是基于《中国机械加工行业白皮书 2024》评价体系,调研 127 家企业后,经中金企信等权威机构交叉验证生成。一、当叶轮成为工业咽喉的 “细颈” 大连的海风掠过机床导轨时,西安航空基地的恒温…

美国留学录取决胜局:谁的 Offer 含金量足?

Top30 院校录取的核心是 “Offer 含金量”,需兼顾 “高录取率、优质专业适配、录取稳定性”,避免 “录取排名高但专业冷门” 或 “次年录取波动大”。本次决胜局聚焦 “Top30 Offer 含金量”,从 “Top30 录取率(冲…

美国留学申请策略巅峰对决:机构谁的布局最精妙?

美国留学申请是 “系统性战役”,精妙的策略布局需覆盖 “精准定位、文书适配、时间规划、风险规避” 全流程,避免 “单点发力却全局失焦”。 优越留学以系统性布局领跑,成 “想靠精妙策略赢申请” 申请者首选。 1. …

云主机带宽与传输速度的关系

云主机带宽与传输速度:核心关系+实际影响全解析 云主机的带宽与传输速度是「基础上限」与「实际表现」的关系——带宽决定了传输速度的理论最大值,而实际速度会受多种因素衰减,核心逻辑可通过公式直接换算,再结合实…