赞
踩
本博客姊妹篇
背景:即时通讯过程中,解决传统网站使用HTTP轮询方式请求获取最新的数据(如每3秒请求一次)。
缺点:
WebSocket:WebSocket是一种在单个TCP连接上进行全双工通信的协议。
优势:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
package com.qiangesoft.im.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocket配置类 */ @Configuration public class WebSocketConfig { /** * bean注册:会自动扫描带有@ServerEndpoint注解声明的Websocket Endpoint(端点),注册成为Websocket bean。 * 注意:如果项目使用外置的servlet容器,而不是直接使用springboot内置容器的话,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
package com.qiangesoft.im.core; import com.alibaba.fastjson2.JSONObject; import com.qiangesoft.im.core.constant.ChatTypeEnum; import com.qiangesoft.im.core.constant.ImBodyEnum; import com.qiangesoft.im.pojo.bo.ImMessageBO; import com.qiangesoft.im.pojo.dto.PingDTO; import com.qiangesoft.im.pojo.vo.ImMessageVO; import com.qiangesoft.im.pojo.vo.PongVO; import com.qiangesoft.im.pojo.vo.SysUserVo; import com.qiangesoft.im.service.IImGroupUserService; import com.qiangesoft.im.util.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * 聊天会话 * * @author qiangesoft * @date 2023-08-30 */ @Slf4j @ServerEndpoint("/ws/im/{userId}") @Component public class ImWebSocketServer { /** * concurrent包的线程安全Set,用来存放每个客户端对应的session */ private static final ConcurrentHashMap<Long, Session> WEBSOCKET_MAP = new ConcurrentHashMap<>(); /** * 连接成功:用map存客户端对应的session */ @OnOpen public void onOpen(Session session, @PathParam("userId") Long userId) { log.info("User [{}] connection opened=====>", userId); // 关闭之前的 if (WEBSOCKET_MAP.containsKey(userId)) { Session oldSession = WEBSOCKET_MAP.get(userId); close(oldSession, userId); } // 存储session WEBSOCKET_MAP.put(userId, session); // 在线人数 log.info("User connection add 1, online num is [{}]", WEBSOCKET_MAP.size()); // 响应 PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.PONG.getCode()); pongVO.setContent("连接成功"); pongVO.setTimestamp(System.currentTimeMillis()); doSendMessage(session, pongVO); } /** * 收到客户端消息 */ @OnMessage public void onMessage(Session session, @PathParam("userId") Long userId, String message) { log.info("User [{}] send a message, content is [{}]", userId, message); PingDTO pingDTO = null; try { pingDTO = JSONObject.parseObject(message, PingDTO.class); } catch (Exception e) { log.error("消息解析失败"); e.printStackTrace(); } if (pingDTO == null || !ImBodyEnum.PING.getCode().equals(pingDTO.getType())) { sendInValidMessage(session); return; } // 响应 PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.PONG.getCode()); pongVO.setContent("已收到消息~"); pongVO.setTimestamp(System.currentTimeMillis()); doSendMessage(session, pongVO); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session, @PathParam("userId") Long userId) { close(session, userId); // 在线人数减1 if (!WEBSOCKET_MAP.containsKey(userId)) { log.info("User connection reduce 1, online num is [{}]", WEBSOCKET_MAP.size()); } log.info("User [{}] connection is closed<=====", userId); } /** * 报错 * * @param session * @param error */ @OnError public void onError(Session session, @PathParam("userId") Long userId, Throwable error) { log.info("User [{}] connection is error!", userId); error.printStackTrace(); } /** * 指定的userId服务端向客户端发送消息 */ public static void sendMessage(ImMessageBO message) { String chatType = message.getChatType(); if (ChatTypeEnum.GROUP.getCode().equals(chatType)) { sendGroupMessage(message); } if (ChatTypeEnum.PERSON.getCode().equals(chatType)) { sendPersonMessage(message); } } /** * 被挤下线 */ public static void offline(Long userId) { Session session = WEBSOCKET_MAP.get(userId); if (session != null) { // 设备下线 PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.OFFLINE.getCode()); pongVO.setContent("设备被挤下线"); pongVO.setTimestamp(System.currentTimeMillis()); doSendMessage(session, pongVO); // 关闭 close(session, userId); } } /** * 自定义关闭 * * @param session * @param userId */ public static void close(Session session, Long userId) { try { session.close(); } catch (IOException e) { e.printStackTrace(); } WEBSOCKET_MAP.remove(userId); } /** * 发送无效消息 */ private static void sendInValidMessage(Session session) { PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.PONG.getCode()); pongVO.setContent("无效消息"); pongVO.setTimestamp(System.currentTimeMillis()); doSendMessage(session, pongVO); } /** * 发送群组消息 * * @param message */ private static void sendGroupMessage(ImMessageBO message) { MessageHandlerService messageHandlerService = SpringUtil.getBean(MessageHandlerService.class); ImMessageVO messageVO = messageHandlerService.buildVo(message); PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.MESSAGE.getCode()); pongVO.setContent(messageVO); pongVO.setTimestamp(System.currentTimeMillis()); // 发送给群成员 IImGroupUserService groupUserService = SpringUtil.getBean(IImGroupUserService.class); List<Long> userIdList = groupUserService.listGroupUser(message.getTargetId()).stream().map(SysUserVo::getId).collect(Collectors.toList()); for (Long userId : userIdList) { Session session = WEBSOCKET_MAP.get(userId); doSendMessage(session, pongVO); } } /** * 发送私聊消息 * * @param message */ private static void sendPersonMessage(ImMessageBO message) { MessageHandlerService messageHandlerService = SpringUtil.getBean(MessageHandlerService.class); ImMessageVO messageVO = messageHandlerService.buildVo(message); PongVO pongVO = new PongVO(); pongVO.setType(ImBodyEnum.MESSAGE.getCode()); pongVO.setContent(messageVO); pongVO.setTimestamp(System.currentTimeMillis()); // 发送给好友 Session session = WEBSOCKET_MAP.get(message.getTargetId()); doSendMessage(session, pongVO); } /** * 发送消息 * * @param session * @param message */ private static void doSendMessage(Session session, PongVO message) { try { if (session != null) { session.getBasicRemote().sendText(JSONObject.toJSONString(message)); } } catch (IOException e) { e.printStackTrace(); } } }
源码地址:https://gitee.com/qiangesoft/boot-business/tree/master/boot-business-im
后续内容见下章
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。