首页 > 网络 > 云计算 >

Spark源码知识讲解之Master启动和通信机制

2017-11-09

Spark源码知识讲解之Master启动和通信机制。Master主要就是用于管理集群,负责资源的调度什么的。它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Master就可以作为一个RpcEndpoint;继承LeaderElectable之后,就可以参见选举

Master主要就是用于管理集群,负责资源的调度什么的。它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Master就可以作为一个RpcEndpoint;继承LeaderElectable之后,就可以参见选举

一 重要属性

RpcEnv rpcEnv:用于注册和维护RpcEndpoint和RpcEndpointRef

RpcAddress:address: 维护了host和port

Int webUiPort : web ui 端口

HashSet[WorkerInfo] workers : 维护全部worker的信息

HashMap[String, WorkerInfo] idToWorker: 维护workid和workerinfo之间的映射关系

HashMap[RpcAddress, WorkerInfo] addressToWorker: 维护worker节点的地址信息个wokrerinfo的映射关系

HashSet[ApplicationInfo] apps: 维护全部application的信息

HashMap[String, ApplicationInfo] idToApp: 维护application id和 ApplicationInfo映射关系

ArrayBuffer[ApplicationInfo] waitingApps: 当前处于等待的application

ArrayBuffer[ApplicationInfo] completedApps:当前已经完成的application

HashMap[RpcEndpointRef, ApplicationInfo]endpointToApp: 维护 RpcEndpointRef和ApplicationInfo之间的映射

HashMap[RpcAddress, ApplicationInfo] addressToApp:维护 RpcAddress和ApplicationInfo之间的映射

HashSet[DriverInfo] drivers:维护所有驱动的信息

ArrayBuffer[DriverInfo] waitingDrivers:当前处于等待的驱动

ArrayBuffer[DriverInfo] completedDrrivers: 当前已经完成的驱动

二 核心方法

2.1 startRpcEnvAndEndpoint

# 注册'Master' RpcEndpoint,并返回RpcEndpointRef,用于发送消息# 向Master的通信终端发送请求,获取绑定的端口号

# 返回一个(RpcEnv, web ui port, rest port)

def startRpcEnvAndEndpoint(host: String, port: Int, webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// 注册'Master' RpcEndpoint,并返回RpcEndpointRef,用于发送消息
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// 向Master的通信终端发送请求,获取绑定的端口号
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
// 返回一个(RpcEnv,web ui port, rest port)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}

2.2 初始化Master,首先就会调用到onStart方法

# 构建web ui 和 启动rest server

# 守护线程启动一个调度机制,定期检查Worker是否超时

# 进行Master HA相关的操作

override def onStart(): Unit = {
 logInfo("Starting Spark master at " + masterUrl)
 logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
 // 构建Master的Web UI,可以查看向Master提交的应用程序等信息
 webUi = new MasterWebUI(this, webUiPort)
 webUi.bind()
 masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
 // 如果启用反向代理
 if (reverseProxy) {
 masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
 logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
 s"Applications UIs are available at $masterWebUiUrl")
 }
 // 守护线程启动一个调度机制,定期检查Worker是否超时
 checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 self.send(CheckForWorkerTimeOut)
 }
 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
 // 如果启用了rest server,那么启动rest服务,可以通过该服务向master提交各种请求
 if (restServerEnabled) {
 val port = conf.getInt("spark.master.rest.port", 6066)
 restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
 }
 restServerBoundPort = restServer.map(_.start())

 masterMetricsSystem.registerSource(masterSource)
 masterMetricsSystem.start()
 applicationMetricsSystem.start()
 masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
 applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

 val serializer = new JavaSerializer(conf)
 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
 // 如果恢复模式是ZOOKEEPER,那么通过zookeeper来持久化恢复状态
 case "ZOOKEEPER" =>
 logInfo("Persisting recovery state to ZooKeeper")
 val zkFactory =
 new ZooKeeperRecoveryModeFactory(conf, serializer)
 (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是文件系统,那么通过文件系统来持久化恢复状态
 case "FILESYSTEM" =>
 val fsFactory =
 new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是定制的,那么指定你定制的全路径类名,然后产生相关操作来持久化恢复状态
 case "CUSTOM" =>
 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
 .newInstance(conf, serializer)
 .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
 case _ =>
 (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
 }
 persistenceEngine = persistenceEngine_
 leaderElectionAgent = leaderElectionAgent_
}

2.3 receive

