当前位置:   article > 正文

SpringBoot整合配置WebSocket服务代码实现_spring-boot-starter-websocket

spring-boot-starter-websocket

基于Javax的实现

首先,在springboot 项目中的pom文件引入WebSocket的依赖,配置如下:

  1. <!--websocket-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-websocket</artifactId>
  5. <exclusions>
  6. <exclusion>
  7. <artifactId>spring-boot-starter-tomcat</artifactId>
  8. <groupId>org.springframework.boot</groupId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>

在springboot 项目中,新建WebSocketConfig配置类,代码如下:

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  6. /**
  7. *
  8. *
  9. * @version 1.0
  10. * @since JDK1.8
  11. * @author tarzan
  12. * @date 2023年10月26日 15:21:09
  13. */
  14. @Slf4j
  15. @Configuration
  16. public class WebSocketConfig{
  17. @Bean
  18. public ServerEndpointExporter serverEndpointExporter() {
  19. return new ServerEndpointExporter();
  20. }
  21. }

在springboot 项目中,新建WebSocketServer 服务器服务类

完整代码如下:

  1. import com.alibaba.fastjson.JSONObject;
  2. import lombok.EqualsAndHashCode;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springblade.core.tool.utils.CollectionUtil;
  5. import org.springblade.core.tool.utils.Func;
  6. import org.springframework.stereotype.Component;
  7. import javax.websocket.*;
  8. import javax.websocket.server.ServerEndpoint;
  9. import java.util.List;
  10. import java.util.concurrent.CopyOnWriteArraySet;
  11. /**
  12. * @author tarzan
  13. */
  14. @ServerEndpoint("/ws/automate")
  15. @Component
  16. @Slf4j
  17. @EqualsAndHashCode
  18. public class WebSocketServer {
  19. /** 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 */
  20. private static final CopyOnWriteArraySet<WebSocketServer> WEBSOCKET_CLIENTS =new CopyOnWriteArraySet<>();
  21. /** 当前session */
  22. private Session session;
  23. /** 当前session订阅的话题 */
  24. private List<String> topics;
  25. public WebSocketServer() {
  26. }
  27. @OnOpen
  28. public void onOpen(Session session) {
  29. this.session = session;
  30. WEBSOCKET_CLIENTS.add(this);
  31. log.info("WebSocketServer有新客户端连接加入:{},当前在线人数为:{}", session.getId(),WEBSOCKET_CLIENTS.size());
  32. }
  33. @OnClose
  34. public void onClose() {
  35. WEBSOCKET_CLIENTS.remove(this);
  36. log.info("有一连接关闭:{},当前在线人数为:{}", this.session.getId(), WEBSOCKET_CLIENTS.size());
  37. }
  38. @OnMessage
  39. public void onMessage(String message, Session session) {
  40. this.topics= Func.toStrList(message);
  41. log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
  42. }
  43. @OnError
  44. public void onError(Session session, Throwable error) {
  45. log.error("发生错误:"+error.getMessage());
  46. WEBSOCKET_CLIENTS.remove(this);
  47. log.info("有一连接异常:{},当前在线人数为:{}", session.getId(), WEBSOCKET_CLIENTS.size());
  48. }
  49. public <T> void sendData(T data, String topicName) {
  50. JSONObject result=new JSONObject();
  51. result.put("topic",topicName);
  52. result.put("data",data);
  53. //显示值为null的字段
  54. // String jsonString = JSON.toJSONString(result, SerializerFeature.WriteMapNullValue);
  55. sendMessage(result.toString(),topicName);
  56. }
  57. /**
  58. * 发送消息广播
  59. *
  60. * @param message 消息文本
  61. * @author tarzan
  62. * @date 2022年12月08日 09:24:34
  63. */
  64. public void sendMessage(String message,String topic){
  65. try {
  66. WEBSOCKET_CLIENTS.forEach(client ->{
  67. if (client.session.isOpen()&&CollectionUtil.isNotEmpty(client.topics)&&client.topics.contains(topic)) {
  68. client.session.getAsyncRemote().sendText(message);
  69. }
  70. });
  71. } catch (Exception e) {
  72. log.error(e.getMessage());
  73. }
  74. }
  75. public void sendMessageToAllUsers(String message){
  76. try {
  77. WEBSOCKET_CLIENTS.forEach(client -> {
  78. if (client.session.isOpen()) {
  79. client.session.getAsyncRemote().sendText(message);
  80. }
  81. });
  82. } catch (Exception e) {
  83. log.error(e.getMessage());
  84. }
  85. }
  86. }

注意:配置号websocket服务后,访问端口和你自己web端口是一样的。 

