赞
踩
1.什么是心跳检测?
判断对方是否正常运行,一般采用定时发送简单的通讯包,如果在指定时间内未接收到对方响应,则判定对方已经宕掉。用于检测TCP的异常断开。
心跳包一般就是客户端发送给服务端的简单消息,如果服务端几分钟内没有收到客户端消息,则视为客户端已经断开,这个时候就主动关闭客户端的通道。
2.使用Netty实现服务端心跳检测
下面我们编写服务端代码,服务端实现以下功能:如果在N长时间内没有接受到客户端连接,则发送一段信息给客户端,并关闭其通道
* 我们创建服务端心跳检测的Handler,命名为HeartBeatHandler,并重写userEventTriggered方法
- /**
- * 服务端心跳检测
- */
- public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-
- if(evt instanceof IdleStateEvent){
- IdleStateEvent ise = (IdleStateEvent)evt;
-
- // 服务端读空闲
- if(ise.state().equals(IdleState.READER_IDLE)){
- ctx.writeAndFlush("client reader idle, channel will close");
- ctx.channel().close();
-
- // 服务端写空闲
- }else if (ise.state().equals(IdleState.WRITER_IDLE)){
- ctx.write("pong message");
-
- // 服务端读写空闲
- }else if (ise.state().equals(IdleState.ALL_IDLE)){
- ctx.channel().close();
- }else{
- // DO NOTHING
- }
- }
- }
- }

* 服务端代码,如下
- public class Server {
-
- public static void main(String[] args) {
- //服务类
- ServerBootstrap bootstrap = new ServerBootstrap();
-
- //boss和worker
- EventLoopGroup boss = new NioEventLoopGroup();
- EventLoopGroup worker = new NioEventLoopGroup();
-
- try {
- //设置线程池
- bootstrap.group(boss, worker);
-
- //设置socket工厂
- bootstrap.channel(NioServerSocketChannel.class);
-
- //设置管道工厂
- bootstrap.childHandler(new ChannelInitializer<Channel>() {
-
-
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(new StringEncoder());
- // 主要在这里,先创建一个IdleStateHandler,10秒内没有读写则判定为空闲
- ch.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.SECONDS));
- // 然后执行心跳检测
- ch.pipeline().addLast(new HeartBeatHandler());
- }
- });
-
- //设置参数,TCP参数
- bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
- bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
- bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
-
- //绑定端口
- ChannelFuture future = bootstrap.bind(8088).sync();
-
- System.out.println("server start...");
- //等待服务端关闭
- future.channel().closeFuture().sync();
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- //释放资源
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }

* 启动服务端并测试
启动服务端之后,我们来开启一个客户端,还是使用telnet的方式
连接到服务端之后,如果我们十秒钟不发送数据的话,则会被强制关闭连接,如下图所示
3.关于IdleStateHandler源码分析
1)IdleStateHandler结构分析
- public class IdleStateHandler extends ChannelDuplexHandler {
-
- // 对象初始化的时候初始化好这些数据
- private final long readerIdleTimeNanos;
- private final long writerIdleTimeNanos;
- private final long allIdleTimeNanos;
-
- // 通过构造函数可知
- public IdleStateHandler(
- long readerIdleTime, long writerIdleTime, long allIdleTime,
- TimeUnit unit) {
- if (unit == null) {
- throw new NullPointerException("unit");
- }
-
- if (readerIdleTime <= 0) {
- readerIdleTimeNanos = 0;
- } else {
- readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (writerIdleTime <= 0) {
- writerIdleTimeNanos = 0;
- } else {
- writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (allIdleTime <= 0) {
- allIdleTimeNanos = 0;
- } else {
- allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
- }
- }
-
- // 在channelRegistered/channelActive的时候调用初始化方法initialize方法
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- // Initialize early if channel is active already.
- if (ctx.channel().isActive()) {
- initialize(ctx);
- }
- super.channelRegistered(ctx);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // This method will be invoked only if this handler was added
- // before channelActive() event is fired. If a user adds this handler
- // after the channelActive() event, initialize() will be called by beforeAdd().
- initialize(ctx);
- super.channelActive(ctx);
- }
-
- // 监听到客户端写数据的时候,更新lastReadTime
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- lastReadTime = System.nanoTime();
- firstReaderIdleEvent = firstAllIdleEvent = true;
- ctx.fireChannelRead(msg);
- }

总结:通过以上对IdleStateHandler的分析,可知:
* 在构造IdleStateHandler的时候,便使用创建参数确定读、写、读写的空闲时间,并转换为millsecond
* 当客户端连接到服务端,并且其channel处于活动状态,则调用initialize初始化方法
* 当服务端监听到客户端发送数据时,即channelRead方法,则更新lastReadTime参数,即最近一次读时间
可知,最重要的逻辑应该再initialize方法中,下面我们就来分析一下该方法
2)initialize()
- private void initialize(ChannelHandlerContext ctx) {
- // Avoid the case where destroy() is called before scheduling timeouts.
- // See: https://github.com/netty/netty/issues/143
-
- // private volatile int state; // 0 - none, 1 - initialized, 2 - destroyed
- switch (state) {
- case 1:
- case 2:
- return;
- }
-
- state = 1;
-
- // 1.获取线程池
- EventExecutor loop = ctx.executor();
-
- // 2.初始化最后一次读时间和最后一次写时间为当前时间
- lastReadTime = lastWriteTime = System.nanoTime();
-
- // 3.根据用户设置的读空闲时间启动一个定时任务,读空闲时间为频率执行
- if (readerIdleTimeNanos > 0) {
- readerIdleTimeout = loop.schedule(
- new ReaderIdleTimeoutTask(ctx),
- readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
-
- // 4.根据用户设置的写空闲时间启动一个定时任务,写空闲时间为频率执行
- if (writerIdleTimeNanos > 0) {
- writerIdleTimeout = loop.schedule(
- new WriterIdleTimeoutTask(ctx),
- writerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
-
- // 5.根据用户设置的读写空闲时间启动一个定时任务,读写空闲时间为频率执行
- if (allIdleTimeNanos > 0) {
- allIdleTimeout = loop.schedule(
- new AllIdleTimeoutTask(ctx),
- allIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- }

总结:由上可知,实现心跳检测的关键就在这个schedule上,根据用户设置的空闲时间来确定定时任务的频率。
那么,定时任务是如何做到检测的呢?我们继续来看
3)ReaderIdleTimeoutTask
源码如下:
- private final class ReaderIdleTimeoutTask implements Runnable {
-
- private final ChannelHandlerContext ctx;
-
- ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
- this.ctx = ctx;
- }
-
- @Override
- public void run() {
- if (!ctx.channel().isOpen()) {
- return;
- }
-
- // 1.获取当前时间和最后一次读时间
- long currentTime = System.nanoTime();
- long lastReadTime = IdleStateHandler.this.lastReadTime;
-
- // nextDelay即为比较用户设置的读空闲时间和 当前时间-最后一次时间
- long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
-
- // 2.如果nextDelay<=0,则说明客户端已经在超过读空闲时间内没有写入数据了
- if (nextDelay <= 0) {
- // 2.1 重新定义readerIdleTimeout schedule,与initialize方法设置的相同,继续执行定时任务
- readerIdleTimeout =
- ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- try {
- // 2.2 event = new IdleStateEvent(IdleState.READER_IDLE, true),将event设置为读空闲
- IdleStateEvent event;
- if (firstReaderIdleEvent) {
- firstReaderIdleEvent = false;
- event = IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT;
- } else {
- event = IdleStateEvent.READER_IDLE_STATE_EVENT;
- }
- // 2.3 channelIdle的主要工作就是将evt传输给下一个Handler
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
-
- // 3.如果nextDelay>0,则说明客户端在规定时间内已经写入数据了,
- } else {
- // 3.1 重新定义readerIdleTimeout schedule,以nextDelay为执行频率
- readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }

由以上源码分析可知,ReaderIdleTimeoutTask的主要功能就是:
* 根据当前时间和用户最后一次读时间来确实用户是否写入超时
* 如果已经写超时,则重新定义schedule任务,还是以readerIdleTimeNanos为频率来执行。并设置evt为READ_IDLE,并传递给下一个Handler
* 如果没有写超时,则继续执行schedule任务,此时以nextDelay(readerIdleTimeNanos - (currentTime - lastReadTime))为频率来执行
4)后续处理
READ_IDLE事件传递给下一个Handler,在本例中即为传递给HeartBeatHandler,其重新定义了userEventTriggered方法,此时正好接收到READ_IDLE事件,做相应处理即可
参考:Netty in Action
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。