当前位置:   article > 正文

实现Websocket集群及通信的第二种方式(含拦截器)_websockethandler

websockethandler

一、第一种方式的缺点

        为了防止恶意占用网络连接资源,需要在websockt连接加入拦截器,但是在查找了大量网络资源后,根据注解@ServerEndpoint进行websocket连接的方式进行拦截我没有找到,其中有一篇博文是在@ServerEndPoint中加入自定义的配置器。

附:文章出处

去实现ServerEndpointConfig.Configurator内部类中的modifyHandShake方法进行拦截,我尝试了一下后,没有第二种方式简单,而且第二种方式具有通用性,较第一种方式要好一点,可以根据自己的情况进行选择。

二、第二种方式实现Websocket集群及通信

        集群只需要加入SpringCloud依赖加入注册中心,再使用网关进行同一转发、负载均衡即可搭建集群,同上一篇博文一致,在此篇不做展示。        

效果展示:

        如果说第一种方式是一个websocket连接一个解决方案,那么第二种方式就是websocket集体注册,共享解决方案。具体代码如下

1、目录结构

  

2、pom依赖

  1. <parent>
  2. <artifactId>spring-boot-starter-parent</artifactId>
  3. <groupId>org.springframework.boot</groupId>
  4. <version>2.1.6.RELEASE</version>
  5. </parent>
  6. <properties>
  7. <maven.compiler.source>8</maven.compiler.source>
  8. <maven.compiler.target>8</maven.compiler.target>
  9. </properties>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-context</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework</groupId>
  17. <artifactId>spring-beans</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-web</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework</groupId>
  25. <artifactId>spring-aspects</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-redis</artifactId>
  30. <version>1.4.1.RELEASE</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.cloud</groupId>
  34. <artifactId>spring-cloud-starter</artifactId>
  35. <version>2.1.0.RELEASE</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-websocket</artifactId>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.projectlombok</groupId>
  43. <artifactId>lombok</artifactId>
  44. </dependency>
  45. <dependency>
  46. <groupId>com.alibaba</groupId>
  47. <artifactId>fastjson</artifactId>
  48. <version>1.2.9</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.springframework.boot</groupId>
  52. <artifactId>spring-boot-starter-amqp</artifactId>
  53. </dependency>
  54. </dependencies>

3、代码根据结构从上至下

①SpringWebSocketConfig

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.servlet.config.annotation.EnableWebMvc;
  4. import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
  5. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  6. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  7. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  8. import org.springframework.web.socket.handler.TextWebSocketHandler;
  9. @Configuration
  10. @EnableWebMvc
  11. @EnableWebSocket
  12. public class SpringWebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
  13. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  14. //.setAllowedOrigins("*") 允许跨域访问
  15. registry.addHandler(webSocketHandler(),"/webSocket").addInterceptors(new SpringWebSocketHandlerInterceptor()).setAllowedOrigins("*");
  16. registry.addHandler(webSocketHandler(), "/sockjs/socketServer.do").addInterceptors(new SpringWebSocketHandlerInterceptor()).setAllowedOrigins("*");
  17. }
  18. @Bean
  19. public TextWebSocketHandler webSocketHandler(){
  20. return new SpringWebSocketHandler();
  21. }
  22. }

②Controller层没有实际意义,不写也可以