发送使用代码示例

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springblade.coalface.websocket.OpcSocketServer;
  3. import org.springblade.coalface.websocket.TopicNameConstant;
  4. import org.springblade.coalface.websocket.WebSocketServer;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.Resource;
  8. /**
  9. * @author tarzan
  10. */
  11. @Slf4j
  12. @Component
  13. public class WebSocketTask{
  14. @Resource
  15. private WebSocketServer webSocketServer;
  16. @Scheduled(cron = "0/5 * * * * ?")
  17. public void execute() {
  18. webSocketServer.sendMessageToAllUsers(TopicNameConstant.HEARTBEAT);
  19. }
  20. }

注:也可以把sendMessageToAllUsers方法改为静态方法,通过类去调用。 

代码是我项目中的代码,请按照自己的业务进行修改! 

WebSocket服务器类种注入serivice服务实现

完整代码如下(这里发送的消息的方法是静态方法):

  1. import lombok.EqualsAndHashCode;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springblade.coalface.calculate.enums.CollisionLevelEnum;
  4. import org.springblade.coalface.opcua.dto.CoalCutterDTO;
  5. import org.springblade.coalface.opcua.service.OpcIssueService;
  6. import org.springblade.core.tool.utils.SpringUtil;
  7. import org.springblade.core.tool.utils.StringUtil;
  8. import org.springframework.stereotype.Component;
  9. import javax.websocket.*;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.util.concurrent.CopyOnWriteArraySet;
  12. import java.util.concurrent.atomic.AtomicReference;
  13. /**
  14. * @author tarzan
  15. */
  16. @ServerEndpoint("/ws/opc/issue")
  17. @Slf4j
  18. @Component("opcWebSocketServer")
  19. @EqualsAndHashCode
  20. public class OpcSocketServer {
  21. /** 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 */
  22. private static final CopyOnWriteArraySet<OpcSocketServer> WEBSOCKET_CLIENTS =new CopyOnWriteArraySet<>();
  23. public static final AtomicReference<CollisionLevelEnum> CMD = new AtomicReference<>(null);
  24. /** 当前session */
  25. private Session session;
  26. private final static String ALERT="alert";
  27. private final static String CONTROL="control";
  28. private final static OpcIssueService opcIssueService;
  29. static {
  30. opcIssueService= SpringUtil.getBean(OpcIssueService.class);
  31. }
  32. @OnOpen
  33. public void onOpen(Session session) {
  34. this.session = session;
  35. WEBSOCKET_CLIENTS.add(this);
  36. log.info("OpcSocketServer有新客户端连接加入:{},当前在线人数为:{}", session.getId(),WEBSOCKET_CLIENTS.size());
  37. }
  38. @OnClose
  39. public void onClose() {
  40. WEBSOCKET_CLIENTS.remove(this);
  41. log.info("有一连接关闭:{},当前在线人数为:{}", this.session.getId(), WEBSOCKET_CLIENTS.size());
  42. }
  43. @OnMessage
  44. public void onMessage(String message, Session session) {
  45. log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
  46. String[] params=message.split(":");
  47. if(StringUtil.isNotBlank(message)&&params.length>1){
  48. if(ALERT.equals(params[0])){
  49. CollisionLevelEnum levelEnum= CollisionLevelEnum.valueOf(params[1]);
  50. if(CollisionLevelEnum.SAFE.equals(levelEnum)){
  51. manualIssue(session,levelEnum);
  52. }
  53. }
  54. if(CONTROL.equals(params[0])){
  55. CollisionLevelEnum levelEnum= CollisionLevelEnum.valueOf(params[1]);
  56. //是否需要相等
  57. if(levelEnum.getStatus()>= CoalCutterDTO.currentLevel.getStatus()){
  58. manualIssue(session,levelEnum);
  59. }
  60. }
  61. }
  62. }
  63. private void manualIssue(Session session, CollisionLevelEnum levelEnum){
  64. CollisionLevelEnum execute=CMD.get();
  65. if(execute==null){
  66. verifyIssue(session,levelEnum);
  67. }else{
  68. if(levelEnum.getStatus()>execute.getStatus()){
  69. verifyIssue(session,levelEnum);
  70. }else{
  71. sendMessageToUser(session,execute+"指令正在执行!请稍后点击");
  72. }
  73. }
  74. }
  75. private void verifyIssue(Session session,CollisionLevelEnum levelEnum){
  76. if(opcIssueService.manualIssue(levelEnum)){
  77. opcIssueService.delayRecover(15,levelEnum);
  78. sendMessageToUser(session,levelEnum+"指令下发成功!");
  79. log.info(levelEnum+"指令下发成功!");
  80. }else {
  81. sendMessageToUser(session,levelEnum+"指令下发失败!");
  82. log.error(levelEnum+"指令下发失败!");
  83. }
  84. }
  85. @OnError
  86. public void onError(Session session, Throwable error) {
  87. log.error("发生错误:"+error.getMessage());
  88. WEBSOCKET_CLIENTS.remove(this);
  89. log.info("有一连接异常:{},当前在线人数为:{}", session.getId(), WEBSOCKET_CLIENTS.size());
  90. }
  91. /**
  92. * sendMessageToUser:发给指定用户
  93. *
  94. */
  95. public static void sendMessageToUser(Session session, String message) {
  96. if (session != null && session.isOpen()) {
  97. session.getAsyncRemote().sendText(message);
  98. }
  99. }
  100. public static void sendMessageToAllUsers(String message){
  101. try {
  102. WEBSOCKET_CLIENTS.forEach(client ->sendMessageToUser(client.session,message));
  103. } catch (Exception e) {
  104. log.error(e.getMessage());
  105. }
  106. }
  107. }

