当前位置:   article > 正文

实用篇--SpringBoot整合Netty实现消息推送服务器_springboot+netty消息推送

springboot+netty消息推送

前言

由于今天是要练车的,所以的话只能写一点简单的东西了,做一个整合吧,刚好先前随便说了一下Netty是吧,那么我们就直接实战吧,我们来整合一下Netty。我的设想是使用Netty来实现客户端消息的实时推送,就是这个破玩意:

当然还有咱们的聊天,用户聊天,反正都做推送了,再加一个用户聊天有何不可。都TM是一个玩意。 那么既然咱们是使用SpringBoot或者是Spring的话,那么我们就刚好直接使用IOC来实例化咱们的Netty。那么这样的话,我们可以使用Netty来实现实时的消息推送,以及在线用户聊天功能,同时的话,对于咱们后台的管理系统也刚好有这个需求,有新的博文审核消息啥的这个要推送给后台的。那么同时为了提高整个实时在线的工作效率,数据的存储直接进行异步处理,这个方案很多,那么最省事的就是搞个线程池+异步任务就完了,或者直接MQ过去,然后存储,反正这种数据丢了也没事。像博文,问答之类的数据,这些注意一点就完了。

那么本文的话,两个目标,第一个是怎么整合,服务端和客户端怎么整合。第二个就是我们实际上就是说,创建一个服务端,可以完成websock协议。

项目创建

首先我们创建一个新的项目。 在我这里的话是已经创建好了:

可以看到,我这里的话,做戏做全套,那么这里有两个端,一个是Server,还有是Client,什么意思,就是说,你使用Netty做服务端可以,做客户端链接其他的服务器也可以,rpc嘛。 例如:

整合

导入依赖

首先是导入依赖,这里的话是SpringBoot,那么直接导入这个就完了:

  1. <!-- 导入Netty的依赖-->
  2. <dependency>
  3. <groupId>io.netty</groupId>
  4. <artifactId>netty-all</artifactId>
  5. </dependency>
  6. 复制代码

编写相关配置

既然是用到了SpringBoot,那么我们直接把对应的配置给提取出来:

  1. #Netty的一些配置
  2. netty:
  3. boss: 4
  4. worker: 2
  5. timeout: 6000
  6. port: 9000
  7. #多端口绑定
  8. portSalve: 9001
  9. host: 127.0.0.1
  10. 复制代码

这个看你自己,我这里演示就是多端口的。值得一提的是这个演示的,演示完就删了哈,所以这篇博文会尽可能详细。毕竟,我以后搞不好还会回来copy。

服务端创建

首先是我们的服务端,那么在这里的话,我们想要创建一个Netty服务,基本上就三个东西,一个是我们选择哪一个线程模型,这里我们显然选择主从模型。所以的话,那么我们就需要准备两个线程组,之后的话是我们的初始化器,用来初始化Handler,之后是我们自定义的Handler。这里为了更好地演示,我们这里以创建Http处理的为例子。之后的话我们以创建websocket为例子。

ok,既然如此我们知道了需要那些东西,那么我们直接去搞就好了。

读取配置

首先要做的就是读取配置,我这里准备了专门的配置类。

  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.context.annotation.Configuration;
  4. @ConfigurationProperties(prefix = "netty")
  5. @Data
  6. @Configuration
  7. public class HoleNettyProperties {
  8. // boss线程数量 默认为cpu线程数*4
  9. private Integer boss;
  10. // worker线程数量 默认为cpu线程数*2
  11. private Integer worker;
  12. // 连接超时时间 默认为30s
  13. private Integer timeout = 30000;
  14. // 服务器主端口 默认9000
  15. private Integer port = 9000;
  16. // 服务器备用端口
  17. private Integer portSalve = 9001;
  18. // 服务器地址 默认为本地
  19. private String host = "127.0.0.1";
  20. }
  21. 复制代码

配置类

