官术网_书友最值得收藏!

6.5 Spark 1.6 RPC內幕解密:運行機制、源碼詳解、Netty與Akka等

Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef為核心的新型架構下的RPC通信方式,就目前的實現而言,其底層依舊是Akka。Akka是基于Actor的分布式消息通信系統,而在Spark 1.6中封裝了Akka,提供更高層的Rpc實現,目的是移除對Akka的依賴,為擴展和自定義Rpc打下基礎。

Spark 2.0版本中Rpc的變化情況如下。

 SPARK-6280:從Spark中刪除Akka systemName。

 SPARK-7995:刪除AkkaRpcEnv,并從Core的依賴中刪除Akka。

 SPARK-7997:刪除開發人員api SparkEnv.actorSystem和AkkaUtils。

RpcEnv是一個抽象類abstract class,傳入SparkConf。RPC環境中[RpcEndpoint]需要注冊自己的名字[RpcEnv]來接收消息。[RpcEnv]將處理消息發送到[RpcEndpointRef]或遠程節點,并提供給相應的[RpcEndpoint]。[RpcEnv]]未被捕獲的異常,[RpcEnv]將使用[RpcCallContext.sendFailure]發送異常給發送者,如果沒有這樣的發送者,則記錄日志NotSerializableException。

RpcEnv.scala的源碼如下。

1.    private[spark] abstract class RpcEnv(conf: SparkConf) {
2.
3.    private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
4.  ......

RpcCallContext.scala處理異常的方法包括reply、sendFailure、senderAddress,其中reply是給發送者發送一個信息。如果發送者是[RpcEndpoint],它的[RpcEndpoint.receive]將被調用。

其中,RpcCallContext的地址RpcAddress是一個case class,包括hostPort、toSparkURL等成員。

RpcAddress.scala的源碼如下。

1.    private[spark] case class RpcAddress(host: String, port: Int) {
2.    def hostPort: String = host + ":" + port
3.    /**返回一個字符串,該字符串的形式為:spark://host:port*/
4.    def toSparkURL: String = "spark://" + hostPort
5.    override def toString: String = hostPort
6.  }

RpcAddress伴生對象object RpcAddress屬于包org.apache.spark.rpc,fromURIString方法從String中提取出RpcAddress;fromSparkURL方法也是從String中提取出RpcAddress。說明:case class RpcAddress通過伴生對象object RpcAddress的方法調用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。

伴生對象RpcAddress的源碼如下。

1.   private[spark] object RpcAddress {
2.    /**返回[RpcAddress]為代表的uri */
3.    def fromURIString(uri: String): RpcAddress = {
4.      val uriObj = new java.net.URI(uri)
5.      RpcAddress(uriObj.getHost, uriObj.getPort)
6.    }
7.    /**返回[RpcAddress],編碼的形式:spark://host:port */
8.    def fromSparkURL(sparkUrl: String): RpcAddress = {
9.      val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
10.     RpcAddress(host, port)
11.   }
12. }

RpcEnv解析:

(1)RpcEnv是RPC的環境(相當于Akka中的ActorSystem),所有的RPCEndpoint都需要注冊到RpcEnv實例對象中(注冊的時候會指定注冊的名稱,這樣客戶端就可以通過名稱查詢到RpcEndpoint的RpcEndpointRef引用,從而進行通信),在RpcEndpoint接收到消息后會調用receive方法進行處理。

(2)RpcEndpoint如果接收到需要reply的消息,就會交給自己的receiveAndReply來處理(回復時是通過RpcCallContext中的relpy方法來回復發送者的),如果不需要reply,就交給receive方法來處理。

(3)RpcEnvFactory是負責創建RpcEnv的,通過create方法創建RpcEnv實例對象,默認用Netty。

RpcEnv示意圖如圖6-4所示。

圖6-4 RPCEnv示意圖

回到RpcEnv.scala的源碼,首先調用RpcUtils.lookupRpcTimeout(conf),返回RPC遠程端點查找時默認Spark的超時時間。方法lookupRpcTimeout中構建了一個RpcTimeout,定義spark.rpc.lookupTimeout。spark.network.timeout的超時時間是120s。

RpcUtils.scala的lookupRpcTimeout方法的源碼如下。

1.  def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
2.    RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network
      .timeout"), "120s")
3.  }

