Netty4

阅读数:137 评论数:0

跳转到新版页面

分类

python/Java

正文

一、概述

Netty的强大的地方,是他能方便的实现自定义协议的网络传输。Netty基于Reactor模式:http://1024s.top/mbstudy/mbBlog/blogDetail?blogId=12400

1、Netty分为两个线程组

BossGroup 专门负责接收客户端连接
WorkerGroup 专门负责网络读写操作

2、selector

selector用于监听多个channel的事件,但只有抽象类SelectableChannel的子类才能被Selector复用。

使用SelectableChannel的register方法,可将Channel注册到Selector。

二、Netty组件

1、Bootstrap

用来配置客户端netty应用

(1)调用connect()方法来连接服务端。

(2)一般用一个EventLoopGroup

2、ServerBootstrap

用来配置服务端netty应用

// 服务器端设置两个EventLoop
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup)
// 设置一个服务器端的通道实现
public B channel(Class<? extends C> channelClass)
// 给服务器通道添加配置
public B option(ChannelOption参数, T value)
// 给接收到的通道添加配置
public ServerBootstrap childOption(ChannelOption参数, T value)
// 设置业务处理类(自定义的 handler)
public ServerBootstrap childHandler(ChannelHandler childHandler)
// 服务器端设置占用的端口号
public ChannelFuture bind(int inetPort) 

(1)通过调用bind()方法绑定一个端口监听连接,它会返回一个ChannelFuture迅步任务.

(2)会用两个EventLoopGroup,一个用于处于连接,一个用于数据传输。

(3)handler()和childHandler()的主要区别是,handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。

(4)option()是给父通道设置参数,childOption()是给子通道设置参数,父通道指提是ServerSocketChannel,子通道指SocketChannel,后者是由前者accept后建立,因此将它们的关系称为父子。

(5)sync()是同步阻塞,直到之前的调用完成为止。

EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FirstServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并开始接收连接
            ChannelFuture f = b.bind(port).sync();
            // 等待server socket关闭
            f.channel().closeFuture().sync();

3、ChannelHandler接口

主要用来处理各种事件。

//通道就绪事件
public void channelActive(ChannelHandlerContext ctx)
//通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg)
//数据读取完毕事件
public void channelReadComplete(ChannelHandlerContext ctx) 
//通道发生异常事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

ChannelInboundHandler 用于处理入站 I/O 事件
ChannelOutboundHandler 用于处理出站 I/O 事件
ChannelInboundHandlerAdapter 用于处理入站 I/O 事件
ChannelOutboundHandlerAdapter 用于处理出站 I/O 事件
Encoder和Decoder 也是ChannelHandler的实现类。
ChannelDuplexHandler 用于处理出、入站I/O事件

(1)ChannelHandlerContext

通过它可以拿到channel、pipeline等对象,就可以进行读写等操作。

一个Channel对应一个ChannelPipeline,而ChannelPipeline中包含的是ChannelHandlerContext所组成的双向链表。每个ChannelHandlerContext对应关联一个ChannelHandler,如果想要向整个链表中添加事件处理Handler,可以重写ChannelInitiaze类中的initChannel方法。

//关闭通道
ChannelFuture close()
//刷新
ChannelOutboundInvoker flush()
//将数据写到 ChannelPipeline 中,ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
ChannelFuture writeAndFlush(Object msg) 

(2)ChannelHandlerAdapter

数据在各个Handler之间传递,需要调用方法中传递的ChannelHandlerContext,在netty的API中提供了两个基类ChannelOutboundHandlerAdapter和ChannelInBoundHanderAdapter,他们仅仅实现了调用ChannelHandlerContext来把消息传递给下一个Handler,因为我们只关心处理数据,因此我们可以继承这两个基类来帮助我们做这些,而我们仅需处理数据的部分即可。

当一个ChannelHandler被加入到ChannelPipeline中时,它便会获得一个ChannelHandlerContext的引用,而ChannelHandlerContext可以用来读写Netty中的数据流。因此,现在可以有两种方式来发送数据,一种是把数据直接写入Channel,一种是把数据写入ChannelHandlerContext,它们的区别是写入Channel的话,数据流会从Channel的头开始传递,而如果写入ChannelHandlerContext的话,数据流会流入管道中下一个Handler。

(3)ChannelInboundHandler

注册事件 fireChannelRegistered
连接建立事件 fireChannelActive
读事件和读完成事件

fireChannelRead

fireChannelReadComplete

