当前位置:   article > 正文

Netty4实现心跳检测及IdleStateHandler源码分析_idlestatehandler 心跳检测 redis是否连接正常代码怎么写

idlestatehandler 心跳检测 redis是否连接正常代码怎么写

1.什么是心跳检测?

    判断对方是否正常运行,一般采用定时发送简单的通讯包,如果在指定时间内未接收到对方响应,则判定对方已经宕掉。用于检测TCP的异常断开。

    心跳包一般就是客户端发送给服务端的简单消息,如果服务端几分钟内没有收到客户端消息,则视为客户端已经断开,这个时候就主动关闭客户端的通道。

 

2.使用Netty实现服务端心跳检测

    下面我们编写服务端代码,服务端实现以下功能:如果在N长时间内没有接受到客户端连接,则发送一段信息给客户端,并关闭其通道

 

    * 我们创建服务端心跳检测的Handler,命名为HeartBeatHandler,并重写userEventTriggered方法

  1. /**
  2. * 服务端心跳检测
  3. */
  4. public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  7. if(evt instanceof IdleStateEvent){
  8. IdleStateEvent ise = (IdleStateEvent)evt;
  9. // 服务端读空闲
  10. if(ise.state().equals(IdleState.READER_IDLE)){
  11. ctx.writeAndFlush("client reader idle, channel will close");
  12. ctx.channel().close();
  13. // 服务端写空闲
  14. }else if (ise.state().equals(IdleState.WRITER_IDLE)){
  15. ctx.write("pong message");
  16. // 服务端读写空闲
  17. }else if (ise.state().equals(IdleState.ALL_IDLE)){
  18. ctx.channel().close();
  19. }else{
  20. // DO NOTHING
  21. }
  22. }
  23. }
  24. }

    * 服务端代码,如下

  1. public class Server {
  2. public static void main(String[] args) {
  3. //服务类
  4. ServerBootstrap bootstrap = new ServerBootstrap();
  5. //boss和worker
  6. EventLoopGroup boss = new NioEventLoopGroup();
  7. EventLoopGroup worker = new NioEventLoopGroup();
  8. try {
  9. //设置线程池
  10. bootstrap.group(boss, worker);
  11. //设置socket工厂
  12. bootstrap.channel(NioServerSocketChannel.class);
  13. //设置管道工厂
  14. bootstrap.childHandler(new ChannelInitializer<Channel>() {
  15. @Override
  16. protected void initChannel(Channel ch) throws Exception {
  17. ch.pipeline().addLast(new StringEncoder());
  18. // 主要在这里,先创建一个IdleStateHandler,10秒内没有读写则判定为空闲
  19. ch.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.SECONDS));
  20. // 然后执行心跳检测
  21. ch.pipeline().addLast(new HeartBeatHandler());
  22. }
  23. });
  24. //设置参数,TCP参数
  25. bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
  26. bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
  27. bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
  28. //绑定端口
  29. ChannelFuture future = bootstrap.bind(8088).sync();
  30. System.out.println("server start...");
  31. //等待服务端关闭
  32. future.channel().closeFuture().sync();
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. } finally{
  36. //释放资源
  37. boss.shutdownGracefully();
  38. worker.shutdownGracefully();
  39. }
  40. }
  41. }

    * 启动服务端并测试

    启动服务端之后,我们来开启一个客户端,还是使用telnet的方式

    连接到服务端之后,如果我们十秒钟不发送数据的话,则会被强制关闭连接,如下图所示

 

