NIO重构UDP收发模块

news/2025/9/21 13:07:59/文章来源:https://www.cnblogs.com/xiondun/p/19103556

本文大纲如下:

  • 1、写作背景
  • 2、基本的UDP包收发用法
  • 3、采用NIO方式处理UDP

一、背景

本篇内容,主要来源是在对公司代码重构。公司一个项目是采用UDP方式通信,在UDP的不可靠基础上,封装成可靠的通信协议。其本质是UDP+协议的方式,因今天的重点是UDP通信,所以只讲解UDP模块。由于APP有N个的通信对象,之前的代码中,也就有了N个线程监听接收的消息,N个线程发送消息。这样就会使用大量的线程,而且监听的线程一直处于阻塞状态,效率低下。在这种情况下,也就有必要对此模块进行重构了。

二、基本的UDP包收发用法

这也是公司之前的用法,比较简单粗暴,好处是开发成本低,但后期业务增加的时候,性能会有所下降

对于收发UDP包,需要localIp + localPort + remoteIp + remotePort,属于端对端的通信

1)、UDP发送数据
   public static void Send(byte[] data, int offset, int length, int localPort, InetAddress remoteAddress, int remotePort) throws Exception {if (remoteAddress == null || remotePort <= 0) {throw new Exception("Null remote address !!!");}if (data == null || offset < 0 || length <= 0) {throw new Exception("null send data !!!");}// 会分配一个可用的本地端口DatagramSocket socket = new DatagramSocket(null);// 多个UDP socket绑定相同的端口socket.setReuseAddress(true);// 绑定本地端口socket.bind(new InetSocketAddress(localPort));// 封装成PacketDatagramPacket packet = new DatagramPacket(data, offset, length, remoteAddress, remotePort);socket.send(packet);socket.close();}

发送UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建发送的UDP数据包
  • 发送
  • 关闭Socket
2)、UDP接收数据

DatagramSocket socket = new MulticastSocket(null);
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(listenPort));protected Runnable listenLoop = new Runnable() {@Overridepublic void run() {byte[] receiveBuffer = new byte[1024];DatagramPacket packet = new DatagramPacket(receiveBuffer, receiveBuffer.length);while (listenRunning) {if (socket != null && !socket.isClosed()) {try {socket.receive(packet);} catch (IOException e) {e.printStackTrace();}}}}};

接收UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建接收的UDP数据包
  • socket.receive(packet);
  • 关闭Socket

三、NIO重构UDP收发模块

1)、思路

NIO是同步非阻塞方式,将DatagramChannel向Selector选择器注册,使用一个Thread轮询Selector,当网卡准备数据时,就能告知用户开始处理发送或接收事件。总之,一切的数据发送和接收前,都得到Selector注册,得到了Selector的“允许”后,才能处理后续的工作。

2)、核心代码