之后的话,是我们的一个配置: 这个配置主要就是声明一个Bootstrap开启服务,之后绑定我们设定的配置和处理器。

  1. @Configuration
  2. @EnableConfigurationProperties
  3. public class NettyConfig {
  4. @Autowired
  5. HoleNettyProperties holeNettyProperties;
  6. /**
  7. * boss 线程池
  8. * 负责客户端连接
  9. * @return
  10. */
  11. @Bean
  12. public NioEventLoopGroup boosGroup(){
  13. return new NioEventLoopGroup(holeNettyProperties.getBoss());
  14. }
  15. /**
  16. * worker线程池
  17. * 负责业务处理
  18. * @return
  19. */
  20. @Bean
  21. public NioEventLoopGroup workerGroup(){
  22. return new NioEventLoopGroup(holeNettyProperties.getWorker());
  23. }
  24. /**
  25. * 服务器启动器
  26. * @return
  27. */
  28. @Bean
  29. public ServerBootstrap serverBootstrap(){
  30. ServerBootstrap serverBootstrap = new ServerBootstrap();
  31. serverBootstrap
  32. .group(boosGroup(),workerGroup()) // 指定使用的线程组
  33. .channel(NioServerSocketChannel.class) // 指定使用的通道
  34. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,holeNettyProperties.getTimeout()) // 指定连接超时时间
  35. .childHandler(new ServerHandler()); // 指定worker处理器
  36. return serverBootstrap;
  37. }
  38. /**
  39. * 客户端启动器
  40. * @return
  41. */
  42. @Bean
  43. public Bootstrap bootstrap(){
  44. // 新建一组线程池
  45. NioEventLoopGroup eventExecutors = new NioEventLoopGroup(holeNettyProperties.getBoss());
  46. Bootstrap bootstrap = new Bootstrap();
  47. bootstrap
  48. .group(eventExecutors) // 指定线程组
  49. .option(ChannelOption.SO_KEEPALIVE, true)
  50. .channel(NioSocketChannel.class) // 指定通道
  51. .handler(new ClientHandler()); // 指定处理器
  52. return bootstrap;
  53. }
  54. }
  55. 复制代码

可以看到的话,我们这里是有两个的,一个是用来创建服务端的,还有一个是用来创建客户端的。在这里的话,我们主要还是做声明。

区别是 在于指定线程组。以及在我们后面真正实例化的时候,前者是监听,后者是连接。

服务处理器 ServerHandler

这个玩意其实就是用来做初始化的。

  1. public class ServerHandler extends ChannelInitializer<SocketChannel> {
  2. /**
  3. * 初始化通道以及配置对应管道的处理器
  4. * @param socketChannel
  5. * @throws Exception
  6. */
  7. @Override
  8. protected void initChannel(SocketChannel socketChannel) throws Exception {
  9. ChannelPipeline pipeline = socketChannel.pipeline();
  10. pipeline.addLast(new MessageDecodeHandler());
  11. pipeline.addLast(new MessageEncodeHandler());
  12. pipeline.addLast(new ServerListenerHandler());
  13. }
  14. }
  15. 复制代码

同时在这里指定了两个消息的编解码器。

  1. public class MessageDecodeHandler extends ByteToMessageDecoder {
  2. @Override
  3. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
  4. int len = byteBuf.readInt();
  5. byte[] content = new byte[len];
  6. byteBuf.readBytes(content);
  7. MessageBean messageBean = new MessageBean();
  8. messageBean.setContent(content);
  9. messageBean.setLen(len);
  10. list.add(messageBean);
  11. }
  12. }
  13. 复制代码
  1. public class MessageEncodeHandler extends MessageToByteEncoder<MessageBean> {
  2. @Override
  3. protected void encode(ChannelHandlerContext channelHandlerContext, MessageBean messageBean, ByteBuf byteBuf) throws Exception {
  4. byteBuf.writeInt(messageBean.getLen());
  5. byteBuf.writeBytes(messageBean.getContent());
  6. }
  7. }
  8. 复制代码

