依赖
<!--socket io --><dependency><groupId>io.socket</groupId><artifactId>socket.io-client</artifactId><version>1.0.1</version></dependency><dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>${netty-socketio.version}</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version></dependency>
代码
/*** socket io 资源管理器* @author ken* @date 2021/1/6 16:18*/
@Slf4j
@Component
public class SocketIOClientManager {@Autowiredprivate KafkaConnectionManager kafkaConnectionManager;@Autowiredprivate MqttConnectionManager mqttConnectionManager;@Resourceprivate WebSocketEventHandler webSocketEventHandler;// 用来存已连接的客户端唯一ID, <KAFKA+" : "+URL+" : "+topics, <sessionID, CLIENT>>private Map<String, Map<String, SocketIOClient>> clientMap = Collections.synchronizedMap(new HashMap<>());public void addClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");}if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);if (!subMap.containsKey(sessionID)) {subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}} else {final HashMap<String, SocketIOClient> subMap = new HashMap<>();subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}log.info("在线客户端: " + clientMap.toString());}public void removeClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");return;}if (clientMap.containsKey(resourceID)) {final Map<String, SocketIOClient> subMap = clientMap.get(resourceID);final Iterator<Map.Entry<String, SocketIOClient>> iterator = subMap.entrySet().iterator();while (iterator.hasNext()) {final Map.Entry<String, SocketIOClient> clientEntry = iterator.next();if (clientEntry.getKey().equals(sessionID)) {iterator.remove();log.info("移除客户端: {}", sessionID);// 如果移除session后对应url没有对应session,那么移除urlif (subMap.size() == 0) {clientMap.remove(resourceID);log.info("移除ID: {}", resourceID);if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {kafkaConnectionManager.removeConnection(resourceID);}if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {mqttConnectionManager.removeConnection(resourceID);}} else {clientMap.put(resourceID, subMap);}}}} else {log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);}}public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {String resourceID = getParamsByClient(client);KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);if (kafkaServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {kafkaServer.pushMesg(topic, mesg);}}public void pushKafkaMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());}}}public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {String resourceID = getParamsByClient(client);MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);if (mqttServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {mqttServer.pushMesg(topic, mesg);}}public void pushMQTTMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());}}}/*** 此方法为获取client连接中的参数,可根据需求更改** @param client* @return*/private String getParamsByClient(SocketIOClient client) {// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");return resourceID;}}
@Configuration
public class SocketIOConfig {@Value("${socket-io.host}")private String host;@Value("${socket-io.port}")private int port;public String getUrl() {return "http://" + host + ":" + port;}public SocketIOConfig() {}@Beanpublic SocketIOServer socketIOServer() {//创建Socket,并设置监听端口com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();// 设置主机名,默认是0.0.0.0config.setHostname(host);// 设置监听端口config.setPort(port);// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间config.setUpgradeTimeout(10000);// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔config.setPingInterval(25000);// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件config.setPingTimeout(60000);return new SocketIOServer(config);}@Beanpublic SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {return new SpringAnnotationScanner(socketServer);}
}
@Component
@Slf4j
public class WebSocketEventHandler {@Autowiredprivate SocketIOClientManager socketIOClientManager;public String getClientSubKafkaEvent() {return clientSubKafkaEvent;}public String getClientPubKafkaEvent() {return clientPubKafkaEvent;}public String getClientSubEmqEvent() {return clientSubEmqEvent;}public String getClientPubEmqEvent() {return clientPubEmqEvent;}private final String clientSubKafkaEvent = "subKafka";private final String clientPubKafkaEvent = "pubKafka";private final String clientSubEmqEvent = "subEmq";private final String clientPubEmqEvent = "pubEmq";@OnConnectpublic void onConnect(SocketIOClient client) {log.info("客户端发起连接. sessionId->{}", client.getSessionId());socketIOClientManager.addClient(client);}@OnDisconnectpublic void onDisconnect(SocketIOClient client) {final String sessionID = client.getSessionId().toString();log.info("客户端断开连接, sessionId->{}" + sessionID);socketIOClientManager.removeClient(client);client.disconnect();}// kafka消息接收入口@OnEvent(value = clientPubKafkaEvent)public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}// emq信息接收入口@OnEvent(value = clientPubEmqEvent)public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {private final SocketIOServer server;private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);@Autowiredpublic ServerRunner(SocketIOServer server) {this.server = server;}@Overridepublic void run(String... args) {logger.info("SocketIO 启动...");server.start();}
}
@Slf4j
public class SocketClientEMQTest {public static void main(String[] args) {final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subEmq", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_mqtt {}", args);}});/* socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
// arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}
@Slf4j
public class SocketClientKAFKATest {public static void main(String[] args) {final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
// options.query = "loginUserNum=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subKafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}", args);}});/* socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
// arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}