基于corundumstudio建立websocket长连接

依赖

<!--socket io --><dependency><groupId>io.socket</groupId><artifactId>socket.io-client</artifactId><version>1.0.1</version></dependency><dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>${netty-socketio.version}</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version></dependency>

代码

/*** socket io 资源管理器* @author ken* @date 2021/1/6 16:18*/
@Slf4j
@Component
public class SocketIOClientManager {@Autowiredprivate KafkaConnectionManager kafkaConnectionManager;@Autowiredprivate MqttConnectionManager mqttConnectionManager;@Resourceprivate WebSocketEventHandler webSocketEventHandler;// 用来存已连接的客户端唯一ID, <KAFKA+" : "+URL+" : "+topics, <sessionID, CLIENT>>private Map<String, Map<String, SocketIOClient>> clientMap = Collections.synchronizedMap(new HashMap<>());public void addClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");}if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);if (!subMap.containsKey(sessionID)) {subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}} else {final HashMap<String, SocketIOClient> subMap = new HashMap<>();subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}log.info("在线客户端: " + clientMap.toString());}public void removeClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");return;}if (clientMap.containsKey(resourceID)) {final Map<String, SocketIOClient> subMap = clientMap.get(resourceID);final Iterator<Map.Entry<String, SocketIOClient>> iterator = subMap.entrySet().iterator();while (iterator.hasNext()) {final Map.Entry<String, SocketIOClient> clientEntry = iterator.next();if (clientEntry.getKey().equals(sessionID)) {iterator.remove();log.info("移除客户端: {}", sessionID);// 如果移除session后对应url没有对应session,那么移除urlif (subMap.size() == 0) {clientMap.remove(resourceID);log.info("移除ID: {}", resourceID);if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {kafkaConnectionManager.removeConnection(resourceID);}if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {mqttConnectionManager.removeConnection(resourceID);}} else {clientMap.put(resourceID, subMap);}}}} else {log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);}}public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {String resourceID = getParamsByClient(client);KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);if (kafkaServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {kafkaServer.pushMesg(topic, mesg);}}public void pushKafkaMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());}}}public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {String resourceID = getParamsByClient(client);MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);if (mqttServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {mqttServer.pushMesg(topic, mesg);}}public void pushMQTTMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());}}}/*** 此方法为获取client连接中的参数,可根据需求更改** @param client* @return*/private String getParamsByClient(SocketIOClient client) {// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");return resourceID;}}
@Configuration
public class SocketIOConfig {@Value("${socket-io.host}")private String host;@Value("${socket-io.port}")private int port;public String getUrl() {return "http://" + host + ":" + port;}public SocketIOConfig() {}@Beanpublic SocketIOServer socketIOServer() {//创建Socket,并设置监听端口com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();// 设置主机名,默认是0.0.0.0config.setHostname(host);// 设置监听端口config.setPort(port);// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间config.setUpgradeTimeout(10000);// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔config.setPingInterval(25000);// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件config.setPingTimeout(60000);return new SocketIOServer(config);}@Beanpublic SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {return new SpringAnnotationScanner(socketServer);}
}
@Component
@Slf4j
public class WebSocketEventHandler {@Autowiredprivate SocketIOClientManager socketIOClientManager;public String getClientSubKafkaEvent() {return clientSubKafkaEvent;}public String getClientPubKafkaEvent() {return clientPubKafkaEvent;}public String getClientSubEmqEvent() {return clientSubEmqEvent;}public String getClientPubEmqEvent() {return clientPubEmqEvent;}private final String clientSubKafkaEvent = "subKafka";private final String clientPubKafkaEvent = "pubKafka";private final String clientSubEmqEvent = "subEmq";private final String clientPubEmqEvent = "pubEmq";@OnConnectpublic void onConnect(SocketIOClient client) {log.info("客户端发起连接. sessionId->{}", client.getSessionId());socketIOClientManager.addClient(client);}@OnDisconnectpublic void onDisconnect(SocketIOClient client) {final String sessionID = client.getSessionId().toString();log.info("客户端断开连接, sessionId->{}" + sessionID);socketIOClientManager.removeClient(client);client.disconnect();}// kafka消息接收入口@OnEvent(value = clientPubKafkaEvent)public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}// emq信息接收入口@OnEvent(value = clientPubEmqEvent)public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {private final SocketIOServer server;private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);@Autowiredpublic ServerRunner(SocketIOServer server) {this.server = server;}@Overridepublic void run(String... args) {logger.info("SocketIO 启动...");server.start();}
}
@Slf4j
public class SocketClientEMQTest {public static void main(String[] args) {final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subEmq", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_mqtt {}", args);}});/*  socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}
@Slf4j
public class SocketClientKAFKATest {public static void main(String[] args) {final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
//        options.query = "loginUserNum=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subKafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}", args);}});/*  socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql查询开启事务_MySQL中的查询事务问题

之前帮同学做个app的后台&#xff0c;使用了MySQLMyBatis&#xff0c;遇到了一个查询提交的问题&#xff0c;卡了很久&#xff0c;现在有时间了来复盘下环境情况假设有学生表&#xff1a;USE test;CREATE TABLE student (Id int NOT NULL PRIMARY KEY AUTO_INCREMENT,Name varc…

通过gparted 调整 ubuntu 磁盘

1. 启动和安装 1. 安装 sudo apt-get install gparted 2. 启动 sudo gparted2.配置 如果想扩充磁盘,需要有未分配空间,且该未分配空间位于partition相邻的格子

mysql增加布尔字段_JDBC对MySQL数据库布尔字段的操作方法

本文实例讲述了JDBC对MySQL数据库布尔字段的操作方法。分享给大家供大家参考。具体分析如下&#xff1a;在Mysql数据库如果要使用布尔字段&#xff0c;而应该设置为BIT(1)类型此类型在Mysql中不能通过MySQLQueryBrowser下方的Edit与Apply Changed去编辑只能通过语句修改&#x…

linux查看mysql表空间使用率_Oracle查看数据库表空间使用情况sql语句

Oracle查看数据库表空间使用情况sql语句SELECT UPPER(F.TABLESPACE_NAME) "表空间名",D.TOT_GROOTTE_MB "表空间大小(M)",D.TOT_GROOTTE_MB - F.TOTAL_BYTES "已使用空间(M)",TO_CHAR(ROUND((D.TOT_GROOTTE_MB - F.TOTAL_…

mysql 不同服务器不同库之间的访问_不同服务器数据库之间的数据操作

--创建链接服务器exec sp_addlinkedserver ITSV , , SQLOLEDB , 远程服务器名或ip地址 exec sp_addlinkedsrvlogin ITSV , false ,null, 用户名 , 密码 --查询示例select * from ITSV.数据库名.dbo.表名--导入示例select * into 表 from ITSV.数据库名.dbo.表名--以后不再使用…

mysql 禁止转义_必须转义哪些字符才能阻止(我的)SQL注入?

6 个答案:答案 0 :(得分&#xff1a;46)关于退格字符的猜测&#xff1a;想象一下&#xff0c;我发送了一封电子邮件“嗨&#xff0c;这是根据需要更新数据库的查询”和带有的附加文本文件INSERT INTO students VALUES ("Bobby Tables",12,"abc",3.6);你捕获…

mysql 失效转移_MySQL基于MHA的FailOver过程

大家好&#xff0c;我是anyux。本文介绍MySQL基于MHA的FailOver过程。MHA FailOver过程详解什么是FailOver故障转移主库宕机&#xff0c;一直到业务恢复正常的处理过程如何处理FailOver1.快速监控到主库宕机2.选择新主节点&#xff0c;选择策略mysqladmin ping检查数据库状态&a…

mysql 设置 character_set_server_MySQL:简单记录character_set_server影响参数

Waiting for global read lock&#xff1a;由于flush table with read lock调用函数lock_global_read_lock导致DML操作堵塞。Waiting for commit lock &#xff1a;由于flush table with read lock 调用函数make_global_read_lock_block_commit导致事务不能提交现象堵塞COMMIT和…

mysql3.5.2 下载_mybatis 3.5.2 jar 下载

本文更新日期&#xff1a;2019年9月21日很多人找不到mybatis jar或者下载mybatis jar需要付积分&#xff0c;所以本页面给大家提供一个便捷的下载通道&#xff0c;敬请关注。一、mybatis 3.5.2版本下载&#xff1a;此压缩包包含文件&#xff1a;(1)mybatis-3.5.2.jar(2)mybatis…

kali 切换图形界面_kali Linux 文本图形界面切换遇到的怪问题

前段装了在Virtual Box上装一个Kali Linux玩&#xff0c;然后设为了开机进入文本界面&#xff0c;后来遇到无法上网的问题&#xff0c;网上找到解决方法&#xff0c;说是NAT地址转换和host-only双网卡顺序问题&#xff0c;按照网上的说法调整顺序后一切正常。问题及调整方法详见…

linux mysql更改生效_linux下面MySQL变量修改及生效

今天在访问mysql项目的时候突然报500错误&#xff0c;没有找到连接&#xff0c;因此想到mysql的连接时间。mysql> show global variables;主要就是连接时间是28800(8小时)&#xff0c;而且任务调度也没打开&#xff0c;因此想到修改全局变量的值。1.修改任务调度装:1.1具体的…

linux刻录win10u盘_手把手教你装系统之【制作官方win10安装U盘】

本帖最后由 蚂蚁炒花甲 于 2019-11-11 22:58 编辑很多粉粉在收到linux版本的笔记本后用不惯&#xff0c;但又不知道如何装win10系统下面我就来教大家&#xff0c;如何自己动手 制作win10 安装U盘想学习的粉粉们&#xff0c;可以跟着我 学习下手把手教你装系统之【官方win10 U盘…

mysql 数据仓库 元数据_数据仓库中的元数据管理

1. 引言元数据是数据仓库中的一个重要组成部分&#xff0c;元数据管理系统则是构建&#xff0c;管理&#xff0c;维护和使用数据仓库系统的核心部件。2. 基础知识2.1 元数据的定义元数据是指来自企业内外的所有物理数据和知识&#xff0c;包括物理数据的格式&#xff0c;技术和…

python列表有固定大小吗_如何在python中创建固定大小的列表?

(tl&#xff1b;dr&#xff1a;对您的问题的确切答案是numpy.empty或numpy.empty_like&#xff0c;但是您可能不在乎&#xff0c;可以使用myList [None]*10000。)简单方法您可以将列表初始化为所有相同的元素。使用一个非数字值(如果以后使用它会给出一个错误&#xff0c;这是…

mysql 魔术设置_mysql主从复制实践

1.master服务器上安装mysql&#xff0c;正常安装mysql参考2.slave服务器上安装mysql&#xff0c;正常安装mysql参考3.配置3.1master服务器配置cnf文件vim /etc/my.cnf加入配置[mysqld]log-bin master-binlog-bin-index master-bin.indexserver-id 1重启mysql服务service mys…

mysql将时间戳转化为天数_mysql 将时间戳直接转换成日期时间

我的应用&#xff1a;select *,FROM_UNIXTIME(create_at, %Y-%m-%d) as date from stock转载原文&#xff1a;FROM_UNIXTIME( ):转为时间戳类型时间UNIX_TIMESTAMP( ) :返回长类型时间from_unixtime()是MySQL里的时间函数select uid,userid,username,email,FROM_UNIXTIME(addti…

centeros7安装mysql5.6_CentOS7安装MySQL5.6

1.安装包准备(MySQL官网下载)(1)查看MySQL是否安装&#xff0c;如果存在就先卸载[roothadoop101 桌面]# rpm -qa|grepMySQLmysql-libs-5.1.73-7.el6.x86_64[roothadoop101 桌面]# rpm-e --nodeps mysql-libs-5.1.73-7.el6.x86_64(2)查看mariadb是否安装&#xff0c;如果存在就先…

python中int和eval的区别_python中eval与int的区别浅析

python中eval和int的区别是什么&#xff1f;下面给大家介绍一下:1.eval()函数eval()能够以Python表达式的方式解析并执行字符串&#xff0c;并将返回结果输出。eval()函数将去掉字符串的两个引号&#xff0c;将其解释为一个变量。作用&#xff1a;a. 处理数字单引号&#xff0c…

scrapy爬取天气存MySQL_Scrapy实战篇(五)之爬取历史天气数据

本篇文章我们以抓取历史天气数据为例&#xff0c;简单说明数据抓取的两种方式&#xff1a;1、一般简单或者较小量的数据需求&#xff0c;我们以requests(selenum)beautiful的方式抓取数据2、当我们需要的数据量较多时&#xff0c;建议采用scrapy框架进行数据采集&#xff0c;sc…

mysql 第二天数据_MySQL入门第二天------数据库操作

一、基本命令1、启动服务器cmdnet start [服务器名称]net start mysql572、停止服务器cmdnet stop [服务器名称]net stop mysql573、链接数据库mysql -u 用户名 -p 登录密码mysql -u root -p4、退出登录quitexit\q5、查看版本(连接后执行)select version();6、查看当前时间(连接…