// 发送接口
public interface Sender extends Closeable {// 触发异步的发送请求boolean postSendAsync() throws IOException;void send(String message,InetSocketAddress remoteAddress);
}
// 接收接口
public interface Receiver extends Closeable {// 触发异步的接收请求boolean postReceiveAsync() throws IOException;// 开始监听void start();
}
// 用于Channel向Selector注册
public interface IoProvider extends Closeable {boolean registerInput(DatagramChannel channel, HandleProviderCallback callback);boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback);void unRegisterInput(DatagramChannel channel);void unRegisterOutput(DatagramChannel channel);abstract class HandleProviderCallback implements Runnable {@Overridepublic final void run() {onProviderIo();}/*** 可以进行接收或者发送时的回调**/protected abstract void onProviderIo();}}// 实现了Sender和Receiver
class DatagramChannelAdapter implements Sender,Receiver,Closeable {private final AtomicBoolean isClosed = new AtomicBoolean(false);private final AtomicBoolean isSending = new AtomicBoolean();private final DatagramChannel channel;private final IoProvider ioProvider;private final UdpDataDispatcher dispatcher;private final Queue<UDPSendSnapshot> queue = new ConcurrentLinkedQueue<>();private final ReceiveUdpListener receiverUdpListener;DatagramChannelAdapter(DatagramChannel channel, IoProvider ioProvider, ReceiveUdpListener receiverUdpListener) throws IOException {this.channel = channel;this.ioProvider = ioProvider;this.receiverUdpListener = receiverUdpListener;dispatcher = new UdpDataDispatcher(channel);// 非阻塞模式下操作channel.configureBlocking(false);}@Overridepublic boolean postReceiveAsync() throws IOException {if (isClosed.get()) {throw new IOException("Current channel is closed!");}// 注册能不能输入return ioProvider.registerInput(channel, inputCallback);}@Overridepublic void start() {try {postReceiveAsync();} catch (IOException e) {e.printStackTrace();}}@Overridepublic boolean postSendAsync() throws IOException {if (isClosed.get()) {throw new IOException("Current channel is closed!");}// 当前发送的数据附加到回调中return ioProvider.registerOutput(channel, outputCallback);}@Overridepublic void send(String message,InetSocketAddress remoteAddress) {queue.offer(new UDPSendSnapshot(message,remoteAddress));requestSend();}private void requestSend() {if (isSending.compareAndSet(false,true) ) {if (queue.size() <= 0){isSending.set(false);return;}try {if (!postSendAsync()) {isSending.set(false);}} catch (IOException e) {e.printStackTrace();CloseUtils.close(this);}}}@Overridepublic void close() throws IOException {if (isClosed.compareAndSet(false, true)) {// 解除注册回调ioProvider.unRegisterInput(channel);ioProvider.unRegisterOutput(channel);// 关闭CloseUtils.close(channel);}}// 输入的数据操作private final IoProvider.HandleProviderCallback inputCallback = new IoProvider.HandleProviderCallback() {@Overrideprotected void onProviderIo() {if (isClosed.get()) {return;}System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!inputCallback");ReceiveUdpData receiveUdp = dispatcher.receive();try {if (receiveUdp == null) {throw new IOException();}postReceiveAsync();receiverUdpListener.onReceiveUdpListener(receiveUdp.getBytes(),receiveUdp.getTotal(),receiveUdp.getAddress(),receiveUdp.getPort());} catch (IOException e) {CloseUtils.close(DatagramChannelAdapter.this);}}};// 输出的数据操作private final IoProvider.HandleProviderCallback outputCallback = new IoProvider.HandleProviderCallback() {@Overrideprotected void onProviderIo() {if (isClosed.get() || queue.size() == 0) {return;}System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!outputCallback");synchronized (isSending) {UDPSendSnapshot snapshot = queue.poll();dispatcher.sendMessage(snapshot.getMessage(),snapshot.getRemoteAddress());isSending.set(false);}}};/*** 收到监听UDP消息之后的回调*/interface ReceiveUdpListener {void onReceiveUdpListener(byte[] data, int length, InetSocketAddress address, int port);}
}
public class IoSelectorProvider implements IoProvider {private final AtomicBoolean isClosed = new AtomicBoolean(false);// 是否处于某个过程private final AtomicBoolean inRegInput = new AtomicBoolean(false);private final AtomicBoolean inRegOutput = new AtomicBoolean(false);// 读和写的数据选择器private final Selector readSelector;private final Selector writeSelector;private final ExecutorService dataHandlePool;private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();public IoSelectorProvider() throws IOException {readSelector = Selector.open();writeSelector = Selector.open();dataHandlePool = Executors.newFixedThreadPool(4,new Factory.NameableThreadFactory("IoProvider-Thread-"));// 开始输出输入的监听startRead();startWrite();}private void startRead() {Runnable runnable = new Runnable() {@Overridepublic void run() {while (!isClosed.get()) {try {if (readSelector.select() == 0) {waitSelection(inRegInput);continue;} else if (inRegInput.get()) {waitSelection(inRegInput);}Set<SelectionKey> selectionKeys = readSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isValid()) {// 对应着下面的两种形式 可读System.out.println("可读的回调");handleSelection(selectionKey,SelectionKey.OP_READ, inputCallbackMap, dataHandlePool, inRegInput);}iterator.remove();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException ignored) {break;}}}};// 启动线程new Thread(runnable).start();}private void startWrite() {Runnable runnable = new Runnable() {@Overridepublic void run() {while (!isClosed.get()) {try {if (writeSelector.select() == 0) {waitSelection(inRegOutput);continue;} else if (inRegOutput.get()) {waitSelection(inRegOutput);}Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isValid()) {// 可写if (selectionKey.isWritable()) {System.out.println("可写的回调");handleSelection(selectionKey,SelectionKey.OP_WRITE, outputCallbackMap, dataHandlePool, inRegOutput);}}iterator.remove();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException ignored) {break;}}}};// 启动线程new Thread(runnable).start();}private static void handleSelection(SelectionKey key, int keyOps,HashMap<SelectionKey, Runnable> map,ExecutorService pool, AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {try {// 重点// 取消继续对keyOps的监听key.interestOps(key.readyOps() & ~keyOps);} catch (CancelledKeyException e) {return;}}Runnable runnable = null;try {runnable = map.get(key);} catch (Exception ignored) {}if (runnable != null && !pool.isShutdown()) {// 异步调度pool.execute(runnable);}}@Overridepublic boolean registerInput(DatagramChannel channel, HandleProviderCallback callback) {return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,inputCallbackMap, callback) != null;}@Overridepublic boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback) {return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,outputCallbackMap, callback) != null;}@Overridepublic void unRegisterInput(DatagramChannel channel) {unRegisterSelection(channel, readSelector, inputCallbackMap, inRegInput);}@Overridepublic void unRegisterOutput(DatagramChannel channel) {unRegisterSelection(channel, writeSelector, outputCallbackMap, inRegOutput);}private static SelectionKey registerSelection(DatagramChannel channel, Selector selector,int registerOps, AtomicBoolean locker,HashMap<SelectionKey, Runnable> map,Runnable runnable) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {// 设置锁定状态locker.set(true);try {// 唤醒当前的selector,让selector不处于select()状态selector.wakeup();SelectionKey key = null;if (channel.isRegistered()) {// 查询是否已经注册过key = channel.keyFor(selector);}if (key != null) {key.interestOps(key.readyOps() | registerOps);}if (key == null) {// 注册selector得到Keykey = channel.register(selector, registerOps);// 注册回调map.put(key, runnable);}return key;} catch (ClosedChannelException| CancelledKeyException| ClosedSelectorException e) {e.printStackTrace();return null;} finally {// 解除锁定状态locker.set(false);try {// 通知locker.notify();} catch (Exception ignored) {}}}}private static void unRegisterSelection(DatagramChannel channel, Selector selector,Map<SelectionKey, Runnable> map,AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {locker.set(true);selector.wakeup();try {if (channel.isRegistered()) {SelectionKey key = channel.keyFor(selector);if (key != null) {// 取消监听的方法key.cancel();map.remove(key);}}} finally {locker.set(false);try {locker.notifyAll();} catch (Exception ignored) {}}}}private static void waitSelection(final AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {if (locker.get()) {try {locker.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}@Overridepublic void close() throws IOException {if (isClosed.compareAndSet(false, true)) {dataHandlePool.shutdown();inputCallbackMap.clear();outputCallbackMap.clear();CloseUtils.close(readSelector);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/908775.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入解析:C语言:猜数字游戏

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

深入解析:深度学习从入门到精通 - AutoML与神经网络搜索(NAS):自动化模型设计未来

深入解析:深度学习从入门到精通 - AutoML与神经网络搜索(NAS):自动化模型设计未来2025-09-21 12:39 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; ove…

题解:SP6562 PRUBALL - Esferas

盲猜你们都是从 CSP-S 2025 初赛 来的…… 题目描述 给你 \(n\) 颗蛋和一个 \(m\) 层高的楼,定义蛋的硬度 \(k\) 为:在 \(<k\) 的楼层扔蛋不会碎,在 \(\ge k\) 的楼层扔蛋会碎。求在最坏情况下,最少需要扔多少次…

个人项目-文本查重

软工第二次作业之个人项目——论文查重 项目信息项目信息 详情课程 班级链接作业要求 作业要求项目目标 实现一个论文查重程序,规范软件开发流程,熟悉Github进行源代码管理和学习软件测试GitHub仓库 https://github.…

深入解析:[数据结构] LinkedList

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

US$34 MB ESL Emulator

MB ESL EmulatorTop 4 Reasons To Get MB ESL Emulator1. This device works with Mercedes EIS.2. It emulates both of old (W202, 208, 210) and new (203, 208, 211, 639).3. ESL types functioning.You can use t…

采用python test测试http接口

采用python test测试http接口pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco"…

CF2147 Codeforces Global Round 29 (Div. 1 + Div. 2) 解题报告

A 题挂机半天,B 题挂机半天,D 题脑子犯蠢,3t寄了。省流 A 题挂机半天,B 题挂机半天,D 题脑子犯蠢,3t寄了。9.20 内含剧透,请vp后再来。 赛前 白天刚打完失败的 ccpc 网络赛,不过心态已经调整的非常平和,然后抱…

US$29 Vag R250 VW Audi Dashboard Programmer Free Shipping

R250 VW Audi Dashboard Programmer You can use R250 to program Siemens/VDO new cryptography system Description:This product looks like a small box that needs to be connected to a PC running Win98/Me/XP…

数字图像基础知识

前言 数字图像(Digital Image),又称数码图像或数位图像,以数字形式存储于电子设备中。 有多种方式可以生成数字图像。 一种是物理收集,例如使用数码相机、扫描仪、卫星遥感器、红外/热成像仪、核磁共振 MRI 等设备…

详细介绍:农业XR数字融合工作站,赋能农业专业实践学习

详细介绍:农业XR数字融合工作站,赋能农业专业实践学习pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas&…

标题:分享一个值得推荐的免费云服务——阿贝云

最近在搭建个人网站时,无意中发现了一个提供免费虚拟主机和免费云服务器的平台——阿贝云。经过一段时间的使用,我真的被它的稳定性和易用性打动了! 阿贝云不仅提供了完全免费的云服务器资源,还支持多种常见环境,…

PPT2Note使用说明

PPT2Note使用说明 简介 PPT2Note是一个应用于教学的使用工具,可以自动抓取在教学大屏上打开的PPT文件并发送至绑定的用户笔记中。解决了PPT翻页太快漏截图问题。

第三周:面向对象入门2与类的识别

第三周:面向对象入门2与类的识别集美大学课程实验报告-第三周:面向对象入门2与类的识别项目名称 内容课程名称 Java程序设计班级 网安2412指导教师 郑如滨学生姓名 王嘉熙学号 202421336061实验项目名称 面向对象入门…

详细介绍:Flink-新增 Kafka source 引发状态丢失导致启动失败

详细介绍:Flink-新增 Kafka source 引发状态丢失导致启动失败2025-09-21 11:59 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !impor…

【面向接口编程(IOP)典型场景】底层组件如何实现回调通知上层应用系统? 另外一种实现方式

【面向接口编程(IOP)典型场景】底层组件如何实现回调通知上层应用系统? 另外一种实现方式偶然看到一篇文章, https://www.cnblogs.com/buguge/p/19055703 对这篇文章的设计进行了更改。 原来设计的类图 和流程图 :…

GEE训练教程:Sentinel-2卫星影像揭秘飓风奥蒂斯破坏力 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

设置Redis在CentOS7上的自启动配置

在CentOS 7系统中,要设置Redis服务的自启动,需要配置Redis服务以便它能够在系统启动时自动运行。为此,我们将使用 systemctl命令,这是CentOS 7 中管理服务的推荐方法。 首先,确保已经正确地安装了Redis服务并且它…

挂载配置文件以Docker启动Redis服务

要使用Docker启动Redis服务,并挂载配置文件,首先需要确保已经安装好Docker环境。以下是具体步骤和相关的解释: 步骤1:准备Redis配置文件 您需要准备一个Redis配置文件,此文件会包含Redis服务器的配置指令。创建一…

abc418d

AtCoder ABC418 D XNOR Operation link 题意 给定一个长度为 \(n\) 的 01 串 \(s\),每次可以选择相邻的两个位置。如果两个位置字符相同,把它们缩成 \(1\),否则缩成 \(0\)。求 \(s\) 中有多少个子串经过操作可以变成…