赞
踩
在Spark中很多地方都涉及网络通信,比如各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。在Spark 0.x.x与Spark 1.6.0前的版本中,组件间的消息通信主要借助于Akka,使用Akka可以轻松地构建强有力的高并发与分布式应用。虽然Akka作为一款优秀的分布式通信框架,但在Spark 2.0.0 版本中被移除了,Spark官网文档对此的描述为:“Akka的依赖被移除了,因此用户可以使用任何版本的Akka来编程了。” 在Spark 1.x.x 版本中,用户文件与Jar包的上传采用了由Jetty实现的HttpFileServer,但在Spark 2.0.0 版本中它也被废弃了,现在使用的是基于Spark内置RPC 框架的NettyStreamManager。节点间的Shuffle过程和Block数据的复制与备份在Spark 2.0.0 版本中依然没用了Netty,通过对接口和程序进行重新设计,将各个组件间的消息互通、用户文件与Jar包的上传等内容统一纳入Spark的RPC框架体系中。
Spark 内置 RPC 框架的基本架构:
TransportContext 内部包含传输上下文的配置信息 TransportConf 和对客户端请求消息进行处理的 RpcHandler。TransportConf 在创建TransportClientFactory 和 TransportServer 时都是必需的,而RpcHandler 只用于创建 TransportServer。TransportClientFactory 是 RPC客户端的工厂类,TransportServer 是RPC服务端的实现。图中记号的含义如下:
- /**
- * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
- * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
- * to create a Client.
- */
- public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
- return new TransportClientFactory(this, bootstraps);
- }
- private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
-
- /** A simple data structure to track the pool of clients between two peer nodes. */
- private static class ClientPool {
- TransportClient[] clients;
- Object[] locks;
-
- ClientPool(int size) {
- clients = new TransportClient[size];
- locks = new Object[size];
- for (int i = 0; i < size; i++) {
- locks[i] = new Object();
- }
- }
- }
由此可见,ClientPool实际是由TransportClient的数组构成的,而locks数组中的Object与clients数组中的TransportClient按照数组索引一一对应,通过对每个TransportClient分别采用不同的锁,降低并发情况下线程间对锁的争用,进而减少阻塞,提高并发度。
- /** Create a server which will attempt to bind to a specific host and port. */
- public TransportServer createServer(
- String host, int port, List<TransportServerBootstrap> bootstraps) {
- return new TransportServer(this, host, port, rpcHandler, bootstraps);
- } //this为TransportContext对象本身
下面初步介绍Spark内置RPC框架的组件:
Spark的RPC框架由组件TransportConf提供配置信息,它由配置提供者conf、模块名称module两个属性组成:
- private final ConfigProvider conf;//配置提供者
- private final String module;//配置的模块名称
其中conf是真正的配置提供者,其类型ConfigProvider是一个抽象类,代码如下:
- /**
- * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration.
- */
- public abstract class ConfigProvider {
- /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */
- public abstract String get(String name);
-
- public String get(String name, String defaultValue) {
- try {
- return get(name);
- } catch (NoSuchElementException e) {
- return defaultValue;
- }
- }
-
- public int getInt(String name, int defaultValue) {
- return Integer.parseInt(get(name, Integer.toString(defaultValue)));
- }
-
- public long getLong(String name, long defaultValue) {
- return Long.parseLong(get(name, Long.toString(defaultValue)));
- }
-
- public double getDouble(String name, double defaultValue) {
- return Double.parseDouble(get(name, Double.toString(defaultValue)));
- }
-
- public boolean getBoolean(String name, boolean defaultValue) {
- return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
- }
- }

ConfigProvider提供了包括get、getInt、getLong、getDouble、getBoolean等方法,这些方法都是基于抽象方法get获取值,经过一次类型转换而实现。这个抽象的get方法需要子类去实现。
- object SparkTransportConf {
- private val MAX_DEFAULT_NETTY_THREADS = 8
- def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
- val conf = _conf.clone
- val numThreads = defaultNumThreads(numUsableCores)
- conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
- conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
- new TransportConf(module, new ConfigProvider {
- override def get(name: String): String = conf.get(name)//实际是代理了SparkConf的get方法
- })
- }
- private def defaultNumThreads(numUsableCores: Int): Int = {
- val availableCores =
- if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
- math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
- }
- }

