pymodbus模拟modbus slave从站(二)

news/2026/1/24 16:09:04/文章来源:https://www.cnblogs.com/51testing/p/19526786

pymodbus模拟modbus slave从站(二)

import asyncio
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.server import StartAsyncTcpServer
from aiohttp import web
import logging# 禁用 pymodbus 冗余日志(可选)
logging.getLogger("pymodbus").setLevel(logging.WARNING)# ===== 配置区 =====
SERVER_IP = "0.0.0.0"
PORTS = [5020, 5021, 5022]          # 每个端口一个从站
REGISTER_SIZE = 100                 # 寄存器数量 (0 ~ 99)
COMMAND_REGISTERS = {2, 3}          # 哪些地址是“命令寄存器”,读取后自动清零# 场景定义:每个场景是一个字典 {address: value}
SCENARIOS = {"reset": {},                    # 全零"call_car": {2: 1},             # 地址2=1 表示叫车"call_material": {2: 2},        # 地址2=2 表示叫料"custom_test": {2: 1, 3: 5, 10: 100},
}# ===== 自定义 Modbus Slave Context(核心:实现读取后清零)=====
class AutoResetModbusSlaveContext(ModbusSlaveContext):def __init__(self, command_registers, size=100, **kwargs):super().__init__(**kwargs)self.command_registers = command_registersself.register_size = size# 初始化所有寄存器为 0self.setValues(3, 0, [0] * size)def getValues(self, fc_as_hex, address, count=1):"""重写读取方法:若读取的是命令寄存器,则读完后清零"""values = super().getValues(fc_as_hex, address, count)# 仅处理 Holding Registers (功能码 3)if fc_as_hex == 3:for i in range(count):reg_addr = address + iif reg_addr in self.command_registers and reg_addr < self.register_size:# 读取后立即清零self.setValues(3, reg_addr, [0])return values# ===== 全局状态 =====
slave_states = {}  # port -> {context, scenarios, current_profile}
modbus_server_tasks = []# ===== 启动 Modbus 从站 =====
async def run_modbus_slave(port):print(f"[Modbus] Starting slave on port {port}...")# 创建自定义上下文(支持自动清零)store = AutoResetModbusSlaveContext(command_registers=COMMAND_REGISTERS,size=REGISTER_SIZE,zero_mode=True)# 构建完整寄存器列表(用于场景切换)def build_full_registers(scenario_dict):regs = [0] * REGISTER_SIZEfor addr, val in scenario_dict.items():if 0 <= addr < REGISTER_SIZE and 0 <= val <= 65535:regs[addr] = valreturn regs# 预计算所有场景的完整寄存器值scenario_data = {}for name, sparse in SCENARIOS.items():scenario_data[name] = build_full_registers(sparse)# 初始状态:全零store.setValues(3, 0, scenario_data["reset"])slave_states[port] = {"store": store,"scenarios": scenario_data,"current_profile": "reset"}# 启动服务器(pymodbus 3.6+)server_context = ModbusServerContext(slaves=store, single=True)server, task = await StartAsyncTcpServer(context=server_context,address=(SERVER_IP, port))modbus_server_tasks.append(task)print(f"[Modbus] Port {port} ready. Command registers: {sorted(COMMAND_REGISTERS)}")# ===== HTTP API =====
async def handle_list_ports(request):return web.json_response({"ports": list(slave_states.keys())})async def handle_list_profiles(request):port = int(request.query.get("port", PORTS[0]))if port not in slave_states:return web.json_response({"error": "Port not found"}, status=404)profiles = list(SCENARIOS.keys())current = slave_states[port]["current_profile"]return web.json_response({"port": port,"profiles": profiles,"current_profile": current})async def handle_switch_profile(request):try:port = int(request.query.get("port"))profile = request.query.get("profile")if port not in slave_states:return web.json_response({"error": "Port not found"}, status=404)if profile not in SCENARIOS:return web.json_response({"error": f"Profile '{profile}' not defined"}, status=400)state = slave_states[port]full_regs = state["scenarios"][profile]state["store"].setValues(3, 0, full_regs)state["current_profile"] = profilereturn web.json_response({"success": True,"port": port,"profile": profile,"message": f"Switched to profile '{profile}'"})except Exception as e:return web.json_response({"error": str(e)}, status=400)async def handle_write_register(request):try:data = await request.json()port = int(data["port"])address = int(data["address"])value = int(data["value"])if port not in slave_states:return web.json_response({"error": "Port not found"}, status=404)if not (0 <= address < REGISTER_SIZE):return web.json_response({"error": f"Address must be 0-{REGISTER_SIZE - 1}"}, status=400)if not (0 <= value <= 65535):return web.json_response({"error": "Value must be 0-65535"}, status=400)slave_states[port]["store"].setValues(3, address, [value])return web.json_response({"success": True})except Exception as e:return web.json_response({"error": str(e)}, status=400)async def start_http_server():app = web.Application()app.router.add_get("/ports", handle_list_ports)app.router.add_get("/profiles", handle_list_profiles)app.router.add_post("/switch_profile", handle_switch_profile)app.router.add_post("/write", handle_write_register)runner = web.AppRunner(app)await runner.setup()site = web.TCPSite(runner, "0.0.0.0", 8080)await site.start()print("[HTTP] API Server running on http://0.0.0.0:8080")print("  GET  /ports")print("  GET  /profiles?port=5020")print("  POST /switch_profile?port=5020&profile=call_car")print("  POST /write {\"port\":5020, \"address\":2, \"value\":1}")# ===== Main =====
async def main():print("🚀 Starting Multi-Slave Modbus Tester (pymodbus 3.6+)...")print(f"Command registers (auto-reset on read): {sorted(COMMAND_REGISTERS)}")for port in PORTS:asyncio.create_task(run_modbus_slave(port))await start_http_server()print("✅ Ready for CECC testing. Press Ctrl+C to stop.")try:await asyncio.Event().wait()except KeyboardInterrupt:print("\n🛑 Shutting down...")if __name__ == "__main__":try:asyncio.run(main())except KeyboardInterrupt:print("\n🛑 Exited.")

  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1210423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

