首页 > 网络 > 云计算 >

Spark源码知识讲解之SparkContext

2017-11-09

Spark源码知识讲解之SparkContext介绍。SparkContext是用户和Spark集群交互的唯一入口,它的主要是有两个: 可以用来创建RDD,累加器和广播变量; 初始化应用程序所需要的核心组件,为应用程序准备运行环境

SparkContext是用户和Spark集群交互的唯一入口,它的主要是有两个:

# 可以用来创建RDD,累加器和广播变量

# 初始化应用程序所需要的核心组件,为应用程序准备运行环境

只可以有一个SparkContext实例运行在一个JVM中,所以在创建SparkContext的时候之前,确保之前的SparkContext已经关闭了,即调用stop方法停止当前JVM中唯一运行的SparkContext

一SparkContext核心属性

SparkConf conf : Spark配置参数对象

SparkEnv env:Spark环境对象

JobProgressListener jobProgressListener: Job进度监听器

SparkStatusTracker statusTracker:Spark状态跟踪器

Configuration hadoopConfiguration: Hadoop配置文件对象

Int executorMemory:executor内存

DAGScheduler dagScheduler: 一个高层级的面向Stage的调度器,接收job,将job按照RDD的划分成若干个TaskSet,也称之为Stage,然后以一个个TaskSet的形式提交给底层的调度器TaskScheduler.并且它需要寻求Task的最优调度等

TaskScheduler taskScheduler:TaskScheduler主要是提交TaskSet到集群运算并汇报结果

SchedulerBackend schedulerBackend:控制整个集群资源的控制和调度,向Master注册和Executor反注册,并且向Executor发送Task

RpcEndpointRef heartbeatReceiver:心跳接收器

String applicationId:应用程序id

Option[String] applicationAttemptId: 应用程序尝试id

Option[ExecutorAllocationManager]executorAllocationManager: Executor分配管理器

HashMap[String, String] executorEnvs: 维护的executor和其环境变量的映射关系

Option[ContextCleaner] cleaner:上下文清理器

Option[String] checkpointDir:checkpoint目录

二SparkContext重要方法

2.1SparkContext 初始化各个组件,准备应用程序运行环境

// 初始化事件日志目录
_eventLogDir
=
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
// 初始日志事件的压缩类型
_eventLogCodec
= {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}

// 初始化Job进度监听器
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
// 创建Saprk的执行环境
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// 初始化状态跟踪器
_statusTracker = new SparkStatusTracker(this)
// 获取executor所需要的内存,如果没有配置,默认1024
_executorMemory = _conf.getOption("spark.executor.memory")
 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
 .orElse(Option(System.getenv("SPARK_MEM"))
 .map(warnSparkMem))
 .map(Utils.memoryStringToMb)
 .getOrElse(1024)
// 创建TaskScheduler之前需要注册HeartbeatReceiver心跳接收器,因为executor需要取HeartbeatReceiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// 创建和开始TaskScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
// 初始化SchedulerBackend
_schedulerBackend = sched
// 构造TaskScheduler
_taskScheduler = ts
// 初始化DAGScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// 启动TaskScheduler
_taskScheduler.start()
// 初始化BlockManager
_env.blockManager.initialize(_applicationId)
_executorAllocationManager =
 if (dynamicAllocationEnabled) {
 schedulerBackend match {
 case b: ExecutorAllocationClient =>
 Some(new ExecutorAllocationManager(
 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
 case _ =>
 None
 }
 } else {
 None
 }
// 启动Executor分配管理器
_executorAllocationManager.foreach(_.start())
// 启动上下文清理器
_cleaner =
 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
 Some(new ContextCleaner(this))
 } else {
 None
 }
_cleaner.foreach(_.start())

2.2broadcast

将一个只读广播变量发送给集群中每一个节点,该变量只给集群发送一次

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
 assertNotStopped()
 require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
 "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
 val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
 val callSite = getCallSite
 logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
 cleaner.foreach(_.registerBroadcastForCleanup(bc))
 bc
}

2.3makeRDD 将一个序列或者集合转化为RDD

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
 assertNotStopped()
 val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
 new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}

2.4 textFile

读取来自于HDFS,本地文件系统或者任意一个Hadoop支持的文件系统URI的文件,返回由一个字字符串组成的RDD

def textFile(
 path: String,
 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
 assertNotStopped()
 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
 minPartitions).map(pair => pair._2.toString).setName(path)
}

2.5runJob

在给定的RDD分区集合上运行函数,并且把结果传给指定的结果处理函数

def runJob[T, U: ClassTag](
 rdd: RDD[T],
 func: (TaskContext, Iterator[T]) => U,
 partitions: Seq[Int],
 resultHandler: (Int, U) => Unit): Unit = {
 if (stopped.get()) {
 throw new IllegalStateException("SparkContext has been shutdown")
 }
 val callSite = getCallSite
 val cleanedFunc = clean(func)
 logInfo("Starting job: " + callSite.shortForm)
 if (conf.getBoolean("spark.logLineage", false)) {
 logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
 }
 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
 progressBar.foreach(_.finishAll())
 rdd.doCheckpoint()
}

2.6submitJob

提交一个要执行的job,然后将结果放在一个FutureJob对象中

// 提交一个要执行的job,并且将返回结果放在一个FutureJob中
def submitJob[T, U, R](
 rdd: RDD[T],
 processPartition: Iterator[T] => U,
 partitions: Seq[Int],
 resultHandler: (Int, U) => Unit,
 resultFunc: => R): SimpleFutureAction[R] =
{
 assertNotStopped()
 val cleanF = clean(processPartition)
 val callSite = getCallSite
 val waiter = dagScheduler.submitJob(
 rdd,
 (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
 partitions,
 callSite,
 resultHandler,
 localProperties.get)
 new SimpleFutureAction(waiter, resultFunc)
}
相关文章
最新文章
热点推荐