那么同样的,我们声明了一个Bean,专门用来承载消息的,主要是转JSON用的。

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class MessageBean {
  5. /**
  6. * 数据长度
  7. */
  8. private Integer len;
  9. /**
  10. * 通讯数据
  11. */
  12. private byte[] content;
  13. public MessageBean(Object object) {
  14. content = JSONUtil.toJsonStr(object).getBytes(StandardCharsets.UTF_8);
  15. len = content.length;
  16. }
  17. }
  18. 复制代码

这里的话还是用到了JSONUtil,所以的话,还需要导入依赖:

  1. <dependency>
  2. <groupId>cn.hutool</groupId>
  3. <artifactId>hutool-all</artifactId>
  4. <version>${hutool.version}</version>
  5. <!-- <hutool.version>5.5.4</hutool.version>-->
  6. </dependency>
  7. 复制代码

注意自己对应的版本号,我这里是5.5.4

自定义处理器

最后是我们的自定义处理器,这个东西就是我们实现业务的核心。但是我们这边只是演示,所以的话就随便一点了。

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class ClientListenerHandler extends SimpleChannelInboundHandler<MessageBean> {
  4. /**
  5. * 服务端上线的时候调用
  6. * @param ctx
  7. * @throws Exception
  8. */
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. log.info("{}连上了服务器",ctx.channel().remoteAddress());
  12. }
  13. /**
  14. * 服务端掉线的时候调用
  15. * @param ctx
  16. * @throws Exception
  17. */
  18. @Override
  19. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  20. log.info("{}断开了服务器",ctx.channel().remoteAddress());
  21. ctx.fireChannelInactive();
  22. }
  23. /**
  24. * 读取服务端消息
  25. * @param channelHandlerContext
  26. * @param messageBean
  27. * @throws Exception
  28. */
  29. @Override
  30. protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception {
  31. log.info("来自服务端的消息:{}",new String(messageBean.getContent(), CharsetUtil.UTF_8));
  32. channelHandlerContext.channel().close();
  33. }
  34. /**
  35. * 异常发生时候调用
  36. * @param ctx
  37. * @param cause
  38. * @throws Exception
  39. */
  40. @Override
  41. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  42. log.error("{}连接出异常了",ctx.channel().remoteAddress());
  43. log.error(ExceptionUtil.printStackTrace((Exception) cause));
  44. ctx.close();
  45. }
  46. }
  47. 复制代码

同时这块也有一个异常写入类,用来返回这个错误的,这个看你自己,不要也罢

  1. public class ExceptionUtil {
  2. public static String printStackTrace(Exception e){
  3. Writer writer = new StringWriter();
  4. PrintWriter printWriter = new PrintWriter(writer);
  5. e.printStackTrace(printWriter);
  6. printWriter.close();
  7. return writer.toString();
  8. }
  9. }
  10. 复制代码

自定义启动类

ok,现在的话我们已经准备好了我们需要的东西(基本上),配置,处理器,线程组,用来开启服务的Bootstrap。那么既然是做SpringBoot的整合,那么我们就在更进一步,待会的的话,我们再搞一个注解。

  1. @Component
  2. @Slf4j
  3. public class ServerBoot {
  4. @Autowired
  5. ServerBootstrap serverBootstrap;
  6. @Resource
  7. NioEventLoopGroup boosGroup;
  8. @Resource
  9. NioEventLoopGroup workerGroup;
  10. @Autowired
  11. HoleNettyProperties holeNettyProperties;
  12. /**
  13. * 开机启动
  14. * @throws InterruptedException
  15. */
  16. @PostConstruct
  17. public void start() throws InterruptedException {
  18. // 绑定端口启动
  19. serverBootstrap.bind(holeNettyProperties.getPort()).sync();
  20. serverBootstrap.bind(holeNettyProperties.getPortSalve()).sync();
  21. log.info("启动Netty多端口服务器: {},{}",holeNettyProperties.getPort(),holeNettyProperties.getPortSalve());
  22. }
  23. /**
  24. * 关闭线程池
  25. */
  26. @PreDestroy
  27. public void close() throws InterruptedException {
  28. log.info("关闭Netty服务器");
  29. boosGroup.shutdownGracefully();
  30. workerGroup.shutdownGracefully();
  31. }
  32. }
  33. 复制代码