异常通知事件 fireExceptionCaught
用户自定义事件 fireUserEventTriggered
Channel 可写状态变化事件 fireChannelWritabilityChanged
连接关闭事件 fireChannelInactive

(4)ChannelOutboundHandler

端口绑定 bind
连接服务端 connect
写事件 write
刷新 flush
读事件 read
主动断开连接 disconnect
关闭 channel close

(5)SimpleChannelInboundHandler

Netty提供了一个常用的基类SimpleChannelInboundHandler<T>,其中T就是这个Handler处理的数据的类型,消息到达这个Handler时,Netty会自动调用这个Handler中的channelRead0(ChannelHandlerContext,T)方法,T是传递过来的数据对象,在这个方法中我们便可以任意写我们的逻辑。

(6)Sharable注解

当客户端连接到服务器时,Netty新建一个ChannelPipeline处理其中的事件,而一个ChannelPipeline中含有若干ChannelHandler。如果每个客户端连接都新建一个ChannelHandler实例,当有大量客户端时,服务器将只在大量的ChannelHandler实例。为此,Netty提供了Sharable注解,如果一个ChannelHandler没有使用临界区(或者理解为线程安全),那么可将其标注为Sharable,如此,服务器只需保存一个实例就能处理所有客户端的事件。

4、ChannelInitializer

ChannelInitializer 是一种特殊的 ChannelHandler,它也是一种 ChannelInboundHandler,它提供了在通道注册到 EventLoop 后初始化通道的简单方法,在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作,初始化操作完成后会将自身从Pipeline中移除。

 // 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {  
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
          // 向pipeline中添加自定义业务处理handler
          ch.pipeline().addLast(new NettyServerHandle())
      }
});

5、ChannelPipeline

一个channel可能有多个channelHandler,初始化channel时,把channelHandler按顺序装在pipeline中,就可以实现按序执行channelHandler了。

采用的是责任链模式。

 

6、Channel

代表一个Socket链接,或者其它与IO操作相关的组件。

NioSocketChannel 异步非阻塞的客户端 TCP Socket 连接。
NioServerSocketChannel 异步非阻塞的服务器端 TCP Socket 连接
NioDatagramChannel 异步的 UDP 连接。
NioSctpChannel 异步的客户端 Sctp 连接。
NioSctpServerChannel 异步的 Sctp 服务器端连接

(1)查询channel的状态

boolean isOpen(); //如果通道打开,则返回true
boolean isRegistered();//如果通道注册到EventLoop,则返回true
boolean isActive();//如果通道处于活动状态并且已连接,则返回true
boolean isWritable();//当且仅当I/O线程将立即执行请求的写入操作时,返回true。

(2)提供channel的配置参数

ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);

(3)提供支持io操作

例如,读、写、连接和绑定。所有 I/O 操作都是异步的,一经调用就马上返回,而不保证所请求的 I/O 操作在调用结束时已完成。相反,在调用时都将返回一个 ChannelFuture实例,用来代表未来的结果。

(4)提供ChannelPipepline

//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler处理器,装配流水线
pipeline.addLast(new MyServerHandler());

7、NioEventLoop(EventLoop)

每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通信。

(1)每个NioEventLoop中包含一个Selector,一个taskQueue。

(2)每个NioEventLoop的Selector上可以注册监听多个NioChannel

(3)每个NioChannel都绑定有一个自己的ChannelPipeline。

8、EventLoopGroup

EventLoopGroup就是事件循环线程EventLoop的合集,可以用它创建像BossGroup和WorkerGroup这样的事件循环线程组。

NioEventLoopGroup 异步非阻塞
DefaultEventLoopGroup  
//断开连接,关闭线程 
public Future<?> shutdownGracefully()

9、Future

Netty中所有IO操作都是异步的,调用者无法立即获得处理状态,但可以通过Future-Listener机制,获取IO结果。

(1)ChannelFuture

它是Future的子接口,保存Channel异步操作的结果。

Channel channel() 返回当前正在进行 IO 操作的通道
ChannelFuture sync() 等待异步操作执行完毕,将异步改为同步
addListener() 注册监听器,当操作或Future对象已完成(isDone返回ture),将会通知指定的监听器
isDone() 判断当前操作是否完成
isSuccess() 判断已完成的当前操作是否成功
isCancelled() 判断已完成的当前操作是否被取消
getCause() 获取已完成的当前操作失败的原因
closeFuture() 关闭通道

三、Encoders,Decoders

