一、介绍
在项目开发过程中,很多时候,我们不可避免的需要实现的一个功能:
服务端实时发送信息给客户端。比如实时公告、实时订单通知、实时报警推送等等,登录后的客户端需要知道与它相关的实时信息,以便进行下一步处理。
从事服务端开发的特别是C/C++开发的技术人员都知道,客户端可以通过套接口与服务端保持套接口长连接。这样就服务端就可以实时给客户端推送信息了,但是这是针对TCP的长连接,如果是针对HTTP协议(在TCP层之上的实现了超文本协议的短链接--一般情况下短链接),实现服务端与客户端通知一般有一下两种方式:
1、HTTP轮询
一般情况下,http是短链接,也就是请求响应式的,每一次请求都对应一次回复,回复完成后连接断开,这样做的好处就是不需要保持与服务端的长连接,因为HTTP协议底层还是TCP协议,服务端根据机器性能都有一个最大的套接口连接数限制。以windows为例子,如windows下TCP连接数受多个参数影响:
- 最大tcp连接数 
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
TcpNumConnections = 0x00fffffe (Default = 16,777,214)
以上注册表信息配置单机的最大允许的TCP连接数,默认为 16M。这个数值看似很大,这个并不是限制最大连接数的唯一条件,还有其他条件会限制到TCP 连接的最大连接数。
- 最大动态端口数 
 TCP客户端和服务器连接时,客户端必须分配一个动态端口,默认情况下这个动态端口的分配范围为 1024-5000 ,也就是说默认情况下,客户端最多可以同时发起3977个Socket连接。我们可以修改如下注册表来调整这个动态端口的范围.
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxUserPort = 5000 (Default = 5000, Max = 65534)
最大调整值65535,也就是最大能有6w多tcp连接。
- 最大TCB数量 
 系统为每个TCP连接分配一个TCP控制块(TCP control block or TCB),这个控制块用于缓存TCP连接的一些参数,每个TCB需要分配 0.5 KB的pagepool 和 0.5KB 的Non-pagepool,也就说,每个TCP连接会占用 1KB 的系统内存。换句话说TCP的连接数也受到系统的内存的限制。系统的最大TCB数量由如下注册表设置决定:
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxFreeTcbs = 2000 (Default = RAM dependent, but usual Pro = 1000, Srv=2000)
非Server版本,MaxFreeTcbs 的默认值为1000(64M 以上物理内存),Server 版本,这个的默认值为 2000。也就是说,默认情况下,Server版本最多同时可以建立并保持2000个TCP连接。
- 最大TCB Hash table数量 
 TCB是通过Hash table来管理的,下面注册表设置决定了这个Hash table的大小
HKEY_LOCAL_MACHINE \System \CurrentControlSet \services \Tcpip \Parameters]
MaxHashTableSize = 512 (Default = 512, Range = 64-65536)
指明分配pagepool内存的数量,也就是说,如果MaxFreeTcbs = 1000, 则pagepool的内存数量为500KB那么 MaxHashTableSize应大于500才行。这个数量越大,则Hash table的冗余度就越高,每次分配和查找TCP.这里 MaxHashTableSize被配置为比MaxFreeTcbs大4倍,这样可以大大增加TCP建立的速度。
    知道了底层TCP限制之后,我们可以知道实际上长连接在普通的windows机器上最多大概是1000路左右,也就并发是1000个http长连接。如果将长连接改为http短链接,http请求完成后立即释放,那么服务端的并发就会大大增加,如果请求速度不太耗时,服务端的并发量有可能达到1w或者更大!!!c
    使用HTTP轮询,就是使用HTTP短链接模式,定期与服务端进行通信主动获取服务端信息的方式实现服务端“推送”信息至客户端,它有如下特点:
- 避免与服务端的长连接,减低服务端压力,提升服务端的并发访问能力 
- 客户端主动与服务端通信,需要定期与服务端进行轮询查询获取信息,但对客户端而言存在延迟,延迟时间最大为轮询时间。 
- 服务端需要做额外的工作包保存一些实时数据,等待客户端拉取。 
2、websocket
http长轮询因为存在信息延迟的问题,有时候,我们需要实时收到服务端推送的信息就无法避免使用websocket了。在前面我已经说到,websocket实际上也是http升级upgrade之后的tcp长连接,长连接的数量限制经过调整后最大能有(65525-1024)= 64501个长连接(在内存、句柄数等不设限情况下)。但实际测试,可能服务端的websocket连接数可能维持的2w左右(经过实际测试),如果改为云主机,连接数可能达到6w左右。如果需要更多了连接,我们可以考虑集群的方式,如n台高性能机器能支持最大n*6w的websocket连接!!它有如下优点:
- WebSocket一次握手就可以使客户端和服务端建立长连接,并进行双向数据传输 
- 服务端可主动向客户端发送信息,实时性很高 
- 与HTTP协议比起来,WebSocket协议每次数据传输的头信息都较小,节约带宽 
针对浏览器本身,连接后台最大的websocket数量也是有限制的,以下是我搜索到的各个浏览器支持的最大websocket连接数:
IE        6个
chrome  256个
Firefox 200个
safari  1273个(MAC版本)
超过各个浏览器最大数,后台就收不到请求。
二、 websocket实现
说明了原理之后,接下来就是如何实现websocket,这里我提供了几种实现方式
1、J2EE7自带的原始实现
服务端实现
WebSocket是JavaEE7新支持的,Javax.websocket.server包含注解、类、接口用于创建和配置服务端点;Javax.websocket包则包含服务端点和客户断电公用的注解、类、接口、异常,创建一个注解式的端点,将自己的写的类以及类中的一些方法用前面提到的包中的注解装饰,这里我提供了一个基本的websocket实现接口,提供了连接、关闭、接收消息、发送消息接口:
package com.easystudy.websocket;
import java.io.IOException;
import javax.websocket.Session;
import lombok.extern.slf4j.Slf4j;
/**
 * @欢迎加入群聊,一起分享,一起合作,一起进步
 * QQ交流群:961179337
 * 微信账号:lixiang6153
 * 微信公众号:IT技术快餐
 * 电子邮箱:lixx2048@163.com
 */
