赞
踩
目录
继承复合handler,既是出栈处理器,又是入栈处理器。
工作流程:
1)初始化new时定义了读写的空闲超时时间
2)在handler的读写channelRead和write方法中记录本次读写的时间
3)启动了三个定时任务,定时检测,任务中如果检测到当前时间距离上次读写时间超过了定义的空闲时间,则发布一个IdleStateEvent事件,调用handler链上的UserEventTriggered方法
所以可以自定义handler实现ChannelInboundHandler中的userEventTriggered方法,进行自定义处理,关闭channel或发送心跳请求
- public class IdleStateHandler extends ChannelDuplexHandler {
- }
共五个参数
observeOutput 写超时是在handler的write方法记录最后一次的写时间,如果这个参数为false,那么只会检测这个write方法记录的时间与当前时间差是否超过定义的空闲时间。但是如果是发送大数据包,发送的耗时时间超过了定义的空闲时间,也会触发事件。参数设置为true的话,不光检测write方法记录的时间,也会检测当前channel的发送缓冲区是否有变化,有变化表示数据正在发送中或者有新的write写入,不触发写空闲超时
每一次的write(ByteBuf) 方法会将此次的byteBuf写入到发送缓冲区,发送缓冲区将btyeBuf以一个链表存放,记录所有节点的一个总的数据大小size,调用flush()时,会循环链表节点,写入到socket,每次写完,会从总size中减去此次已写入的字节数。当然,flush()发送中要是来了新的write(buf),也会更新链表和总大小size。
本文最后会在源码上加入输入语句测试这个参数。
readerIdleTime 读空闲超时
writerIdleTime 写空闲时间
allIdleTime 读写空闲时间
unit 时间单位
- public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
- this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
- TimeUnit.SECONDS);
- }
-
- public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,
- TimeUnit unit) {
- this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
- }
-
- public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
- if (unit == null) {
- throw new NullPointerException("unit");
- }
-
- this.observeOutput = observeOutput;
-
- if (readerIdleTime <= 0) {
- readerIdleTimeNanos = 0;
- } else {
- readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (writerIdleTime <= 0) {
- writerIdleTimeNanos = 0;
- } else {
- writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
- }
- if (allIdleTime <= 0) {
- allIdleTimeNanos = 0;
- } else {
- allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
- }
- }

在handlerAdded、channelActive、channelRegistered,会调用初始化方法,初始化发送缓冲区大小、启动三个定时检测任务
- private void initialize(ChannelHandlerContext ctx) {
- // Avoid the case where destroy() is called before scheduling timeouts.
- // See: https://github.com/netty/netty/issues/143
-
- // 只初始化一次
- switch (state) {
- case 1:
- case 2:
- return;
- }
-
- state = 1;
- // observeOutput上边说的那事,初始化当前channel发送缓冲区的大小
- initOutputChanged(ctx);
-
- lastReadTime = lastWriteTime = ticksInNanos();
- if (readerIdleTimeNanos > 0) {
- readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
- readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (writerIdleTimeNanos > 0) {
- writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
- writerIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- if (allIdleTimeNanos > 0) {
- allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
- allIdleTimeNanos, TimeUnit.NANOSECONDS);
- }
- }
-
- private void initOutputChanged(ChannelHandlerContext ctx) {
- if (observeOutput) {
- Channel channel = ctx.channel();
- Unsafe unsafe = channel.unsafe();
- // 发送缓冲区
- ChannelOutboundBuffer buf = unsafe.outboundBuffer();
-
- if (buf != null) {
- // 当前的发送缓冲区正在发送的buf和要发送的总大小
- lastMessageHashCode = System.identityHashCode(buf.current());
- lastPendingWriteBytes = buf.totalPendingWriteBytes();
- lastFlushProgress = buf.currentProgress();
- }
- }
- }

记录lastReadTime
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
- reading = true;
- firstReaderIdleEvent = firstAllIdleEvent = true;
- }
- ctx.fireChannelRead(msg);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
- lastReadTime = ticksInNanos();
- reading = false;
- }
- ctx.fireChannelReadComplete();
- }