③SpringWebSocketHandler

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.socket.CloseStatus;
  7. import org.springframework.web.socket.TextMessage;
  8. import org.springframework.web.socket.WebSocketMessage;
  9. import org.springframework.web.socket.WebSocketSession;
  10. import org.springframework.web.socket.handler.TextWebSocketHandler;
  11. import java.io.IOException;
  12. import java.util.Map;
  13. import java.util.Set;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. /**
  17. * @author secondj
  18. * @Date 2021/11/15 10:26
  19. */
  20. @Component
  21. public class SpringWebSocketHandler extends TextWebSocketHandler {
  22. @Autowired
  23. FanoutSender fanoutSender;
  24. private static final AtomicInteger ati = new AtomicInteger();
  25. public static final ConcurrentHashMap<String,WebSocketSession> map = new ConcurrentHashMap<>();
  26. private static Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);
  27. public SpringWebSocketHandler() {
  28. // TODO Auto-generated constructor stub
  29. }
  30. /**
  31. * 连接成功后给前端发送的消息,会触发页面上onopen方法
  32. */
  33. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  34. // TODO Auto-generated method stub
  35. Object sid = session.getAttributes().get("tdt_sid");
  36. map.put(sid.toString(),session);
  37. int num = ati.incrementAndGet();
  38. logger.info("connect to the websocket success......当前数量:{}",num);
  39. //这块会实现自己业务,比如,当用户登录后,会把离线消息推送给用户
  40. TextMessage returnMessage = new TextMessage("连接成功");
  41. session.sendMessage(returnMessage);
  42. }
  43. /**
  44. * 关闭连接时触发
  45. * 调用session.close也会触发
  46. */
  47. public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
  48. logger.debug("websocket connection closed......");
  49. String sid= (String) session.getAttributes().get("tdt_sid");
  50. logger.info("用户"+sid+"已退出!");
  51. if(map.containsKey(sid)){
  52. map.remove(sid);
  53. }
  54. int num = ati.decrementAndGet();
  55. logger.info("剩余在线用户:{}"+num);
  56. }
  57. /**
  58. * js调用websocket.send时候,会调用该方法
  59. */
  60. @Override
  61. protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  62. WebSocketMessage message2self = new TextMessage("该用户没有上线".getBytes());
  63. String payload = message.getPayload();
  64. WebSocketVO webSocketVO = JSONObject.parseObject(payload, WebSocketVO.class);
  65. String toSid = webSocketVO.getToUserId();
  66. if(map.containsKey(toSid)){
  67. map.get(toSid).sendMessage(message);
  68. }else {
  69. ConcurrentHashMap<String,String> sendMap = new ConcurrentHashMap();
  70. sendMap.put(toSid,message.getPayload());
  71. logger.info("getPayLoad():{}",message.getPayload());
  72. logger.info("messge:{}",message.toString());
  73. logger.info("map数据:{}",sendMap.toString());
  74. fanoutSender.sendMessage(sendMap.toString());
  75. }
  76. }
  77. public void sendMessage(TextMessage message) throws Exception {
  78. WebSocketMessage message2self = new TextMessage("该用户没有上线".getBytes());
  79. String payload = message.getPayload();
  80. WebSocketVO webSocketVO = JSONObject.parseObject(payload, WebSocketVO.class);
  81. String toSid = webSocketVO.getToUserId();
  82. if(map.containsKey(toSid)){
  83. map.get(toSid).sendMessage(message);
  84. }
  85. }
  86. /**
  87. * 代理发生异常时执行
  88. */
  89. public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
  90. if(session.isOpen()){session.close();}
  91. logger.debug("websocket connection closed......");
  92. Map<String, Object> attributes = session.getAttributes();
  93. Object sid = attributes.get("sid");
  94. map.remove(sid);
  95. WebSocketMessage webSocketMessage = new TextMessage("发生异常错误".getBytes());
  96. session.sendMessage(webSocketMessage);
  97. }
  98. public boolean supportsPartialMessages() {
  99. return false;
  100. }
  101. /**
  102. * 给某个用户发送消息
  103. *
  104. * @param sid
  105. * @param message
  106. */
  107. public void sendMessageToUser(String sid, TextMessage message) {
  108. Set<Map.Entry<String, WebSocketSession>> entries = map.entrySet();
  109. for (Map.Entry<String, WebSocketSession> entry : entries) {
  110. if (entry.getValue().getAttributes().get("tdt_sid").equals(sid)) {
  111. try {
  112. if (entry.getValue().isOpen()) {
  113. entry.getValue().sendMessage(message);
  114. }
  115. } catch (IOException e) {
  116. e.printStackTrace();
  117. }
  118. break;
  119. }
  120. }
  121. }
  122. public void sendMessageToUser(String sid, String message) {
  123. TextMessage messages = new TextMessage(message.getBytes());
  124. this.sendMessageToUser(sid,messages);
  125. }
  126. /**
  127. * 给所有在线用户发送消息
  128. *
  129. * @param message
  130. */
  131. public void sendMessageToUsers(TextMessage message) throws IOException {
  132. Set<Map.Entry<String, WebSocketSession>> entries = map.entrySet();
  133. for (Map.Entry<String, WebSocketSession> entry : entries) {
  134. WebSocketSession session = entry.getValue();
  135. if(session.isOpen()){
  136. session.sendMessage(message);
  137. }
  138. }
  139. }
  140. }

