Spring Boot·Stomp协议

CY 2019年04月12日 2,537次浏览

项目中要实现一个实时数据和用户聊天功能,想了很久觉得还是用WebSocket比较合适,查了一下Spring官网,无意间看到了一篇文章,了解到了Stomp协议,看了很多的文章了解了一下Stomp协议,简单记录一下我对Stomp协议的理解和如何使用Spring封装好的Stomp来完成一些功能。

【参考链接】Stomp的官网

【参考链接】WebSocket全系列教程

简单介绍

WebSocket只是定义了文字字节两种形式的消息格式,传输过程中并没有像HTTP协议那样丰富的内容和规范,StompWebSocket实现了一种规范,也就相当于WebSocket的子协议,很多的消息中间件都是用了Stomp协议,Stomp协议的构成就不说了,网上到处都有在讲,大概的形式类似于下面的样子:

COMMAND
header1:value1
header2:value2
Body^@

其中的COMMAND包含 CONNECTSENDSUBSCRIBEUNSUBSCRIBEBEGINCOMMITABORTACKNACKDISCONNECT

服务器端可以接收和广播消息,客户端可以订阅和发送消息

接收:服务端行为,可以接收由客户端发过来的消息

广播:服务端行为,可以将消息广播到已经订阅该消息的用户

订阅:客户端行为,订阅服务器的消息,如果服务器有消息发送到这个订阅,客户端就会收到消息

发送:客户端行为,将消息发送给服务器

基本使用

使用方法

Spring在服务端实现了Stomp协议,使用注解就可以用到Stomp协议了,和SpringMVC进行无缝结合,甚至可以使用HTTP请求去发送WebSocket信息。

例如下面的样子就可以使用基于Stomp协议的WebSocket

前提是导入了spring-boot-starter-websocket

@MessageMapping("/send")
@SendTo("/notice/message")
public ChatMessage send(ChatMessage message) throws Exception {
    return message;
}

@MessageMapping的意思和@RequestMapping非常类似,客户端需要往这个路径里面发送消息

@SendTo是服务器发送给客户端消息使用的路径,客户端需要订阅这个路径

自定义方法中可以包含的参数

Message这其中包含了payloadheaderspayload用来存储具体的消息内容,headers用来存储头信息

Principal存放用户名,这个用户名需要用户手动设置

SimpMessageHeaderAccessor 这个类用来封装Stomp协议的头信息

@Payload 消息体内容,使用方法例如:@Payload Map<String, Object> message

@Header 可以用来获取某个headerkey

@Headers 用来获取全部的Header@Headers Map<String, List<String>> header

@DestinationVariable类似SpringMvc中的@PathVariable

除了上面的参数以外,可以放用户自定义的任意参数,Spring会帮用户初始化这些参数对象


注册Stomp协议到Spring

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        /*
         * 客户端订阅来自 "/notice" 和 "/chat" 为前缀的消息
         * 在Controller中,可通过@SendTo注解指明发送目标,就可以发送信息到已经订阅的用户了
         * 客户端只可以订阅以这两个字符串为前缀的主题
         */
        config.enableSimpleBroker("/notice", "/chat");

        /*
         * 客户端发送过来的消息,需要以"/socket"为前缀
         * 经过Broker转发给相应的Controller. @MessageMapping中的地址
         * 客户端写成/socket + @MessageMapping中的地址
         */
        config.setApplicationDestinationPrefixes("/socket");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        /*
         * 路径"/socketConnect"被注册为STOMP端点并对外暴露
         * 客户端通过该路径接入WebSocket服务
         * 可以通过setAllowedOrigins("*")来设置跨域
         */
        registry.addEndpoint("/socketConnect").withSockJS();
    }
}

上面的路径一旦配置,就意味着@SendTo中的路径一定要写成/notice/xxx或者/chat/xxx

客户端订阅的时候直接写@SendTo中的地址,客户端发送要使用/socket + @MessageMapping中的地址

客户端连接WebSocket就要使用/socketConnect连接

客户端的写法

// 连接
var socket = new SockJS('/socketConnect');
stompClient = Stomp.over(socket);
stompClient.connect({
    // 头信息...
}, function (frame) {
    setConnected(true);
    console.log('Connected: ' + frame);
    // 订阅
    stompClient.subscribe(`/notice/message`, function (greeting) {
        showGreeting(JSON.parse(greeting.body).content);
    });
});

// 省略一点代码...

// 发送
stompClient.send(
    `/socket/send`, // 地址
    {...}, // 头信息
    JSON.stringify({'content': $("#name").val()}) // 发送内容
);

前提是客户端已经引入了SockJSStompJS

拦截器的使用