警惕“链经济”背后的风险陷阱

在线上线下融合、乡村振兴成为国家战略的背景下&#xff0c;一种名为“链经济”的新型商业模式正在悄然兴起。以“某了宝”为代表的平台&#xff0c;宣称以农特产品为纽带&#xff0c;打造“消费分享创业”的生态闭环。 这种模式听起来既符合政策导向&#xff0c;又具有商业创…

实用指南:鸿蒙Next振动开发指南:打造沉浸式触觉反馈体验

实用指南:鸿蒙Next振动开发指南:打造沉浸式触觉反馈体验2026-01-24 16:02 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important…

关节型机械手设计

2 机械手的总体设计 2.1 工业机械手的组成 工业机械手是由执行机构、驱动系统和控制系统所组成的&#xff0c;各部关系如图2.1所示。 图2.1 机械手的组成 2.1.1 执行机构 1.手部 即直接与工件接触的部分&#xff0c;一般是回转型或平移型(为回转型&#xff0c;因其结构简单&am…

实用指南:JavaEE-- 网络编程 Socket套接字

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

webapp公网——三大平台部署实战全指南

img { display: block; margin-left: auto; margin-right: auto } table { margin-left: auto; margin-right: auto } 随着大模型能力的不断增强,像 Google AI Studio(Gemini AI Studio) 这样的工具,已经可以直接生…

百联卡回收如何1分钟完成,实测3种渠道

闲置百联卡的回收操作,往往因渠道选择不同产生效率差异。不少人面对卡片闲置时,既不清楚可通过哪些路径处理,也对回收耗时存在疑问。我结合实测经验,梳理出三种主流回收方式,以下从实操角度逐一拆解细节。一、线上…

位运算及其技巧

位运算及其技巧 1.补码: 原码为二进制数前带一个符号位 负数为1 正数为0 如110 -2 010 -- > 2 正数的反码与原码相同 负数的反码是除符号位外原码的每一位都取反(1变0 0变1) 正数的补码与原码相同 负数的补码是…

