一起学netty(19)心跳机制的概念和 IdleStateHandler 原理分析

weblog 823 0 0

什么是心跳?

顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。

为什么需要心跳?

因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

如何实现心跳?

我们可以通过两种方式实现心跳机制:

  • 1.使用 TCP 协议层面的 keepalive 机制.
  • 2.在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  • 1.它不是 TCP 的标准协议, 并且是默认关闭的.
  • 2.TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  • 3.TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳.

Netty应用层自定义心跳机制

服务端代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyServerHeartBeat {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boot = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup(8);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boot,work)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        System.out.println("创建连接中...");
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 5秒没有读写操作,则触发userEventTriggered方法
                        pipeline.addLast(new IdleStateHandler(0,0,5, TimeUnit.SECONDS));
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                            /**
                             * 读数据时触发
                             * @param ctx
                             * @param msg
                             * @throws Exception
                             */
                            @Override
                            public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                System.out.println("消息:"+msg);
                            }
                            /**
                             * 读、写、读写超时时触发
                             * @param ctx
                             * @param evt
                             * @throws Exception
                             */
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                IdleStateEvent event = (IdleStateEvent)evt;
                                IdleState state = event.state();
                                switch (state){
                                    case READER_IDLE:
                                        System.out.println("读空闲");
                                        break;
                                    case WRITER_IDLE:
                                        System.out.println("写空闲");
                                        break;
                                    case ALL_IDLE:
                                        System.out.println("读写空闲");
                                        break;
                                }
                                // 5秒没有数据收发,则认为连接中断,服务器主动关闭连接
                                ctx.close();
                                Channel channel = ctx.channel();
                                channel.close();
                            }
                        });
                    }
                });
        ChannelFuture sync = serverBootstrap.bind(8099).sync();
        sync.channel().closeFuture().sync();
    }
}

客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyClientHeartBeat {
    private static SocketChannel socketChannel;
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boot =new NioEventLoopGroup(1);

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(boot).
                channel(NioSocketChannel.class).
                handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 4秒没有写数据就发送一次心跳,告知服务器我还活着
                        pipeline.addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                IdleStateEvent event = (IdleStateEvent)evt;
                                IdleState state = event.state();
                                // 写超时
                                if(state==IdleState.WRITER_IDLE){
                                    // 超时 发送心跳包
                                    ctx.writeAndFlush("我还活着,别把我断开");
                                }
                            }
                        });
                    }
                });
        ChannelFuture future = bootstrap.connect("localhost", 8099).sync();
        socketChannel=(SocketChannel)future.channel();
        future.channel().closeFuture().sync();
    }
}

对于服务端的测试,可以直接在cmd命令行窗口使用telnet命令进行测试,例如:

telnet localhost 8099 回车

连接成功后窗口是没有内容的。

此时按 ctrl + ] 键进入telnet的命令行窗口,可以输入send命令向服务端发送字符数据。

注意:1.服务器设置了超过5秒不发送数据连接就断开了。2.输入telnet的时候ip和端口之间是没有冒号的。

上述代码只是简单的例子,具体使用还需要详细的设计。Protobuf可以是一种比较不错的解决方案。

IdleStateHandler实现原理

首先要知道的是,netty每次创建一个客户端通道,都会调用initChannel方法,该方法中为每个客户端设置处理器链,也就是ChannelPipeline。所以每个客户端都会有一个IdleStateHandler对象。

IdleStateHandler类的构造函数

构造函数:
    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

参数

  • readerIdleTime 读超时时间
  • writerIdleTime 写超时时间
  • allIdleTime 读写超时时间
  • unit 时间单位

channelActive方法

该在通道创建时调用,该方法中调用了initialize方法。

public void channelActive(ChannelHandlerContext ctx) throws Exception {

       this.initialize(ctx);

       super.channelActive(ctx);

}

initialize方法

该方法在channelActive方法中被调用

如果设置的超时时间大于0,则内部会创建一个task任务。

304行是这样的,用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么316行则会触发userEventTriggered方法。


猜你喜欢
official 1030 ChannelHandler用来处Channel上各种事件(包括建立连接,数据收发,异常处等)ChannelHandler为出站入栈两种。所有ChannelHandler被连接成串,就
其他 3876 功能动态编译运行输入java代码测试版,项目下载:http://www.jiajiajia.club/file/info/fxwPbs/101Java动态编译-动态运行-代码检测-算法练习spri
official 1090 编码器解码器在网络应用中需要实现某种编解码器,将始字节数据与自定义消息对象进行互相转换。网络中都是以字节码数据形式来传输数据,服务器编码数据后发送到客户端,客户端需要对数据进行解码
ASM,java基础 985   关于cglib代以及常用api,请参考:初步探究cglib动态代:http://www.jiajiajia.club/blog/artical/yjw520
official 981 之前文章中提到了java中nio是同步非阻塞网络io模型,本文就主要说明下同步、异步、阻塞、非阻塞来帮助解nio。io操作IO两阶段(旦拿到数据后就变成了数据操作,不再是IO
official 900 调度基本当有堆任务要处,但由于资源有限,这些事情没法同时处。这就需要确定某种规则来决定处这些任务顺序,这就是“调度”研究问题。在多道程序系统中,进程数量往往是多于处个数
official 1057 之前文章中提到过,单线程nio模型任然有定缺点。在上节《netty(7)netty线程模型》中也提到,netty出现,封装了nio复杂代码,并且介入多线程来处事件,最大限度
java基础 1128 ;//allotherfieldsdefaulted}构造个空HashMap,初始容量为16,负载因子为0.75,负载因子作用将会在下方解释。2.publicHashMap(intinitialCapacity)publicH
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。