進入RpcTimeout,進行RpcTimeout關聯超時的原因描述,當TimeoutException發生的時候,關于超時的額外的上下文將包含在異常消息中。

RpcTimeout.scala的源碼如下。

1.  private[spark] class RpcTimeout(val duration: FiniteDuration, val
    timeoutProp: String)
2.  extends Serializable {
3.
4.  /**修正TimeoutException標準的消息包括描述 */
5.  private def createRpcTimeoutException(te: TimeoutException):
    RpcTimeoutException = {
6.    new RpcTimeoutException(te.getMessage + ". This timeout is controlled
      by " + timeoutProp, te)
7.  }

其中的RpcTimeoutException繼承自TimeoutException。

1.    private[rpc] class RpcTimeoutException(message: String, cause:
      TimeoutException)
2.  extends TimeoutException(message) { initCause(cause) }

其中的TimeoutException繼承自Exception。

1.     public class TimeoutException extends Exception {
2.  ......
3.      public TimeoutException(String message) {
4.          super(message);
5.      }
6.  }

回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出現超時,將加入這些信息。

RpcTimeout.scala的addMessageIfTimeout的源碼如下。

1.  def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
2.    //異常已被轉換為一個RpcTimeoutException,就拋出它
3.    case rte: RpcTimeoutException => throw rte
4.    //其他TimeoutException異常轉換為修改的消息RpcTimeoutException
5.    case te: TimeoutException => throw createRpcTimeoutException(te)
6.  }

RpcTimeout.scala中的awaitResult方法比較關鍵:awaitResult一直等結果完成并獲得結果,如果在指定的時間沒有返回結果,就拋出異常[RpcTimeoutException]。

Spark 2.1.1版本的RpcTimeout.scala的源碼如下。

1.    def awaitResult[T](future: Future[T]): T = {
2.      val wrapAndRethrow: PartialFunction[Throwable, T] = {
3.        case NonFatal(t) =>
4.          throw new SparkException("Exception thrown in awaitResult", t)
5.      }
6.      try {
7.        //scalastyle:關閉awaitresult
8.        Await.result(future, duration)
9.        //scalastyle:打開awaitresult
10.     } catch addMessageIfTimeout.orElse(wrapAndRethrow)
11.   }
12. }

Spark 2.2.0版本的RpcTimeout.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第2~10行整體被替換,調整為調用ThreadUtils.awaitResult(future, duration)。

1.   /**
2.    * 等待完成的結果并返回結果。如果結果不在這個超時  timeout   范圍內,就拋出一個異常
      * [RpcTimeoutException]表示配置控制超時
3.    *
4.    * @param  future    `Future` 將被等待
5.    * @throws RpcTimeoutException如果在等待指定的時間future還沒準備好
6.    */
7.   def awaitResult[T](future: Future[T]): T = {
8.     try {
9.       ThreadUtils.awaitResult(future, duration)
10.    } catch addMessageIfTimeout
11.  }
12. }

其中的future是Future[T]類型,繼承自Awaitable。

1.  trait Future[+T] extends Awaitable[T]

Awaitable是一個trait,其中的ready方法是指Duration時間片內,Awaitable的狀態變成completed狀態,就是ready。在Await.result中,result本身是阻塞的。

Awaitable.scala的源碼如下。

1.    trait Awaitable[+T] {
2.  ......
3.  def ready(atMost: Duration)(implicit permit: CanAwait): this.type
4.  ......
5.    @throws(classOf[Exception])
6.    def result(atMost: Duration)(implicit permit: CanAwait): T
7.  }
8.