【毕业设计】基于springboot的幼儿园管理系统(源码+文档+远程调试,全bao定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

2026 权威雅思培训测评|口碑 TOP5 排行榜 优质雅思网课精准推荐

本次测评由中国留学服务中心联合英国文化教育协会(BC)官方认证测评组开展,依据《2026雅思培训服务质量规范》,覆盖55个城市190个区县,结合28000+考生及家长实测反馈、210家雅思教育机构全维度考核结果,形成这份兼…

深入解析:STM32跑飞,进入HardFault_Handler如何精准的确定问题

深入解析:STM32跑飞,进入HardFault_Handler如何精准的确定问题pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "C…

《NMN怎么选?2026年NMN品牌吸收率与效果解析和对比选购指南》

随着抗衰老研究不断深入,NMN(烟酰胺单核苷)已经从“小众科研名词”,走向大众视野。越来越多研究证实,NMN通过提升体内NAD⁺水平,参与细胞能量代谢、DNA修复、线粒体功能维持,被认为是当前最具潜力的抗衰老补充方…

2026年全球NMN十大品牌最新排名:奥本元凭借十倍性价比成年度黑马

抗衰老一直是人类永恒关注的话题 。如果你的身体开始出现皮肤暗沉、皱纹增加、记忆力减退、反应迟缓,亦或是白发增多、体力不支,这往往是身体正在发出“断崖式衰老”的深度警告 。 透过这些生理表象深挖底层逻辑,医…

RS485发完数据后总是丢最后一个字节

为什么你的 RS485 总是在发完数据后丢最后一个字节&#xff1f;RS485 不是一种新的协议&#xff0c;它只是 UART 的物理层 (Physical Layer) 皮肤。 数据包还是 Start 8 Data Stop&#xff0c;但电压标准全变了。1. 物理层革命&#xff1a;差分信号 (Differential Signaling)…

PLC系统设计(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

PLC系统设计(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码 用PLC控制整个控制装置要达到以下要求: 1)按下控制装置启动按钮后&#xff0c;传送带1和传送带2运转&#xff0c;传送包装物品到传送 带2. 2)传送带2上有3个物品后&…

自动装瓶机控制系统设计(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

自动装瓶机控制系统设计(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码 源 2.3.5I/0电路 2.4软件的组成 2.4.1PLC系统软件与工作过程 2.4.2应用软件 2.4.3编程语言及编程支持工具软件 2.5PLC控制系统抗干扰措施 2.5.1.PLC控制…

基于PLC的灌装饮料控制系统设计【程序与文档】(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

基于PLC的灌装饮料控制系统设计【程序与文档】(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码控制要求如下图所示&#xff0c;西门子1200博途V15(博途版本V15及以上都可以打开) 商品包括梯形图程序、触摸屏仿真、完整报告文档…

基于PLC的灌装饮料控制系统设计控制【程序与文档】(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

基于西门子plc博图1200药片自动装瓶机控制系统设计【程序与文档】(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码 基于西门子plc博图1200药片自动装瓶机控制系统设计 1.仿真报告(1.5W字) 2.10表 3.接线图

【救命稻草】RAG开发者的春天来了!UltraRAG框架上线,解决4大落地痛点,代码量减少80%!

做AI开发、企业数智化的人都懂这种痛—— 想搭建RAG系统落地到业务&#xff0c;却卡在“数据构建乱、检索不准、微调复杂”&#xff1a;整理领域数据要手动标注&#xff0c;耗时耗力&#xff1b;检索模型调参全靠试错&#xff0c;准确率忽高忽低&#xff1b;微调模型要写大量代…

【扎心真相】RAG分块策略大反转:语义分块竟是“智商税“?简单粗暴方法吊打高级算法!

在构建 RAG 系统时&#xff0c;开发者面临的第一道关卡往往是&#xff1a;如何切分文档&#xff08;Chunking&#xff09;&#xff1f; 传统的做法是“一刀切”——每 200 个 Token 切一块&#xff0c;简单粗暴。但最近一年&#xff0c;**语义分块&#xff08;Semantic Chunki…

饮料灌装流水线控制画面【程序与文档】(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

饮料灌装流水线控制画面【程序与文档】(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码 西门子PLC程序设计饮料罐装控制要求如下图所示&#xff0c;西门子1200博途V15(博途版本V15及以上都可以打开) 包括梯形图程序、触摸屏仿真…