以上 通过  static {
        opcIssueService= SpringUtil.getBean(OpcIssueService.class);
    } 实现的service 服务注入,发现有时候打包的时候,会出现 getBean空指针问题。

建议改成:在方法的内使用

  1. private void verifyIssue(Session session,CollisionLevelEnum levelEnum){
  2. OpcIssueService opcIssueService = SpringUtil.getBean(OpcIssueService.class);
  3. if(opcIssueService.manualIssue(levelEnum)){
  4. opcIssueService.delayRecover(15,levelEnum);
  5. sendMessageToUser(session,levelEnum+"指令下发成功!");
  6. log.info(levelEnum+"指令下发成功!");
  7. }else {
  8. sendMessageToUser(session,levelEnum+"指令下发失败!");
  9. log.error(levelEnum+"指令下发失败!");
  10. }
  11. }

基于WebMVC的实现

首先,在springboot 项目中的pom文件引入WebSocket的依赖,配置如下:

  1. <!--websocket-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-websocket</artifactId>
  5. <exclusions>
  6. <exclusion>
  7. <artifactId>spring-boot-starter-tomcat</artifactId>
  8. <groupId>org.springframework.boot</groupId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>

在springboot 项目中,新建WebSocketConfig配置类,需要增加@EnableWebSocket注解和继承WebSocketConfigurer 类。

代码如下:

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  6. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  7. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  8. import javax.annotation.Resource;
  9. /**
  10. *
  11. *
  12. * @version 1.0
  13. * @since JDK1.8
  14. * @author tarzan
  15. * @date 2023年10月26日 15:21:09
  16. */
  17. @Slf4j
  18. @Configuration
  19. @EnableWebSocket
  20. public class WebSocketConfig implements WebSocketConfigurer {
  21. @Resource
  22. private OpcSessionHandler opcSessionHandler;
  23. //addHandler 映射关系可以添加多个
  24. @Override
  25. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  26. registry.addHandler(opcSessionHandler, "/ws/opc/issue").setAllowedOrigins("*");
  27. }
  28. //只使用webmvc的方式这个可以省略不写
  29. @Bean
  30. public ServerEndpointExporter serverEndpointExporter() {
  31. return new ServerEndpointExporter();
  32. }
  33. }

