赞
踩
目录
程序引导类(Bootstrap)可以理解为是一个程序的入口程序,在 Java 程序中,就是一个包含 main 方法的程序类。在 Netty 中,引导程序还包含一系列的配置项。本篇文章我们就来介绍 Netty 的引导程序。
引导程序类是一种引导程序,使 Netty 程序可以很容易地引导一个 Channel。在 Netty 中,承担引导程序的是 AbstractBootStrap
抽象类。
引导程序类都在io.netty.bootstrap
包下。AbstractBootStrap
抽象类有两个子类:Bootstrap
和ServerBootstrap
,分别用于引导客户端程序及服务的程序。
下图展示了引导程序类的关系:
从上图可以看出,AbstractBootStrap
抽象类实现了Cloneable
接口。那么为什么需要实现Cloneable
接口呢?
在 Netty 中经常需要创建多个具有类似配置或者完全相同配置的Channel
。为了支持这种模式而又不避免为每个Channel
都创建并配置一个新的引导类实例,因此AbstractBootStrap
被标记为了Cloneable
。在一个已经配置完成的引导实例上调用clone()
方法将返回另一个可以立即使用的引导类实例。
这种方式只会创建引导类实例的EventLoopGroup
的一个浅拷贝,所以,EventLoopGroup
将在所有克隆的Channel
实例之间共享。这是可以接受的,毕竟这些克隆的Channel
的生命周期都很短暂,例如,一个典型的场景是创建一个Channel
以进行一次HTTP
请求。
以下是AbstractBootStrap
的核心源码:
- public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
-
- volatile EventLoopGroup group;
- private volatile ChannelFactory<? extends C> channelFactory;
- private volatile SocketAddress localAddress;
- private final Map<ChannelOption<?>, Object> options = new LinkedHashMap();
- private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap();
- private volatile ChannelHandler handler;
-
- AbstractBootstrap() {
- //禁止从其他程序包扩展
- }
-
- AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
- this.group = bootstrap.group;
- this.channelFactory = bootstrap.channelFactory;
- this.handler = bootstrap.handler;
- this.localAddress = bootstrap.localAddress;
- synchronized(bootstrap.options) {
- this.options.putAll(bootstrap.options);
- }
-
- this.attrs.putAll(bootstrap.attrs);
- }
-
- //...
- }

从上述源码可以看出,子类型B是其父类型的一个类型参数,因此可以返回到运行时实例的引用以支持方法的链式调用。从私有变量可以看出,AbstractBootStrap
所要管理的启动配置包括EventLoopGroup
、SocketAddress
、Channel配置
、ChannelHandler
等信息。
AbstractBootStrap
是禁止被除io.netty.bootstrap
包外其他程序所扩展的,因此可以看到AbstractBootStrap
默认构造方法被设置为了包内可见。
BootStrap 类是AbstractBootStrap
抽象类的子类之一,主要是用于客户端或者使用了无连接协议的应用程序。
以下是BootStrap
类的核心源码:
- public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
- private static final AddressResolverGroup<?> DEFAULT_RESOLVER;
- private final BootstrapConfig config = new BootstrapConfig(this);
- private volatile AddressResolverGroup<SocketAddress> resolver;
- private volatile SocketAddress remoteAddress;
-
- public Bootstrap() {
- this.resolver = DEFAULT_RESOLVER;
- }
-
- private Bootstrap(Bootstrap bootstrap) {
- super(bootstrap);
- this.resolver = DEFAULT_RESOLVER;
- this.resolver = bootstrap.resolver;
- this.remoteAddress = bootstrap.remoteAddress;
- }
-
- public Bootstrap resolver(AddressResolverGroup<?> resolver) {
- this.resolver = resolver == null ? DEFAULT_RESOLVER : resolver;
- return this;
- }
-
- public Bootstrap remoteAddress(SocketAddress remoteAddress) {
- this.remoteAddress = remoteAddress;
- return this;
- }
-
- public Bootstrap remoteAddress(String inetHost, int inetPort) {
- this.remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort);
- return this;
- }
-
- public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) {
- this.remoteAddress = new InetSocketAddress(inetHost, inetPort);
- return this;
- }
-
- public ChannelFuture connect() {
- this.validate();
- SocketAddress remoteAddress = this.remoteAddress;
- if (remoteAddress == null) {
- throw new IllegalStateException("remoteAddress not set");
- } else {
- return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
- }
- }
-
- public ChannelFuture connect(String inetHost, int inetPort) {
- return this.connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
- }
-
- public ChannelFuture connect(InetAddress inetHost, int inetPort) {
- return this.connect(new InetSocketAddress(inetHost, inetPort));
- }
-
- public ChannelFuture connect(SocketAddress remoteAddress) {
- ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
- this.validate();
- return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
- }
-
- public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
- ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
- this.validate();
- return this.doResolveAndConnect(remoteAddress, localAddress);
- }
-
- //...
-
- public Bootstrap validate() {
- super.validate();
- if (this.config.handler() == null) {
- throw new IllegalStateException("handler not set");
- } else {
- return this;
- }
- }
-
- public Bootstrap clone() {
- return new Bootstrap(this);
- }
-
- public Bootstrap clone(EventLoopGroup group) {
- Bootstrap bs = new Bootstrap(this);
- bs.group = group;
- return bs;
- }
-
- public final BootstrapConfig config() {
- return this.config;
- }
-
- final SocketAddress remoteAddress() {
- return this.remoteAddress;
- }
-
- final AddressResolverGroup<?> resolver() {
- return this.resolver;
- }
-
- ...
- }