写一个拦截器只需要在配置类中添加以下的代码:

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            // 获取头信息
            StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
            // 返回空就代表取消客户端的访问
            if (accessor == null) return null;
            // 判断指令是否为CONNECT,意味着首次连接
            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                // 从头中获取信息
                String password = accessor.getFirstNativeHeader("password");
                if (...) { // 进行判断
                    accessor.setUser((UserPrincipal) () -> accessor.getFirstNativeHeader("username")); // 放用户名,以便Controller层取出
                    return message;
                }
                return null;
            }
            // 不是首次连接就直接通过
            return message;
        }
    });
}

可以在Controller层方法的参数中直接注入SimpMessageHeaderAccessor对象,就可以通过getUser()方法取出上面放进去的用户名了。

或者注入Principal对象也是可以的,通过getName()取出用户名。

实现一对一的消息发送

这个方法就很多了,不用Stomp协议的实现就不说了,SimpMessagingTemplate类可以发送一对多也可以发送一对一,其中的convertAndSendToUser()方法就可以实现一对一的发送

第一个参数是对方的名字,当然这个名字必须在线,也就是对方在服务器上有Principal对象

第二个参数就是订阅地址,客户端在订阅这个地址的时候一定要加上前缀/user,如果需要自定义这个前缀,可以在配置类中使用下面的方式来自定义前缀

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    
    // ...

    /*
     * 一对一发送的前缀
     * 订阅主题:/custom + 后端convertAndSendToUser()方法中的订阅地址
     * 推送方式:messagingTemplate.convertAndSendToUser(destUsername, "/notice/message", message);
     */
    config.setUserDestinationPrefix("/custom");
}
  • 客户端发送消息的时候使用的地址

    setApplicationDestinationPrefixes()方法配置好的地址 + @MessageMapping()中配置的地址

  • 客户端订阅的时候使用的地址

    setUserDestinationPrefix()方法配置的地址 + convertAndSendToUser()方法中的地址

使用上面的方式来实现点对点发送信息的时候,必须要有拦截器来拦截用户以获取该用户的用户名,并且使用方法setUser()将用户名设置上去,不然不起作用,或者可以使用安全框架,让每个用户必须有一个不为空的Principal对象即可

一对多消息的实现

使用SimpMessagingTemplate类中的convertAndSend()方法,方法中有两个参数:

参数一:客户端可以订阅的路径

参数二:要发送的消息

这样的话,客户端就可以直接订阅参数一中规定的路径了

使用SimpMessagingTemplate对象甚至可以在方法上指定@RequestMapping来实现HTTP形式的访问通知。

@SendToUser注解

@SendToUser注解的用途就是用来模拟请求相应的,意思就是客户端发送一个消息,服务端对这个消息进行处理之后再返回给客户端,和请求响应不同的是,服务端可以将这个消息发送给同一个账号的多处登录(同一个账号的不同Session),也可以使用@SendToUser注解中的broadcast = false来禁止发送给多个Session

如果使用@SendToUser注解的话,客户端的订阅地址为:

setUserDestinationPrefix()方法的参数 + @SendToUser注解中的地址

注意@SendToUser中地址的前缀依然要满足enableSimpleBroker()方法中定义的前缀

自己感觉可以用这个功能做服务器到客户端的连续推送,也可以用这个功能来做另一端登录这一端被挤下线

客户端地址的所有写法

  • 客户端连接WebSocket

    addEndpoint()方法中规定的地址

  • 客户端发送消息的地址

    setApplicationDestinationPrefixes()方法中的地址 + @MessageMapping()中的地址

  • 客户端订阅@SendTo()地址

    直接写sendTo中的地址,sendTo中的地址前缀必须是enableSimpleBroker()方法中配置的

  • 客户端订阅convertAndSendToUser()方法发送的消息的地址

    setUserDestinationPrefix()方法配置的地址 + convertAndSendToUser()方法中的地址

  • 客户端订阅convertAndSend()方法发送的消息的地址

    直接使用convertAndSend()中的地址,convertAndSend()中的地址前缀必须是enableSimpleBroker()方法中配置的

  • 如果不使用@SendTo()注解,也不使用SimpMessagingTemplate的时候使用的地址

    /topic + @MessageMapping()中的地址,前提是需要在enableSimpleBroker()方法中配置/topic

原生WebSocket实现功能总结

很多的时候不会使用到Stomp协议,所以就需要使用原生的WebSocket了,原生的WebSocket需要用户自己在发送的消息里面定义用户标识,比如说用户的ID,然后后台使用用户的IDkey,用户的Session对象做值,就可以存储当前的用户,其他用户也一样,如果其他用户要给所有的用户发送消息的话,可以循环取出Map中的所有的Session,然后使用session.sendMessage()方法来发送消息,如果只需要给一个用户发消息的话,首先需要知道这个用户的userId,然后后台就可以通过这个userId取出对应的Session发送消息就可以了。

关于SpringBoot使用原生的WebSocket相关的配置,这里不做记录,可以参照网上的很多教程

【参考链接】WebSocket的故事(五)—— Springboot中,实现网页聊天室之自定义消息代理

【参考连接】WebSocket的故事(六)—— Springboot中,实现更灵活的WebSocket