@Slf4j
public abstract class BaseWS {
    /**
     * 终端初次连接
     * @param userId  userId
     * @param session session
     * @throws IOException IOException
     */
    abstract void onOpen(Session session, Long userId) throws IOException;
    /**
     * 终端断开连接
     */
    abstract void onClose();
    /**
     * 终端传递参数
     * @param session session
     * @param message message
     */
    abstract void onMessage(String message, Session session);
    /**
     * 报错
     * @param session session
     * @param error   error
     */
    abstract void onError(Session session, Throwable error);
    /**
     * 向终端发送
     * @param message message
     * @throws IOException IOException
     */
    abstract void sendMessage(String message) throws IOException;
    void heartBeat(Long user, String signal, Session session) {
        if ("ping".equalsIgnoreCase(signal)) {
            try {
                log.info("heart beat=====> {},user:{}, sessionId:{}", signal, user, session.getId());
                session.getBasicRemote().sendText("pong");
                log.info("heart beat<====> {},user:{}, sessionId:{}", "pong", user, session.getId());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
websocket端点实现:
package com.easystudy.websocket;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
/**
 * @ServerEndpoint 该注解可以将类定义成一个WebSocket服务器端,
 * @OnOpen 表示有浏览器链接过来的时候被调用
 * @OnClose 表示浏览器发出关闭请求的时候被调用
 * @OnMessage 表示浏览器发消息的时候被调用
 * @OnError 表示报错了
 * @欢迎加入群聊,一起分享,一起合作,一起进步
 * QQ交流群:961179337
 * 微信账号:lixiang6153
 * 微信公众号:IT技术快餐
 * 电子邮箱:lixx2048@163.com
 */
@Component
@ServerEndpoint("/ws/msg/{userid}")
public class MessageEndPoint extends BaseWS {
    // concurrent包下线程安全的Set
    private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>();
    private Session session;
    @Override
    @OnOpen
    public void onOpen(Session session, @PathParam("userid") Long userid) {
        this.session = session;
        SESSIONS.add(this);
        System.out.println(String.format(userid + "成功建立连接~ 当前总连接数为:%s", SESSIONS.size()));
        System.out.println(this);
    }
    @Override
    @OnClose
    public void onClose() {
        SESSIONS.remove(this);
        System.out.println(String.format("成功关闭连接~ 当前总连接数为:%s", SESSIONS.size()));
    }
    @Override
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("收到客户端【" +session.getId()+ "】消息:" + message);
    }
    @Override
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }
    /**
     * 指定发消息
     * @param message
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 群发消息: 静态方法
     * @param message
     */
    public static void fanoutMessage(String message) {
        SESSIONS.forEach(ws -> ws.sendMessage(message));
    }
}我们监听的端点是
/ws/msg/{userid}
端点携带了一个参数userid,表示长连接的用户id,我们在对应的方法实现中通过注解引用对应参数即可。
我们使用J2EE7标准注解,必须注入相应的ServerEndpointExporter类:
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
最后,我们提供一个controller用于测试服务端往发送客户端信息:
package com.easystudy.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.easystudy.websocket.MessageEndPoint;
@RestController
@RequestMapping("/response")
public class TestController {
    @GetMapping("/send")
    public String reponseMsgToClient(@RequestParam(name="content", required = true)String content){
        System.out.println("发送消息:[" + content + "]给客户端!");
        MessageEndPoint.fanoutMessage(content);
        return "消息【" +content+ "】发送成功!";
    }
}
客户端实现
在实现websocket服务端之后,我们就需要实现websocket客户端,连接到服务器,接收服务端消息,实现代码如下所示:
websocket测试
WebSocket Demo
 服务器回复内容:
发送
启动浏览器,打开控制台,可以看到连接到服务器字样,输入内容,点击发送,服务端后台打印接收到客户端信息并广播到客户端,客户端控制台也打印了相同字样。
2、springboot实现
除了J2EE原始实现之外,使用springboot之后,功能就更强大了,它提供了一个核心的配置类WebSocketConfigurer用于注册各种websocket端点、拦截器、处理器信息。如下我们通过继承WebSocketConfigurer配置对应端点、处理器和拦截器:
package com.easystudy.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.easystudy.websocket.MsgWebSocketHandler;
import com.easystudy.websocket.MsgWebSocketInterceptor;
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 设置端点连接路径和处理器
        registry.addHandler(new MsgWebSocketHandler(), "/ws/msg/{userid}")
                .setAllowedOrigins("*")
                // 设置拦截器
                .addInterceptors(new MsgWebSocketInterceptor());
    }
}
我们配置了自己的处理器处理对应端点、配置了拦截器进行信息拦截。
我这里的拦截器主要拦截请求参数,限定请求参数必须携带用户名作为连接的唯一标识:
package com.easystudy.websocket;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
 * 自定义拦截器拦截WebSocket请求
 * @author Administrator
 * QQ交流群:961179337
 * 微信账号:lixiang6153
 * 微信公众号:IT技术快餐
 * 电子邮箱:lixx2048@163.com
 */