回到RpcEnv.scala中,其中endpointRef方法返回我們注冊的RpcEndpoint的引用,是代理的模式。我們要使用RpcEndpoint,是通過RpcEndpointRef來使用的。Address方法是RpcEnv監聽的地址;setupEndpoint方法注冊時根據RpcEndpoint名稱返回RpcEndpointRef。fileServer返回用于服務文件的文件服務器實例。如果RpcEnv不以服務器模式運行,可能是null值。

RpcEnv.scala的源碼如下。

1.   private[spark] abstract class RpcEnv(conf: SparkConf) {
2.
3.    private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout
      (conf)
4.  ......
5.    private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
6.  def address: RpcAddress
7.  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
8.  .......
9.    def fileServer: RpcEnvFileServer
10. ......

RpcEnv.scala中的RpcEnvFileServer方法中的RpcEnvConfig是一個case class。RpcEnvFileServer的源碼如下。

1.    private[spark] trait RpcEnvFileServer {
2.    def addFile(file: File): String
3.  ......
4.  private[spark] case class RpcEnvConfig(
5.      conf: SparkConf,
6.      name: String,
7.      bindAddress: String,
8.      advertiseAddress: String,
9.      port: Int,
10.     securityManager: SecurityManager,
11.     clientMode: Boolean)

RpcEnv是一個抽象類,其具體的子類是NettyRpcEnv。Spark 1.6版本中包括AkkaRpcEnv和NettyRpcEnv兩種方式。Spark 2.0版本中只有NettyRpcEnv。

下面看一下RpcEnvFactory。RpcEnvFactory是一個工廠類,創建[RpcEnv],必須有一個空構造函數,以便可以使用反射創建。create根據具體的配置,反射出具體的實例對象。RpcEndpoint方法中定義了receiveAndReply方法和receive方法。

RpcEndpoint.scala的源碼如下。

1.   private[spark] trait RpcEnvFactory {
2.
3.    def create(config: RpcEnvConfig): RpcEnv
4.  }
5.  private[spark] trait RpcEndpoint {
6.  ......
7.    val rpcEnv: RpcEnv
8.
9.    ......
10.   final def self: RpcEndpointRef = {
11.     require(rpcEnv != null, "rpcEnv has not been initialized")
12.     rpcEnv.endpointRef(this)
13.   }
14. .......
15.
16.   def receive: PartialFunction[Any, Unit] = {
17.     case _ => throw new SparkException(self + " does not implement
        'receive'")
18.   }
19. ......
20.   def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
      Unit] = {
21.     case _ => context.sendFailure(new SparkException(self + " won't reply
        anything"))
22.   }
23. ......

Master繼承自ThreadSafeRpcEndpoint,接收消息使用receive方法和receiveAndReply方法。

其中,ThreadSafeRpcEndpoint繼承自RpcEndpoint:ThreadSafeRpcEndpoint是一個trait,需要RpcEnv線程安全地發送消息給它。線程安全是指在處理下一個消息之前通過同樣的[ThreadSafeRpcEndpoint]處理一條消息。換句話說,改變[ThreadSafeRpcEndpoint]的內部字段在處理下一個消息是可見的,[ThreadSafeRpcEndpoint]的字段不需要volatile或equivalent,不能保證對于不同的消息在相同的[ThreadSafeRpcEndpoint]線程中來處理。

1.  private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint

回到RpcEndpoint.scala,重點看一下receiveAndReply方法和receive方法。receive方法處理從[RpcEndpointRef.send]或者[RpcCallContext.reply]發過來的消息,如果收到一個不匹配的消息,[SparkException]會拋出一個異常onError。receiveAndReply方法處理從[RpcEndpointRef.ask]發過來的消息,如果收到一個不匹配的消息,[SparkException]會拋出一個異常onError。receiveAndReply方法返回PartialFunction對象。

RpcEndpoint.scala的源碼如下。

1.     def receive: PartialFunction[Any, Unit] = {
2.    case _ => throw new SparkException(self + " does not implement
      'receive'")
3.  }
4.
5.  ......
6.    def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
      Unit] = {
7.      case _ => context.sendFailure(new SparkException(self + " won't reply
        anything"))
8.    }

在Master中,Receive方法中收到消息以后,不需要回復對方。

Master.scala的Receive方法的源碼如下。

1.    override def receive: PartialFunction[Any, Unit] = {
2.     case ElectedLeader =>
3.     .....
4.   recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
5.           override def run(): Unit = Utils.tryLogNonFatalError {
6.             self.send(CompleteRecovery)
7.           }
8.         }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
9.       }
10.
11.    case CompleteRecovery => completeRecovery()
12.
13.
14.    case RevokedLeadership =>
15.      logError("Leadership has been revoked -- master shutting down.")
16.      System.exit(0)
17.
18.    case RegisterApplication(description, driver) =>
19.     ......
20.        schedule()
21.

在Master中,receiveAndReply方法中收到消息以后,都要通過context.reply回復對方。

在Master中,RpcEndpoint如果接收到需要reply的消息,就會交給自己的receiveAndReply來處理(回復時是通過RpcCallContext中的relpy方法來回復發送者的),如果不需要reply,就交給receive方法來處理。

RpcCallContext的源碼如下。

1.  private[spark] trait RpcCallContext {
2.
3.   /**
       *回復消息的發送者。如果發送者是[RpcEndpoint],其[RpcEndpoint.receive]
       *將被調用
5.     */
6.    def reply(response: Any): Unit
7.
8.   /**
       *向發送方報告故障
9.     */
10.   def sendFailure(e: Throwable): Unit
11.
12.  /**
       *此消息的發送者
13.    */
14.   def senderAddress: RpcAddress
15. }

回到RpcEndpoint.scala,RpcEnvFactory是一個trait,負責創建RpcEnv,通過create方法創建RpcEnv實例對象,默認用Netty。

RpcEndpoint.scala的源碼如下。

1.   private[spark] trait RpcEnvFactory {
2.
3.    def create(config: RpcEnvConfig): RpcEnv
4.  }

RpcEnvFactory的create方法沒有具體的實現。下面看一下RpcEnvFactory子類NettyRpcEnvFactory中create的具體實現,使用的方式為nettyEnv。

NettyRpcEnv.scala的create方法的源碼如下。

1.    def create(config: RpcEnvConfig): RpcEnv = {
2.      val sparkConf = config.conf
3.      //在多個線程中使用JavaSerializerInstance 是安全的。然而,如果將來計劃支持
        //KryoSerializer,必須使用ThreadLocal來存儲SerializerInstance
4.
5.      val javaSerializerInstance =
6.        new JavaSerializer(sparkConf).newInstance().asInstanceOf
          [JavaSerializerInstance]
7.      val nettyEnv =
8.        new NettyRpcEnv(sparkConf, javaSerializerInstance,
          config.advertiseAddress,
9.          config.securityManager)
10.     if (!config.clientMode) {
11.       val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
12.         nettyEnv.startServer(config.bindAddress, actualPort)
13.         (nettyEnv, nettyEnv.address.port)
14.       }
15.       try {
16.         Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf,
            config.name)._1
17.       } catch {
18.         case NonFatal(e) =>
19.           nettyEnv.shutdown()
20.           throw e
21.       }
22.     }
23.     nettyEnv
24.   }
25. }

在Spark 2.0版本中回溯一下NettyRpcEnv的實例化過程。在SparkContext實例化時調用createSparkEnv方法。

SparkContext.scala的源碼如下。

1.  ......
2.  _env = createSparkEnv(_conf, isLocal, listenerBus)
3.      SparkEnv.set(_env)
4.  ......
5.
6.    private[spark] def createSparkEnv(
7.        conf: SparkConf,
8.        isLocal: Boolean,
9.        listenerBus: LiveListenerBus): SparkEnv = {
10.     SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.
        numDriverCores(master))
11.   }
12.
13. .....

SparkContext的createSparkEnv方法中調用了SparkEnv.createDriverEnv方法。下面看一下createDriverEnv方法的實現,其調用了create方法。

SparkEnv.scala的createDriverEnv的源碼如下。

1.    private[spark] def createDriverEnv(
2.      .......
3.      create(
4.        conf,
5.        SparkContext.DRIVER_IDENTIFIER,
6.        bindAddress,
7.        advertiseAddress,
8.        port,
9.        isLocal,
10.       numCores,
11.       ioEncryptionKey,
12.       listenerBus = listenerBus,
13.       mockOutputCommitCoordinator = mockOutputCommitCoordinator
14.     )
15.   }
16.
17.  private def create(
18.     ........
19.     val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress,
        port, conf,
20.       securityManager, clientMode = !isDriver)
21. ......

在RpcEnv.scala中,creat方法直接調用new()函數創建一個NettyRpcEnvFactory,調用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory繼承自RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接使用NettyRpcEnvFactory的方式。

RpcEnv.scala的源碼如下。

1.   private[spark] object RpcEnv {
2.   .......
3.
4.    def create(
5.        name: String,
6.        bindAddress: String,
7.        advertiseAddress: String,
8.        port: Int,
9.        conf: SparkConf,
10.       securityManager: SecurityManager,
11.       clientMode: Boolean): RpcEnv = {
12.     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress,
        port, securityManager,
13.       clientMode)
14.     new NettyRpcEnvFactory().create(config)
15.   }

NettyRpcEnvFactory().create的方法如下。

NettyRpcEnv.scala的源碼如下。

1.    private[rpc]     class   NettyRpcEnvFactory       extends   RpcEnvFactory  with
      Logging {
2.
3.    def create(config: RpcEnvConfig): RpcEnv = {
4.    ......
5.      val nettyEnv =
6.        new NettyRpcEnv(sparkConf, javaSerializerInstance,
          config.advertiseAddress,
7.          config.securityManager)
8.      if (!config.clientMode) {
9.        val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
10.         nettyEnv.startServer(config.bindAddress, actualPort)
11.         (nettyEnv, nettyEnv.address.port)
12.       }
13.       try {
14.         Utils.startServiceOnPort(config.port, startNettyRpcEnv,
            sparkConf, config.name)._1
15.       } catch {
16.         case NonFatal(e) =>
17.           nettyEnv.shutdown()
18.           throw e
19.       }
20.     }
21.     nettyEnv
22.   }
23. }

在NettyRpcEnvFactory().create中調用new()函數創建一個NettyRpcEnv。NettyRpcEnv傳入SparkConf參數,包括fileServer、startServer等方法。

NettyRpcEnv的源碼如下。

1.   private[netty] class NettyRpcEnv(
2.      val conf: SparkConf,
3.      javaSerializerInstance: JavaSerializerInstance,
4.      host: String,
5.      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
6.
7.  ......
8.    override def fileServer: RpcEnvFileServer = streamManager
9.  ......
10.   def startServer(bindAddress: String, port: Int): Unit = {
11.     val bootstraps: java.util.List[TransportServerBootstrap] =
12.       if (securityManager.isAuthenticationEnabled()) {
13.         java.util.Arrays.asList(new         SaslServerBootstrap(transportConf,
            securityManager))
14.       } else {
15.         java.util.Collections.emptyList()
16.       }
17.     server = transportContext.createServer(bindAddress, port, bootstraps)
18.     dispatcher.registerRpcEndpoint(
19.       RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
20.   }

NettyRpcEnv.scala的startServer中,通過transportContext.createServer創建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注冊RpcEndpoint。在createServer方法中調用new()函數創建一個TransportServer。

TransportContext的createServer方法的源碼如下。

1.  public TransportServer createServer(
2.      String host, int port, List<TransportServerBootstrap> bootstraps) {
3.    return new TransportServer(this, host, port, rpcHandler, bootstraps);
4.  }

TransportServer.java的源碼如下。

1.  public TransportServer(
2.        TransportContext context,
3.        String hostToBind,
4.        int portToBind,
5.        RpcHandler appRpcHandler,
6.        List<TransportServerBootstrap> bootstraps) {
7.      this.context = context;
8.      this.conf = context.getConf();
9.      this.appRpcHandler = appRpcHandler;
10.     this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull
        (bootstraps));
11.
12.     try {
13.       init(hostToBind, portToBind);
14.     } catch (RuntimeException e) {
15.       JavaUtils.closeQuietly(this);
16.       throw e;
17.     }
18.   }

TransportServer.java中的關鍵方法是init,這是Netty本身的實現內容。

TransportServer.java中的init的源碼如下。

1.         private void init(String hostToBind, int portToBind) {
2.
3.      IOMode ioMode = IOMode.valueOf(conf.ioMode());
4.      EventLoopGroup bossGroup =
5.        NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
          conf.getModuleName() + "-server");
6.      EventLoopGroup workerGroup = bossGroup;
7.  .......

接下來,我們看一下RpcEndpointRef。RpcEndpointRef是一個抽象類,是代理模式。

RpcEndpointRef.scala的源碼如下。

1.   private[spark] abstract class RpcEndpointRef(conf: SparkConf)
2.    extends Serializable with Logging {
3.
4.    private[this] val maxRetries = RpcUtils.numRetries(conf)
5.    private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
6.    private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
7.  ......
8.  def send(message: Any): Unit
9.  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
10. .....

NettyRpcEndpointRef是RpcEndpointRef的具體實現子類。ask方法通過調用nettyEnv.ask傳遞消息。RequestMessage是一個case class。

Spark 2.1.1版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源碼如下。

1.   private[netty] class NettyRpcEndpointRef(
2.      @transient private val conf: SparkConf,
3.      endpointAddress: RpcEndpointAddress,
4.      @transient @volatile private var nettyEnv: NettyRpcEnv)
5.    extends RpcEndpointRef(conf) with Serializable with Logging {
6.  ......
7.   override def ask[T: ClassTag](message: Any, timeout: RpcTimeout):
     Future[T] = {
8.      nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout)
9.    }
10. ......

Spark 2.2.0版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第3行endpointAddress增加了private訪問限制。

 上段代碼中第5行刪掉了Serializable及Logging的繼承。

1.  private[netty] class NettyRpcEndpointRef(
2.     @transient private val conf: SparkConf,
3.     private val endpointAddress: RpcEndpointAddress,
4.     @transient @volatile private var nettyEnv: NettyRpcEnv) extends
       RpcEndpointRef(conf) {

下面從實例的角度來看RPC的應用:

RpcEndpoint的生命周期:構造(constructor)->啟動(onStart)、消息接收(receive、receiveAndReply )、停止(onStop)。

Master中接收消息的方式有兩種:①receive接收消息不回復;②receiveAndReply通過context.reply的方式回復消息。例如,Worker發送Master的RegisterWorker消息,當Master注冊成功,Master就返回Worker RegisteredWorker消息。

Worker啟動時,從生命周期的角度,Worker實例化的時候提交Master進行注冊。

Worker的onStart的源碼如下。

1.    override def onStart() {
2.  .......
3.    registerWithMaster()
4.
5.    metricsSystem.registerSource(workerSource)
6.    metricsSystem.start()
7.    //Attach the worker metrics servlet handler to the web ui after the
      //metrics system is started.
8.    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
9.  }

進入registerWithMaster方法:

Worker的registerWithMaster的源碼如下。

1.  private def registerWithMaster() {
2.     ......
3.         registerMasterFutures = tryRegisterAllMasters()
4.      ....

進入tryRegisterAllMasters方法:在rpcEnv.setupEndpointRef中根據masterAddress、ENDPOINT_NAME名稱獲取RpcEndpointRef。

Spark 2.1.1版本的Worker的tryRegisterAllMasters的源碼如下。

1.  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.   ......
3.            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
              Master.ENDPOINT_NAME)
4.            registerWithMaster(masterEndpoint)
5.       ......

Spark 2.2.0版本的Worker的tryRegisterAllMasters的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第4行registerWithMaster方法調整為sendRegisterMessageToMaster方法。

1.    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.     val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
       Master.ENDPOINT_NAME)
4.              sendRegisterMessageToMaster(masterEndpoint)
5.  ......

基于masterEndpoint,使用registerWithMaster方法注冊。registerWithMaster方法中通過ask方法發送RegisterWorker消息,并要求發送返回結果,返回的消息類型為RegisterWorkerResponse。然后進行模式匹配,如果成功,就handleRegisterResponse。如果失敗,就退出。

Spark 2.1.1版本的Worker.scala的registerWithMaster的源碼如下。

1.  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
2.      masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
3.        workerId, host, port, self, cores, memory, workerWebUiUrl))
4.        .onComplete {
5.          //這是一個非常快的行動,所以可以用ThreadUtils.sameThread
6.          case Success(msg) =>
7.            Utils.tryLogNonFatalError {
8.              handleRegisterResponse(msg)
9.            }
10.         case Failure(e) =>
11.           logError(s"Cannot register with master: ${masterEndpoint
              .address}", e)
12.           System.exit(1)
13.       }(ThreadUtils.sameThread)
14.   }