上述方法主要分为以下几类:
group:设置用于处理Channel
所有事件的EventLoopGroup
。
channel:指定了 Channel
的实现类。
localAddress:指定 Channel
应该绑定到的本地地址。如果没有指定,则有操作系统创建一个随机的地址。或者,也可以通过bind()
或者connect()
方法指定localAddress
。
option:设置ChannelOption
,其将被应用到每个新创建的Channel
的ChannelConfig
。这些选项将会通过bind()
或者connect()
方法设置到Channel
,配置的顺序与调用先后没有关系。这个方法在Channel
已经被创建后再次调用就不会再起任何效果了。支持什么样的ChannelOption
取决于所使用的Channel
类型。
attr:指定新创建的Channel
的属性值。这些属性值是通过bind()
或者connect()
方法设置到Channel
。配置的顺序取决于调用的先后顺序。这个方法在Channel
已经被创建后再次调用就不会再起任何效果了。
handler:设置被添加到ChannelPipeline
以接收事件通知的ChannelHandler
。
remoteAddress:设置远程地址。也可以通过connect()
方法来指定它。
clone:创建一个当前Bootstrap
的克隆,其具有和原始的Bootstrap
相同的设置信息。
connect:用于连接到远程节点并返回一个ChannelFuture
,并将会在连接操作完成后接收到通知。
bind:绑定Channel
并返回一个ChannelFuture
,其将会在绑定操作完成之后接收到通知,在那之后必须调用Channel
。
BootStrap
类中许多方法都继承自AbstractBootstrap
类。
ServerBootstrap 类是AbstractBootStrap
抽象类的子类之一,主要是用于引导服务器程序。
以下是 ServerBootstrap 类的源码:
- public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
- private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap();
- private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap();
- private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
- private volatile EventLoopGroup childGroup;
- private volatile ChannelHandler childHandler;
-
- public ServerBootstrap() {
- }
-
- private ServerBootstrap(ServerBootstrap bootstrap) {
- super(bootstrap);
- this.childGroup = bootstrap.childGroup;
- this.childHandler = bootstrap.childHandler;
- synchronized(bootstrap.childOptions) {
- this.childOptions.putAll(bootstrap.childOptions);
- }
-
- this.childAttrs.putAll(bootstrap.childAttrs);
- }
-
- public ServerBootstrap group(EventLoopGroup group) {
- return this.group(group, group);
- }
-
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
- super.group(parentGroup);
- if (this.childGroup != null) {
- throw new IllegalStateException("childGroup set already");
- } else {
- this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");
- return this;
- }
- }
-
- public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
- ObjectUtil.checkNotNull(childOption, "childOption");
- synchronized(this.childOptions) {
- if (value == null) {
- this.childOptions.remove(childOption);
- } else {
- this.childOptions.put(childOption, value);
- }
-
- return this;
- }
- }
-
- public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
- ObjectUtil.checkNotNull(childKey, "childKey");
- if (value == null) {
- this.childAttrs.remove(childKey);
- } else {
- this.childAttrs.put(childKey, value);
- }
-
- return this;
- }
-
- public ServerBootstrap childHandler(ChannelHandler childHandler) {
- this.childHandler = (ChannelHandler)ObjectUtil.checkNotNull(childHandler, "childHandler");
- return this;
- }
-
- //...
-
- public ServerBootstrap clone() {
- return new ServerBootstrap(this);
- }
-
- /** @deprecated */
- @Deprecated
- public EventLoopGroup childGroup() {
- return this.childGroup;
- }
-
- final ChannelHandler childHandler() {
- return this.childHandler;
- }
-
- final Map<ChannelOption<?>, Object> childOptions() {
- synchronized(this.childOptions) {
- return copiedMap(this.childOptions);
- }
- }
-
- final Map<AttributeKey<?>, Object> childAttrs() {
- return copiedMap(this.childAttrs);
- }
-
- public final ServerBootstrapConfig config() {
- return this.config;
- }
-
- //...
- }