public class MsgWebSocketInterceptor implements HandshakeInterceptor{
    /**
     * 前置拦截一般用来注册用户信息,绑定 WebSocketSession
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Map attributes) throws Exception {
         System.out.println("前置拦截~~");
         if (!(request instanceof ServletServerHttpRequest))
             return true;
         // 获取用户名信息
         HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
         String path = servletRequest.getServletPath();
         System.out.println("path:" + path);
         String userName = servletRequest.getParameter("userName");
         //String userName = (String) servletRequest.getSession().getAttribute("userName");
         if (null == userName) {
             userName = "lixx";
         }
         // 保存属性到session属性信息中
         attributes.put("userName", userName);
         return true;
    }
    /**
     * 后置拦截器
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        System.out.println("后置拦截~~");
    }
}拦截器获取到对应属性之后存入到session的会话属性之中,连接之后可以通过session获取会话属性。
处理器实现:
package com.easystudy.websocket;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * QQ交流群:961179337
 * 微信账号:lixiang6153
 * 微信公众号:IT技术快餐
 * 电子邮箱:lixx2048@163.com
 */
public class MsgWebSocketHandler implements WebSocketHandler{
    private static final Map SESSIONS = new ConcurrentHashMap<>();
    /**
     * 建立新的 socket 连接后回调的方法
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userName = session.getAttributes().get("userName").toString();
        SESSIONS.put(userName, session);
        System.out.println(String.format("成功建立连接~ userName: %s", userName));
    }
    /**
     * 连接关闭时,回调的方法
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        System.out.println("连接已关闭,status:" + closeStatus);
    }
    /**
     * 接收客户端发送的 Socket
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
        String msg = message.getPayload().toString();
        System.out.println("接收到消息:" + msg);
    }
    /**
     * 连接出错时,回调的方法
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        System.out.println("连接出错");
        if (session.isOpen()) {
            session.close();
        }
    }
    /**
     * 这个是 WebSocketHandler是否处理部分消息,返回 false就完事了
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    /**
     * 指定发消息
     * @param userName
     * @param message
     */
    public static void sendMessage(String userName, String message) {
        WebSocketSession webSocketSession = SESSIONS.get(userName);
        if (webSocketSession == null || !webSocketSession.isOpen())
            return;
        try {
            webSocketSession.sendMessage(new TextMessage(message));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 群发消息
     * @param message
     */
    public static void fanoutMessage(String message) {
        SESSIONS.keySet().forEach(us -> sendMessage(us, message));
    }
}在建立连接之后,我们通过:session.getAttributes().get("userName").toString();获取到连接时候提供的用户参数,用于后续指定用户P2P发送信息。
客户端实现代码如下:
测试
WebSocket Demo
 服务器回复内容:
发送
最后测试发送信息如下:

3、socketJS实现
一些浏览器中缺少对WebSocket的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS协议的透明的回退选项。SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。
SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。除此之外,spring也对socketJS提供了支持。此处实现与springboot实现相似,这里不具体介绍,只给出对应代码,实现如下(后续提供的代码是实际项目上的代码,请各位保持修改个更新,谢谢!!!)。
端点、拦截器、通道等配置如下
package com.donwait.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/**
 * 同 HTTP 在 TCP 套接字上添加 请求-响应 模型层一样,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义;
 * (STOMP在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义)
 * STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧:
 * SEND
 * destination:/app/marco
 * content-length:20
 *
 * {\"message\":\"Marco!\"}
 *
 * 分析:
 * A1)SEND:STOMP命令,表明会发送一些内容;
 * A2)destination:头信息,用来表示消息发送到哪里;
 * A3)content-length:头信息,用来表示 负载内容的 大小;
 * A4)空行:
 * A5)帧内容(负载)内容:
 */
@Configuration
@EnableWebSocketMessageBroker       // 能够在 WebSocket 上启用 STOMP
public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {
    /*
     * 将 "/dys" 路径 注册为 STOMP 端点,即客户端在订阅或发布消息 到目的地址前,要连接该端点,
     * 就是说用户发送请求 url='/项目名/dys'与 STOMP server进行连接,之后再转发到订阅url
     * 端点的作用:客户端在订阅或发布消息 到目的地址前,要连接该端点
     * 备注:client连接地址和发送地址是不同的,以本例为例,前者是/项目名/dys, 后者是/项目名/app/XX,先连接后发送
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 在网页上我们就可以通过这个链接 /demon/websocket ==来和服务器的WebSocket连接
        // 连接:new SockJS("http://127.0.0.1:7019/websocket/dys");
        registry.addEndpoint("/dys")                                // 开启 /dys端点
                .setAllowedOrigins("*")                             // 允许跨域访问
                .setHandshakeHandler(new HandshakeHandler())        // 握手处理器
                .addInterceptors(new HandshakeInterceptor())        // 握手拦截器
                .withSockJS();                                      // 允许使用socketJs方式访问
    }
    /*
     * 消息传输参数配置
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(8192)                          // 设置消息字节数大小
                .setSendBufferSizeLimit(8192)                       // 设置消息缓存大小
                .setSendTimeLimit(10000);                           // 设置消息发送时间限制毫秒
    }
    /*
     * 输入通道参数设置
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(8)                 // 设置消息输入通道的线程池线程数
                    .maxPoolSize(16)                                // 最大线程数
                    .keepAliveSeconds(60);                          // 线程活动时间
        registration.interceptors(createUserInterceptor());         // 注入用户入站通道拦截器
    }
    /*
     * 输出通道参数设置
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(8)
                    .maxPoolSize(16);
    }
    /*
     * 配置broker:
     * 配置了一个 简单的消息代理。如果不重载,默认case下,会自动配置一个简单的内存消息代理,
     * 用来处理 "/topic"为前缀的消息。但经过重载后,消息代理将会处理前缀为 "/topic" and "/queue"消息
     * 分析:
     * (1)应用程序的目的地 以 "/app" 为前缀,而代理的目的地以 "/topic" 和 "/queue" 作为前缀
     * (2)以应用程序为目的地的消息将会直接路由到 带有 @MessageMapping注解的控制器方法中
     * (3)而发送到代理上的消息,包括 @MessageMapping注解方法的返回值所形成的消息,将会路由到代理上,并最终发送到订阅这些目的地客户端
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 代理的目的地址为topic或queque(代理目的地以 /topic为前缀)
        // 广播消息订阅:stompClient.subscribe('/topic/alarm', function (response)
        registry.enableSimpleBroker("/topic", "/queue");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来):应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.
        // 客户端发送端点前缀:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
        registry.setApplicationDestinationPrefixes("/app");
        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        // registry.setUserDestinationPrefix("/user/");
        /*
        // 启用了STOMP代理中继功能,并将其代理目的地前缀设置为 /topic and /queue,并将所有目的地前缀为 "/topic" or "/queue"的消息都会发送到STOMP代理中[真正消息代理activeMQ或RabbitMQ]
        registry.enableStompBrokerRelay("/topic", "/queue")                             // 设置可以订阅的地址,也就是服务器可以发送的地址
                .setRelayHost("192.168.12.18")
                .setRelayPort(5672)
                .setClientLogin("admin")
                .setClientPasscode("admin")
                .setSystemHeartbeatReceiveInterval(2000)                                // 设置心跳信息接收时间间隔
                .setSystemHeartbeatSendInterval(2000);                                  // 设置心跳信息发送时间间隔
        // 应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.
        registry.setApplicationDestinationPrefixes("/app");
        */
    }
    /**
     *
     * @Title: createUserInterceptor
     * @Description: 将客户端渠道拦截器加入spring ioc容器
     * @return
     */
    @Bean
    public UserInterceptor createUserInterceptor() {
        return new UserInterceptor();
    }
}握手拦截器配置:
package com.donwait.websocket;
import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map attributes) throws Exception {
        log.info("============握手前===========");
        /*
        // 解决The extension [x-webkit-deflate-frame] is not supported问题
        if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {
            request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate");
        }
        // 检查session的值是否存在
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpSession session = servletRequest.getServletRequest().getSession(false);
            String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID);
            //把session和accountId存放起来
            attributes.put(Constants.SESSIONID, session.getId());
            attributes.put(Constants.SKEY_ACCOUNT_ID, accountId);
        }
        */
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Exception ex) {
        log.info("============握手后===========");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}用户拦截器配置:
package com.donwait.websocket;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import com.donwait.amqp.RabbitMQ;
import com.donwait.model.RtmpInviteInfo;
import com.donwait.model.User;
import com.donwait.protobuf.RTMP_INVITE_PARAM;
import com.donwait.redis.RtmpInviteService;
/**
 * @ClassName: UserInterceptor
 * @Description: 客户端渠道拦截适配器
 */
@SuppressWarnings("deprecation")
public class UserInterceptor extends ChannelInterceptorAdapter {
    @Autowired
    private RtmpInviteService redisRtmpInviteService;
    @Autowired
    private RabbitMQ rabbitMQ;
    //@Autowired
    //private UserCacheService userCacheService;
    /**
     * 获取包含在stomp中的用户信息
     */
    @SuppressWarnings("rawtypes")
    @Override
    public Message> preSend(Message> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                Object name = ((Map) raw).get("name");
                if (name instanceof LinkedList) {
                    // 设置当前访问器的认证用户
                    accessor.setUser(new User(((LinkedList) name).get(0).toString()));
                }
            }
        }
        return message;
    }
    @Override
    public void postSend(Message> message, MessageChannel channel, boolean sent) {
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
        // ignore non-STOMP messages like heartbeat messages
        if(sha.getCommand() == null) {
            return;
        }
        //这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key
        //String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString();
        //String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString();
        //判断客户端的连接状态
        switch(sha.getCommand()) {
            case CONNECT:
                connect(sha);
                break;
            case CONNECTED:
                break;
            case DISCONNECT:
                disconnect(sha);
                break;
            default:
                break;
        }
    }
    // 连接成功
    private void connect(StompHeaderAccessor sha){
        System.out.println(" STOMP 连接成功:" + sha.getUser().getName());
    }
    // 断开连接
    private void disconnect(StompHeaderAccessor sha){
        System.out.println(" STOMP 连接断开" + sha.getUser().getName());
        // 移除用户信息
        //userCacheService.delete(sha.getUser().getName());
        String strKey = String.format("rtmp_invite_info::%s_*", sha.getUser().getName());
        List invite_list = redisRtmpInviteService.findByKeyEx(strKey);
        if (invite_list != null) {
            for(RtmpInviteInfo rtmpInviteInfo : invite_list){
                // 通知接入服务器
                RTMP_INVITE_PARAM.Builder builder = RTMP_INVITE_PARAM.newBuilder();
                builder.setRtmpIP(rtmpInviteInfo.getRtmpIp());
                builder.setRtmpPort(rtmpInviteInfo.getRtmpPort());
                builder.setDevID(rtmpInviteInfo.getDevId());
                builder.setProtocolType(rtmpInviteInfo.getProtoType());
                builder.setStreamType(rtmpInviteInfo.getStreamType());
                rabbitMQ.send(rtmpInviteInfo.getExchangeName(), rtmpInviteInfo.getRouteKey(), builder.build().toByteArray());
                strKey = String.format("%s::%s_%s_%d_%d_%d", rtmpInviteInfo.getCacheName(), sha.getUser().getName(), rtmpInviteInfo.getDevId(), rtmpInviteInfo.getChannelNum().longValue(), rtmpInviteInfo.getProtoType().longValue(), rtmpInviteInfo.getStreamType().longValue());
                redisRtmpInviteService.deleteByKey(strKey);
            }
        }
    }
}处理器代码:
package com.donwait.websocket;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HandshakeHandler extends DefaultHandshakeHandler{
    public HandshakeHandler(){
        log.debug("new HandshakeHandler");
    }
}
配置完成之后,需要封装一个消息服务实现点对点和广播形式发送:
package com.donwait.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Service;
/**
 * websocket广播推送服务
 * @author Administrator
 *
 */