使用SparkTransportConf的fromSparkConf方法来构造TransportConf。传递的三个参数分别为SparkConf、模块名module及可用的内核数numUsableCores。如果numUsableCores小于等于0,那么线程数是系统可用处理器的数量,不过系统的内核数不可能全部用于网络传输,所以这里将分配给网络传输的内核数量最多限制在8个。最终确定的线程数将用于设置客户端传输线程数(spark.$module.io.clientTreads属性)和服务端传输线程数(spark.$module.io.serverThreads属性)。from-SparkConf最终构造TransportConf对象时传递的ConfigProvider为实现get方法的匿名内部类,get的实现实际是代理了SparkConf的get方法。
TransportClientFactory是创建TransportClient的工厂类。在TransportContext中,createClientFactory方法可以创建TransportClientFactory实例:
- /**
- * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
- * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
- * to create a Client.
- */
- public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
- return new TransportClientFactory(this, bootstraps);
- }
-
- public TransportClientFactory createClientFactory() {
- return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());//方法重载
- }
TransportContext中有两个重载的createClientFactory方法,它们最终在构造TransportClientFactory时都会传递两个参数:TransportContext和TransportClientBootstrap列表,代码如下:
- public TransportClientFactory(
- TransportContext context,
- List<TransportClientBootstrap> clientBootstraps) {
- this.context = Preconditions.checkNotNull(context);
- this.conf = context.getConf();
- this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
- this.connectionPool = new ConcurrentHashMap<>();
- this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
- this.rand = new Random();
-
- IOMode ioMode = IOMode.valueOf(conf.ioMode());
- this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
- // TODO: Make thread pool name configurable.
- this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
- this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
- }

相关变量如下:
2.1 客户端引导程序TransportClientBootstrap
TransportClientFactory的clientBootstraps属性是TransportClientBootstrap的列表。TransportClientBootstrap是在TranportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所做的操作往往是昂贵的,好在建立的连接可以重用。
- public interface TransportClientBootstrap {
- void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
- }
2.2 创建RPC客户端TransportClient
有了TransportClientFactory,Spark的各个模块就可以使用它创建RPC客户端TransportClient。每个TransportClient实例只能和一个无端的RPC服务通信,所以Spark中的组件如果想要和多个RPC服务通信,就需要持有多个TransportClient实例。创建TransportClient的方法如下(实际为从缓存中获取TransportClient):
- public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
- //创建InetSocketAddress
- final InetSocketAddress unresolvedAddress =
- InetSocketAddress.createUnresolved(remoteHost, remotePort);
-
- // Create the ClientPool if we don't have it yet.
- ClientPool clientPool = connectionPool.get(unresolvedAddress);
- if (clientPool == null) {
- connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
- clientPool = connectionPool.get(unresolvedAddress);
- }
- //随机选择一个TranportClient
- int clientIndex = rand.nextInt(numConnectionsPerPeer);
- TransportClient cachedClient = clientPool.clients[clientIndex];//从缓存中获取
-
- if (cachedClient != null && cachedClient.isActive()) {//获取并返回激活的TransportClient
- TransportChannelHandler handler = cachedClient.getChannel().pipeline()
- .get(TransportChannelHandler.class);
- synchronized (handler) {
- handler.getResponseHandler().updateTimeOfLastRequest();
- }
-
- if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}",
- cachedClient.getSocketAddress(), cachedClient);
- return cachedClient;
- }
- }
-
- final long preResolveHost = System.nanoTime();
- final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
- final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
- if (hostResolveTimeMs > 2000) {
- logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
- } else {
- logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
- }
- //创建并返回TranportClient对象
- synchronized (clientPool.locks[clientIndex]) {
- cachedClient = clientPool.clients[clientIndex];
-
- if (cachedClient != null) {
- if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
- return cachedClient;
- } else {
- logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
- }
- }
- clientPool.clients[clientIndex] = createClient(resolvedAddress);
- return clientPool.clients[clientIndex];
- }
- }