上述方法主要分为以下几类:
group:设置ServerBootstrap
要用的EventLoopGroup
。这个EventLoopGroup
将用于ServerChannel
和被接受的子Channel
的 I/O 处理。
channel:设置将要被实例化的ServerChannel
类。
localAddress:指定 ServerChannel
应该绑定到的本地地址。如果没有指定,则有操作系统创建一个随机的地址。或者,也可以通过bind()
方法指定localAddress
。
option:指定要应用到新创建的ServerChannel
的ChannelConfig
的ChannelOption
。这些选项将会通过bind()
设置到Channel
,在bind()
方法被调用之后,设置或者改变ChannelOption
将不会有任何效果。支持什么样的ChannelOption
取决于所使用的Channel
类型。
childOption:指定当子Channel
被接受时,应用到子Channel
的ChannelConfig
的ChannelOption
。所支持的ChannelOption
取决于所使用的Channel
类型。
attr:指定ServerChannel
上的属性值。这些属性值是通过bind()
或者connect()
方法设置到Channel
。在bind()
方法被调用之后它们将不会有任何效果。
childAttr:将属性设置给已经被接受的子Channel
。之后再次调用将不会有任何效果。
handler:设置被添加到ServerChannel
的ChannelPipeline
中的ChannelHandler
。
childHandler:设置将被添加到已被接受的子Channel
的ChannelPipeline
中的ChannelHandler
。
clone:克隆一个设置好原始的ServerBootstrap
相同的ServerBootstrap
。
bind:绑定ServerChannel
并返回一个ChannelFuture
,其将会在绑定操作完成之后接收到通知(带着成功或者失败的结果)。
ServerBootstrap
类中许多方法都继承自AbstractBootstrap
类。
为了能更好的理解引导程序,下面就以 Echo 协议的服务器的代码为例。核心代码如下:
- // 多线程事件循环器
- EventLoopGroup bossGroup = new NioEventLoopGroup(); // boss
- EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
-
- try {
- // 启动NIO服务的引导程序类
- ServerBootstrap b = new ServerBootstrap();
-
- b.group(bossGroup, workerGroup) // 设置EventLoopGroup
- .channel(NioServerSocketChannel.class) // 指明新的Channel的类型
- .childHandler(new EchoServerHandler()) // 指定ChannelHandler
- .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
- .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
-
- // 绑定端口,开始接收进来的连接
- ChannelFuture f = b.bind(port).sync();
-
- System.out.println("EchoServer已启动,端口:" + port);
-
- // 等待服务器 socket 关闭 。
- // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
- f.channel().closeFuture().sync();
- } finally {
-
- // 优雅的关闭
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }

引导 Netty 服务器主要分为几下几个步骤。
在上述代码中,首先是需要实例化引导程序类。由于是服务器端的程序,所以,实例化了一个ServerBootstrap
。
设置ServerBootstrap
的EventLoopGroup
。上述服务器使用了两个NioEventLoopGroup
,一个代表boss
线程组,一个代表work
线程组。
boss
线程主要是接收客户端的请求,并将请求转发给work
线程处理。
boss
线程是轻量的,不会处理耗时的任务,因此可以承受高并发的请求。而真实的 I/O 操作都是由work
线程在执行。
NioEventLoopGroup
是支持多线程的,因此可以执行线程池的大小。如果没有指定,则 Netty 会指定一个默认的线程池大小,核心代码如下:
- private static final int DEFAULT_EVENT_LOOP_THREADS;
-
- static {
- // 默认 EventLoopGroup 的线程数
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
- "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
-
- if (logger.isDebugEnabled()) {
- logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
- }
- }
-
- public NioEventLoopGroup() {
- // 如果不指定, nThreads = 0
- this(0);
- }
-
- /**
- * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
- */
- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
- // 如果不指定(nThreads = 0),默认值是 DEFAULT_EVENT_LOOP_THREADS
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- }