④SpringWebSocketHandlerInterceptor

  1. import org.springframework.http.server.ServerHttpRequest;
  2. import org.springframework.http.server.ServerHttpResponse;
  3. import org.springframework.http.server.ServletServerHttpRequest;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.socket.WebSocketHandler;
  6. import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
  7. import javax.servlet.http.HttpSession;
  8. import java.util.Map;
  9. /**
  10. * @author secondj
  11. * @Date 2021/11/11 15:52
  12. */
  13. @Component
  14. public class SpringWebSocketHandlerInterceptor extends HttpSessionHandshakeInterceptor {
  15. @Override
  16. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
  17. Map<String, Object> attributes) throws Exception {
  18. //TODO 获取url传递的参数,通过attributes在Interceptor处理结束后传递给WebSocketHandler
  19. //TODO WebSocketHandler可以通过WebSocketSession的getAttributes()方法获取参数
  20. //设置session值这里根据自己需求设置即可
  21. if (request instanceof ServletServerHttpRequest) {
  22. ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
  23. HttpSession session = servletRequest.getServletRequest().getSession(true);
  24. String sid = servletRequest.getServletRequest().getParameter("userId");
  25. String token = servletRequest.getServletRequest().getParameter("token");
  26. if(!token.equals("EXAM_PERMISSION")){
  27. return false;
  28. }
  29. if (session != null) {
  30. String userName = (String) session.getAttribute("exam_sid");
  31. if (userName == null) {
  32. userName = sid;
  33. }
  34. attributes.put("exam_sid",userName);
  35. return true;
  36. }
  37. }
  38. return false;
  39. }
  40. @Override
  41. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
  42. Exception ex) {
  43. // TODO Auto-generated method stub
  44. super.afterHandshake(request, response, wsHandler, ex);
  45. }
  46. }

