udp使用socketserver 接受简单方便
使用是python 的threading 快速构建udp 接受线程
使用 pyqt5的QThread,用于发现信号到ui线程,跟新ui
使用queue接受udp数据,并通过queue在 udp接受线程和数据解析线程间数据传递。
from socketserver import*
import threading
import time
import sys
from queue import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5 import *
'''udp REC线程'''
class MyUDPServer(UDPServer):def __init__(self, server_address, RequestHandlerClass, queue):self.udp_queue = queue # 增加的参数UDPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)class Handler(BaseRequestHandler):def handle(self):self.data = self.request[0].strip()self.server.udp_queue.put(self.data)# print(self.server.udp_queue.get())# print(self.data)'''DATA 解析 线程'''
class mythread(QThread):signal = pyqtSignal(bytes)def __init__(self,queue,rec_data_Instance):super( mythread,self).__init__()self.running = Trueself.queue = queueself.rec_data_Instance=rec_data_Instanceif self.rec_data_Instance:self.signal.connect(self.rec_data_Instance.rec_data)def __del__(self):self.running = Falseself.wait()def run(self):while self.running:# if self.udp_queue.if self.queue.empty()!= True :# print(self.queue.get())self.signal.emit(self.queue.get())def stop(self):self.running = False'''ui 显示线程'''
class MY_Widget(QWidget):def __init__(self):super().__init__()self.initUI()self.button_num = 0 def initUI(self):self.ip = "10.7.28.51"self.port = 9999### 窗体大小self.height = 100self.width = 200self.resize(self.width,self.height)### 设置窗体 标题self.setWindowTitle("UDP ")v1layout = QVBoxLayout()self.open_button = QPushButton("打开")self.open_button.clicked.connect(self.open_button_func)self.edit= QTextEdit(self)self.edit.setText("test")v1layout.addWidget(self.open_button)v1layout.addWidget(self.edit)self.setLayout(v1layout)self.show()def open_udp_server(self,ip,port): ADDR = ip, port#接受队列self.udp_queue=Queue(maxsize=5)#udp 服务self.server = MyUDPServer(ADDR, Handler, self.udp_queue) #创建线程,线程用于UDP接收self.server_thread = threading.Thread(target=self.server.serve_forever) #设置后台线程self.server_thread.setDaemon(True) self.server_thread.start() #创建数据解析self.up_mythread = mythread(self.udp_queue,self)self.up_mythread.start()def close_udp_server(self): self.up_mythread.stop()self.server.shutdown()self.server.server_close()def print_hex(self,bytes):l = [hex(int(i)) for i in bytes]str = " ".join(l)return strdef rec_data(self,data):print(self.print_hex(data))self.edit.setText(self.print_hex(data))def open_button_func(self):self.button_num = self.button_num+1if self.button_num %2 == 1:self.open_button.setText("关闭")self.open_udp_server(self.ip,self.port)else:self.open_button.setText("打开")self.close_udp_server()if __name__ == "__main__": app = QApplication(sys.argv)#创建uiex = MY_Widget()app.exit(app.exec_())