我们知道Master本身就是一个Actor或者RpcEndpoint,所以他肯定会实现receive方法

/**
* receive是一个偏函数,对于偏函数而言,是PartialFunction[A,B]类的一个实例,A是可以接收的类型,
* B是需要返回的类型。对应着这里的receive方法,所以可以接收任何类型,不需要返回
*
@return
*/
override def receive: PartialFunction[Any, Unit] = {
// 如果是ElectedLeader请求,表示需要重新选举
case ElectedLeader =>
// 根据配置的spark.deploy.recoveryMode,决定使用哪一种recovery 模式,然后决定采用什么持久化engine
// 然后根据持久化engine,读取持久化数据,得到一个已经存储的(applicationInfo,driverInfo,workerInfo)
// 的一个三元组

val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
// 根据读取的持久化数据是否都为空,判断RecoveryState状态是否是alive还是recovering
state
= if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
// 如果处于recovering状态
if (state == RecoveryState.RECOVERING) {
// 开始恢复数据
beginRecovery(storedApps, storedDrivers, storedWorkers)
// 后台线程调度一个线程去定期检查master完成了恢复工作
recoveryCompletionTask
= forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
// 如果是CompleteRecovery,则调用completeRecovery
case CompleteRecovery => completeRecovery()
// 如果是RevokedLeadership请求,则是关闭Master,将会重新触发master Leadership选举
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
// 如果是RegisterApplication请求,则判断是不是leader,从而注册应用程序
case RegisterApplication(description, driver) =>
// 其他的非leader的master是不能进行应用程序的创建和注册
if (state == RecoveryState.STANDBY) {
// ignore,don't send response
} else {
logInfo("Registering app " + description.name)
// 创建应用程序和driver
val app = createApplication(description, driver)
// 注册应用程序
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// 持久化引擎添加该application
persistenceEngine
.addApplication(app)
// 向master发送RegisteredApplication请求,表示注册已完成
driver.send(RegisteredApplication(app.id, self))
schedule()
}
// 如果ExecutorStateChanged请求,表示Executor状态发生改变
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
// 通过application获取运行该app的executor得到指定的executor
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) =>
// 获取appinfo信息
val appInfo = idToApp(appId)
// 更新该executor的状态为指定的状态
val oldState = exec.state
exec.state = state
// 如果指定的状态时处于正在运行的状态,将retry重试次数置为0
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor$execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
// 给executor对应的 application发送ExecutorUpdated请求
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
// 如果指定状态该是已经完成状态
if (ExecutorState.isFinished(state)) {
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// 如果应用程序已经运行完毕,则从appInfo移除这个executor
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
// 该executor所对应的worker也会移除该executor
exec.worker.removeExecutor(exec)
// 重试一定次数,不再无限制循环
val normalExit = exitStatus == Some(0)
// 只要retry次数小于10,那么executor的资源就会不断的调整
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
//重新分配资源
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
// 如果发送DriverStateChanged请求,表示Driver转态发生变化
case DriverStateChanged(driverId, state, exception) =>
// 如果Driver的state为ERROR | FINISHED | KILLED | FAILED,删除它
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpectedstate update for driver $driverId: $state")
}
// 如果发送的是Heartbeat请求,表示心跳检测机制,由worker向master发起的
case Heartbeat(workerId, worker) =>
// 根据workerId获取worker
idToWorker
.get(workerId) match {
// 如果worker存在,则更新workinfo的lastHeartbeat属性,否则表示该worker还没有向master注册
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
// 如果worker集合已经存在这个worker
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Askingit to re-register.")
// 则worker向master发起ReconnectWorker请求
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Thisworker was never registered, so ignoring the heartbeat.")
}
}
// 如果是MasterChangeAcknowledged请求,表示application已经被master确认,将app状态置为waiting
case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
// 如果是WorkerSchedulerStateResponse,表示worker调度状态响应请求
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
// 根据workerId获取worker
idToWorker
.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
// worker状态置为alive
worker.state = WorkerState.ALIVE
// 从指定的executor中过滤出哪些是有效的executor

