赞
踩
心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维护一个心跳连接,告诉 Eureka Server 自己还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。
public class NettyClient { private static final String HOST = "127.0.0.1"; private static final int PORT = 9911; private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup(); private void connect(String host,int port){ try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8); ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) .addLast(new StringDecoder()) // 当一定周期内(默认50s)没有收到对方任何消息时,需要主动关闭链接 .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) .addLast("heartBeatHandler",new HeartBeatReqHandler()); } }); // 发起异步连接操作 ChannelFuture future = b.connect().sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 所有资源释放完之后,清空资源,再次发起重连操作 executor.execute(()->{ try { TimeUnit.SECONDS.sleep(5); //发起重连操作 connect(NettyClient.HOST,NettyClient.PORT); } catch (InterruptedException e) { e.printStackTrace(); } }); } } public static void main(String[] args) { new NettyClient().connect(NettyClient.HOST,NettyClient.PORT); } }
这里稍微复杂点的就是38行开始的重连部分。
2. 心跳消息发送类 HeartBeatReqHandler
package cn.sp.heartbeat; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * Created by 2YSP on 2019/5/23. */ @ChannelHandler.Sharable public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String> { private volatile ScheduledFuture<?> heartBeat; private static final String hello = "start notify with server$_"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())); System.out.println("================"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (heartBeat != null){ heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { if ("ok".equalsIgnoreCase(msg)){ //服务端返回ok开始心跳 heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS); }else { System.out.println("Client receive server heart beat message : --->"+msg); } } private class HeartBeatTask implements Runnable{ private final ChannelHandlerContext ctx; public HeartBeatTask(ChannelHandlerContext ctx){ this.ctx = ctx; } @Override public void run() { String heartBeat = "I am ok"; System.out.println("Client send heart beat message to server: ----->"+heartBeat); ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes())); } } }
channelActive()方法在首次建立连接后向服务端问好,如果服务端返回了 “ok” 就创建一个线程每隔5秒发送一次心跳消息。如果发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
3. 服务端 NettyServer
public class NettyServer { public static void main(String[] args) { new NettyServer().bind(9911); } private void bind(int port){ EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) .addLast(new StringDecoder()) .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) .addLast("HeartBeatHandler",new HeartBeatRespHandler()); } }); // 绑定端口,同步等待成功 b.bind(port).sync(); System.out.println("Netty Server start ok ...."); }catch (Exception e){ e.printStackTrace(); } } }
package cn.sp.heartbeat; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * Created by 2YSP on 2019/5/23. */ @ChannelHandler.Sharable public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String> { private static final String resp = "I have received successfully$_"; @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { if (msg.equals("start notify with server")){ ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes())); }else { //返回心跳应答信息 System.out.println("Receive client heart beat message: ---->"+ msg); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } } }
第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。
启动服务端再启动客户端,可以看到心跳检测正常,如下图。
服务端控制台:
客户端控制台:
现在让服务端宕机一段时间,看客户端能否重连并开始正常工作。
关闭服务端后,客户端周期性的连接失败,控制台输出如图:
重新启动服务端,过一会儿就会发现重连成功了。
总得来说,使用 Netty 实现心跳检测还是比较简单的,这里比较懒没有使用其他序列化协议(如 ProtoBuf 等),如果感兴趣的话大家可以自己试试。
代码地址,点击这里。
有篇SpringBoot 整合长连接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。