之后,我们创建一个启动注解。

  1. @Import(ServerBoot.class)
  2. @Target(ElementType.TYPE)
  3. @Retention(RetentionPolicy.RUNTIME)
  4. @Documented
  5. public @interface EnableNettyServer {
  6. }
  7. 复制代码

之后要用的时候就直接:

客户端

之后的话,就是我们的客户端的处理。首先配置类俺已经给出来了。区别呢就是咱们的处理器和启动类。

处理器 ClientHandler

首先一样的编解码处理

  1. public class ClientHandler extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel socketChannel) throws Exception {
  4. ChannelPipeline pipeline = socketChannel.pipeline();
  5. pipeline.addLast(new MessageEncodeHandler());
  6. pipeline.addLast(new MessageDecodeHandler());
  7. pipeline.addLast(new ClientListenerHandler());
  8. }
  9. }
  10. 复制代码

之后的话是我们具体的业务逻辑:

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class ClientListenerHandler extends SimpleChannelInboundHandler<MessageBean> {
  4. /**
  5. * 服务端上线的时候调用
  6. * @param ctx
  7. * @throws Exception
  8. */
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. log.info("{}连上了服务器",ctx.channel().remoteAddress());
  12. }
  13. /**
  14. * 服务端掉线的时候调用
  15. * @param ctx
  16. * @throws Exception
  17. */
  18. @Override
  19. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  20. log.info("{}断开了服务器",ctx.channel().remoteAddress());
  21. ctx.fireChannelInactive();
  22. }
  23. /**
  24. * 读取服务端消息
  25. * @param channelHandlerContext
  26. * @param messageBean
  27. * @throws Exception
  28. */
  29. @Override
  30. protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception {
  31. log.info("来自服务端的消息:{}",new String(messageBean.getContent(), CharsetUtil.UTF_8));
  32. channelHandlerContext.channel().close();
  33. }
  34. /**
  35. * 异常发生时候调用
  36. * @param ctx
  37. * @param cause
  38. * @throws Exception
  39. */
  40. @Override
  41. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  42. log.error("{}连接出异常了",ctx.channel().remoteAddress());
  43. log.error(ExceptionUtil.printStackTrace((Exception) cause));
  44. ctx.close();
  45. }
  46. }
  47. 复制代码

启动类

之后还是我们的启动类,这个就是和服务的不一样的玩意:

  1. @Component
  2. public class ClientBoot {
  3. @Autowired
  4. Bootstrap bootstrap;
  5. @Autowired
  6. HoleNettyProperties holeNettyProperties;
  7. /**
  8. * 主端口连接
  9. * @return
  10. * @throws InterruptedException
  11. */
  12. public Channel connect() throws InterruptedException {
  13. // 连接服务器
  14. ChannelFuture channelFuture = bootstrap.connect(holeNettyProperties.getHost(), holeNettyProperties.getPort()).sync();
  15. // 监听关闭
  16. Channel channel = channelFuture.channel();
  17. return channel;
  18. }
  19. /**
  20. * 备用端口连接
  21. * @return
  22. * @throws InterruptedException
  23. */
  24. public Channel connectSlave() throws InterruptedException {
  25. // 连接服务器
  26. ChannelFuture channelFuture = bootstrap.connect(holeNettyProperties.getHost(), holeNettyProperties.getPort()).sync();
  27. // 监听关闭
  28. Channel channel = channelFuture.channel();
  29. channel.closeFuture().sync();
  30. return channel;
  31. }
  32. /**
  33. * 发送消息到服务器端
  34. * @return
  35. */
  36. public void sendMsg(MessageBean messageBean) throws InterruptedException {
  37. connect().writeAndFlush(messageBean);
  38. }
  39. }
  40. 复制代码