因为我们在网络传输时只能传输字节流,因此,在发送数据之前,我们必须把我们的message型转换为bytes,与之对应,我们在接收数据后,必须把接收的bytes再转换成message。我们把bytes to message这个过程称为decode, 把message to bytes称为encoder.

1、解码器 Decoder

解码器的基类主要分为两类

解码从字节到消息 ByteToMessageDecoder 和 ReplayingDecoder
解码从消息到消息 MessageToMessageDecoder

(1)ByteToMessageDecoder

ByteToMessageDecoder 继承自ChannelInboundHandlerAdapter

decode() 这是必须要实现的唯一抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复执行,直到确定没有新的元素被添加到该List,或者该ByteBuf中没有更多可读的字节时为止。然后,如果List不为空,那么它的内容将会被传递给ChannelPipeline中的下一个ChannelInboundHandler
decodeLast() Netty 提供的这个默认实现只是简单地调用了decode()方法。当Channel的状态变为非活泼时,这个方法会被调用一次。可以重写该方法以提供特殊的处理。
public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
   public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() >= 4) {
        out.add(in.readInt());
    } }
}

(2)ReplayingDecoder

ReplayingDecoder抽象类是ByteToMessageDecoder的一个子类,ByteToMessageDecoder解码读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查;若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。

不是所有的标准ByteBuf操作都被支持,如果调用一个不支持的操作会抛出UnReplayableOperationException
性能上,使用ReplayingDecode要略慢与ByteToMessageDecoder
/**
 * Integer解码器,ReplayingDecoder实现
 */
public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readInt());
    }
}

(3)MessageToMessageDecoder

用于从一种消息解码为另外一种消息。

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

2、编码器 Encoder

(1)MessageToByteEncoder

编码从消息到字节

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
        out.writeShort(msg);//将Short转成二进制字节流写入ByteBuf中
    }
}

(2)MessageToMessageEncoder

编码从消息到消息

public class IntegerToStringEncoder extends MessageToMessageEncoder <Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.ValueOf(msg));
    }
}

3、编解码器

同时具有编码器和解码器的功能,这些类实现了ChannelOutboundHandlerChannelInboundHandler接口。

(1)ByteToMessageCodec

实现从字节到消息的编解码,其子类绝对不使用@Sharable进行注解。

(2)MessageToMessageCodec

实现从消息到消息的编解码

public class NumberCodec extends MessageToMessageCodec<Integer,Long> {

    @Override
    protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
            out.add(msg.longValue());
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, List<Object> out) throws Exception {
            out.add(msg.intValue())
    }
}

 




相关推荐

Java NIO根据操作系统不同,比如macosx是KQueueSelectorProvider、windows有WindowsSelectorProvider、Linux有EPollSelec

Netty提供了丰富的解码器抽象基类:主要分为两类: (1)解码字节到消息(ByteToMessageDecoder和ReplayingDecoder) (2)解码消

netty提供了6种不同的websocket类型: (1)BinaryWebsocketFrame,二进制数据类型 (2)TextWebSocketFrame,文本数

一、概述 在计算机中,我们以字节为单位,一个字节为8bit。 而数据存储中的字节顺序多取决于硬件设计,即所谓的大端存储和小端存储。Intel处理器使用小端存储,PowerPc的处理器采用大端存储。 当

一、Channel的四大类型 1、FileChannel FileChannel是操作文件的Channel,可以通过FileChannel从一个文件中读取数据,也

ByteBuf是对字节的封装,有基于堆内存和直接内存。若是堆内存,应用程序无需考虑什么时候释放,因为GC是帮助做,如果是直接内存,那么需要主动释放。 每个ByteBuf对象都有一个引用计数,当这个数值

Netty中的HashedWheelTimer可以用于提交延迟任务,Java里的Time组件也具备相同的功能,不过Time是基于执行是基于优先队列实现的,相当于需要对所有的任务基于执行的时间排个

一、netty客户端流控 1、这几种情况下,如果客户端没有流控保护,很容易发生内存泄露。 (1)网络瓶颈,当客户端发送速度超过网络链路处理能力,会导致客户端发送队列积压。 (2)当对端读取速度小于已方

Selector Selector是一个多路复用器,它负责管理被注册到其上的SelectableChannel。 1、目前多路复用IO实现主要: (

LineBasedFrameDecoder以换行符\n或\r\n作为依据,遇到\r\n都认为是一条完整的消息。 而DelimiterBasedFrameDecoder允许我们指定任