赞
踩
Javax的实现
首先,在springboot 项目中的pom文件引入WebSocket的依赖,配置如下:
- <!--websocket-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <groupId>org.springframework.boot</groupId>
- </exclusion>
- </exclusions>
- </dependency>
在springboot 项目中,新建WebSocketConfig配置类,代码如下:
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.config.annotation.EnableWebSocket;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- /**
- *
- *
- * @version 1.0
- * @since JDK1.8
- * @author tarzan
- * @date 2023年10月26日 15:21:09
- */
- @Slf4j
- @Configuration
- public class WebSocketConfig{
-
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
-
- }
在springboot 项目中,新建WebSocketServer 服务器服务类
完整代码如下:
-
- import com.alibaba.fastjson.JSONObject;
- import lombok.EqualsAndHashCode;
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.core.tool.utils.CollectionUtil;
- import org.springblade.core.tool.utils.Func;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.util.List;
- import java.util.concurrent.CopyOnWriteArraySet;
-
- /**
- * @author tarzan
- */
- @ServerEndpoint("/ws/automate")
- @Component
- @Slf4j
- @EqualsAndHashCode
- public class WebSocketServer {
-
- /** 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 */
- private static final CopyOnWriteArraySet<WebSocketServer> WEBSOCKET_CLIENTS =new CopyOnWriteArraySet<>();
- /** 当前session */
- private Session session;
- /** 当前session订阅的话题 */
- private List<String> topics;
-
- public WebSocketServer() {
- }
-
- @OnOpen
- public void onOpen(Session session) {
- this.session = session;
- WEBSOCKET_CLIENTS.add(this);
- log.info("WebSocketServer有新客户端连接加入:{},当前在线人数为:{}", session.getId(),WEBSOCKET_CLIENTS.size());
- }
-
- @OnClose
- public void onClose() {
- WEBSOCKET_CLIENTS.remove(this);
- log.info("有一连接关闭:{},当前在线人数为:{}", this.session.getId(), WEBSOCKET_CLIENTS.size());
- }
-
- @OnMessage
- public void onMessage(String message, Session session) {
- this.topics= Func.toStrList(message);
- log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
- }
-
- @OnError
- public void onError(Session session, Throwable error) {
- log.error("发生错误:"+error.getMessage());
- WEBSOCKET_CLIENTS.remove(this);
- log.info("有一连接异常:{},当前在线人数为:{}", session.getId(), WEBSOCKET_CLIENTS.size());
- }
-
- public <T> void sendData(T data, String topicName) {
- JSONObject result=new JSONObject();
- result.put("topic",topicName);
- result.put("data",data);
- //显示值为null的字段
- // String jsonString = JSON.toJSONString(result, SerializerFeature.WriteMapNullValue);
- sendMessage(result.toString(),topicName);
- }
-
- /**
- * 发送消息广播
- *
- * @param message 消息文本
- * @author tarzan
- * @date 2022年12月08日 09:24:34
- */
- public void sendMessage(String message,String topic){
- try {
- WEBSOCKET_CLIENTS.forEach(client ->{
- if (client.session.isOpen()&&CollectionUtil.isNotEmpty(client.topics)&&client.topics.contains(topic)) {
- client.session.getAsyncRemote().sendText(message);
- }
- });
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }
-
- public void sendMessageToAllUsers(String message){
- try {
- WEBSOCKET_CLIENTS.forEach(client -> {
- if (client.session.isOpen()) {
- client.session.getAsyncRemote().sendText(message);
- }
- });
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }
-
-
- }
注意:配置号websocket服务后,访问端口和你自己web端口是一样的。
发送使用代码示例
-
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.coalface.websocket.OpcSocketServer;
- import org.springblade.coalface.websocket.TopicNameConstant;
- import org.springblade.coalface.websocket.WebSocketServer;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- /**
- * @author tarzan
- */
- @Slf4j
- @Component
- public class WebSocketTask{
-
- @Resource
- private WebSocketServer webSocketServer;
-
- @Scheduled(cron = "0/5 * * * * ?")
- public void execute() {
- webSocketServer.sendMessageToAllUsers(TopicNameConstant.HEARTBEAT);
- }
- }
注:也可以把sendMessageToAllUsers方法改为静态方法,通过类去调用。
代码是我项目中的代码,请按照自己的业务进行修改!
完整代码如下(这里发送的消息的方法是静态方法):
-
-
- import lombok.EqualsAndHashCode;
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.coalface.calculate.enums.CollisionLevelEnum;
- import org.springblade.coalface.opcua.dto.CoalCutterDTO;
- import org.springblade.coalface.opcua.service.OpcIssueService;
- import org.springblade.core.tool.utils.SpringUtil;
- import org.springblade.core.tool.utils.StringUtil;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.concurrent.atomic.AtomicReference;
-
- /**
- * @author tarzan
- */
- @ServerEndpoint("/ws/opc/issue")
- @Slf4j
- @Component("opcWebSocketServer")
- @EqualsAndHashCode
- public class OpcSocketServer {
-
- /** 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 */
- private static final CopyOnWriteArraySet<OpcSocketServer> WEBSOCKET_CLIENTS =new CopyOnWriteArraySet<>();
- public static final AtomicReference<CollisionLevelEnum> CMD = new AtomicReference<>(null);
- /** 当前session */
- private Session session;
-
- private final static String ALERT="alert";
- private final static String CONTROL="control";
-
- private final static OpcIssueService opcIssueService;
-
- static {
- opcIssueService= SpringUtil.getBean(OpcIssueService.class);
- }
-
- @OnOpen
- public void onOpen(Session session) {
- this.session = session;
- WEBSOCKET_CLIENTS.add(this);
- log.info("OpcSocketServer有新客户端连接加入:{},当前在线人数为:{}", session.getId(),WEBSOCKET_CLIENTS.size());
- }
-
- @OnClose
- public void onClose() {
- WEBSOCKET_CLIENTS.remove(this);
- log.info("有一连接关闭:{},当前在线人数为:{}", this.session.getId(), WEBSOCKET_CLIENTS.size());
- }
-
- @OnMessage
- public void onMessage(String message, Session session) {
- log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
- String[] params=message.split(":");
- if(StringUtil.isNotBlank(message)&¶ms.length>1){
- if(ALERT.equals(params[0])){
- CollisionLevelEnum levelEnum= CollisionLevelEnum.valueOf(params[1]);
- if(CollisionLevelEnum.SAFE.equals(levelEnum)){
- manualIssue(session,levelEnum);
- }
- }
- if(CONTROL.equals(params[0])){
- CollisionLevelEnum levelEnum= CollisionLevelEnum.valueOf(params[1]);
- //是否需要相等
- if(levelEnum.getStatus()>= CoalCutterDTO.currentLevel.getStatus()){
- manualIssue(session,levelEnum);
- }
- }
- }
- }
-
- private void manualIssue(Session session, CollisionLevelEnum levelEnum){
- CollisionLevelEnum execute=CMD.get();
- if(execute==null){
- verifyIssue(session,levelEnum);
- }else{
- if(levelEnum.getStatus()>execute.getStatus()){
- verifyIssue(session,levelEnum);
- }else{
- sendMessageToUser(session,execute+"指令正在执行!请稍后点击");
- }
- }
- }
-
-
- private void verifyIssue(Session session,CollisionLevelEnum levelEnum){
- if(opcIssueService.manualIssue(levelEnum)){
- opcIssueService.delayRecover(15,levelEnum);
- sendMessageToUser(session,levelEnum+"指令下发成功!");
- log.info(levelEnum+"指令下发成功!");
- }else {
- sendMessageToUser(session,levelEnum+"指令下发失败!");
- log.error(levelEnum+"指令下发失败!");
- }
- }
-
- @OnError
- public void onError(Session session, Throwable error) {
- log.error("发生错误:"+error.getMessage());
- WEBSOCKET_CLIENTS.remove(this);
- log.info("有一连接异常:{},当前在线人数为:{}", session.getId(), WEBSOCKET_CLIENTS.size());
- }
-
- /**
- * sendMessageToUser:发给指定用户
- *
- */
- public static void sendMessageToUser(Session session, String message) {
- if (session != null && session.isOpen()) {
- session.getAsyncRemote().sendText(message);
- }
- }
- public static void sendMessageToAllUsers(String message){
- try {
- WEBSOCKET_CLIENTS.forEach(client ->sendMessageToUser(client.session,message));
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }
-
-
- }
以上 通过 static {
opcIssueService= SpringUtil.getBean(OpcIssueService.class);
} 实现的service 服务注入,发现有时候打包的时候,会出现 getBean空指针问题。
建议改成:在方法的内使用
- private void verifyIssue(Session session,CollisionLevelEnum levelEnum){
- OpcIssueService opcIssueService = SpringUtil.getBean(OpcIssueService.class);
- if(opcIssueService.manualIssue(levelEnum)){
- opcIssueService.delayRecover(15,levelEnum);
- sendMessageToUser(session,levelEnum+"指令下发成功!");
- log.info(levelEnum+"指令下发成功!");
- }else {
- sendMessageToUser(session,levelEnum+"指令下发失败!");
- log.error(levelEnum+"指令下发失败!");
- }
- }
的实现
首先,在springboot 项目中的pom文件引入WebSocket的依赖,配置如下:
- <!--websocket-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <groupId>org.springframework.boot</groupId>
- </exclusion>
- </exclusions>
- </dependency>
在springboot 项目中,新建WebSocketConfig配置类,需要增加@EnableWebSocket注解和继承WebSocketConfigurer 类。
代码如下:
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.config.annotation.EnableWebSocket;
- import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
- import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- import javax.annotation.Resource;
-
- /**
- *
- *
- * @version 1.0
- * @since JDK1.8
- * @author tarzan
- * @date 2023年10月26日 15:21:09
- */
- @Slf4j
- @Configuration
- @EnableWebSocket
- public class WebSocketConfig implements WebSocketConfigurer {
-
- @Resource
- private OpcSessionHandler opcSessionHandler;
-
- //addHandler 映射关系可以添加多个
- @Override
- public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
- registry.addHandler(opcSessionHandler, "/ws/opc/issue").setAllowedOrigins("*");
- }
-
- //只使用webmvc的方式这个可以省略不写
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
-
- }
在springboot 项目中,新建OpcSessionHandler 会话处理类,代码如下:
- package org.springblade.coalface.websocket;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.coalface.calculate.enums.CollisionLevelEnum;
- import org.springblade.coalface.opcua.dto.CoalCutterDTO;
- import org.springblade.coalface.opcua.service.OpcIssueService;
- import org.springblade.core.tool.utils.StringUtil;
- import org.springframework.lang.NonNull;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.*;
-
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicReference;
-
-
- /**
- * @author tarzan
- */
- @Component
- @Slf4j
- public class OpcSessionHandler implements WebSocketHandler {
-
- @Resource
- private OpcIssueService opcIssueService;
-
- private static final CopyOnWriteArraySet<WebSocketSession> SESSIONS=new CopyOnWriteArraySet<>();
- public final AtomicReference<CollisionLevelEnum> CMD = new AtomicReference<>(null);
-
- private final static String ALERT="alert";
- private final static String CONTROL="control";
-
- @Override
- public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
- SESSIONS.add(session);
- log.info(session.getId()+" OpcSessionHandler当前在线人数:"+SESSIONS.size());
- }
-
- @Override
- public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
- String msg = message.getPayload().toString();
- log.info("接收消息"+session.getId()+":"+msg);
- opcIssueService.method();
- }
-
-
-
- @Override
- public void handleTransportError(WebSocketSession session,@NonNull Throwable exception) throws Exception {
- log.error("OpcSessionHandler连接出错"+session.getId());
- SESSIONS.remove(session);
- if (!session.isOpen()) {
- session.close();
- }
- }
-
- @Override
- public void afterConnectionClosed(WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
- log.info("OpcSessionHandler关闭连接"+session.getId());
- SESSIONS.remove(session);
- }
-
-
- @Override
- public boolean supportsPartialMessages() {
- return false;
- }
-
-
- public void sendMessageToUser(WebSocketSession session, String contents) {
- if (session != null && session.isOpen()) {
- TextMessage message = new TextMessage(contents);
- try {
- session.sendMessage(message);
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- }
- }
-
- public void sendMessageToAllUsers(String contents) {
- SESSIONS.forEach(session->sendMessageToUser(session, contents));
- }
-
- }
配置以上代码即可。
注:通过WebMVC实现的websocket暂时没办法通过异步推送数据。
WebSocket是一种在单个TCP连接上进行全双工通信的网络协议,它可以在客户端和服务器之间建立持久性的连接,并提供实时的双向通信能力。相较于传统的HTTP协议,WebSocket具有更低的延迟和更高的效率,适用于需要实时数据传输和实时互动的应用场景。
下面将对WebSocket技术的几个关键概念进行详细解释。
握手过程(Handshake) WebSocket连接的建立需要经过一次握手过程。在客户端发起连接请求时,会发送一个特殊的HTTP请求头(Upgrade请求头字段),并告知服务器要升级协议为WebSocket。服务器在接收到这个请求后,如果同意升级,则会返回一个特殊的HTTP响应头(Upgrade和Connection响应头字段),表示已经切换到WebSocket协议。握手完成后,连接从HTTP协议切换到WebSocket协议。
数据帧(Data Frames) WebSocket使用数据帧来传输数据,数据帧有固定的格式。数据帧包含一个帧头和一个帧载荷。帧头包含了数据帧的控制信息,例如数据帧是否是最后一个帧、数据帧的类型和长度等。帧载荷是实际传输的数据。
实时双向通信 WebSocket协议支持实时的双向通信,客户端和服务器可以同时进行数据的发送和接收。客户端可以通过JavaScript的WebSocket API与服务器建立连接,并通过发送数据帧向服务器发送消息。服务器可以向客户端发送消息,客户端通过监听WebSocket的事件来接收服务器发送的消息。
心跳机制 WebSocket支持心跳机制来保持连接的稳定性。客户端和服务器可以定时发送心跳消息以保持连接的活跃状态。如果一段时间内没有收到心跳消息,可以判定连接已断开,并进行相应的处理。
安全性(Security) WebSocket可以使用TLS/SSL加密来保护数据的传输。通过使用加密的WebSocket连接,可以确保数据的机密性和完整性,以防止数据被窃取和篡改。
使用场景 WebSocket被广泛应用于需要实时通信的应用场景,例如:
总结: WebSocket是一种实现实时双向通信的网络协议,它可以在客户端和服务器之间建立持久性的连接。WebSocket具有低延迟、高效率的特点,适用于需要实时数据传输和实时互动的应用场景。通过使用WebSocket,可以实现即时聊天、在线协作、多人游戏、实时监控等一系列的实时应用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。