之后也是我们的启动注解

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Import(ClientBoot.class)
  5. public @interface EnableNettyClient {
  6. }
  7. 复制代码

如果你想要同时开启也是可以的,但是注意改一下配置,否则的话,串了。

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @EnableNettyClient
  5. @EnableNettyServer
  6. public @interface EnableNetty {
  7. }
  8. 复制代码

ok,那么这块的就是最基本的整合。

聊天服务器创建实例

ok,那么现在的话,我们就来创建一下聊天服务器,我们使用的websocket 也就是ws协议。

首先还是我们的配置,这里的话,我就使用单端口了。 那么我们修改完成后一个样子是这样的:

我们甚至提取出了一个Controller。 这个Controller的作用很大,我们可以直接通过Controller对客户端进行消息推送,例如你的博文审核通过了,我们可以通过这个在线通知你(前提是你在线)。而且这里的话也有个好处嘛,就是说,我们的客户端只需要进行监听,发送消息可以通过正常的http请求,这样的话最起码作为一个分布式的项目,请求还是到了网关的,我们还是可以进行监控的,同时也可确保安全,因为我们对于很多的一些处理可以在Controller进行操作,只是用Netty进行消息转发。

工具类

这里的话,我们使用到这几个工具类:

  1. /**
  2. * 异常打印工具类
  3. */
  4. public class ExceptionUtil {
  5. public static String printStackTrace(Exception e){
  6. Writer writer = new StringWriter();
  7. PrintWriter printWriter = new PrintWriter(writer);
  8. e.printStackTrace(printWriter);
  9. printWriter.close();
  10. return writer.toString();
  11. }
  12. }
  13. 复制代码
  1. /**
  2. * @Description: 自定义响应结构, 转换类
  3. */
  4. public class JsonUtils {
  5. // 定义jackson对象
  6. private static final ObjectMapper MAPPER = new ObjectMapper();
  7. /**
  8. * 将对象转换成json字符串。
  9. * <p>Title: pojoToJson</p>
  10. * <p>Description: </p>
  11. * @param data
  12. * @return
  13. */
  14. public static String objectToJson(Object data) {
  15. try {
  16. String string = MAPPER.writeValueAsString(data);
  17. return string;
  18. } catch (JsonProcessingException e) {
  19. e.printStackTrace();
  20. }
  21. return null;
  22. }
  23. /**
  24. * 将json结果集转化为对象
  25. *
  26. * @param jsonData json数据
  27. * @param beanType 对象类型
  28. * @return
  29. */
  30. public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
  31. try {
  32. T t = MAPPER.readValue(jsonData, beanType);
  33. return t;
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. return null;
  38. }
  39. /**
  40. * 将json数据转换成pojo对象list
  41. * <p>Title: jsonToList</p>
  42. * <p>Description: </p>
  43. * @param jsonData
  44. * @param beanType
  45. * @return
  46. */
  47. public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
  48. JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
  49. try {
  50. List<T> list = MAPPER.readValue(jsonData, javaType);
  51. return list;
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. return null;
  56. }
  57. }
  58. 复制代码

那个StringUtils老朋友,一大把,我这里就不粘贴了,实际上我也没用到那个玩意。

消息转换

首先我们这个是直接基于websocket协议来的,因为要实现这个长连接监听嘛。 所以在消息转换这里略有不同:

这里的话和先前的区别就是这个:

  1. /**
  2. * 定义worker端的处理器
  3. */
  4. public class ServerHandler extends ChannelInitializer<SocketChannel> {
  5. /**
  6. * 初始化通道以及配置对应管道的处理器
  7. * @param socketChannel
  8. * @throws Exception
  9. */
  10. @Override
  11. protected void initChannel(SocketChannel socketChannel) throws Exception {
  12. ChannelPipeline pipeline = socketChannel.pipeline();
  13. // 流水线管理通道中的处理程序(Handler),用来处理业务
  14. // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
  15. pipeline.addLast(new HttpServerCodec());
  16. pipeline.addLast(new ChunkedWriteHandler());
  17. pipeline.addLast(new HttpObjectAggregator(1024*64));
  18. pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
  19. pipeline.addLast(new ServerListenerHandler());
  20. }
  21. }
  22. 复制代码

这个玩意我们改变了,同时我们也没有先前的编解码器了。

配置

这个配置的话,和先前一样,区别就是咱们是单接口嘛:

  1. #Netty的一些配置
  2. netty:
  3. boss: 1
  4. worker: 4
  5. timeout: 6000
  6. port: 9000
  7. host: 127.0.0.1
  8. 复制代码
  1. @ConfigurationProperties(prefix = "netty")
  2. @Data
  3. @Configuration
  4. public class HoleNettyProperties {
  5. /**
  6. * boss线程数量 默认为cpu线程数*2
  7. */
  8. private Integer boss;
  9. /**
  10. * worker线程数量 默认为cpu线程数*2
  11. */
  12. private Integer worker;
  13. /**
  14. * 连接超时时间 默认为30s
  15. */
  16. private Integer timeout = 30000;
  17. /**
  18. * 服务器主端口 默认9000
  19. */
  20. private Integer port = 9000;
  21. private String host = "127.0.0.1";
  22. }
  23. 复制代码

然后咱们的配置类就是这样了:

  1. @Configuration
  2. @EnableConfigurationProperties
  3. public class NettyConfig {
  4. @Autowired
  5. HoleNettyProperties holeNettyProperties;
  6. /**
  7. * boss 线程池
  8. * 负责客户端连接
  9. * @return
  10. */
  11. @Bean
  12. public NioEventLoopGroup boosGroup(){
  13. return new NioEventLoopGroup(holeNettyProperties.getBoss());
  14. }
  15. /**
  16. * worker线程池
  17. * 负责业务处理
  18. * @return
  19. */
  20. @Bean
  21. public NioEventLoopGroup workerGroup(){
  22. return new NioEventLoopGroup(holeNettyProperties.getWorker());
  23. }
  24. /**
  25. * 服务器启动器
  26. * @return
  27. */
  28. @Bean
  29. public ServerBootstrap serverBootstrap(){
  30. ServerBootstrap serverBootstrap = new ServerBootstrap();
  31. serverBootstrap
  32. .group(boosGroup(),workerGroup()) // 指定使用的线程组
  33. .channel(NioServerSocketChannel.class) // 指定使用的通道
  34. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,holeNettyProperties.getTimeout()) // 指定连接超时时间
  35. .childHandler(new ServerHandler()); // 指定worker处理器
  36. return serverBootstrap;
  37. }
  38. }
  39. 复制代码

消息封装

为了这个更好的处理,咱们就把这个String转化为一个标准的Bean,也方便后面转化为Entity,也就是实体嘛。

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class ChatMsg implements Serializable {
  5. private String senderId;
  6. private String receiverId;
  7. private String msg;
  8. private String msgId;
  9. }
  10. 复制代码
  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class DataContent implements Serializable {
  5. private Integer action;
  6. private ChatMsg chatMsg;
  7. private String extend;
  8. }
  9. 复制代码

这个的话,你可以自己看着改。 反正我这个样例就是这样的,后面我也要改。

消息处理

之后就是我们的服务处理的Handler了。 在这里的话,我这里还定义了一个枚举类,主要是用来这样的:

  1. public enum MessageActionEnum {
  2. //定义消息类型
  3. CONNECT(1,"第一次(或重连)初始化连接"),
  4. CHAT(2,"聊天消息"),
  5. SIGNED(3,"消息签收"),
  6. KEEPALIVE(4,"客户端保持心跳"),
  7. PULL_FRIEND(5, "拉取好友");
  8. public final Integer type;
  9. public final String content;
  10. MessageActionEnum(Integer type,String content) {
  11. this.type = type;
  12. this.content = content;
  13. }
  14. }
  15. 复制代码

