当前位置:   article > 正文

【Netty】利用Netty实现心跳检测和重连机制_netty心跳重连

netty心跳重连

一、前言

  心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
  我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维护一个心跳连接,告诉 Eureka Server 自己还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。

二、设计思路

  1. 分为客户端和服务端
  2. 建立连接后,客户端先发送一个消息询问服务端是否可以进行通信了。
  3. 客户端收到服务端 Yes 的应答后,主动发送心跳消息,服务端接收到心跳消息后,返回心跳应答,周而复始。
  4. 心跳超时利用 Netty 的 ReadTimeOutHandler 机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接。
  5. 为了避免出现粘/拆包问题,使用 DelimiterBasedFrameDecoderStringDecoder 来处理消息。

三、编码

  1. 先编写客户端 NettyClient
public class NettyClient {

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 9911;

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    EventLoopGroup group = new NioEventLoopGroup();


    private void connect(String host,int port){
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8);
                            ch.pipeline()
                                    .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
                                    .addLast(new StringDecoder())
                                    // 当一定周期内(默认50s)没有收到对方任何消息时,需要主动关闭链接
                                    .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
                                    .addLast("heartBeatHandler",new HeartBeatReqHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture future = b.connect().sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 所有资源释放完之后,清空资源,再次发起重连操作
            executor.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(5);
                    //发起重连操作
                    connect(NettyClient.HOST,NettyClient.PORT);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void main(String[] args) {
        new NettyClient().connect(NettyClient.HOST,NettyClient.PORT);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

这里稍微复杂点的就是38行开始的重连部分。
2. 心跳消息发送类 HeartBeatReqHandler

package cn.sp.heartbeat;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * Created by 2YSP on 2019/5/23.
 */
@ChannelHandler.Sharable
public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String> {

    private volatile ScheduledFuture<?> heartBeat;

    private static final String hello = "start notify with server$_";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes()));
        System.out.println("================");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (heartBeat != null){
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if ("ok".equalsIgnoreCase(msg)){
            //服务端返回ok开始心跳
            heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS);
        }else {
            System.out.println("Client receive server heart beat message : --->"+msg);
        }
    }

    private class HeartBeatTask implements Runnable{

        private final ChannelHandlerContext ctx;

        public HeartBeatTask(ChannelHandlerContext ctx){
            this.ctx = ctx;
        }


        @Override
        public void run() {
            String heartBeat = "I am ok";
            System.out.println("Client send heart beat message to server: ----->"+heartBeat);
            ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes()));
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

channelActive()方法在首次建立连接后向服务端问好,如果服务端返回了 “ok” 就创建一个线程每隔5秒发送一次心跳消息。如果发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
3. 服务端 NettyServer

public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bind(9911);
    }

    private void bind(int port){
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());

                            ch.pipeline()
                                    .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
                                    .addLast(new StringDecoder())
                                    .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
                                    .addLast("HeartBeatHandler",new HeartBeatRespHandler());
                        }
                    });
            // 绑定端口,同步等待成功
             b.bind(port).sync();
            System.out.println("Netty Server start ok ....");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  1. 心跳响应类 HeartBeatRespHandler
package cn.sp.heartbeat;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Created by 2YSP on 2019/5/23.
 */
@ChannelHandler.Sharable
public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String> {

    private static final String resp = "I have received successfully$_";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (msg.equals("start notify with server")){
            ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes()));
        }else {
            //返回心跳应答信息
            System.out.println("Receive client heart beat message: ---->"+ msg);
            ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。

四、测试

启动服务端再启动客户端,可以看到心跳检测正常,如下图。
服务端控制台:
在这里插入图片描述
客户端控制台:
在这里插入图片描述
现在让服务端宕机一段时间,看客户端能否重连并开始正常工作。

关闭服务端后,客户端周期性的连接失败,控制台输出如图:
在这里插入图片描述
重新启动服务端,过一会儿就会发现重连成功了。
在这里插入图片描述

五、总结

总得来说,使用 Netty 实现心跳检测还是比较简单的,这里比较懒没有使用其他序列化协议(如 ProtoBuf 等),如果感兴趣的话大家可以自己试试。
代码地址,点击这里
有篇SpringBoot 整合长连接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/242049
推荐阅读
相关标签
  

闽ICP备14008679号