从上述代码中得知TransportClient创建步骤如下:
上述代码整个执行过程实际解决了TransportClient缓存的使用及createClient方法的线程安全问题,并没有涉及创建TransportClient的实现。TransportClient的创建过程在重载的createClient方法中实现:
- private TransportClient createClient(InetSocketAddress address) throws IOException {
- logger.debug("Creating new connection to {}", address);
- //构建根引导程序Bootstrap并对其进行配置
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(workerGroup)
- .channel(socketChannelClass)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
- .option(ChannelOption.ALLOCATOR, pooledAllocator);
-
- final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
- final AtomicReference<Channel> channelRef = new AtomicReference<>();
- //为根引导程序设置管道初始化回调函数
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) {
- TransportChannelHandler clientHandler = context.initializePipeline(ch);
- clientRef.set(clientHandler.getClient());
- channelRef.set(ch);
- }
- });
-
- // Connect to the remote server
- long preConnect = System.nanoTime();
- ChannelFuture cf = bootstrap.connect(address);
- if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
- throw new IOException(
- String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
- } else if (cf.cause() != null) {
- throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
- }
-
- TransportClient client = clientRef.get();
- Channel channel = channelRef.get();
- assert client != null : "Channel future completed successfully with null client";
-
- long preBootstrap = System.nanoTime();
- logger.debug("Connection to {} successful, running bootstraps...", address);
- try {
- for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
- clientBootstrap.doBootstrap(client, channel);//给TransportClient设置客户端引导程序
- }
- } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
- long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
- logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
- client.close();
- throw Throwables.propagate(e);
- }
- long postBootstrap = System.nanoTime();
-
- logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
- address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
-
- return client;
- }

根据上述代码得知创建TransportClient步骤如下:
TransportServer是RPC框架的服务端,可提供高效、低级别的流服务,TransportContext的createServer方法用于创建TransportServer,其实现如下:
- public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
- return new TransportServer(this, null, port, rpcHandler, bootstraps);
- }
TransportContext中有4个名为createServer的重载方法,但是它们最终调用了TransportServer的构造器来创建TransportServer实现。
- public TransportServer(
- TransportContext context,
- String hostToBind,
- int portToBind,
- RpcHandler appRpcHandler,
- List<TransportServerBootstrap> bootstraps) {
- this.context = context;
- this.conf = context.getConf();
- this.appRpcHandler = appRpcHandler;
- this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
-
- try {
- init(hostToBind, portToBind);
- } catch (RuntimeException e) {
- JavaUtils.closeQuietly(this);
- throw e;
- }
- }

TransportServer的构造器中的各个变量如下:
TransportServer的构造器中调用 了init方法,init方法用于对TransportServer进行初始化,代码如下:
- private void init(String hostToBind, int portToBind) {
- //根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup
- IOMode ioMode = IOMode.valueOf(conf.ioMode());
- EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
- EventLoopGroup workerGroup = bossGroup;
- //创建一个汇集ByteBuf但对本地线程缓存禁用的分配器
- PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
- //创建Netty的服务端根引导程序并对其进行配置
- bootstrap = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NettyUtils.getServerChannelClass(ioMode))
- .option(ChannelOption.ALLOCATOR, allocator)
- .childOption(ChannelOption.ALLOCATOR, allocator);
-
- if (conf.backLog() > 0) {
- bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
- }
- if (conf.receiveBuf() > 0) {
- bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
- }
- if (conf.sendBuf() > 0) {
- bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
- }
- //为根引导程序设置管道初始化回调函数
- bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- RpcHandler rpcHandler = appRpcHandler;
- for (TransportServerBootstrap bootstrap : bootstraps) {
- rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
- }
- context.initializePipeline(ch, rpcHandler);
- }
- });
- //给根引导程序绑定Socket的监听端口
- InetSocketAddress address = hostToBind == null ?
- new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
- channelFuture = bootstrap.bind(address);
- channelFuture.syncUninterruptibly();
- port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
- logger.debug("Shuffle server started on port: {}", port);
- }

