目录
1.总体设计
2.自定义协议设计(简单版)
3.消息类型(1字节)
4.项目结构
5.核心功能代码
(1)pom.xml(Maven依赖)
(2)IotServer.java(服务器启动器)
(3)IotServerInitializer.java(Pipeline初始化)
(4)DeviceChannelManager.java(设备连接管理器)
(5)model/IotMessage.java(消息实体)
(6)IotMessageDecoder.java(自定义解码器)
(7)IotMessageEncoder.java(自定义编码器)
(8)IotServerHandler.java(核心业务逻辑处理)
这个项目已经支持了:
6.Netty物联网服务器常见问题及解决方案
(1)粘包/拆包问题
问题描述
解决方案
(2)高并发性能问题
问题描述
解决方案
(3)连接管理与高可用性
问题描述
解决方案
(4)协议兼容性与扩展性
问题描述
解决方案
(5)资源泄漏与稳定性
问题描述
解决方案
(6)弱网络环境适配
问题描述
解决方案
(7)安全与加密
问题描述
解决方案
(8)监控与日志
问题描述
解决方案
1.总体设计
模块 | 说明 |
设备连接(Login) | 设备上线时登录认证 |
心跳检测(Heartbeat) | 定时检测设备是否在线 |
消息上报(Upload) | 设备主动上传数据到平台 |
指令下发(PushCommand) | 平台推送命令到指定设备 |
连接管理(Channel管理) | 按 deviceId 管理连接 |
编解码器(自定义协议) | 处理消息粘包、拆包 |
日志监控 | 记录通信过程 |
2.自定义协议设计(简单版)
设备和平台约定的消息格式,统一规范通信!
+----------+--------+------------+--------------+
| 消息头部 | 消息类型 | 设备ID长度 | 消息体长度 |
| 4字节 | 1字节 | 1字节 | 4字节 |
+----------+--------+------------+--------------+
| 设备ID (变长) | 消息体 (变长) |
+-----------------------------------------------+
3.消息类型(1字节)
类型值 | 含义 |
0x01 | 登录请求 |
0x02 | 心跳 |
0x03 | 上报数据 |
0x04 | 平台推送命令 |
4.项目结构
iot-netty-server/├── IotServer.java // 启动服务器├── IotServerInitializer.java // 初始化Pipeline├── IotMessageDecoder.java // 解码器├── IotMessageEncoder.java // 编码器├── IotServerHandler.java // 核心业务处理├── DeviceChannelManager.java // 连接管理├── model/│ ├── IotMessage.java // 消息模型└── pom.xml // Maven依赖
5.核心功能代码
(1)pom.xml(Maven依赖)
<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.109.Final</version></dependency>
</dependencies>
(2)IotServer.java(服务器启动器)
public class IotServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new IotServerInitializer());ChannelFuture f = b.bind(9000).sync();System.out.println("IoT服务器启动成功,监听端口9000...");f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
(3)IotServerInitializer.java(Pipeline初始化)
public class IotServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IotMessageDecoder());pipeline.addLast(new IotMessageEncoder());pipeline.addLast(new IotServerHandler());}
}
(4)DeviceChannelManager.java(设备连接管理器)
public class DeviceChannelManager {private static final Map<String, Channel> deviceChannels = new ConcurrentHashMap<>();public static void register(String deviceId, Channel channel) {deviceChannels.put(deviceId, channel);}public static void remove(Channel channel) {deviceChannels.values().removeIf(value -> value.equals(channel));}public static Channel get(String deviceId) {return deviceChannels.get(deviceId);}
}
(5)model/IotMessage.java(消息实体)
public class IotMessage {private byte type;private String deviceId;private byte[] payload;// getter、setter省略
}
(6)IotMessageDecoder.java(自定义解码器)
public class IotMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 10) return; // 最小消息长度检查in.markReaderIndex();int header = in.readInt();byte type = in.readByte();byte deviceIdLength = in.readByte();int payloadLength = in.readInt();if (in.readableBytes(