val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
// 遍历有效的executors
for (exec <- validExecutors) {
// 获取executor所对应的app
val app = idToApp.get(exec.appId).get
// 为app设置executor,比如哪一个worker,多少核数等资源
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
// 将该executor添加到该woker
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
// 将所有的driver设置为RUNNING然后加入到worker中
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
// 如果是WorkerLatestState,表示woreker最近的状态
case WorkerLatestState(workerId, executors, driverIds) =>
// 根据指定的wrokerId获取worker
idToWorker
.get(workerId) match {
case Some(worker) =>
// 比那里指定的executor,判断指定的这些executor是否能够和worker里的executor进行匹配
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
// 如果匹配不上则让worker kill掉这executor
if (!executorMatches) {
// masterdoesn&#39;t recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}
// 遍历传递进来的driver
for (driverId <- driverIds) {
// 判断driver是否匹配
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
// 如果匹配不上则让worker kill掉这driver
if (!driverMatches) {
// masterdoesn&#39;t recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}
// 如果是UnregisterApplication请求,表示不注册app,从当前master中移除
case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
// 如果是CheckForWorkerTimeOut,表示检测worker超时的请求
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()

}

2.4 receiveAndReply

在master启动之后就要开始接受消息,但是这些请求时需要返回结果的

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// 如果请求是RegisterWorker,表示 Master注册新的Worker
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// 如果当前节点状态是standby,返回MasterInStandby
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
// 判断维护的workerid->WorkerInfo映射是否包含这个worker id
// 如果包含返回
wokerid,则返回 worker id重复的RegisterWorkerFailed
context.reply(RegisterWorkerFailed("Duplicateworker ID"))
} else {// 表示当前节点为master,且要注册是worker id之前是不存在的
// 创建worker,并进行注册,注册成功并且返回RegisteredWorker请求,然后开始调度
// 否则返回RegisterWorkerFailed请求,worker注册失败

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same" +
"address:" + workerAddress)
context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "
+ workerAddress))
}
}
// 如果是RequestSubmitDriver请求,表示提交driver,更新master所维护的driver信息
case RequestSubmitDriver(description) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can onlyaccept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 创建driver
val driver = createDriver(description)
// 持久化引擎添加drriver
persistenceEngine.addDriver(driver)
// drivers集合和waitingDrivers集合添加driver
waitingDrivers
+= driver
drivers.add(driver)
schedule()// 开始调度
// 返回成功的请求消息

context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driversuccessfully submitted as ${driver.id}"))
}
// 如果是RequestKillDriver请求,表示kill掉该driver
case RequestKillDriver(driverId) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Canonly kill drivers in ALIVE state."
context.reply(KillDriverResponse(self, driverId, success = false, msg))
} else {
logInfo("Asked to kill driver " + driverId)
// 获取指定的driver
val driver = drivers.find(_.id == driverId)
driver match {
// 从master所维护的driver先关列表或者集合中移除这个driver
case Some(d) =>
// 处于等待的driver集合包含这个driver
if (waitingDrivers.contains(d)) {
// 移除并且发送DriverStateChanged请求
waitingDrivers
-= d
self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
} else {
// 否则让worker kill掉这个driver
d.worker.foreach { w =>
w.endpoint.send(KillDriver(driverId))
}
}
// 返回KillDriverResponse请求
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
context.reply(KillDriverResponse(self, driverId, success = true, msg))
case None =>
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}
// 如果是RequestDriverStatus,表示获取driver状态信息
case RequestDriverStatus(driverId) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can onlyrequest driver status in ALIVE state."
context.reply(
DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
} else {
// 从当前的drivers集合和已经完成的driver集合查找这个driver,并返回相关信息
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
context.reply(DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
case None =>
context.reply(DriverStatusResponse(found = false, None, None, None, None))
}
}
// 如果是RequestMasterState请求,则表示获取master状态
case RequestMasterState =>
context.reply(MasterStateResponse(
address.host, address.port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))
// 如果是BoundPortsRequest,则表示获取绑定的那些端口
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
// 如果是RequestExecutors,则表示为application设置目标数量的executor
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
// 如果是KillExecutors,表示杀掉application指定的executors,返回操作状态
case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))
}

2.5 onDisconnected 断开连接

override def onDisconnected(address: RpcAddress): Unit = {
// Thedisconnected client could&#39;ve been either aworker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}

2.6 beginRecovery 开始恢复

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
// 遍历每一个存储的application,注册该application,并且发送MasterChanged请求
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)
// 将该application状态置为UNKNOWN状态
app.state = ApplicationState.UNKNOWN
// 然后这个app向master发送MasterChanged请求

