- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 4435字
- 2019-12-12 17:30:00
6.5 Spark 1.6 RPC內幕解密:運行機制、源碼詳解、Netty與Akka等
Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef為核心的新型架構下的RPC通信方式,就目前的實現而言,其底層依舊是Akka。Akka是基于Actor的分布式消息通信系統,而在Spark 1.6中封裝了Akka,提供更高層的Rpc實現,目的是移除對Akka的依賴,為擴展和自定義Rpc打下基礎。
Spark 2.0版本中Rpc的變化情況如下。
SPARK-6280:從Spark中刪除Akka systemName。
SPARK-7995:刪除AkkaRpcEnv,并從Core的依賴中刪除Akka。
SPARK-7997:刪除開發人員api SparkEnv.actorSystem和AkkaUtils。
RpcEnv是一個抽象類abstract class,傳入SparkConf。RPC環境中[RpcEndpoint]需要注冊自己的名字[RpcEnv]來接收消息。[RpcEnv]將處理消息發送到[RpcEndpointRef]或遠程節點,并提供給相應的[RpcEndpoint]。[RpcEnv]]未被捕獲的異常,[RpcEnv]將使用[RpcCallContext.sendFailure]發送異常給發送者,如果沒有這樣的發送者,則記錄日志NotSerializableException。
RpcEnv.scala的源碼如下。
1. private[spark] abstract class RpcEnv(conf: SparkConf) { 2. 3. private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) 4. ......
RpcCallContext.scala處理異常的方法包括reply、sendFailure、senderAddress,其中reply是給發送者發送一個信息。如果發送者是[RpcEndpoint],它的[RpcEndpoint.receive]將被調用。
其中,RpcCallContext的地址RpcAddress是一個case class,包括hostPort、toSparkURL等成員。
RpcAddress.scala的源碼如下。
1. private[spark] case class RpcAddress(host: String, port: Int) { 2. def hostPort: String = host + ":" + port 3. /**返回一個字符串,該字符串的形式為:spark://host:port*/ 4. def toSparkURL: String = "spark://" + hostPort 5. override def toString: String = hostPort 6. }
RpcAddress伴生對象object RpcAddress屬于包org.apache.spark.rpc,fromURIString方法從String中提取出RpcAddress;fromSparkURL方法也是從String中提取出RpcAddress。說明:case class RpcAddress通過伴生對象object RpcAddress的方法調用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。
伴生對象RpcAddress的源碼如下。
1. private[spark] object RpcAddress { 2. /**返回[RpcAddress]為代表的uri */ 3. def fromURIString(uri: String): RpcAddress = { 4. val uriObj = new java.net.URI(uri) 5. RpcAddress(uriObj.getHost, uriObj.getPort) 6. } 7. /**返回[RpcAddress],編碼的形式:spark://host:port */ 8. def fromSparkURL(sparkUrl: String): RpcAddress = { 9. val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) 10. RpcAddress(host, port) 11. } 12. }
RpcEnv解析:
(1)RpcEnv是RPC的環境(相當于Akka中的ActorSystem),所有的RPCEndpoint都需要注冊到RpcEnv實例對象中(注冊的時候會指定注冊的名稱,這樣客戶端就可以通過名稱查詢到RpcEndpoint的RpcEndpointRef引用,從而進行通信),在RpcEndpoint接收到消息后會調用receive方法進行處理。
(2)RpcEndpoint如果接收到需要reply的消息,就會交給自己的receiveAndReply來處理(回復時是通過RpcCallContext中的relpy方法來回復發送者的),如果不需要reply,就交給receive方法來處理。
(3)RpcEnvFactory是負責創建RpcEnv的,通過create方法創建RpcEnv實例對象,默認用Netty。
RpcEnv示意圖如圖6-4所示。