3.关于IdleStateHandler源码分析

 

    1)IdleStateHandler结构分析

  1. public class IdleStateHandler extends ChannelDuplexHandler {
  2. // 对象初始化的时候初始化好这些数据
  3. private final long readerIdleTimeNanos;
  4. private final long writerIdleTimeNanos;
  5. private final long allIdleTimeNanos;
  6. // 通过构造函数可知
  7. public IdleStateHandler(
  8. long readerIdleTime, long writerIdleTime, long allIdleTime,
  9. TimeUnit unit) {
  10. if (unit == null) {
  11. throw new NullPointerException("unit");
  12. }
  13. if (readerIdleTime <= 0) {
  14. readerIdleTimeNanos = 0;
  15. } else {
  16. readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
  17. }
  18. if (writerIdleTime <= 0) {
  19. writerIdleTimeNanos = 0;
  20. } else {
  21. writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
  22. }
  23. if (allIdleTime <= 0) {
  24. allIdleTimeNanos = 0;
  25. } else {
  26. allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
  27. }
  28. }
  29. // 在channelRegistered/channelActive的时候调用初始化方法initialize方法
  30. @Override
  31. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  32. // Initialize early if channel is active already.
  33. if (ctx.channel().isActive()) {
  34. initialize(ctx);
  35. }
  36. super.channelRegistered(ctx);
  37. }
  38. @Override
  39. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  40. // This method will be invoked only if this handler was added
  41. // before channelActive() event is fired. If a user adds this handler
  42. // after the channelActive() event, initialize() will be called by beforeAdd().
  43. initialize(ctx);
  44. super.channelActive(ctx);
  45. }
  46. // 监听到客户端写数据的时候,更新lastReadTime
  47. @Override
  48. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  49. lastReadTime = System.nanoTime();
  50. firstReaderIdleEvent = firstAllIdleEvent = true;
  51. ctx.fireChannelRead(msg);
  52. }

    总结:通过以上对IdleStateHandler的分析,可知:

    * 在构造IdleStateHandler的时候,便使用创建参数确定读、写、读写的空闲时间,并转换为millsecond

    * 当客户端连接到服务端,并且其channel处于活动状态,则调用initialize初始化方法

    * 当服务端监听到客户端发送数据时,即channelRead方法,则更新lastReadTime参数,即最近一次读时间

 

    可知,最重要的逻辑应该再initialize方法中,下面我们就来分析一下该方法

 

    2)initialize()

  1. private void initialize(ChannelHandlerContext ctx) {
  2. // Avoid the case where destroy() is called before scheduling timeouts.
  3. // See: https://github.com/netty/netty/issues/143
  4. // private volatile int state; // 0 - none, 1 - initialized, 2 - destroyed
  5. switch (state) {
  6. case 1:
  7. case 2:
  8. return;
  9. }
  10. state = 1;
  11. // 1.获取线程池
  12. EventExecutor loop = ctx.executor();
  13. // 2.初始化最后一次读时间和最后一次写时间为当前时间
  14. lastReadTime = lastWriteTime = System.nanoTime();
  15. // 3.根据用户设置的读空闲时间启动一个定时任务,读空闲时间为频率执行
  16. if (readerIdleTimeNanos > 0) {
  17. readerIdleTimeout = loop.schedule(
  18. new ReaderIdleTimeoutTask(ctx),
  19. readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  20. }
  21. // 4.根据用户设置的写空闲时间启动一个定时任务,写空闲时间为频率执行
  22. if (writerIdleTimeNanos > 0) {
  23. writerIdleTimeout = loop.schedule(
  24. new WriterIdleTimeoutTask(ctx),
  25. writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  26. }
  27. // 5.根据用户设置的读写空闲时间启动一个定时任务,读写空闲时间为频率执行
  28. if (allIdleTimeNanos > 0) {
  29. allIdleTimeout = loop.schedule(
  30. new AllIdleTimeoutTask(ctx),
  31. allIdleTimeNanos, TimeUnit.NANOSECONDS);
  32. }
  33. }

    总结:由上可知,实现心跳检测的关键就在这个schedule上,根据用户设置的空闲时间来确定定时任务的频率。

    那么,定时任务是如何做到检测的呢?我们继续来看

 

    3)ReaderIdleTimeoutTask

    源码如下:

  1. private final class ReaderIdleTimeoutTask implements Runnable {
  2. private final ChannelHandlerContext ctx;
  3. ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
  4. this.ctx = ctx;
  5. }
  6. @Override
  7. public void run() {
  8. if (!ctx.channel().isOpen()) {
  9. return;
  10. }
  11. // 1.获取当前时间和最后一次读时间
  12. long currentTime = System.nanoTime();
  13. long lastReadTime = IdleStateHandler.this.lastReadTime;
  14. // nextDelay即为比较用户设置的读空闲时间和 当前时间-最后一次时间
  15. long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
  16. // 2.如果nextDelay<=0,则说明客户端已经在超过读空闲时间内没有写入数据了
  17. if (nextDelay <= 0) {
  18. // 2.1 重新定义readerIdleTimeout schedule,与initialize方法设置的相同,继续执行定时任务
  19. readerIdleTimeout =
  20. ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  21. try {
  22. // 2.2 event = new IdleStateEvent(IdleState.READER_IDLE, true),将event设置为读空闲
  23. IdleStateEvent event;
  24. if (firstReaderIdleEvent) {
  25. firstReaderIdleEvent = false;
  26. event = IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT;
  27. } else {
  28. event = IdleStateEvent.READER_IDLE_STATE_EVENT;
  29. }
  30. // 2.3 channelIdle的主要工作就是将evt传输给下一个Handler
  31. channelIdle(ctx, event);
  32. } catch (Throwable t) {
  33. ctx.fireExceptionCaught(t);
  34. }
  35. // 3.如果nextDelay>0,则说明客户端在规定时间内已经写入数据了,
  36. } else {
  37. // 3.1 重新定义readerIdleTimeout schedule,以nextDelay为执行频率
  38. readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
  39. }
  40. }
  41. }

    由以上源码分析可知,ReaderIdleTimeoutTask的主要功能就是:

    * 根据当前时间和用户最后一次读时间来确实用户是否写入超时

    * 如果已经写超时,则重新定义schedule任务,还是以readerIdleTimeNanos为频率来执行。并设置evt为READ_IDLE,并传递给下一个Handler

    * 如果没有写超时,则继续执行schedule任务,此时以nextDelay(readerIdleTimeNanos - (currentTime - lastReadTime))为频率来执行

    

    4)后续处理

    READ_IDLE事件传递给下一个Handler,在本例中即为传递给HeartBeatHandler,其重新定义了userEventTriggered方法,此时正好接收到READ_IDLE事件,做相应处理即可

 

 

参考:Netty in Action

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/242167
推荐阅读
相关标签
  

闽ICP备14008679号