根据上述代码可知TransportServer初始化的步骤如下:
在创建TransportClient和对TranportServer初始化的实现中,都在管道初始化回调函数中调用了TranportContext的initializePipeline方法,initializePipeline方法将调用Netty的API对管道初始化。
- public TransportChannelHandler initializePipeline(
- SocketChannel channel,
- RpcHandler channelRpcHandler) {
- try {
- TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
- channel.pipeline()
- .addLast("encoder", encoder)
- .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
- .addLast("decoder", decoder)
- .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
- .addLast("handler", channelHandler);
- return channelHandler;
- } catch (RuntimeException e) {
- logger.error("Error while initializing Netty pipeline", e);
- throw e;
- }
- }

根据上述代码,可知initialPipeline方法的执行步骤如下:
- private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
- TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
- TransportClient client = new TransportClient(channel, responseHandler);
- TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
- rpcHandler);
- return new TransportChannelHandler(client, responseHandler, requestHandler,
- conf.connectionTimeoutMs(), closeIdleConnections);
- }
TransportChannelHandler实现了Netty的ChannelInboundHandler,以便对Netty管道中的消息进行处理。上图中的Handler(除了MessageEncoder)由于都实现了ChannelInboundHandler接口,作为自定义的ChannelInboundHandler,所以都要重写channelRead方法。Netty框架使用工作链模式来对每个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法如代码下:
- public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
- if (request instanceof RequestMessage) {
- requestHandler.handle((RequestMessage) request);
- } else {
- responseHandler.handle((ResponseMessage) request);
- }
- }
从上述代码可知,当TransportChannelHandler读取的request是RequestMessage时,则将此消息的处理进一步交给TransportRequestHandler,当读取的request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。
5.1 MessageHandler的继承体系
TransportRequestHandler与TransportResponseHandler继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义如下:
- public abstract class MessageHandler<T extends Message> {
- //用于对接收到的单个消息进行处理
- public abstract void handle(T message) throws Exception;
- //当channel激活时调用
- public abstract void channelActive();
- //当捕获到channel发生异常时调用
- public abstract void exceptionCaught(Throwable cause);
- //当channel非激活时调用
- public abstract void channelInactive();
- }
5.2 Message的继承体系
根据MessageHandler代码可知,MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义如代码下:
- public interface Message extends Encodable {
- //返回消息的类型
- Type type();
- //返回消息中可选的内容体
- ManagedBuffer body();
- //用于判断消息的主体是否包含在消息的同一帧中
- boolean isBodyInFrame();
Message接口继承了Encodable接口,Encodable的定义如下:
- public interface Encodable {
- int encodedLength();
- void encode(ByteBuf buf);
- }
实现Encodable接口的类将可以转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf;encodedLenth用于返回转换的对象数量。
从上图中看到,最终的消息实现类都直接或间接地实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有4种:
由于OneWayMessage不需要响应,所以ResponseMessage对于成功或失败状态的实现各有3种,分别如下:
5.3 ManagerBuffer的继承体系
查看接口Message中对body的定义,其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是ManagedBuffer并不存储数据,也不是数据的实际来源,这与关系型数据库的视图类似)。抽象类ManagedBuffer代码如下:
- public abstract class ManagedBuffer {
- //返回数据的字节数
- public abstract long size();
- //将数据按照NIO的ByteBuffer类型返回
- public abstract ByteBuffer nioByteBuffer() throws IOException;
- //将数据按照InputStream返回
- public abstract InputStream createInputStream() throws IOException;
- //当有新的使用者使用此视图时,增加引用此视图的引用数
- public abstract ManagedBuffer retain();
- //当有使用者不再使用此视图时,减少引用此视图的引用数;当引用数为0时释放缓冲区
- public abstract ManagedBuffer release();
- //将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion
- public abstract Object convertToNetty() throws IOException;
- }
由于TransportRequestHandler实际是把请求消息交给RpcHandler进行处理,RpcHandler是一个抽象类,定义了一些RPC处理器的规范,代码下:
- public abstract class RpcHandler {
- private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
- //抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现
- public abstract void receive(
- TransportClient client,
- ByteBuffer message,
- RpcResponseCallback callback);
- //获取Streammanager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态
- public abstract StreamManager getStreamManager();
- //重载receive方法,RpcResponseCallback为默认的ONE_WAY_CALLBACK
- public void receive(TransportClient client, ByteBuffer message) {
- receive(client, message, ONE_WAY_CALLBACK);
- }
- //当与给定客户端相关联的channel处于活动状态时调用
- public void channelActive(TransportClient client) { }
- //当与给定客户端相关联的channel处于非活动状态时调用
- public void channelInactive(TransportClient client) { }
- //当channel产生异常时调用
- public void exceptionCaught(Throwable cause, TransportClient client) { }
- }