定义消息的类型。这个在Controller可以用,在Netty的服务里面也可以使用,看你自己,这里咱们演示的话就,还是和正常的一样。全部在Netty的服务里面用的,后面怎么玩其实很好想像,待会看到Controller的代码就懂了。

那么这里还是看到这个代码

  1. @Component
  2. @ChannelHandler.Sharable
  3. public class ServerListenerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  4. private static final Logger log = LoggerFactory.getLogger(ServerBoot.class);
  5. /**
  6. * 当建立链接时将Channel放置在Group当中
  7. */
  8. @Override
  9. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  10. log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
  11. // 添加到channelGroup 通道组
  12. UserConnectPool.getChannelGroup().add(ctx.channel());
  13. }
  14. /**
  15. * 读取数据
  16. */
  17. @Override
  18. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  19. /**
  20. * 1.接受到msg
  21. * 2.将msg转化为实体类
  22. * 3.解析消息类型
  23. * 将实体类当中的userid和连接的Channel进行对应
  24. * */
  25. String content = msg.text();
  26. Channel channel = ctx.channel();
  27. DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
  28. assert dataContent != null;
  29. Integer action = dataContent.getAction();
  30. if(Objects.equals(action, MessageActionEnum.CONNECT.type)){
  31. //进行关联注册
  32. String senderId = dataContent.getChatMsg().getSenderId();
  33. UserConnectPool.getChannelMap().put(senderId,channel);
  34. // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
  35. AttributeKey<String> key = AttributeKey.valueOf("userId");
  36. ctx.channel().attr(key).setIfAbsent(senderId);
  37. }else if(Objects.equals(action, MessageActionEnum.CHAT.type)){
  38. /**
  39. * 解析你的消息,然后进行持久化,或者其他的操作,看你自己
  40. * */
  41. ChatMsg chatMsg = dataContent.getChatMsg();
  42. //发送消息
  43. Channel receiverChannel = UserConnectPool.getChannel(chatMsg.getReceiverId());
  44. if(receiverChannel==null){
  45. //用户不在线
  46. }else {
  47. //为了保险起见你还可以在你的Group里面去查看有没有这样的Channel
  48. //毕竟不太能够保证原子性操作嘛,反正底层也是CurrentMap
  49. Channel findChannel = UserConnectPool.getChannelGroup().find(ctx.channel().id());
  50. if(findChannel!=null){
  51. receiverChannel.writeAndFlush(
  52. new TextWebSocketFrame(
  53. JsonUtils.objectToJson(chatMsg)
  54. )
  55. );
  56. }else {
  57. //离线
  58. }
  59. }
  60. }else if (Objects.equals(action, MessageActionEnum.SIGNED.type)){
  61. }else if (Objects.equals(action, MessageActionEnum.KEEPALIVE.type)){
  62. }else if(Objects.equals(action, MessageActionEnum.PULL_FRIEND.type)){
  63. }
  64. }
  65. @Override
  66. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  67. log.info("用户下线了:{}", ctx.channel().id().asLongText());
  68. // 删除通道
  69. UserConnectPool.getChannelGroup().remove(ctx.channel());
  70. removeUserId(ctx);
  71. }
  72. @Override
  73. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  74. //打印异常
  75. log.info("异常:{}", cause.getMessage());
  76. // 删除通道
  77. UserConnectPool.getChannelGroup().remove(ctx.channel());
  78. removeUserId(ctx);
  79. ctx.close();
  80. }
  81. /**
  82. * 删除用户与channel的对应关系
  83. */
  84. private void removeUserId(ChannelHandlerContext ctx) {
  85. AttributeKey<String> key = AttributeKey.valueOf("userId");
  86. String userId = ctx.channel().attr(key).get();
  87. UserConnectPool.getChannelMap().remove(userId);
  88. }
  89. }
  90. 复制代码

