赞
踩
前段时间接到一个任务,用Java重构一个nodejs项目,其中用到了websocket的功能了,在nodejs项目中用的是socketio框架来实现websocket的功能,前端对应的也使用了socketio jar包。
一开始对socketio的用法并不是很清楚,以为前后端分离,框架没必要统一。所以后端用的websocket来实现,开发时候很顺利,联调时候问题来了,用过socketio的伙伴可能知道,socketio可以通过自定义事件名称来实现浏览器和服务器之间的数据传输,很显然,websocket没有事件这一概念。为了让前端保持原有逻辑,最好后端沿用之前的socketio功能,通过查阅资料,发现Netty-socketio可以完美解决这一问题。
<!--netty-socket io-->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency>
引入socket server,并启动。通过实现 InitializingBean 接口,重写 afterPropertiesSet(),在初始化bean的时候,开启socketio服务。
import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.util.Map; @Configuration @Slf4j public class SocketIOConfig implements InitializingBean { @Value("${server.host}") private String host; @Resource private SocketIOServerHandler socketIOServerHandler; @Override public void afterPropertiesSet() { SocketConfig socketConfig = new SocketConfig(); // 为了确保一个进程被关闭后,即使它还没有释放该端口,其他进程可以立刻使用该端口,而不是提示端口被占用,注意此项设置必须在socket还没有绑定到本地端口之前设置,否则会导致失效 socketConfig.setReuseAddress(true); // 关闭Nagle算法,即关闭消息的ack确认 socketConfig.setTcpNoDelay(true); // 如果消息发送到一半,关闭连接,-1会等到消息发送完毕再执行tcp的四次挥手。如果是0,则会直接关闭 socketConfig.setSoLinger(-1); com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration(); configuration.setSocketConfig(socketConfig); // host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP configuration.setHostname(host); configuration.setPort(9092); // socket连接数大小(只监听一个端口,设置为1即可) configuration.setBossThreads(1); configuration.setWorkerThreads(100); // 允许自定义请求 configuration.setAllowCustomRequests(true); // 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 configuration.setUpgradeTimeout(1000000); // Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳则发送超时事件 configuration.setPingTimeout(6000000); // Ping消息间隔(毫秒),默认25秒,客户端向服务器发送一条心跳消息间隔 configuration.setPingInterval(25000); SocketIOServer socketIOServer = new SocketIOServer(configuration); // 添加事件监听器 socketIOServer.addListeners(socketIOServerHandler); // 启动SocketIOServer socketIOServer.start(); log.info("------- SocketIOServer start finished ------server hostIp: {}", host); } }
用来区分socket连接,我这里是根据number来区分的,也可以用userID,或者其他,具体根据业务场景来定。
import com.corundumstudio.socketio.SocketIOClient; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap; @Component public class ClientCache { private static ConcurrentHashMap<String, SocketIOClient> connectMap = new ConcurrentHashMap<>(); public void saveClient(String number, SocketIOClient socketIOClient) { connectMap.put(number, socketIOClient); } public SocketIOClient getClient(String number) { return connectMap.get(number); } public void deleteCacheByNumber(String number) { connectMap.remove(number); } public Boolean isContainsNumber(String number) { return connectMap.containsKey(number); } }
其实就是服务端(socket server)和客户端(socket client)之间的交互
import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.corundumstudio.socketio.annotation.OnEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.OnError; import javax.websocket.Session; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Component public class SocketIOServerHandler { @Autowired private ClientCache clientCache; // 记录当前在线连接数 private static AtomicInteger ONLINE_SOCKET_CLIENT_COUNT = new AtomicInteger(0); /** * 建立连接 客户端创建socket连接的时候,调用此事件 * * @param client 客户端的SocketIO */ @OnConnect public void onConnect(SocketIOClient client) { UUID sessionId = client.getSessionId(); String number = client.getHandshakeData().getSingleUrlParam("number"); // 第一次连接的时候返回connect_event_request事件给前端,前端监听到之后,发送connect_event事件给服务端 (和原nodejs逻辑保持一致) clientCache.saveClient(number, client); ONLINE_SOCKET_CLIENT_COUNT.incrementAndGet(); log.info("socket连接建立成功, 当前在线数为: {}, sessionId = {}, number = {}", ONLINE_SOCKET_CLIENT_COUNT, sessionId, number); } /** * 关闭连接 前端调用socket.disconnect()时触发改事件 * * @param client 客户端的SocketIO */ @OnDisconnect public void onDisconnect(SocketIOClient client) throws Exception { String number = client.getHandshakeData().getSingleUrlParam("number"); clientCache.deleteCacheByNumber(number); ONLINE_SOCKET_CLIENT_COUNT.decrementAndGet(); log.info("socket连接关闭成功, 当前在线数为: {} ==> 关闭连接信息: sessionId = {}, number = {}", ONLINE_SOCKET_CLIENT_COUNT, client.getSessionId(), number); } @OnEvent("ping") public void ping(SocketIOClient client, SocketMessage socketMessage) { String number = client.getHandshakeData().getSingleUrlParam("number"); SocketEventVo socketEventVo = new SocketEventVo("ping"); client.sendEvent("pong", socketEventVo); } /** * 自定义事件,前端socket.emit('event01', content)的时候,触发此事件 * * @param client 客户端的SocketIO */ @OnEvent("event01") public void event01(SocketIOClient client, SocketMessage socketMessage) throws Exception { clientCache.saveClient(socketMessage.getNumber(), client); if (clientCache.isContainsNumber(socketMessage.getNumber())) { SocketEventVo socketEventVo = new SocketEventVo("event01"); client.sendEvent("response", socketEventVo); } } /** * 报错时触发此事件 * * @param client 客户端的SocketIO */ @OnError public void onError(Session session, Throwable error) { log.error("SocketIO发生错误, session id = {}, 错误信息为:{}", session.getId(), error.getMessage()); } }
实体类
import lombok.Data;
@Data
public class SocketMessage {
private String number;
private String message;
private String req;
}
import lombok.Data; @Data public class SocketEventVo { private String status; private String req; private String desc; private String type; public SocketEventVo(String req) { this.status = "0"; this.req = req; this.desc = "success"; this.type = "response"; } public SocketEventVo() { } }
Springboot项目,在resources包下创建templates文件夹,然后创建index.html文件。
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>SocketIO Client</title> <base> <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script> <script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script> <style> body { padding: 20px; } #console { height: 450px; overflow: auto; } .connect-msg { color: green; } .disconnect-msg { color: red; } </style> </head> <body> <div style="width: 700px; float: left"> <h3>Socket Client 连接</h3> <div style="border: 1px;"> <label>socketio server ip:</label> <input type="text" id="url" value="http://127.0.0.1:9092?number=1626" style="width: 500px;"> <br> <br> <button id="connect" style="width: 100px;">建立连接</button> <button id="disconnect" style="width: 100px;">断开连接</button> </div> <hr style="height:1px;border:none;border-top:1px solid black;" /> <h3>Socket Client发送消息</h3> <div style="border: 1px;"> <label>socketEvent名称:</label><input type="text" id="socketEvent" value="event01"> <br><br> <button id="send" style="width: 100px;">1626发送消息</button> </div> <hr style="height:1px;border:none;border-top:1px solid black;" /> </div> <div style="float: left;margin-left: 50px;"> <h3>SocketIO 连接情况</h3> <div id="console" class="well"></div> </div> </body> <script type="text/javascript"> var socket ; var errorCount = 0; var isConnected = false; var maxError = 5; //连接 function connect(url) { // 建立socket连接 socket = io.connect(url); //socket.nsp = "/socketIO";//定义命名空间 console.log(socket) //监听本次连接回调函数 socket.on('connect', function () { isConnected =true; console.log("连接成功"); serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>连接成功</span>'); errorCount=0; }); //监听断开 socket.on('disconnect', function () { isConnected =false; console.log("连接断开"); serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已下线! </span>'); }); //监听断开错误 socket.on('connect_error', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>;' + '连接错误-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); //监听连接超时 socket.on('connect_timeout', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '连接超时-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); //监听错误 socket.on('error', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '系统错误-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); } function output(message) { var element = $("<div>" + " " + message + "</div>"); $('#console').prepend(element); } function serverOutput(message) { var element = $("<div>" + message + "</div>"); $('#console').prepend(element); } //连接 $("#connect").click(function(){ if(!isConnected){ var url = $("#url").val(); connect(url); }else { serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已经成功建立连接,不要重复建立!!! </span>'); } }) //断开连接 $("#disconnect").click(function(){ if(isConnected){ socket.disconnect(); } }) //发送消息 $("#send").click(function(){ //自定义的事件名称 var socketEvent = $("#socketEvent").val(); //发送的内容 var enData = { "number": "1626", "message": "浏览器发送给服务端的消息", "req": "event01" }; socket.emit(socketEvent,enData,function(data1,data2){ console.log("ack1:"+data1); console.log("ack2:"+data2); }); }) function getNowTime(){ var date=new Date(); var year=date.getFullYear(); //获取当前年份 var mon=date.getMonth()+1; //获取当前月份 var da=date.getDate(); //获取当前日 var h=date.getHours(); //获取小时 var m=date.getMinutes(); //获取分钟 var s=date.getSeconds(); //获取秒 var ms=date.getMilliseconds(); var d=document.getElementById('Date'); var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms; return date; } </script> </html>
前端页面不生效的话,需要在yml配置中添加如下配置
thymeleaf:
mode: HTML
cache: true
prefix: classpath:/templates/
encoding: UTF-8
suffix: .html
check-template-location: true
template-resolver-order: 1
引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.7.12</version>
</dependency>
首先启动socket服务端,启动之后,打开index.html文件,idea会自动提示一个浏览器标识,直接点击谷歌浏览器
即可打开html页面
接下来按 F12 看一下效果。
首先,点击【建立连接】,我们可以看到connection已经变成upgrade了,upgrade的值为websocket,说明连接建立成功
点击【发送消息】,在message的位置可以看到客户端和服务端之间的消息传输
每个浏览器页面对应一个sessionId,可以多开几个页面,看看后台日志打印的结果,测试一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。