一起学netty(20)基于java nio netty websocket protobuf javascript等技术实现前后端高性能实时数据传输

weblog Md 1260 0 0

  基于java nio + netty + websocket + protobuf +javascript等技术实现前后端高性能实时数据传输的demo模型。

  github地址:https://github.com/18438301593/NettyWebsocketProtoDemo

主要过程分析:

一、.proto文件编写,生成java类,以及javacript文件。

  参考文章:

  http://www.jiajiajia.club/blog/artical/351psy9r6l0g/464

  http://www.jiajiajia.club/blog/artical/ydn9dpg64gkf/466

二、集成websocket

  websocket协议的建立连接的阶段采用了http协议的方式,所以会看到http协议相关的解码器和编码器。如下:

        pipeline.addLast(new HttpServerCodec());
        //支持写大数据流
        pipeline.addLast(new ChunkedWriteHandler());
        //http聚合器
        pipeline.addLast(new HttpObjectAggregator(1024*62));
        //websocket支持,设置路由
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        // 协议包解码
        pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
            @Override
            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
                System.out.println("received client msg ------------------------");
                if (frame instanceof TextWebSocketFrame) {
                    // 文本消息
                    TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                    System.out.println("MsgType is TextWebSocketFrame");
                } else if (frame instanceof BinaryWebSocketFrame) {
                    // 二进制消息
                    ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
                    objs.add(buf);
                    // 自旋累加
                    buf.retain();
                    System.out.println("MsgType is BinaryWebSocketFrame");
                } else if (frame instanceof PongWebSocketFrame) {
                    // PING存活检测消息
                    System.out.println("MsgType is PongWebSocketFrame ");
                } else if (frame instanceof CloseWebSocketFrame) {
                    // 关闭指令消息
                    System.out.println("MsgType is CloseWebSocketFrame");
                    channel.close();
                }

            }
        });

三、 支持protobuf协议

  主要配置解码器和编码器

        //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Google Protocol Buffers 长度属性编码器
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());

        // 协议包编码,编码成二进制数据,通过websocket发送
        pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
            @Override
            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
                ByteBuf result = null;
                if (msg instanceof MessageLite) {
                    result = wrappedBuffer(((MessageLite) msg).toByteArray());
                }
                if (msg instanceof MessageLite.Builder) {
                    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                }
                // 封装二进制数据
                WebSocketFrame frame = new BinaryWebSocketFrame(result);
                out.add(frame);
            }
        });

        // protobuf解码器
        channel.pipeline().addLast("decoder",new ProtobufDecoder(MyMessage.MyMessageInfo.getDefaultInstance()));

四、支持心跳检测,超时关闭连接

  解码器配置

        // 超时心跳检测
        channel.pipeline().addLast(new IdleStateHandler(5,5,5, TimeUnit.SECONDS));

  业务处理在NettyServerHandler类的userEventTriggered()方法。

@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception{
        System.out.println("::::"+evt);
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "读空闲";
                    readIdleTimes ++; // 读空闲的计数加1
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    // 不处理
                    break;
                case ALL_IDLE:
                    eventType ="读写空闲";
                    // 不处理
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);
            if(readIdleTimes > 3){
                System.out.println(" [server]读空闲超过3次,关闭连接");
                ctx.channel().close();
            }
        }
    }

五、前端websocket接受到数据如何解析?

  websocket是一帧一帧发送的,所以要读取所有数据,在转成字节数据,才能反序列化成js对象。

    ws.onmessage = function (event) {
        var reader = new FileReader();
        reader.readAsArrayBuffer(event.data);
        reader.onload = function (e) {
            var buf = new Uint8Array(reader.result);
            var j = proto.MyMessageInfo.deserializeBinary(buf);
            var t = document.getElementById("message");
            t.innerHTML = j.getName();
        }
    };

猜你喜欢
nginx,前端,java基础 1707   javanio+netty+websocket+protobuf+javascript的demo模型。  github地址:https
official 1099 的文章介绍了protobuf的概念参考:http://www.jiajiajia.club/blog/artical/351psy9r6l0g/464以及protobuf的编码方式参考:http
official 1011 UpdaterequestHTTP包建立连接,之的通信全部使用websocket自己的协议,就和http没啥关系了。有兴趣的同可以多了解websocket协议报文的详细信息。Nettywebsoc
official 988 。ServiceSocketChannel本身不具备力,它只监听新进来的TCP链接通道。当有新的TCP链接通道建立,它会创建个SocketChannel的对象,代表和客户的唯连接通道
official 1144 netty包下的ServerSocketChannel或SocketChannel和nio包下的ServerSocketChannel或SocketChannel的概念是样的。只不过netty中又重新对nio
official 1121 在上节《netty(6)》的文章中,简要说明了用nio原生代码写程序的些不足和问题,以及nettynio础上大致做了那些工作。其中提到点就是当活跃客户量太多,单线程处理所带
official 911 、最简单的nio程序publicstaticvoidmain(String[]args)throwsException{ ListSocketChannellist=newArrayList
official 1284 的文章中提到过,单线程的nio模型任然有定缺点。在上节《netty(7)netty的线程模型》中也提到,netty的出,封装了nio复杂的代码,并且介入多线程来处理事件,最大限度的提
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。