Stomp协议 关键点(持续更新)
监听
如果需要添加监听,我们的监听类需要实现 ChannelInterceptor 接口,在 springframework 包 5.0.7 之前这一步我们一般是实现 ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。
https://www.jianshu.com/p/4762494d42f1
https://www.jianshu.com/p/9103c9c7e128
https://spring.io/guides/gs/messaging-stomp-websocket/
1package org.springframework.messaging.support;
2
3import org.springframework.messaging.Message;
4import org.springframework.messaging.MessageChannel;
5
6public interface ChannelInterceptor {
7 // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
8 Message<?> preSend(Message<?> var1, MessageChannel var2);
9
10 // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
11 void postSend(Message<?> var1, MessageChannel var2, boolean var3);
12
13 /*
14 * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
15 * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
16 */
17 void afterSendCompletion(Message<?> var1, MessageChannel var2, boolean var3, Exception var4);
18
19 /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
20 * 2. 在websocket的场景中用不到
21 */
22 boolean preReceive(MessageChannel var1);
23
24 /*
25 * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
26 * 2. 适用于PollableChannels,轮询场景
27 */
28 Message<?> postReceive(Message<?> var1, MessageChannel var2);
29
30 /*
31 * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
32 * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
33 * 2. 适用于PollableChannels,轮询场景
34 */
35 void afterReceiveCompletion(Message<?> var1, MessageChannel var2, Exception var3);
36}
1package com.wzh.demo.websocket.interceptor;
2
3import com.wzh.demo.domain.WebSocketUserAuthentication;
4import org.apache.log4j.Logger;
5import org.springframework.messaging.Message;
6import org.springframework.messaging.MessageChannel;
7import org.springframework.messaging.simp.stomp.StompCommand;
8import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
9import org.springframework.messaging.support.ChannelInterceptor;
10import org.springframework.messaging.support.MessageHeaderAccessor;
11
12import javax.servlet.http.HttpSession;
13
14import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;
15
16/**
17 * <websocke消息监听,用于监听websocket用户连接情况>
18 * <功能详细描述>
19 * @author wzh
20 * @version 2018-08-25 23:39
21 * @see [相关类/方法] (可选)
22 **/
23public class WebSocketChannelInterceptor implements ChannelInterceptor {
24
25 public WebSocketChannelInterceptor() {
26 }
27
28 Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);
29
30 // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
31 @Override
32 public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
33
34 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
35 /**
36 * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
37 * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
38 */
39 if(StompCommand.CONNECT.equals(accessor.getCommand()))
40 {
41 /*
42 * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
43 * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
44 * 3. header参数的key可以一样,取出来就是list
45 * 4. 样例代码header中只有一个token,所以直接取0位
46 */
47 String token = accessor.getNativeHeader("token").get(0);
48
49 /*
50 * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
51 * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
52 * 就可以通过方法提供的 getUser()方法获取到这里封装user对象
53 * 2. 例如可以在这里拿到前端的信息进行登录鉴权
54 */
55 WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();
56
57 System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);
58
59 }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
60 {
61
62 }
63 return message;
64 }
65
66 // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
67 @Override
68 public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {
69
70 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
71
72 /*
73 * 拿到消息头对象后,我们可以做一系列业务操作
74 * 1. 通过getSessionAttributes()方法获取到websocketSession,
75 * 就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
76 * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
77 */
78 HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");
79
80 // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
81 log.info("postSend 中获取httpSession key:" + httpSession.getId());
82
83 // 忽略心跳消息等非STOMP消息
84 if(accessor.getCommand() == null)
85 {
86 return;
87 }
88
89 // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
90 System.out.println(accessor.getCommand());
91 switch (accessor.getCommand())
92 {
93 // 首次连接
94 case CONNECT:
95 log.info("httpSession key:" + httpSession.getId() + " 首次连接");
96 break;
97 // 连接中
98 case CONNECTED:
99 break;
100 // 下线
101 case DISCONNECT:
102 log.info("httpSession key:" + httpSession.getId() + " 下线");
103 break;
104 default:
105 break;
106 }
107
108
109 }
110
111 /*
112 * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
113 * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
114 */
115 @Override
116 public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {
117
118 }
119
120 /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
121 * 2. 在websocket的场景中用不到
122 */
123 @Override
124 public boolean preReceive(MessageChannel messageChannel) {
125 return true;
126 }
127
128 /*
129 * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
130 * 2. 适用于PollableChannels,轮询场景
131 */
132 @Override
133 public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
134 return message;
135 }
136
137 /*
138 * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
139 * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
140 * 2. 适用于PollableChannels,轮询场景
141 */
142 @Override
143 public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {
144
145 }
146}
147
148
订阅完成后推送消息
需要创建 Controller
1@RestController
2public class WebSocketController {
3
4 @Autowired
5 private BaseSocketService baseSocketService;
6
7 @SubscribeMapping({"订阅地址"})
8 public void subscribe(MsgPrincipal msgPrincipal){
9 baseSocketService.connectSuccessPush(msgPrincipal);
10 }
11}