记录lastWriteTime ,ctx.write设置监听,回调中设置读时间。在每次write完,flush()方法中往socket中写完数据调用ChannelPromise的setSuccess设置成功状态时,会调用其上注册的监听器方法。
- private final ChannelFutureListener writeListener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- lastWriteTime = ticksInNanos();
- firstWriterIdleEvent = firstAllIdleEvent = true;
- }
- };
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- // Allow writing with void promise if handler is only configured for read timeout events.
- if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
- ctx.write(msg, promise.unvoid()).addListener(writeListener);
- } else {
- ctx.write(msg, promise);
- }
- }

ReaderIdleTimeoutTask 读空闲超时检测任务
- private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
-
- ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
- super(ctx);
- }
-
- @Override
- protected void run(ChannelHandlerContext ctx) {
- long nextDelay = readerIdleTimeNanos;
- // reading变量会在channelRead设置为true,channelReadComplate设置为false
- // 如果为true,当前有一波还没读完,nextDelay大于0,直接再次放入定时任务下次再说
- if (!reading) {
- // 读完了,算当前时间距离上次读多长时间和读空闲超时时间
- nextDelay -= ticksInNanos() - lastReadTime;
- }
-
- // 小于0表示到位了,要发布事件
- if (nextDelay <= 0) {
- // Reader is idle - set a new timeout and notify the callback.
- // 再定时一个
- readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- boolean first = firstReaderIdleEvent;
- firstReaderIdleEvent = false;
-
- try {
- IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
- // 发布READER_IDLE事件,调用链上的UserEventTriggered方法
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- // Read occurred before the timeout - set a new timeout with shorter delay.
- readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }

WriterIdleTimeoutTask 写空闲超时检测任务
- private final class WriterIdleTimeoutTask extends AbstractIdleTask {
-
- WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
- super(ctx);
- }
-
- @Override
- protected void run(ChannelHandlerContext ctx) {
-
- // 这里和读一样,算时间,如果大于定义的空闲时间则nextDelay 《= 0
- long lastWriteTime = IdleStateHandler.this.lastWriteTime;
- long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
- if (nextDelay <= 0) {
- // 重新设置一个定时
- writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- // 回调函数执行后firstWriterIdleEvent = true,首次超时不检测缓冲区变化
- boolean first = firstWriterIdleEvent;
- firstWriterIdleEvent = false;
-
- try {
- // 这里就是构造函数中第一个参数observeOutput为true,检测缓冲区是否发生变化,如果变化了,表示正在发送中,就不触发事件直接return
- if (hasOutputChanged(ctx, first)) {
- return;
- }
-
- IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- // Write occurred before the timeout - set a new timeout with shorter delay.
- writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }
-
-
- private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
- if (observeOutput) {
- if (lastChangeCheckTimeStamp != lastWriteTime) {
- lastChangeCheckTimeStamp = lastWriteTime;
-
- // 首次不触发超时
- if (!first) {
- return true;
- }
- }
-
- // 获取发送缓冲区
- Channel channel = ctx.channel();
- Unsafe unsafe = channel.unsafe();
- ChannelOutboundBuffer buf = unsafe.outboundBuffer();
-
- if (buf != null) {
- // 取当前的缓冲区要flush()发送的数据大小
- int messageHashCode = System.identityHashCode(buf.current());
- // 当前flush()共要发送多少数据,每次发送一些会更新buf的pendingWriteBytes
- long pendingWriteBytes = buf.totalPendingWriteBytes();
-
- // 和上次记的不一样,那说明缓冲区变化,,return true不触发事件
- if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
- lastMessageHashCode = messageHashCode;
- lastPendingWriteBytes = pendingWriteBytes;
-
- if (!first) {
- return true;
- }
- }
-
- // 相同,Progress也是个发送过程中记录的变量,当前btyeBuf已写入socket的字节数
- long flushProgress = buf.currentProgress();
- if (flushProgress != lastFlushProgress) {
- lastFlushProgress = flushProgress;
-
- if (!first) {
- return true;
- }
- }
- }
- }
-
- return false;
- }

我是直接改netty源码测试的,改了IdleStateHandler类中的记录最后一次写时间的回调方法和写空闲检测的定时任务,在触发事件和检测到缓冲区变化的那行加了输出
- private final ChannelFutureListener writeListener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- Date dd=new Date();
- //格式化
- SimpleDateFormat sim=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time=sim.format(dd);
- System.out.println("检测到写入完成,更新了最后写的时间--" + time);
- lastWriteTime = ticksInNanos();
- firstWriterIdleEvent = firstAllIdleEvent = true;
- }
- };
-
- private final class WriterIdleTimeoutTask extends AbstractIdleTask {
-
- WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
- super(ctx);
- }
-
- @Override
- protected void run(ChannelHandlerContext ctx) {
-
- long lastWriteTime = IdleStateHandler.this.lastWriteTime;
- long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
- if (nextDelay <= 0) {
- // Writer is idle - set a new timeout and notify the callback.
- writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
-
- boolean first = firstWriterIdleEvent;
- firstWriterIdleEvent = false;
- Date dd=new Date();
- //格式化
- SimpleDateFormat sim=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time=sim.format(dd);
- try {
- System.out.println("已经超时了,开始判断缓冲区是否变化,是否有新的write或写入完毕的buf---" + time);
- if (hasOutputChanged(ctx, first)) {
- System.out.println("hasOutputChanged为true缓冲区变化了,不触发写空闲超时---" + time);
- return;
- }
- System.out.println("触发了写空闲超时---" + time);
- IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
- channelIdle(ctx, event);
- } catch (Throwable t) {
- ctx.fireExceptionCaught(t);
- }
- } else {
- // Write occurred before the timeout - set a new timeout with shorter delay.
- writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
- }
- }
- }
-
- private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
- if (observeOutput) {
- if (lastChangeCheckTimeStamp != lastWriteTime) {
- lastChangeCheckTimeStamp = lastWriteTime;
-
- // But this applies only if it's the non-first call.
- if (!first) {
- return true;
- }
- }
-
- Channel channel = ctx.channel();
- Unsafe unsafe = channel.unsafe();
- ChannelOutboundBuffer buf = unsafe.outboundBuffer();
-
- if (buf != null) {
- int messageHashCode = System.identityHashCode(buf.current());
- long pendingWriteBytes = buf.totalPendingWriteBytes();
-
- if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
- // 这是缓冲区总大小 变化
- System.out.println("更新handler中的缓冲区大小数据pendingWriteBytes");
- lastMessageHashCode = messageHashCode;
- lastPendingWriteBytes = pendingWriteBytes;
-
- if (!first) {
- return true;
- }
- }
-
- long flushProgress = buf.currentProgress();
- if (flushProgress != lastFlushProgress) {
- // 这是单个消息分块发送的情况,已经发送的数据量
- System.out.println("更新handler中的缓冲区大小数据flushProgress");
- lastFlushProgress = flushProgress;
-
- if (!first) {
- return true;
- }
- }
- }
- }
-
- return false;
- }