handleRegisterResponse方法中的模式匹配,收到RegisteredWorker消息進行相應的處理。

Worker.scala的handleRegisterResponse的源碼如下。

1.  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit =
    synchronized {
2.      msg match {
3.        case RegisteredWorker(masterRef, masterWebUiUrl) =>
4.       .......
5.

Spark 2.1.1版本中,registerWithMaster方法中的Worker發送RegisterWorker消息給Master,此時,Worker同步收到Master回復的RegisterWorkerResponse消息以后還須根據成功或失敗的情況,通過handleRegisterResponse進行后續的處理。

Spark 2.2.0版本將registerWithMaster方法調整為sendRegisterMessageToMaster方法。sendRegisterMessageToMaster方法中的Worker發送RegisterWorker消息給Master以后,就完成此次注冊。Master節點收到RegisterWorker消息另行處理,如果注冊成功,Master就發送Worker節點成功的RegisteredWorker消息;如果注冊失敗,Master就發送Worker節點失敗的RegisterWorkerFailed消息。

1.    private def sendRegisterMessageToMaster(masterEndpoint:
      RpcEndpointRef): Unit = {
2.     masterEndpoint.send(RegisterWorker(
3.       workerId,
4.       host,
5.       port,
6.       self,
7.       cores,
8.       memory,
9.       workerWebUiUrl,
10.      masterEndpoint.address))
11.  }
主站蜘蛛池模板: 南投县| 朔州市| 邹城市| 铅山县| 读书| 高州市| 福安市| 泰兴市| 法库县| 衡阳县| 仲巴县| 赤水市| 淮滨县| 青铜峡市| 松桃| 武川县| 沁水县| 远安县| 开阳县| 肥西县| 固镇县| 瑞丽市| 沙田区| 敦化市| 抚远县| 高密市| 城口县| 扶绥县| 河池市| 大埔县| 茌平县| 昌平区| 杨浦区| 武清区| 双桥区| 揭东县| 巴林左旗| 丹巴县| 泰州市| 光山县| 黑水县|