@Service
public class MessageService {
    @Autowired
    SimpMessageSendingOperations sendOperation;         // 消息发送模板
    @Autowired
    private SimpUserRegistry userRegistry;              // 用户列表【连接的客户端信息】
    /**
     * 广播形式发送报警信息
     * @param
     */
    public void broadcast(String destination,String message) {
        sendOperation.convertAndSend(destination, message);
        System.out.println("路由:"+ destination + "   推送消息:" + message);
    }
    /**
     * 单独发送信息给某用户
     * 客户端发起连接时候必须携带用户名参数
     * stompClient.connect(
     * {
     *      name: 'lixx' // 携带客户端信息
     * }
     * @param
     */
    public void send(String destination,String username, String message) {
        for (SimpUser user : userRegistry.getUsers()) {
            if (user.getName().equals(username)){
                sendOperation.convertAndSendToUser(username, destination, message);
                System.out.println("路由:"+ destination + "   推送消息:" + message);
                break;
            }
        }
    }
}
最后,送上测试的html客户端页面:
stomp
        Welcome
发送消息
订阅用户消息/user/queue/message
订阅报警消息/topic/alarm
至此,websocket的具体介绍与实例都已送上,如果需要源码或者技术交流或者合作请联系一下方式
源码获取、合作、技术交流请获取如下联系方式:
QQ交流群:961179337

微信账号:lixiang6153
公众号:IT技术快餐
电子邮箱:lixx2048@163.com