圖6-4 RPCEnv示意圖
回到RpcEnv.scala的源碼,首先調用RpcUtils.lookupRpcTimeout(conf),返回RPC遠程端點查找時默認Spark的超時時間。方法lookupRpcTimeout中構建了一個RpcTimeout,定義spark.rpc.lookupTimeout。spark.network.timeout的超時時間是120s。
RpcUtils.scala的lookupRpcTimeout方法的源碼如下。
1. def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { 2. RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network .timeout"), "120s") 3. }
進入RpcTimeout,進行RpcTimeout關聯超時的原因描述,當TimeoutException發生的時候,關于超時的額外的上下文將包含在異常消息中。
RpcTimeout.scala的源碼如下。
1. private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String) 2. extends Serializable { 3. 4. /**修正TimeoutException標準的消息包括描述 */ 5. private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = { 6. new RpcTimeoutException(te.getMessage + ". This timeout is controlled by " + timeoutProp, te) 7. }
其中的RpcTimeoutException繼承自TimeoutException。
1. private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException) 2. extends TimeoutException(message) { initCause(cause) }
其中的TimeoutException繼承自Exception。
1. public class TimeoutException extends Exception { 2. ...... 3. public TimeoutException(String message) { 4. super(message); 5. } 6. }
回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出現超時,將加入這些信息。
RpcTimeout.scala的addMessageIfTimeout的源碼如下。
1. def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { 2. //異常已被轉換為一個RpcTimeoutException,就拋出它 3. case rte: RpcTimeoutException => throw rte 4. //其他TimeoutException異常轉換為修改的消息RpcTimeoutException 5. case te: TimeoutException => throw createRpcTimeoutException(te) 6. }
RpcTimeout.scala中的awaitResult方法比較關鍵:awaitResult一直等結果完成并獲得結果,如果在指定的時間沒有返回結果,就拋出異常[RpcTimeoutException]。
Spark 2.1.1版本的RpcTimeout.scala的源碼如下。
1. def awaitResult[T](future: Future[T]): T = { 2. val wrapAndRethrow: PartialFunction[Throwable, T] = { 3. case NonFatal(t) => 4. throw new SparkException("Exception thrown in awaitResult", t) 5. } 6. try { 7. //scalastyle:關閉awaitresult 8. Await.result(future, duration) 9. //scalastyle:打開awaitresult 10. } catch addMessageIfTimeout.orElse(wrapAndRethrow) 11. } 12. }
Spark 2.2.0版本的RpcTimeout.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第2~10行整體被替換,調整為調用ThreadUtils.awaitResult(future, duration)。
1. /** 2. * 等待完成的結果并返回結果。如果結果不在這個超時 timeout 范圍內,就拋出一個異常 * [RpcTimeoutException]表示配置控制超時 3. * 4. * @param future `Future` 將被等待 5. * @throws RpcTimeoutException如果在等待指定的時間future還沒準備好 6. */ 7. def awaitResult[T](future: Future[T]): T = { 8. try { 9. ThreadUtils.awaitResult(future, duration) 10. } catch addMessageIfTimeout 11. } 12. }
其中的future是Future[T]類型,繼承自Awaitable。
1. trait Future[+T] extends Awaitable[T]
Awaitable是一個trait,其中的ready方法是指Duration時間片內,Awaitable的狀態變成completed狀態,就是ready。在Await.result中,result本身是阻塞的。
Awaitable.scala的源碼如下。
1. trait Awaitable[+T] { 2. ...... 3. def ready(atMost: Duration)(implicit permit: CanAwait): this.type 4. ...... 5. @throws(classOf[Exception]) 6. def result(atMost: Duration)(implicit permit: CanAwait): T 7. } 8.
回到RpcEnv.scala中,其中endpointRef方法返回我們注冊的RpcEndpoint的引用,是代理的模式。我們要使用RpcEndpoint,是通過RpcEndpointRef來使用的。Address方法是RpcEnv監聽的地址;setupEndpoint方法注冊時根據RpcEndpoint名稱返回RpcEndpointRef。fileServer返回用于服務文件的文件服務器實例。如果RpcEnv不以服務器模式運行,可能是null值。
RpcEnv.scala的源碼如下。
1. private[spark] abstract class RpcEnv(conf: SparkConf) { 2. 3. private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout (conf) 4. ...... 5. private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef 6. def address: RpcAddress 7. def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef 8. ....... 9. def fileServer: RpcEnvFileServer 10. ......
RpcEnv.scala中的RpcEnvFileServer方法中的RpcEnvConfig是一個case class。RpcEnvFileServer的源碼如下。
1. private[spark] trait RpcEnvFileServer { 2. def addFile(file: File): String 3. ...... 4. private[spark] case class RpcEnvConfig( 5. conf: SparkConf, 6. name: String, 7. bindAddress: String, 8. advertiseAddress: String, 9. port: Int, 10. securityManager: SecurityManager, 11. clientMode: Boolean)
RpcEnv是一個抽象類,其具體的子類是NettyRpcEnv。Spark 1.6版本中包括AkkaRpcEnv和NettyRpcEnv兩種方式。Spark 2.0版本中只有NettyRpcEnv。
下面看一下RpcEnvFactory。RpcEnvFactory是一個工廠類,創建[RpcEnv],必須有一個空構造函數,以便可以使用反射創建。create根據具體的配置,反射出具體的實例對象。RpcEndpoint方法中定義了receiveAndReply方法和receive方法。
RpcEndpoint.scala的源碼如下。
1. private[spark] trait RpcEnvFactory { 2. 3. def create(config: RpcEnvConfig): RpcEnv 4. } 5. private[spark] trait RpcEndpoint { 6. ...... 7. val rpcEnv: RpcEnv 8. 9. ...... 10. final def self: RpcEndpointRef = { 11. require(rpcEnv != null, "rpcEnv has not been initialized") 12. rpcEnv.endpointRef(this) 13. } 14. ....... 15. 16. def receive: PartialFunction[Any, Unit] = { 17. case _ => throw new SparkException(self + " does not implement 'receive'") 18. } 19. ...... 20. def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 21. case _ => context.sendFailure(new SparkException(self + " won't reply anything")) 22. } 23. ......
Master繼承自ThreadSafeRpcEndpoint,接收消息使用receive方法和receiveAndReply方法。
其中,ThreadSafeRpcEndpoint繼承自RpcEndpoint:ThreadSafeRpcEndpoint是一個trait,需要RpcEnv線程安全地發送消息給它。線程安全是指在處理下一個消息之前通過同樣的[ThreadSafeRpcEndpoint]處理一條消息。換句話說,改變[ThreadSafeRpcEndpoint]的內部字段在處理下一個消息是可見的,[ThreadSafeRpcEndpoint]的字段不需要volatile或equivalent,不能保證對于不同的消息在相同的[ThreadSafeRpcEndpoint]線程中來處理。
1. private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
回到RpcEndpoint.scala,重點看一下receiveAndReply方法和receive方法。receive方法處理從[RpcEndpointRef.send]或者[RpcCallContext.reply]發過來的消息,如果收到一個不匹配的消息,[SparkException]會拋出一個異常onError。receiveAndReply方法處理從[RpcEndpointRef.ask]發過來的消息,如果收到一個不匹配的消息,[SparkException]會拋出一個異常onError。receiveAndReply方法返回PartialFunction對象。
RpcEndpoint.scala的源碼如下。
1. def receive: PartialFunction[Any, Unit] = { 2. case _ => throw new SparkException(self + " does not implement 'receive'") 3. } 4. 5. ...... 6. def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 7. case _ => context.sendFailure(new SparkException(self + " won't reply anything")) 8. }
在Master中,Receive方法中收到消息以后,不需要回復對方。
Master.scala的Receive方法的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case ElectedLeader => 3. ..... 4. recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { 5. override def run(): Unit = Utils.tryLogNonFatalError { 6. self.send(CompleteRecovery) 7. } 8. }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 9. } 10. 11. case CompleteRecovery => completeRecovery() 12. 13. 14. case RevokedLeadership => 15. logError("Leadership has been revoked -- master shutting down.") 16. System.exit(0) 17. 18. case RegisterApplication(description, driver) => 19. ...... 20. schedule() 21.
在Master中,receiveAndReply方法中收到消息以后,都要通過context.reply回復對方。
在Master中,RpcEndpoint如果接收到需要reply的消息,就會交給自己的receiveAndReply來處理(回復時是通過RpcCallContext中的relpy方法來回復發送者的),如果不需要reply,就交給receive方法來處理。
RpcCallContext的源碼如下。
1. private[spark] trait RpcCallContext { 2. 3. /** *回復消息的發送者。如果發送者是[RpcEndpoint],其[RpcEndpoint.receive] *將被調用 5. */ 6. def reply(response: Any): Unit 7. 8. /** *向發送方報告故障 9. */ 10. def sendFailure(e: Throwable): Unit 11. 12. /** *此消息的發送者 13. */ 14. def senderAddress: RpcAddress 15. }
回到RpcEndpoint.scala,RpcEnvFactory是一個trait,負責創建RpcEnv,通過create方法創建RpcEnv實例對象,默認用Netty。
RpcEndpoint.scala的源碼如下。
1. private[spark] trait RpcEnvFactory { 2. 3. def create(config: RpcEnvConfig): RpcEnv 4. }
RpcEnvFactory的create方法沒有具體的實現。下面看一下RpcEnvFactory子類NettyRpcEnvFactory中create的具體實現,使用的方式為nettyEnv。
NettyRpcEnv.scala的create方法的源碼如下。
1. def create(config: RpcEnvConfig): RpcEnv = { 2. val sparkConf = config.conf 3. //在多個線程中使用JavaSerializerInstance 是安全的。然而,如果將來計劃支持 //KryoSerializer,必須使用ThreadLocal來存儲SerializerInstance 4. 5. val javaSerializerInstance = 6. new JavaSerializer(sparkConf).newInstance().asInstanceOf [JavaSerializerInstance] 7. val nettyEnv = 8. new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, 9. config.securityManager) 10. if (!config.clientMode) { 11. val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => 12. nettyEnv.startServer(config.bindAddress, actualPort) 13. (nettyEnv, nettyEnv.address.port) 14. } 15. try { 16. Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 17. } catch { 18. case NonFatal(e) => 19. nettyEnv.shutdown() 20. throw e 21. } 22. } 23. nettyEnv 24. } 25. }
在Spark 2.0版本中回溯一下NettyRpcEnv的實例化過程。在SparkContext實例化時調用createSparkEnv方法。
SparkContext.scala的源碼如下。
1. ...... 2. _env = createSparkEnv(_conf, isLocal, listenerBus) 3. SparkEnv.set(_env) 4. ...... 5. 6. private[spark] def createSparkEnv( 7. conf: SparkConf, 8. isLocal: Boolean, 9. listenerBus: LiveListenerBus): SparkEnv = { 10. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext. numDriverCores(master)) 11. } 12. 13. .....
SparkContext的createSparkEnv方法中調用了SparkEnv.createDriverEnv方法。下面看一下createDriverEnv方法的實現,其調用了create方法。
SparkEnv.scala的createDriverEnv的源碼如下。
1. private[spark] def createDriverEnv( 2. ....... 3. create( 4. conf, 5. SparkContext.DRIVER_IDENTIFIER, 6. bindAddress, 7. advertiseAddress, 8. port, 9. isLocal, 10. numCores, 11. ioEncryptionKey, 12. listenerBus = listenerBus, 13. mockOutputCommitCoordinator = mockOutputCommitCoordinator 14. ) 15. } 16. 17. private def create( 18. ........ 19. val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, 20. securityManager, clientMode = !isDriver) 21. ......
在RpcEnv.scala中,creat方法直接調用new()函數創建一個NettyRpcEnvFactory,調用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory繼承自RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接使用NettyRpcEnvFactory的方式。
RpcEnv.scala的源碼如下。
1. private[spark] object RpcEnv { 2. ....... 3. 4. def create( 5. name: String, 6. bindAddress: String, 7. advertiseAddress: String, 8. port: Int, 9. conf: SparkConf, 10. securityManager: SecurityManager, 11. clientMode: Boolean): RpcEnv = { 12. val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, 13. clientMode) 14. new NettyRpcEnvFactory().create(config) 15. }
NettyRpcEnvFactory().create的方法如下。
NettyRpcEnv.scala的源碼如下。
1. private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { 2. 3. def create(config: RpcEnvConfig): RpcEnv = { 4. ...... 5. val nettyEnv = 6. new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, 7. config.securityManager) 8. if (!config.clientMode) { 9. val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => 10. nettyEnv.startServer(config.bindAddress, actualPort) 11. (nettyEnv, nettyEnv.address.port) 12. } 13. try { 14. Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 15. } catch { 16. case NonFatal(e) => 17. nettyEnv.shutdown() 18. throw e 19. } 20. } 21. nettyEnv 22. } 23. }
在NettyRpcEnvFactory().create中調用new()函數創建一個NettyRpcEnv。NettyRpcEnv傳入SparkConf參數,包括fileServer、startServer等方法。
NettyRpcEnv的源碼如下。
1. private[netty] class NettyRpcEnv( 2. val conf: SparkConf, 3. javaSerializerInstance: JavaSerializerInstance, 4. host: String, 5. securityManager: SecurityManager) extends RpcEnv(conf) with Logging { 6. 7. ...... 8. override def fileServer: RpcEnvFileServer = streamManager 9. ...... 10. def startServer(bindAddress: String, port: Int): Unit = { 11. val bootstraps: java.util.List[TransportServerBootstrap] = 12. if (securityManager.isAuthenticationEnabled()) { 13. java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) 14. } else { 15. java.util.Collections.emptyList() 16. } 17. server = transportContext.createServer(bindAddress, port, bootstraps) 18. dispatcher.registerRpcEndpoint( 19. RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) 20. }
NettyRpcEnv.scala的startServer中,通過transportContext.createServer創建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注冊RpcEndpoint。在createServer方法中調用new()函數創建一個TransportServer。
TransportContext的createServer方法的源碼如下。
1. public TransportServer createServer( 2. String host, int port, List<TransportServerBootstrap> bootstraps) { 3. return new TransportServer(this, host, port, rpcHandler, bootstraps); 4. }
TransportServer.java的源碼如下。
1. public TransportServer( 2. TransportContext context, 3. String hostToBind, 4. int portToBind, 5. RpcHandler appRpcHandler, 6. List<TransportServerBootstrap> bootstraps) { 7. this.context = context; 8. this.conf = context.getConf(); 9. this.appRpcHandler = appRpcHandler; 10. this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull (bootstraps)); 11. 12. try { 13. init(hostToBind, portToBind); 14. } catch (RuntimeException e) { 15. JavaUtils.closeQuietly(this); 16. throw e; 17. } 18. }
TransportServer.java中的關鍵方法是init,這是Netty本身的實現內容。
TransportServer.java中的init的源碼如下。
1. private void init(String hostToBind, int portToBind) { 2. 3. IOMode ioMode = IOMode.valueOf(conf.ioMode()); 4. EventLoopGroup bossGroup = 5. NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); 6. EventLoopGroup workerGroup = bossGroup; 7. .......
接下來,我們看一下RpcEndpointRef。RpcEndpointRef是一個抽象類,是代理模式。
RpcEndpointRef.scala的源碼如下。
1. private[spark] abstract class RpcEndpointRef(conf: SparkConf) 2. extends Serializable with Logging { 3. 4. private[this] val maxRetries = RpcUtils.numRetries(conf) 5. private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) 6. private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) 7. ...... 8. def send(message: Any): Unit 9. def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 10. .....
NettyRpcEndpointRef是RpcEndpointRef的具體實現子類。ask方法通過調用nettyEnv.ask傳遞消息。RequestMessage是一個case class。
Spark 2.1.1版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源碼如下。
1. private[netty] class NettyRpcEndpointRef( 2. @transient private val conf: SparkConf, 3. endpointAddress: RpcEndpointAddress, 4. @transient @volatile private var nettyEnv: NettyRpcEnv) 5. extends RpcEndpointRef(conf) with Serializable with Logging { 6. ...... 7. override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { 8. nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout) 9. } 10. ......
Spark 2.2.0版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第3行endpointAddress增加了private訪問限制。
上段代碼中第5行刪掉了Serializable及Logging的繼承。
1. private[netty] class NettyRpcEndpointRef( 2. @transient private val conf: SparkConf, 3. private val endpointAddress: RpcEndpointAddress, 4. @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
下面從實例的角度來看RPC的應用:
RpcEndpoint的生命周期:構造(constructor)->啟動(onStart)、消息接收(receive、receiveAndReply )、停止(onStop)。
Master中接收消息的方式有兩種:①receive接收消息不回復;②receiveAndReply通過context.reply的方式回復消息。例如,Worker發送Master的RegisterWorker消息,當Master注冊成功,Master就返回Worker RegisteredWorker消息。
Worker啟動時,從生命周期的角度,Worker實例化的時候提交Master進行注冊。
Worker的onStart的源碼如下。
1. override def onStart() { 2. ....... 3. registerWithMaster() 4. 5. metricsSystem.registerSource(workerSource) 6. metricsSystem.start() 7. //Attach the worker metrics servlet handler to the web ui after the //metrics system is started. 8. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) 9. }
進入registerWithMaster方法:
Worker的registerWithMaster的源碼如下。
1. private def registerWithMaster() { 2. ...... 3. registerMasterFutures = tryRegisterAllMasters() 4. ....
進入tryRegisterAllMasters方法:在rpcEnv.setupEndpointRef中根據masterAddress、ENDPOINT_NAME名稱獲取RpcEndpointRef。
Spark 2.1.1版本的Worker的tryRegisterAllMasters的源碼如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 4. registerWithMaster(masterEndpoint) 5. ......
Spark 2.2.0版本的Worker的tryRegisterAllMasters的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第4行registerWithMaster方法調整為sendRegisterMessageToMaster方法。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 4. sendRegisterMessageToMaster(masterEndpoint) 5. ......
基于masterEndpoint,使用registerWithMaster方法注冊。registerWithMaster方法中通過ask方法發送RegisterWorker消息,并要求發送返回結果,返回的消息類型為RegisterWorkerResponse。然后進行模式匹配,如果成功,就handleRegisterResponse。如果失敗,就退出。
Spark 2.1.1版本的Worker.scala的registerWithMaster的源碼如下。
1. private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( 3. workerId, host, port, self, cores, memory, workerWebUiUrl)) 4. .onComplete { 5. //這是一個非常快的行動,所以可以用ThreadUtils.sameThread 6. case Success(msg) => 7. Utils.tryLogNonFatalError { 8. handleRegisterResponse(msg) 9. } 10. case Failure(e) => 11. logError(s"Cannot register with master: ${masterEndpoint .address}", e) 12. System.exit(1) 13. }(ThreadUtils.sameThread) 14. }
handleRegisterResponse方法中的模式匹配,收到RegisteredWorker消息進行相應的處理。
Worker.scala的handleRegisterResponse的源碼如下。
1. private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { 2. msg match { 3. case RegisteredWorker(masterRef, masterWebUiUrl) => 4. ....... 5.
Spark 2.1.1版本中,registerWithMaster方法中的Worker發送RegisterWorker消息給Master,此時,Worker同步收到Master回復的RegisterWorkerResponse消息以后還須根據成功或失敗的情況,通過handleRegisterResponse進行后續的處理。
Spark 2.2.0版本將registerWithMaster方法調整為sendRegisterMessageToMaster方法。sendRegisterMessageToMaster方法中的Worker發送RegisterWorker消息給Master以后,就完成此次注冊。Master節點收到RegisterWorker消息另行處理,如果注冊成功,Master就發送Worker節點成功的RegisteredWorker消息;如果注冊失敗,Master就發送Worker節點失敗的RegisterWorkerFailed消息。
1. private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.send(RegisterWorker( 3. workerId, 4. host, 5. port, 6. self, 7. cores, 8. memory, 9. workerWebUiUrl, 10. masterEndpoint.address)) 11. }
- Visualforce Development Cookbook(Second Edition)
- 商戰數據挖掘:你需要了解的數據科學與分析思維
- WOW!Illustrator CS6完全自學寶典
- 網絡綜合布線技術
- CompTIA Linux+ Certification Guide
- 可編程序控制器應用實訓(三菱機型)
- 具比例時滯遞歸神經網絡的穩定性及其仿真與應用
- 計算機組網技術
- INSTANT Munin Plugin Starter
- Excel 2010函數與公式速查手冊
- 單片機原理實用教程
- IBM? SmartCloud? Essentials
- 工業機器人集成應用
- 信息系統安全保障評估
- 從實踐中學嵌入式Linux操作系統