介绍完RpcHandler,重新梳理TransportRequestHandler的处理过程。TransportRequestHandler处理以下4咱RequestMessage:
- @Override
- public void handle(RequestMessage request) {
- if (request instanceof ChunkFetchRequest) {
- processFetchRequest((ChunkFetchRequest) request);
- } else if (request instanceof RpcRequest) {
- processRpcRequest((RpcRequest) request);
- } else if (request instanceof OneWayMessage) {
- processOneWayMessage((OneWayMessage) request);
- } else if (request instanceof StreamRequest) {
- processStreamRequest((StreamRequest) request);
- } else {
- throw new IllegalArgumentException("Unknown request type: " + request);
- }
- }
处理块获取请求
processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现代码如下:
- private void processFetchRequest(final ChunkFetchRequest req) {
- if (logger.isTraceEnabled()) {
- logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
- req.streamChunkId);
- }
- ManagedBuffer buf;
- try {
- streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
- streamManager.registerChannel(channel, req.streamChunkId.streamId);
- buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
- } catch (Exception e) {
- logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId,
- getRemoteAddress(channel)), e);
- respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
- return;
- }
- respond(new ChunkFetchSuccess(req.streamChunkId, buf));
- }

streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:
处理RPC、流以及无需回复的RPC请求分析方法差不多,这里不做重复累赘复述。
TransportServer的构造器中的bootstraps是TranportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义如下:
- public interface TransportServerBootstrap {
- RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
- }
TransportServerBootstrap的doBootstrap方法将对服务端的RpcHandler进行代理,接收客户端的请求。TransportServerBootstrap的实现类SaslServerBoostrap,其doBootstrap代码如下:
- public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
- return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
- }
根据上述代码可知,SaslServerBootstrap的doBootstrap方法实际创建了SaslRpcHandler,SaslRpcHandler负责对管道进行SASL加密。SaslRpcHandler本身也继承了RpcHandler,可看其receive方法:
- @Override
- public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
- if (isComplete) {
- //将消息传递给SaslRpcHandler所代理的下游Rpchandler并返回
- delegate.receive(client, message, callback);
- return;
- }
- ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
- SaslMessage saslMessage;
- try {
- //对客户端发送的消息进行SASL解密
- saslMessage = SaslMessage.decode(nettyBuf);
- } finally {
- nettyBuf.release();
- }
- if (saslServer == null) {
- //如果saslServer还未创建,则需要创建SparkSaslServer
- client.setClientId(saslMessage.appId);
- saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
- conf.saslServerAlwaysEncrypt());
- }
- byte[] response;
- try {
- //使用saslServer处理已解密的消息
- response = saslServer.response(JavaUtils.bufferToArray(
- saslMessage.body().nioByteBuffer()));
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- callback.onSuccess(ByteBuffer.wrap(response));
- if (saslServer.isComplete()) {
- logger.debug("SASL authentication successful for channel {}", client);
- isComplete = true;//SASL认证交换已经完成
- if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
- logger.debug("Enabling encryption for channel {}", client);
- //对管道进行SASL加密
- SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
- saslServer = null;
- } else {
- saslServer.dispose();
- saslServer = null;
- }
- }
- }

