首页 > 网络 > 云计算 >

Spark源码知识讲解之Master状态改变处理机制原理

2017-11-09

Spark源码知识讲解之Master状态改变处理机制原理。一、Master故障挥着宕机,可能触发新的Master选举,当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。

一Master故障挥着宕机,可能触发新的Master选举

当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
 storedWorkers: Seq[WorkerInfo]) {
 // 遍历每一个存储的application,注册该application,并且发送MasterChanged请求
 for (app <- storedApps) {
 logInfo("Trying to recover app: " + app.id)
 try {
 registerApplication(app)
 // 将该application状态置为UNKNOWN状态
 app.state = ApplicationState.UNKNOWN
 // 然后这个app向master发送MasterChanged请求
 app.driver.send(MasterChanged(self, masterWebUiUrl))
 } catch {
 case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
 }
 }
 // 遍历每一个存储的driver, 更新master所维护的driver集合
 for (driver <- storedDrivers) {
 drivers += driver
 }
 // 遍历每一个存储的wroker,然后向master注册worker
 for (worker <- storedWorkers) {
 logInfo("Trying to recover worker: " + worker.id)
 try {
 // 注册worker,就是更新master的woker集合,和worker相关的映射列表
 registerWorker(worker)
 // 将该worker状态置为UNKNOWN状态
 worker.state = WorkerState.UNKNOWN
 // 然后改worker向master发送MasterChanged请求
 worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
 } catch {
 case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
 }
 }
}

二Worker和AppClient会接受到来自Master的MasterChanged消息

2.1 Worker在收到MasterChanged消息

# 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册

# 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作

case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
changeMaster(masterRef, masterWebUiUrl)
// 创建当前节点executors的简单描述对象ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

2.2 AppClient在收到MasterChanged消息

# 更新master

# 向新的master发送MasterChangeAcknowledged消息

case MasterChanged(masterRef, masterWebUiUrl) =>
 logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
 // 更新master
 master = Some(masterRef)
 alreadyDisconnected = false
 // 向新的master发送MasterChangeAcknowledged消息
 masterRef.send(MasterChangeAcknowledged(appId.get))

三 Master会接受来自Worker和AppClient的消息

3.1 Master在收到Worker的WorkerSchedulerStateResponse消息

由于这是新的master,所以worker需要重新注册,然后新的master再次把之前相关的应用程序在worker上进行恢复

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
 // 根据workerId获取worker
 idToWorker.get(workerId) match {
 case Some(worker) =>
 logInfo("Worker has been re-registered: " + workerId)
 // worker状态置为alive
 worker.state = WorkerState.ALIVE
 // 从指定的executor中过滤出哪些是有效的executor
 val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
 // 遍历有效的executors
 for (exec <- validExecutors) {
 // 获取executor所对应的app
 val app = idToApp.get(exec.appId).get
 // 为app设置executor,比如哪一个worker,多少核数等资源
 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
 // 将该executor添加到该wokerworker.addExecutor(execInfo)
 execInfo.copyState(exec)
 }
 // 将所有的driver设置为RUNNING然后加入到worker中
 for (driverId <- driverIds) {
 drivers.find(_.id == driverId).foreach { driver =>
 driver.worker = Some(worker)
 driver.state = DriverState.RUNNING
 worker.drivers(driverId) = driver
 }
 }
 case None =>
 logWarning("Scheduler state from unknown worker: " + workerId)
 }
 // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
 if (canCompleteRecovery) { completeRecovery() }

3.2 Master收到AppClient的MasterChangeAcknowledged消息

# 更新application状态为WAITTING

# 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作

case MasterChangeAcknowledged(appId) =>
 idToApp.get(appId) match {
 case Some(app) =>
 logInfo("Application has been re-registered: " + appId)
 app.state = ApplicationState.WAITING
 case None =>
 logWarning("Master change ack from unknown app: " + appId)
 }
 // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
 if (canCompleteRecovery) { completeRecovery() }
相关文章
最新文章
热点推荐