在springboot 项目中,新建OpcSessionHandler 会话处理类,代码如下:

  1. package org.springblade.coalface.websocket;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springblade.coalface.calculate.enums.CollisionLevelEnum;
  4. import org.springblade.coalface.opcua.dto.CoalCutterDTO;
  5. import org.springblade.coalface.opcua.service.OpcIssueService;
  6. import org.springblade.core.tool.utils.StringUtil;
  7. import org.springframework.lang.NonNull;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.web.socket.*;
  10. import javax.annotation.Resource;
  11. import java.io.IOException;
  12. import java.util.concurrent.CopyOnWriteArraySet;
  13. import java.util.concurrent.ScheduledExecutorService;
  14. import java.util.concurrent.ScheduledThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. import java.util.concurrent.atomic.AtomicReference;
  17. /**
  18. * @author tarzan
  19. */
  20. @Component
  21. @Slf4j
  22. public class OpcSessionHandler implements WebSocketHandler {
  23. @Resource
  24. private OpcIssueService opcIssueService;
  25. private static final CopyOnWriteArraySet<WebSocketSession> SESSIONS=new CopyOnWriteArraySet<>();
  26. public final AtomicReference<CollisionLevelEnum> CMD = new AtomicReference<>(null);
  27. private final static String ALERT="alert";
  28. private final static String CONTROL="control";
  29. @Override
  30. public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
  31. SESSIONS.add(session);
  32. log.info(session.getId()+" OpcSessionHandler当前在线人数:"+SESSIONS.size());
  33. }
  34. @Override
  35. public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
  36. String msg = message.getPayload().toString();
  37. log.info("接收消息"+session.getId()+":"+msg);
  38. opcIssueService.method();
  39. }
  40. @Override
  41. public void handleTransportError(WebSocketSession session,@NonNull Throwable exception) throws Exception {
  42. log.error("OpcSessionHandler连接出错"+session.getId());
  43. SESSIONS.remove(session);
  44. if (!session.isOpen()) {
  45. session.close();
  46. }
  47. }
  48. @Override
  49. public void afterConnectionClosed(WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
  50. log.info("OpcSessionHandler关闭连接"+session.getId());
  51. SESSIONS.remove(session);
  52. }
  53. @Override
  54. public boolean supportsPartialMessages() {
  55. return false;
  56. }
  57. public void sendMessageToUser(WebSocketSession session, String contents) {
  58. if (session != null && session.isOpen()) {
  59. TextMessage message = new TextMessage(contents);
  60. try {
  61. session.sendMessage(message);
  62. } catch (IOException e) {
  63. log.error(e.getMessage());
  64. }
  65. }
  66. }
  67. public void sendMessageToAllUsers(String contents) {
  68. SESSIONS.forEach(session->sendMessageToUser(session, contents));
  69. }
  70. }

配置以上代码即可。

注:通过WebMVC实现的websocket暂时没办法通过异步推送数据。 

相关知识

WebSocket是一种在单个TCP连接上进行全双工通信的网络协议,它可以在客户端和服务器之间建立持久性的连接,并提供实时的双向通信能力。相较于传统的HTTP协议,WebSocket具有更低的延迟和更高的效率,适用于需要实时数据传输和实时互动的应用场景。

下面将对WebSocket技术的几个关键概念进行详细解释。

  1. 握手过程(Handshake) WebSocket连接的建立需要经过一次握手过程。在客户端发起连接请求时,会发送一个特殊的HTTP请求头(Upgrade请求头字段),并告知服务器要升级协议为WebSocket。服务器在接收到这个请求后,如果同意升级,则会返回一个特殊的HTTP响应头(Upgrade和Connection响应头字段),表示已经切换到WebSocket协议。握手完成后,连接从HTTP协议切换到WebSocket协议。

  2. 数据帧(Data Frames) WebSocket使用数据帧来传输数据,数据帧有固定的格式。数据帧包含一个帧头和一个帧载荷。帧头包含了数据帧的控制信息,例如数据帧是否是最后一个帧、数据帧的类型和长度等。帧载荷是实际传输的数据。

  3. 实时双向通信 WebSocket协议支持实时的双向通信,客户端和服务器可以同时进行数据的发送和接收。客户端可以通过JavaScript的WebSocket API与服务器建立连接,并通过发送数据帧向服务器发送消息。服务器可以向客户端发送消息,客户端通过监听WebSocket的事件来接收服务器发送的消息。

  4. 心跳机制 WebSocket支持心跳机制来保持连接的稳定性。客户端和服务器可以定时发送心跳消息以保持连接的活跃状态。如果一段时间内没有收到心跳消息,可以判定连接已断开,并进行相应的处理。

  5. 安全性(Security) WebSocket可以使用TLS/SSL加密来保护数据的传输。通过使用加密的WebSocket连接,可以确保数据的机密性和完整性,以防止数据被窃取和篡改。

  6. 使用场景 WebSocket被广泛应用于需要实时通信的应用场景,例如:

  • 即时聊天应用:通过WebSocket实现实时的文字、语音、视频聊天功能。
  • 在线协作和共享:通过WebSocket实现实时的协作和共享功能,如实时编辑、实时白板等。
  • 多人游戏:通过WebSocket实现多人游戏中的实时通信和互动。
  • 实时监控和远程控制:通过WebSocket实现实时数据的监控和远程设备的控制。
  • 股票行情和实时数据更新:通过WebSocket实时传输股票行情和实时数据,以提供实时更新的服务。

总结: WebSocket是一种实现实时双向通信的网络协议,它可以在客户端和服务器之间建立持久性的连接。WebSocket具有低延迟、高效率的特点,适用于需要实时数据传输和实时互动的应用场景。通过使用WebSocket,可以实现即时聊天、在线协作、多人游戏、实时监控等一系列的实时应用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/717194
推荐阅读
相关标签
  

闽ICP备14008679号