什么是心跳?
顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。
为什么需要心跳?
因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle
状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG
交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.
如何实现心跳?
我们可以通过两种方式实现心跳机制:
- 1.使用 TCP 协议层面的 keepalive 机制.
- 2.在应用层上实现自定义的心跳机制.
虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:
- 1.它不是 TCP 的标准协议, 并且是默认关闭的.
- 2.TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
- 3.TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.
虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳.
Netty应用层自定义心跳机制
服务端代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServerHeartBeat {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boot = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boot,work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("创建连接中...");
ChannelPipeline pipeline = socketChannel.pipeline();
// 5秒没有读写操作,则触发userEventTriggered方法
pipeline.addLast(new IdleStateHandler(0,0,5, TimeUnit.SECONDS));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<String>() {
/**
* 读数据时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("消息:"+msg);
}
/**
* 读、写、读写超时时触发
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
IdleState state = event.state();
switch (state){
case READER_IDLE:
System.out.println("读空闲");
break;
case WRITER_IDLE:
System.out.println("写空闲");
break;
case ALL_IDLE:
System.out.println("读写空闲");
break;
}
// 5秒没有数据收发,则认为连接中断,服务器主动关闭连接
ctx.close();
Channel channel = ctx.channel();
channel.close();
}
});
}
});
ChannelFuture sync = serverBootstrap.bind(8099).sync();
sync.channel().closeFuture().sync();
}
}
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyClientHeartBeat {
private static SocketChannel socketChannel;
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boot =new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(boot).
channel(NioSocketChannel.class).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 4秒没有写数据就发送一次心跳,告知服务器我还活着
pipeline.addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
IdleState state = event.state();
// 写超时
if(state==IdleState.WRITER_IDLE){
// 超时 发送心跳包
ctx.writeAndFlush("我还活着,别把我断开");
}
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 8099).sync();
socketChannel=(SocketChannel)future.channel();
future.channel().closeFuture().sync();
}
}
对于服务端的测试,可以直接在cmd
命令行窗口使用telnet
命令进行测试,例如:
telnet localhost 8099
回车
连接成功后窗口是没有内容的。
此时按 ctrl + ]
键进入telnet
的命令行窗口,可以输入send
命令向服务端发送字符数据。
注意:1.服务器设置了超过5秒不发送数据连接就断开了。2.输入telnet的时候ip和端口之间是没有冒号的。
上述代码只是简单的例子,具体使用还需要详细的设计。Protobuf可以是一种比较不错的解决方案。
IdleStateHandler实现原理
首先要知道的是,netty每次创建一个客户端通道,都会调用initChannel方法,该方法中为每个客户端设置处理器链,也就是ChannelPipeline。所以每个客户端都会有一个IdleStateHandler对象。
IdleStateHandler类的构造函数
构造函数:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
参数
- readerIdleTime 读超时时间
- writerIdleTime 写超时时间
- allIdleTime 读写超时时间
- unit 时间单位
channelActive方法
该在通道创建时调用,该方法中调用了initialize方法。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.initialize(ctx);
super.channelActive(ctx);
}
initialize方法
该方法在channelActive方法中被调用
如果设置的超时时间大于0,则内部会创建一个task任务。
304行是这样的,用当前时间减去最后一次channelRead
方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead
已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么316行则会触发userEventTriggered
方法。