从上述源码可以看出,如果NioEventLoopGroup
实例化时,没有指定线程数,则最终默认值是DEFAULT_EVENT_LOOP_THREADS
,而默认值DEFAULT_EVENT_LOOP_THREADS
是根据当前机子的CPU 处理的个数乘以 2 得出的。
channel()
方法用于指定ServerBootstrap
的Channel
类型。在本例中,使用的是NioServerSocketChannel
类型,代表了服务器是一个基于ServerSocketChannel
的实现,使用基于 NIO 选择器的实现来接受新连接。
childHandler
用于指定ChannelHandler
,以便处理Channel
的请求。上述例子中,指定的是自定义的EchoServerHandler
。
option
和childOption
方法,分别用于设置ServerChannel
及ServerChannel
的子Channel
的选项。这些选项定义在ChannelOption
类中,包含以下常量:
- public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
-
- public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");
- public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");
- public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
-
- public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
- /**
- * @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
- * and {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead(int)}.
- */
- @Deprecated
- public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
- public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
- /**
- * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK}
- */
- @Deprecated
- public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
- /**
- * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK}
- */
- @Deprecated
- public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");
- public static final ChannelOption<WriteBufferWaterMark> WRITE_BUFFER_WATER_MARK =
- valueOf("WRITE_BUFFER_WATER_MARK");
-
- public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
- public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ");
-
- /**
- * If {@code true} then the {@link Channel} is closed automatically and immediately on write failure.
- * The default value is {@code true}.
- */
- public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE");
-
- public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
- public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
- public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
- public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
- public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
- public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
- public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
- public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
-
- public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS");
- public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
- public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
- public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
- public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
-
- public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
-
- @Deprecated
- public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =
- valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION");
-
- public static final ChannelOption<Boolean> SINGLE_EVENTEXECUTOR_PER_GROUP =
- valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP");
-
- }

bind()
方法用于绑定端口,会创建一个Channel
而后启动服务。
绑定成功后,返回一个ChannelFuture
,以代表是一个异步的操作。在上述的例子里,使用的是sync()
方法,以同步的方式来获取服务启动的结果。
为了能更好的理解引导程序,下面就以 Echo 协议的客户端的代码为例。核心代码如下:
- // 配置客户端
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new EchoClientHandler());
-
- // 连接到服务器
- ChannelFuture f = b.connect(hostName, portNumber).sync();
-
- Channel channel = f.channel();
- ByteBuffer writeBuffer = ByteBuffer.allocate(32);
- try (BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
- String userInput;
- while ((userInput = stdIn.readLine()) != null) {
- writeBuffer.put(userInput.getBytes());
- writeBuffer.flip();
- writeBuffer.rewind();
-
- // 转为ByteBuf
- ByteBuf buf = Unpooled.copiedBuffer(writeBuffer);
-
- // 写消息到管道
- channel.writeAndFlush(buf);
-
- // 清理缓冲区
- writeBuffer.clear();
- }
- } catch (UnknownHostException e) {
- System.err.println("不明主机,主机名为:" + hostName);
- System.exit(1);
- } catch (IOException e) {
- System.err.println("不能从主机中获取I/O,主机名为:" + hostName);
- System.exit(1);
- }
- } finally {
-
- // 优雅的关闭
- group.shutdownGracefully();
- }

引导 Netty 客户端主要分为几下几个步骤。
在上述代码中,首先是需要实例化引导程序类。由于是客户端的程序,所以,实例化了一个Bootstrap
。
设置Bootstrap
的EventLoopGroup
。不同于服务器,客户端只需要使用了一个NioEventLoopGroup
。
channel()
方法用于指定Bootstrap
的Channel
类型。在本例中,由于是客户端使用,使用的是NioSocketChannel
类型,代表了客户端是一个基于SocketChannel
的实现,使用基于 NIO 选择器的实现来发起连接请求。
option
用于设置Channel
的选项。这些选项定义在ChannelOption
类中。
Handler
用于设置处理服务端请求的ChannelHandler
。上述例子中,指定的是自定义的EchoClientHandler
。
connect()
方法用于连接到指定的服务器的Channel
。
连接成功后,返回一个ChannelFuture
,以代表是一个异步的操作。在上述的例子里,使用的是sync()
方法,以同步的方式来获取服务启动的结果。
通过上述对引导程序类的介绍,相信大家对于服务端和客户端的引导程序以及一些配置项的都有了一定的了解。通过看源码,我们就能更加清楚的知道 Netty 服务端和客户端的启动的过程。下节我们来分析Netty的线程模型。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。