app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
// 遍历每一个存储的driver, 更新master所维护的driver集合
for (driver <- storedDrivers) {
drivers += driver
}
// 遍历每一个存储的wroker,然后向master注册worker
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
// 注册worker,就是更新master的woker集合,和worker相关的映射列表
registerWorker(worker)
// 将该worker状态置为UNKNOWN状态
worker.state = WorkerState.UNKNOWN
// 然后改worker向master发送MasterChanged请求

worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}

2.7 completeRecovery 完成恢复

// 恢复完毕,重新创建Driver,完成资源的重新分配
private def completeRecovery() {
// 如果状态不是recovering则返回
if (state != RecoveryState.RECOVERING) { return }
// 然后状态置为completing_recovery(正处于恢复中)
state
= RecoveryState.COMPLETING_RECOVERY

// 杀掉那些不响应但是状态不是UNKNOWN的worker和application
workers
.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

// 重新调度未被任何worker声称的driver,即还没有worker来运行
drivers
.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
// 如果是driver是监管者,则重新发起driver,否则删除driver
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
// 然后状态置为alive
state
= RecoveryState.ALIVE
// 重新分配资源

schedule()
logInfo("Recovery complete - resuming operations!")
}

2.8 removeWorker 删除worker

private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 更新该worker状态为DEAD
worker.setState(WorkerState.DEAD)
// 从worker相关的映射中移除这个worker
idToWorker
-= worker.id
addressToWorker-= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
// 遍历worker的executors
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
// 处于该worker上executor里的application发送ExecutorUpdated请求
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
// 该executor状态置为LOST
exec.state = ExecutorState.LOST
// 该executor的application移除executor

exec.application.removeExecutor(exec)
}
// 遍历worker上的driver
for (driver <- worker.drivers.values) {
// 判断driver是否是supervise,如果是重新发起driver,否则移除该driver
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 持久化引擎移除该worker
persistenceEngine
.removeWorker(worker)
}

2.9 removeApplication 删除application

def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 如果master所维护的application集合包含这个application,则移除它,并且相关的application映射也移除这个app
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (reverseProxy) {
webUi.removeProxyTargets(app.id)
}
// 如果已经完成application的大小大于或者等于spark.deploy.retainedApplications
if (completedApps.size >= RETAINED_APPLICATIONS) {
// 计算要删除的数量
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)

completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
// 从已经完成application数组移除计算的需要删除的数量的apps
completedApps
.trimStart(toRemove)
}
// 然后将该app加入到完成列表
completedApps
+= app // Remember it in our history
// 等待列表移除这个app
waitingApps
-= app
// kill掉运行该app的所有的executor
for (exec <- app.executors.values) {
killExecutor(exec)
}
// 重新标记application状态
app.markFinished(state)
// 如果app不出于完成状态(FINISHED),则发送ApplicationRemoved请求
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}
// 持久化引擎移除这个application
persistenceEngine
.removeApplication(app)
// 重新调度
schedule()

// 向所有worker发送该app已经完成的请求ApplicationFinished
workers
.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}

2.10 registerApplication 注册应用程序

private def registerApplication(app: ApplicationInfo): Unit = {
// 获取app的RpcAddress
val appAddress = app.driver.address
// 如果已经注册过,则直接返回
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}

applicationMetricsSystem.registerSource(app.appSource)
apps += app // 添加这个app到master所维护的application集合
// 并且把app相关数据存放到对应application映射列表
idToApp
(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}

2.11 registerWorker 注册worker

private def registerWorker(worker: WorkerInfo): Boolean = {
// 从master维护的worker集合移除状态为dead的worker
workers
.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
// 获取指定要注册worker的RpcAddress
val workerAddress = worker.endpoint.address
// 如果RpcAddress->WorkInfo的映射包含workerAddress,则获取这个worker
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
// 如果状态是UNKNOWN
if (oldWorker.state == WorkerState.UNKNOWN) {
// 意味着这个worker是重新恢复的worker,所以之前的老的worker需要移除掉
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
// 从master维护的worker集合添加这个worker
workers
+= worker
// 更新master中相关worker的映射或者集合列表
idToWorker
(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}

2.12 timeOutDeadWorkers 移除那些超时的worker

/** 移除那些超时的worker */
private def timeOutDeadWorkers() {
// 获取那些lastHeartbeat超过指定的超时时间的worker
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
// 遍历这些worker,如果状态不是dead,然后删除
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker)
} else {
// 如果是dead,满足条件,则从master维护的workers集合移除这个worker
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS+ 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we&#39;ve seen this DEAD worker in the UI, etc.for long enough; cull it
}
}
}
}

相关文章
最新文章
热点推荐