当前位置:   article > 正文

SpringBoot集成WebSocket实现客户端与服务端通信_websocket客户端和服务端

websocket客户端和服务端

话不多说,直接上代码看效果!

一、服务端:

1、引用依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

2、添加配置文件 WebSocketConfig

  1. @Configuration
  2. public class WebSocketConfig{
  3. @Bean
  4. public ServerEndpointExporter serverEndpointExporter() {
  5. return new ServerEndpointExporter();
  6. }
  7. }

3、编写WebSocket服务端接收、发送功能

  声明接口代码:

  1. public interface IWebSocketService {
  2. void sendMessage(String message, List<String> toSids);
  3. String getMessage(String msg);
  4. }

  实现类代码:

  1. @Slf4j
  2. @Service
  3. @ServerEndpoint("/api/websocket/{sid}")
  4. public class IWebSocketServiceImpl implements IWebSocketService {
  5. @Autowired
  6. RabbitTemplate rabbitTemplate;
  7. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  8. private static AtomicInteger onlineCount = new AtomicInteger(0);
  9. //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
  10. private static CopyOnWriteArraySet<IWebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>();
  11. //与某个客户端的连接会话,需要通过它来给客户端发送数据
  12. private Session session;
  13. //接收sid
  14. private String sid = "";
  15. @Override
  16. public void sendMessage(String message, List<String> toSids) {
  17. log.info("推送消息到客户端 " + toSids + ",推送内容:" + message);
  18. for (IWebSocketServiceImpl item : webSocketSet) {
  19. try {
  20. if (CollectionUtils.isEmpty(toSids)) {
  21. item.sendMessage(message);
  22. } else if (toSids.contains(item.sid)) {
  23. item.sendMessage(message);
  24. }
  25. } catch (IOException e) {
  26. continue;
  27. }
  28. }
  29. }
  30. @Override
  31. public String getMessage(String msg) {
  32. return null;
  33. }
  34. /**
  35. * 连接建立成功调用的方法
  36. */
  37. @OnOpen
  38. public void onOpen(Session session, @PathParam("sid") String sid) {
  39. this.session = session;
  40. webSocketSet.add(this); // 加入set中
  41. this.sid = sid;
  42. addOnlineCount(); // 在线数加1
  43. try {
  44. // sendMessage("conn_success");
  45. log.info("有新客户端开始监听,sid=" + sid + ",当前在线人数为:" + getOnlineCount());
  46. } catch (Exception e) {
  47. log.error("websocket IO Exception");
  48. }
  49. }
  50. /**
  51. * 连接关闭调用的方法
  52. */
  53. @OnClose
  54. public void onClose() {
  55. webSocketSet.remove(this); // 从set中删除
  56. subOnlineCount(); // 在线数减1
  57. // 断开连接情况下,更新主板占用情况为释放
  58. log.info("释放的sid=" + sid + "的客户端");
  59. releaseResource();
  60. }
  61. @OnMessage
  62. public void onMessage(String message, Session session) {
  63. log.info("收到来自客户端 sid=" + sid + " 的信息:" + message);
  64. //此处为MQ接收功能,如果不使用则可以屏蔽,如果有需要可使用
  65. // if (rabbitTemplate == null) {
  66. // ApplicationContext context = springUtils.getApplicationContext();
  67. // rabbitTemplate = context.getBean(RabbitTemplate.class);
  68. // }
  69. // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, //RabbitmqConfig.ROUTINGKEY_RETURN, message);
  70. // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, //RabbitmqConfig.ROUTINGKEY_RETURN, message);
  71. // 群发消息
  72. // List<String> sids = new ArrayList<>();
  73. // for (WebSocketServer item : webSocketSet) {
  74. // sids.add(item.sid);
  75. // }
  76. // try {
  77. // sendMessage("客户端 " + this.sid + "发布消息:" + message, sids);
  78. // } catch (IOException e) {
  79. // e.printStackTrace();
  80. // }
  81. }
  82. private void releaseResource() {
  83. // 这里写释放资源和要处理的业务
  84. log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
  85. }
  86. /**
  87. * 发生错误回调
  88. */
  89. @OnError
  90. public void onError(Session session, Throwable error) {
  91. log.error(session.getAsyncRemote() + "客户端发生错误");
  92. error.printStackTrace();
  93. }
  94. /**
  95. * 实现服务器主动推送消息到 指定客户端
  96. */
  97. public void sendMessage(String message) throws IOException {
  98. this.session.getAsyncRemote().sendText(message);
  99. }
  100. /**
  101. * 获取当前在线人数
  102. *
  103. * @return
  104. */
  105. public int getOnlineCount() {
  106. return onlineCount.get();
  107. }
  108. /**
  109. * 当前在线人数 +1
  110. *
  111. * @return
  112. */
  113. public void addOnlineCount() {
  114. onlineCount.getAndIncrement();
  115. }
  116. /**
  117. * 当前在线人数 -1
  118. *
  119. * @return
  120. */
  121. public void subOnlineCount() {
  122. onlineCount.getAndDecrement();
  123. }
  124. /**
  125. * 获取当前在线客户端对应的WebSocket对象
  126. *
  127. * @return
  128. */
  129. public CopyOnWriteArraySet<IWebSocketServiceImpl> getWebSocketSet() {
  130. return webSocketSet;
  131. }
  132. }