同时在这里的话,我们还有这个玩意:

然后这个玩意是这样的,有一个存储Channel的列表,还有一个MAP。

  1. public class UserConnectPool {
  2. //主要是为了广播消息
  3. private static volatile ChannelGroup channelGroup = null;
  4. /**
  5. * 存放请求ID与channel的对应关系
  6. */
  7. private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
  8. /**
  9. * 定义两把锁
  10. */
  11. private static final Object lock1 = new Object();
  12. private static final Object lock2 = new Object();
  13. public static ChannelGroup getChannelGroup() {
  14. if (null == channelGroup) {
  15. synchronized (lock1) {
  16. if (null == channelGroup) {
  17. channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  18. }
  19. }
  20. }
  21. return channelGroup;
  22. }
  23. public static ConcurrentHashMap<String, Channel> getChannelMap() {
  24. if (null == channelMap) {
  25. synchronized (lock2) {
  26. if (null == channelMap) {
  27. channelMap = new ConcurrentHashMap<>();
  28. }
  29. }
  30. }
  31. return channelMap;
  32. }
  33. public static Channel getChannel(String userId) {
  34. if (null == channelMap) {
  35. return getChannelMap().get(userId);
  36. }
  37. return channelMap.get(userId);
  38. }
  39. }
  40. 复制代码

之后的话,我们这个就ok了。 同时这块咱们在做一个Controller,我们可以直接处理消息:

Controller处理

我们直接看到实现类

  1. @Service
  2. public class PushMsgServiceImpl implements PushMsgService {
  3. @Override
  4. public void pushMsgToOne(DataContent dataContent) {
  5. ChatMsg chatMsg = dataContent.getChatMsg();
  6. Channel channel = UserConnectPool.getChannel(chatMsg.getReceiverId());
  7. if (Objects.isNull(channel)) {
  8. throw new RuntimeException("未连接socket服务器");
  9. }
  10. channel.writeAndFlush(
  11. new TextWebSocketFrame(
  12. JsonUtils.objectToJson(chatMsg)
  13. )
  14. );
  15. }
  16. @Override
  17. public void pushMsgToAll(DataContent dataContent) {
  18. ChatMsg chatMsg = dataContent.getChatMsg();
  19. Channel channel = UserConnectPool.getChannel(chatMsg.getReceiverId());
  20. UserConnectPool.getChannelGroup().writeAndFlush(
  21. new TextWebSocketFrame(
  22. JsonUtils.objectToJson(chatMsg)
  23. )
  24. );
  25. }
  26. }
  27. 复制代码

之后是这个:

  1. @RestController
  2. @RequestMapping("/push")
  3. public class PushMsgController {
  4. /**
  5. * 同时为了方便操作,我们还可以提取出
  6. * Controller
  7. * */
  8. @Autowired
  9. PushMsgService pushMsgService;
  10. @RequestMapping("/pushOne")
  11. public void pushOne(DataContent dataContent){
  12. pushMsgService.pushMsgToOne(dataContent);
  13. }
  14. @RequestMapping("/pushAll")
  15. public void pushAll(DataContent dataContent){
  16. pushMsgService.pushMsgToAll(dataContent);
  17. }
  18. }
  19. 复制代码

看到了吧,我们可以直接通过Controller去进行客户端的消息的推送。好处就是我们可以通过Controller做很多处理,同时方便实现一些功能,例如刚刚的那个消息类型判断,全部写在一块,显然是不太好的,当然这样有一定的性能损耗,看你的需求,要安全就走这个比如一些重要消息的推送啥的,不太需要,例如用户聊天就直接走Netty的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/973864
推荐阅读
相关标签
  

闽ICP备14008679号