测试客户端类
因为跟服务端没太大关系,所以其他代码就不贴了,服务端客户端没有做粘包拆包的handler
5000万长度的字符串,write()1次,然后flush(), 设置的写超时时间为5秒
- public class Client3 {
- public static void main(String[] args) throws InterruptedException {
- NioEventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new IdleStateHandler(false,0, 3000, 0, TimeUnit.MILLISECONDS));
- pipeline.addLast(new SomeClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
- tttt(future.channel());
- future.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- }
- }
-
-
- public static final void tttt(Channel channel) throws InterruptedException {
-
- StringBuilder sb = new StringBuilder();
- for (int m = 0; m < 50000000; m++) {
- sb.append("t");
- }
- for (int i = 0; i < 1; i++) {
- byte[] bytes = sb.toString().getBytes();
- ByteBuf buffer = null;
- // 申请缓存空间
- buffer = Unpooled.buffer(bytes.length);
- // 将数据写入到缓存
- buffer.writeBytes(bytes);
- // 将缓存中的数据写入到Channel
- Date dd=new Date();
- //格式化
- SimpleDateFormat sim=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time=sim.format(dd);
- System.out.println("开始了write--" + time);
- channel.writeAndFlush(buffer);
-
- // TimeUnit.SECONDS.sleep(3);
- }
- }
- }

先来看observeOutput为false的执行情况,可以看到,调用flush之后,超时了2次才写完数据触发回调
再来看observeOutput为true的执行情况,首次不检测缓冲区变化,第二次发现flushProgress数据变化了,正在写入状态,所以不触发超时
再将循环改为2次5000万,中间注释去掉加个睡眠,结果因为第二次write改变了缓冲区总大小,所以pendingWriteBytes变化,不触发超时
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。