4、如果不需要实现客户端功能,此处可选择前端调用,奉上代码

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="utf-8">
  5. <title>Java后端WebSocket的Tomcat实现</title>
  6. <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
  7. </head>
  8. <body>
  9. <div id="message"></div>
  10. <hr />
  11. <div id="main"></div>
  12. <div id="client"></div>
  13. <input id="text" type="text" />
  14. <button onclick="send()">发送消息</button>
  15. <hr />
  16. <button onclick="closeWebSocket()">关闭WebSocket连接</button>
  17. </body>
  18. <script type="text/javascript">
  19. var cid = Math.floor(Math.random() * 100); // 随机生成客户端id
  20. document.getElementById('client').innerHTML += "客户端 id = " + cid + '<br/>';
  21. var websocket = null;
  22. //判断当前浏览器是否支持WebSocket
  23. if ('WebSocket' in window) {
  24. // 改成你的地址
  25. websocket = new WebSocket("ws://127.0.0.1:9204/api/websocket/" + cid);
  26. } else {
  27. alert('当前浏览器 Not support websocket')
  28. }
  29. //连接发生错误的回调方法
  30. websocket.onerror = function () {
  31. setMessageInnerHTML("websocket.onerror: WebSocket连接发生错误");
  32. };
  33. //连接成功建立的回调方法
  34. websocket.onopen = function () {
  35. setMessageInnerHTML("websocket.onopen: WebSocket连接成功");
  36. }
  37. //接收到消息的回调方法
  38. websocket.onmessage = function (event) {
  39. setMessageInnerHTML("websocket.onmessage: " + event.data);
  40. }
  41. //连接关闭的回调方法
  42. websocket.onclose = function () {
  43. setMessageInnerHTML("websocket.onclose: WebSocket连接关闭");
  44. }
  45. //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
  46. window.onbeforeunload = function () {
  47. closeWebSocket();
  48. }
  49. //将消息显示在网页上
  50. function setMessageInnerHTML(innerHTML) {
  51. document.getElementById('message').innerHTML += innerHTML + '<br/>';
  52. }
  53. //关闭WebSocket连接
  54. function closeWebSocket() {
  55. websocket.close();
  56. alert('websocket.close: 关闭websocket连接')
  57. }
  58. //发送消息
  59. function send() {
  60. var message = document.getElementById('text').value;
  61. try {
  62. websocket.send('{"msg":"' + message + '"}');
  63. setMessageInnerHTML("websocket.send: " + message);
  64. } catch (err) {
  65. console.error("websocket.send: " + message + " 失败");
  66. }
  67. }
  68. </script>
  69. </html>

二、客户端:

1、引用依赖

  1. <dependency>
  2. <groupId>org.java-websocket</groupId>
  3. <artifactId>Java-WebSocket</artifactId>
  4. <version>1.3.8</version>
  5. </dependency>