⑤RetryCache该类是用来缓存队列消息和消息重发的,属于消息安全

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * @author secondj
  11. * @Date 2021/11/15 16:28
  12. */
  13. @Slf4j
  14. @Component
  15. public class RetryCache {
  16. private SendMessage sendMessage;
  17. private boolean stop = false;
  18. private Map<String,MessageWithTime> map = new ConcurrentHashMap<>();
  19. private AtomicInteger id = new AtomicInteger();
  20. @Data
  21. @AllArgsConstructor
  22. @NoArgsConstructor
  23. private static class MessageWithTime{
  24. long time;
  25. Object message;
  26. }
  27. public void sender(SendMessage sendMessage){
  28. this.sendMessage = sendMessage;
  29. startRetry();
  30. }
  31. public String generaterId(){
  32. return ""+id.incrementAndGet();
  33. }
  34. public void add(String id,Object message){
  35. map.put(id,new MessageWithTime(System.currentTimeMillis(),message));
  36. }
  37. public void del(String id){
  38. map.remove(id);
  39. }
  40. //多线程发送消息
  41. private void startRetry(){
  42. new Thread(()->{
  43. while(!stop){
  44. try {
  45. Thread.sleep(System.currentTimeMillis());
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. long now = System.currentTimeMillis();
  50. for (String key : map.keySet()) {
  51. MessageWithTime messageWithTime = map.get(key);
  52. if(null != messageWithTime){
  53. if(messageWithTime.getTime()+ 3 * Constant.VALID_TIME < now){
  54. log.info("send message failed after 3 min " + messageWithTime);
  55. del(key);
  56. }else if (messageWithTime.getTime() + Constant.VALID_TIME< now) {
  57. DetailResult detailRes = sendMessage.send(messageWithTime.getMessage());
  58. if (detailRes.isSuccess()) {
  59. del(key);
  60. }
  61. }
  62. }
  63. }
  64. }
  65. }).start();
  66. }
  67. }

⑥RabbitMQConfig

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. /**
  12. * @author secondj
  13. * @Date 2021/11/15 16:20
  14. */
  15. @Configuration
  16. @Slf4j
  17. public class RabbitMQConfig {
  18. @Value("${spring.rabbitmq.addresses}")
  19. public String addresses;
  20. @Value("${spring.rabbitmq.port}")
  21. public String port;
  22. @Value("${spring.rabbitmq.username}")
  23. private String username;
  24. @Value("${spring.rabbitmq.password}")
  25. private String password;
  26. @Value("${spring.rabbitmq.virtual-host}")
  27. private String virtualHost;
  28. @Value("${spring.rabbitmq.publisher-confirms}")
  29. private boolean publisherConfirms;
  30. @Value("${tdt.queue}")
  31. public String queue;
  32. @Value("${tdt.exchange}")
  33. public String exchange;
  34. @Autowired
  35. RetryCache retryCache;
  36. /**
  37. * 创建连接工厂
  38. * @return
  39. */
  40. @Bean
  41. public ConnectionFactory connectionFactory(){
  42. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  43. connectionFactory.setHost("127.0.0.1");
  44. connectionFactory.setPort(Integer.valueOf(port));
  45. connectionFactory.setUsername(username);
  46. connectionFactory.setPassword(password);
  47. connectionFactory.setVirtualHost(virtualHost);
  48. connectionFactory.setPublisherConfirms(publisherConfirms);
  49. return connectionFactory;
  50. }
  51. @Bean
  52. public Queue queueTdt(){
  53. log.info("创建队列成功:{}",queue);
  54. return new Queue(queue);
  55. }
  56. @Bean
  57. public FanoutExchange fanoutExchangeTdt(){
  58. log.info("创建交换机成功:{}",exchange);
  59. return new FanoutExchange(exchange);
  60. }
  61. @Bean
  62. public Binding bindingTdt(){
  63. Binding bind = BindingBuilder.bind(queueTdt()).to(fanoutExchangeTdt());
  64. log.info("交换机队列绑定成功");
  65. return bind;
  66. }
  67. @Bean
  68. public RabbitTemplate rabbitTemplate(){
  69. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
  70. //TODO 失败通知
  71. rabbitTemplate.setMandatory(true);
  72. //TODO 失败回调
  73. rabbitTemplate.setReturnCallback(returnCallback());
  74. //TODO 发送方确认
  75. rabbitTemplate.setConfirmCallback(confirmCallback());
  76. return rabbitTemplate;
  77. }
  78. //===============发送方确认===============
  79. public RabbitTemplate.ConfirmCallback confirmCallback(){
  80. return new RabbitTemplate.ConfirmCallback(){
  81. @Override
  82. public void confirm(CorrelationData correlationData,
  83. boolean ack, String cause) {
  84. if (ack) {
  85. log.info("发送者确认发送给mq成功");
  86. } else {
  87. //处理失败的消息
  88. log.info("发送者发送给mq失败,考虑重发:"+cause);
  89. }
  90. }
  91. };
  92. }
  93. //===============失败通知===============
  94. public RabbitTemplate.ReturnCallback returnCallback(){
  95. return new RabbitTemplate.ReturnCallback(){
  96. @Override
  97. public void returnedMessage(Message message,
  98. int replyCode,
  99. String replyText,
  100. String exchange,
  101. String routingKey) {
  102. log.info("无法路由的消息,需要考虑另外处理。");
  103. log.info("Returned replyText:"+replyText);
  104. log.info("Returned exchange:"+exchange);
  105. log.info("Returned routingKey:"+routingKey);
  106. String msgJson = new String(message.getBody());
  107. log.info("Returned Message:"+msgJson);
  108. }
  109. };
  110. }
  111. }

⑦Constant

  1. /**
  2. * @author secondj
  3. * @Date 2021/11/15 16:31
  4. */
  5. public class Constant {
  6. public static final long VALID_TIME = 3600l;
  7. }

⑧ReceiveMessage和SendMessage

  1. /**
  2. * @author secondj
  3. * @Date 2021/11/15 16:45
  4. */
  5. public interface ReceiveMessage {
  6. DetailResult receive(Object obj);
  7. }
  1. /**
  2. * @author secondj
  3. * @Date 2021/11/15 16:29
  4. */
  5. public interface SendMessage {
  6. DetailResult send(Object obj);
  7. }

⑨FanoutReceiver

  1. import com.alibaba.fastjson.JSONException;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.amqp.support.AmqpHeaders;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.messaging.handler.annotation.Header;
  12. import org.springframework.stereotype.Component;
  13. import java.io.IOException;
  14. import java.util.Map;
  15. import java.util.Set;
  16. /**
  17. * @author secondj
  18. * @Date 2021/11/15 16:22
  19. */
  20. @Component
  21. @Slf4j
  22. public class FanoutReceiver {
  23. private static Logger logger = LoggerFactory.getLogger(FanoutReceiver.class);
  24. @Autowired
  25. SpringWebSocketHandler handler;
  26. @RabbitHandler
  27. @RabbitListener(queues = "queue_mqsocket")//动态绑定
  28. public void receiveMessage(String jsonObject, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  29. //返回字符串
  30. try{
  31. log.info("队列接收到消息:{}",jsonObject);
  32. jsonObject = jsonObject.replace("=",":");
  33. Map<String,String> mapstr = JSONObject.parseObject(jsonObject, Map.class);
  34. Set<Map.Entry<String, String>> entries = mapstr.entrySet();
  35. for (Map.Entry<String, String> entry : entries) {
  36. Object obj = entry.getKey();
  37. String sid = obj.toString();
  38. Object objs = entry.getValue();
  39. String message = objs.toString();
  40. if(SpringWebSocketHandler.map.containsKey(sid)){
  41. handler.sendMessageToUser(sid,message);
  42. }
  43. }
  44. }catch (JSONException e){
  45. e.printStackTrace();
  46. return;
  47. }
  48. try {
  49. channel.basicAck(tag,false);
  50. } catch (IOException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }

⑩DetailResult

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. /**
  5. * @author secondj
  6. * @Date 2021/11/15 16:30
  7. */
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class DetailResult {
  12. private boolean flag;
  13. public Object message;
  14. public boolean isSuccess() {
  15. return flag == true;
  16. }
  17. }

⑩①FanoutSender

  1. import cn.tdt.rabbitmq.cache.RetryCache;
  2. import cn.tdt.rabbitmq.function.SendMessage;
  3. import cn.tdt.rabbitmq.result.DetailResult;
  4. import com.alibaba.fastjson.JSONObject;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * @author secondj
  11. * @Date 2021/11/15 16:22
  12. */
  13. @Component
  14. @Slf4j
  15. public class FanoutSender implements SendMessage {
  16. RetryCache retryCache = new RetryCache();
  17. @Autowired
  18. RabbitTemplate rabbitTemplate;
  19. //发送消息
  20. public void sendMessage(Object obj) {
  21. rabbitTemplate.setExchange("exchange_mqsocket");
  22. log.info("【消息发送者】发送消息到fanout交换机"+ JSONObject.toJSONString(obj));
  23. try{
  24. send(obj);
  25. }catch (RuntimeException ex){
  26. ex.printStackTrace();
  27. log.info("send failed"+ex);
  28. try{
  29. send(obj);
  30. }catch (RuntimeException e){
  31. e.printStackTrace();
  32. log.info("retry send failed"+e);
  33. }
  34. }
  35. }
  36. //客户端发送消息前,先在本地进行缓存
  37. @Override
  38. public DetailResult send(Object message) {
  39. try{
  40. String id = retryCache.generaterId();
  41. retryCache.add(id,message);
  42. rabbitTemplate.convertAndSend("exchange_mqsocket","",message);
  43. // rabbitTemplate.correlationConvertAndSend(message,new CorrelationData(id));
  44. // rabbitTemplate.correlationConvertAndSend(message,new CorrelationData(id));
  45. }catch (Exception e){
  46. return new DetailResult(false,"");
  47. }
  48. return new DetailResult(true,"");
  49. }
  50. }

⑩②WebSocketVO

  1. import lombok.Data;
  2. /**
  3. * @author secondj
  4. * @Date 2021/11/15 14:53
  5. */
  6. @Data
  7. public class WebSocketVO {
  8. private String toUserId;
  9. private String msgType;
  10. private String msgInfo;
  11. }

⑩③启动类

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. /**
  4. * @author secondj
  5. * @Date 2021/11/11 15:37
  6. */
  7. @SpringBootApplication
  8. public class SSOApplicationRun {
  9. public static void main(String[] args) {
  10. SpringApplication.run(SSOApplicationRun.class,args);
  11. }
  12. }

前端代码

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>WebSocket</title>
  6. </head>
  7. <body>
  8. <h3>hello socket</h3>
  9. <p>【userId】:<div><input id="userId" name="userId" type="text"></div>
  10. <p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text"></div>
  11. <p>【msgType】:<div><input id="msgType" name="msgType" type="text"></div>
  12. <p>【msgInfo】:<div><input id="msgInfo" name="msgInfo" type="text"></div>
  13. <p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
  14. <p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
  15. </body>
  16. <script>
  17. var socket;
  18. function openSocket() {
  19. if(typeof(WebSocket) == "undefined") {
  20. console.log("您的浏览器不支持WebSocket");
  21. }else{
  22. console.log("您的浏览器支持WebSocket");
  23. //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
  24. var userId = document.getElementById('userId').value;
  25. //这里采用get请求的方式发送请求,拦截器可以根据getParameter方式获取参数值
  26. var socketUrl="ws://localhost:8080/ctl/webSocket?token=TDT_PERMISSION&userId="+userId;
  27. console.log(socketUrl);
  28. if(socket != null){
  29. socket.close();
  30. socket=null;
  31. }
  32. socket = new WebSocket(socketUrl);
  33. //打开事件
  34. socket.onopen = function() {
  35. console.log("websocket已打开");
  36. //socket.send("这是来自客户端的消息" + location.href + new Date());
  37. };
  38. //获得消息事件
  39. socket.onmessage = function(msg) {
  40. var serverMsg = "收到服务端信息:" + msg.data;
  41. console.log(serverMsg);
  42. //发现消息进入 开始处理前端触发逻辑
  43. };
  44. //关闭事件
  45. socket.onclose = function() {
  46. console.log("websocket已关闭");
  47. };
  48. //发生了错误事件
  49. socket.onerror = function() {
  50. console.log("websocket发生了错误");
  51. }
  52. }
  53. }
  54. function sendMessage() {
  55. if(typeof(WebSocket) == "undefined") {
  56. console.log("您的浏览器不支持WebSocket");
  57. }else {
  58. // console.log("您的浏览器支持WebSocket");
  59. var toUserId = document.getElementById('toUserId').value;
  60. var msgInfo = document.getElementById('msgInfo').value;
  61. var msgType = document.getElementById('msgType').value;
  62. var msg = '{"toUserId":"'+toUserId+'","msgInfo":"'+msgInfo+'","msgType":"'+msgType+'"}';
  63. console.log(msg);
  64. socket.send(msg);
  65. }
  66. }
  67. </script>
  68. </html>

遇到的问题:

        在往消息队列中发送消息时,如果发送的消息是对象,会在接收消息时自动添加Properties字段属性,影响json转换,解决方案为把发送的消息对象转换为字符串对象或者json串。

待优化的问题:

        在单点登录系统中,拦截websocket请求后,未登录用户进行重定向。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/241983
推荐阅读
相关标签
  

闽ICP备14008679号