学习完服务端RpcHandler对请求消息的处理后,接下来学习客户端发送RPC请求的原理。 TransportContext的createChannelHandler方法中调用了TransportClient的构造器,其中TranportResponseHandler的引用将赋给handler属性。
- public TransportClient(Channel channel, TransportResponseHandler handler) {
- this.channel = Preconditions.checkNotNull(channel);
- this.handler = Preconditions.checkNotNull(handler);
- this.timedOut = false;
- }
TranportClient一共有5个方法用于发送请求,分别如下:
8.1 发送RPC请求
- public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final long startTime = System.currentTimeMillis();
- if (logger.isTraceEnabled()) {
- logger.trace("Sending RPC to {}", getRemoteAddress(channel));
- }
- //使用UUID生成请求主键requestId
- final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
- //添加requestId与RpcResponseCallback的引用之间的关系
- handler.addRpcRequest(requestId, callback);
- //发送RPC请求
- channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms", requestId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeRpcRequest(requestId);
- channel.close();
- try {
- callback.onFailure(new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
- }
- });
- return requestId;
- }

根据上述代码得知,sendRpc方法的实现步骤如下:
- public void addRpcRequest(long requestId, RpcResponseCallback callback) {
- updateTimeOfLastRequest();
- outstandingRpcs.put(requestId, callback);
- }
请求发送成功后,客户端将等待接收服务端的响应。根据前面TranportChannelHandler的分析,返回的消息也会传递给TransportChannelHandler的channelRead方法,并由handle方法来处理。TransportResponseHandler的Handle方法分别对前文提到的6种ResponseMessage(ChunkFetchSuccess、ChunkFetchFailure、RpcResponse、RpcFailure、StreamResponse、StreamFailure)进行处理,由于服务端TransportRequestHandler使用processRpcRequest处理RpcRequest类型的消息后,返回给客户端的消息为RpcResponse或RpcFailure,查看如下代码:
- } else if (message instanceof RpcResponse) {
- RpcResponse resp = (RpcResponse) message;
- RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
- if (listener == null) {
- logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
- resp.requestId, getRemoteAddress(channel), resp.body().size());
- } else {
- outstandingRpcs.remove(resp.requestId);
- try {
- listener.onSuccess(resp.body().nioByteBuffer());
- } finally {
- resp.body().release();
- }
- }
- } else if (message instanceof RpcFailure) {
- RpcFailure resp = (RpcFailure) message;
- RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
- if (listener == null) {
- logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
- resp.requestId, getRemoteAddress(channel), resp.errorString);
- } else {
- outstandingRpcs.remove(resp.requestId);
- listener.onFailure(new RuntimeException(resp.errorString));
- }
- }

根据上述代码可知,处理RpcResponse的逻辑如下:
处理RpcResponse的逻辑如下:
8.2 发送获取块请求
fetchChunk的实现代码:
- public void fetchChunk(
- long streamId,
- final int chunkIndex,
- final ChunkReceivedCallback callback) {
- final long startTime = System.currentTimeMillis();
- if (logger.isDebugEnabled()) {
- logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
- }
- //创建StreamChunkId
- final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
- //添加StreamChunkId与ChunkReceivedCallback之间的对应关系
- handler.addFetchRequest(streamChunkId, callback);
- //发送块请求
- channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms", streamChunkId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeFetchRequest(streamChunkId);
- channel.close();
- try {
- callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
- }
- });
- }

请求发送成功后,客户端将等待接收服务端的响应。返回的消息也会传递给TransportChannelHandler的channelRead方法,根据之前的分析,消息的分析将最后交给TransportResponseHandler的handler方法来处理。服务端使用processFetchRequest方法处理ChunkFetchRequest类型的消息后返回给客户端的消息为ChunkFetchSuccess或ChunkFetchFailure,查看处理代码:
- if (message instanceof ChunkFetchSuccess) {
- ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
- ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
- if (listener == null) {
- logger.warn("Ignoring response for block {} from {} since it is not outstanding",
- resp.streamChunkId, getRemoteAddress(channel));
- resp.body().release();
- } else {
- outstandingFetches.remove(resp.streamChunkId);
- listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
- resp.body().release();
- }
- } else if (message instanceof ChunkFetchFailure) {
- ChunkFetchFailure resp = (ChunkFetchFailure) message;
- ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
- if (listener == null) {
- logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
- resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
- } else {
- outstandingFetches.remove(resp.streamChunkId);
- listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
- "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
- }
- }

根据上述代码可知,处理ChunkFetchSuccess的逻辑:
处理ChunkFetchFailure的逻辑:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。