2、自定义WebSocket客户端,继承WebSocketClient类,实现发送、接收等功能

  1. /**
  2. * 自定义WebSocket客户端
  3. */
  4. public class CustomizedWebSocketClient extends WebSocketClient {
  5. private static final Logger logger = LoggerFactory.getLogger(CustomizedWebSocketClient.class);
  6. //用来接收数据
  7. private String excptMessage;
  8. /**
  9. * 线程安全的Boolean -是否受到消息
  10. */
  11. public AtomicBoolean hasMessage = new AtomicBoolean(false);
  12. /**
  13. * 线程安全的Boolean -是否已经连接
  14. */
  15. private AtomicBoolean hasConnection = new AtomicBoolean(false);
  16. /**
  17. * 构造方法
  18. *
  19. * @param serverUri
  20. */
  21. public CustomizedWebSocketClient(URI serverUri) {
  22. super(serverUri);
  23. logger.info("CustomizeWebSocketClient init:" + serverUri.toString());
  24. }
  25. /**
  26. * 打开连接是方法
  27. *
  28. * @param serverHandshake
  29. */
  30. @Override
  31. public void onOpen(ServerHandshake serverHandshake) {
  32. logger.info("CustomizeWebSocketClient onOpen");
  33. }
  34. /**
  35. * 收到消息时
  36. *
  37. * @param s
  38. */
  39. @Override
  40. public void onMessage(String s) {
  41. hasMessage.set(true);
  42. logger.info("CustomizeWebSocketClient onMessage:" + s);
  43. }
  44. public void sendMessage(String message){
  45. this.send(message);
  46. logger.info("已发送消息:" + message);
  47. }
  48. /**
  49. * 当连接关闭时
  50. *
  51. * @param i
  52. * @param s
  53. * @param b
  54. */
  55. @Override
  56. public void onClose(int i, String s, boolean b) {
  57. this.hasConnection.set(false);
  58. this.hasMessage.set(false);
  59. logger.info("CustomizeWebSocketClient onClose:" + s);
  60. }
  61. /**
  62. * 发生error时
  63. *
  64. * @param e
  65. */
  66. @Override
  67. public void onError(Exception e) {
  68. logger.error("CustomizeWebSocketClient onError:" + e);
  69. }
  70. @Override
  71. public void connect() {
  72. if(!this.hasConnection.get()){
  73. super.connect();
  74. hasConnection.set(true);
  75. }
  76. }
  77. //获取接收到的信息
  78. public String getExcptMessage() {
  79. if(excptMessage != null){
  80. String message = new String(excptMessage);
  81. excptMessage = null;
  82. return message;
  83. }
  84. return null;
  85. }
  86. }

3、创建连接封装类,uri对应socket服务的ip和端口号

  1. public class WebSocketClientSingleton {
  2. private static CustomizedWebSocketClient client;
  3. private WebSocketClientSingleton(){}
  4. public static CustomizedWebSocketClient getInstance() {
  5. if (client == null || client.isClosed()) {
  6. String webSocketUri = "ws://localhost:9204/api/websocket/12345678";
  7. try {
  8. //实例WebSocketClient对象,并连接到WebSocket服务端
  9. client = new CustomizedWebSocketClient(new URI(webSocketUri));
  10. client.connect();
  11. //等待服务端响应
  12. while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
  13. System.out.println("连接中···请稍后");
  14. Thread.sleep(1000);
  15. }
  16. } catch (URISyntaxException e) {
  17. e.printStackTrace();
  18. } catch (InterruptedException e) {
  19. throw new RuntimeException(e);
  20. }
  21. // return client;
  22. }
  23. return client;
  24. }
  25. public static void closeClient(CustomizedWebSocketClient client){
  26. client.close();
  27. }
  28. }

三、测试

  1. @Autowired
  2. private IWebSocketService webSocketService;

调用代码:

  1. //懒汉模式创建客户端连接
  2. CustomizedWebSocketClient customizedWebSocketClient = WebSocketClientSingleton.getInstance();
  3. //客户端发送消息
  4. customizedWebSocketClient.sendMessage("----客户端发送消息啦-----1123");
  5. //服务端发送消息
  6. webSocketService.sendMessage("服务端发送消息啦----223333",null);
  7. //关闭连接
  8. WebSocketClientSingleton.closeClient(customizedWebSocketClient);

控制台打印效果:

总结完毕

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/892611
推荐阅读
相关标签
  

闽ICP备14008679号