首页 > 网络 > 云计算 >

第10课:SparkStreaming源码解读之流数据不断接收全生命周期彻底研究和思考

2016-05-24

博文的目标是:Spark Streaming在接收数据的全生命周期贯通。组织思路如下:a) 接收数据的架构模式的设计;b) 然后再具体源码分析。

博文的目标是:
Spark Streaming在接收数据的全生命周期贯通

组织思路如下:
a) 接收数据的架构模式的设计
b) 然后再具体源码分析

接收数据的架构模式的设计
1. 当有Spark Streaming有application的时候Spark Streaming会持续不断的接收数据。
2. 一般Receiver和Driver不在一个进程中的,所以接收到数据之后要不断的汇报给Driver。
3. Spark Streaming要接收数据肯定要使用消息循环器,循环器不断的接收到数据之后,然后将数据存储起来,再将存储完的数据汇报给Driver。
4. Spark Streaming数据接收的过程也是MVC的架构,M是model也就是Receiver.
C是Control也就是存储级别的ReceiverSupervisor。V是界面。
5. ReceiverSupervisor是控制器,Receiver的启动是靠ReceiverTracker启动的,Receiver接收到数据之后是靠ReceiverSupervisor存储数据的。然后Driver就获得元数据也就是界面,通过界面来操作底层的数据,这个元数据就相当于指针。
Spark Streaming接收数据流程如下:
这里写图片描述

具体源码分析<喎"http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vc3Ryb25nPjwvcD4NCjEuIFJlY2VpdmVyVHJhY2tlcs2ouf23osvNSm9itcS3vcq9o6yyosfSw7+49kpvYta709DSu7j2VGFza6OssqLH0lRhc2vW0Na7zai5/dK7uPZSZWNlaXZlclN1cGVydmlzb3LG9Lav0ru49lJlY2VpdmVyLjxiciAvPg0KMi4gz8LNvL7NysdSZWNlaXZlcsb0tq+1xMH3s8zNvKOsz9bU2r7NtNNSZWNlaXZlclRyYWNrZXK1xHN0YXJ0v6rKvL3xzOy1xMLDs8yhozxiciAvPg0KPGltZyBhbHQ9"这里写图片描述" src="http://img.blog.csdn.net/20160523082933076" title="\" />
3. Start方法中创建Endpoint实例

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}
4.  LaunchReceivers源码如下:
/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
//此时的endpoint就是前面实例化的ReceiverTrackerEndpoint
  endpoint.send(StartAllReceivers(receivers))
}
5.  从图上可以知道,send发送消息之后,ReceiverTrackerEndpoint的receive就接收到了消息。
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }
6.  startReceiver源码如下:
/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
    // It&#39;s okay to start when trackerState is Initialized or Started
    !(isTrackerStopping || isTrackerStopped)
  }

  val receiverId = receiver.streamId
  if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    return
  }

  val checkpointDirOption = Option(ssc.checkpointDir)
  val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// startReceiverFunc就是我们通过RDD启动Job的那个Func
  // Function to start the receiver on the worker node
//此时虽然是iterator但是就是一个Receiver,因为你如果追溯一下前面StartReceiver被调用的时候是for循环遍历Receivers.
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
//此时的receiver是根据数据输入来源创建的InputDStream
//例如socketInputDStream他有自己的receiver也就是SocketReceiver
//此时receiver就相当于一个引用句柄。他只是引用的描述
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
//当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It&#39;s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
//此时Seq(receiver)中只有一个Receiver
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
//专门为了创建receiver而创建的RDD
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  // We will keep restarting the receiver job until ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}
6.  startReceiver源码如下:
/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
    // It&#39;s okay to start when trackerState is Initialized or Started
    !(isTrackerStopping || isTrackerStopped)
  }

  val receiverId = receiver.streamId
  if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    return
  }

  val checkpointDirOption = Option(ssc.checkpointDir)
  val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// startReceiverFunc就是我们通过RDD启动Job的那个Func
  // Function to start the receiver on the worker node
//此时虽然是iterator但是就是一个Receiver,因为你如果追溯一下前面StartReceiver被调用的时候是for循环遍历Receivers.
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
//此时的receiver是根据数据输入来源创建的InputDStream
//例如socketInputDStream他有自己的receiver也就是SocketReceiver
//此时receiver就相当于一个引用句柄。他只是引用的描述
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
//当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It&#39;s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
//此时Seq(receiver)中只有一个Receiver
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
//专门为了创建receiver而创建的RDD
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  // We will keep restarting the receiver job until ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}
7.  现在就追踪一下receiver参数的传递过程。先找到startReceiver在哪里调用。
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }
8.  可以看出receiver是StartAllReceivers方法传入的,继续追踪StartAllReceivers。
通过getReceiver就获得了receiver的对象。
/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

submitJob的时候就提交了作业,在具体的节点上运行Job,此时是通过ReceiverSupervisorImpl完成的。
此时在ReceiverTracker的startReceiver调用的时候完成了两件事:

这里写图片描述
ReceiverTrackerImpl的初始化和start方法的调用。

第一步:ReceiverTrackerImpl的初始化
1. ReceiverSupervisor负责接收receiver接收的数据,之后,ReceiverSupervisor会存储数据,然后汇报给Driver。Receiver是一条一条的接收数据(Kafka是 Key Value的形式)。

/**
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl(
2.  ReceiverSupervisorImpl初始化源码如下:
/** Remote RpcEndpointRef for the ReceiverTracker */
//负责链接ReceiverTracker的消息通信体
private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)

/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
//endpoint负责在Driver端接收ReceiverTracker发送来的消息。
private val endpoint = env.rpcEnv.setupEndpoint(
  "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
    override val rpcEnv: RpcEnv = env.rpcEnv

    override def receive: PartialFunction[Any, Unit] = {
      case StopReceiver =>
        logInfo("Received stop signal")
        ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
//每个Batch处理完数据之后,Driver的ReceiverTracker会发消息给ReceiverTrackerImpl要求清理Block信息。
      case CleanupOldBlocks(threshTime) =>
        logDebug("Received delete old batch signal")
        cleanupOldBlocks(threshTime)
//限制receiver接收数据的,也就是限流的。这样的话就可以动态的改变receiver
//的数据接收速度。
      case UpdateRateLimit(eps) =>
        logInfo(s"Received a new rate limit: $eps.")
        registeredBlockGenerators.foreach { bg =>
          bg.updateRate(eps)
        }
    }
  })

cleanupOldBlocks源码如下:

  private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
    logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
    receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
  }
}
3.  从对象中获得限流的速度,这对于实际生产环境非常重要,因为有时间数据请求量非常的多,整个集群又处理不完或者来不及处理,这个时候如果不限流的话,延迟就非常的高。
/**
   * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
   * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
   *
   * @param newRate A new rate in events per second. It has no effect if it&#39;s 0 or negative.
   */
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }
}

至此上面就完成了ReceiverSupervisorImpl的初始化。这里只是简单的提了一些,后面还会详解
这里写图片描述

第二步:ReceiverTrackerImpl的start方法被调用。
在ReceiverTrackerImpl的函数中,并没有start方法,这个时候的实现是在其父类start方法中实现的。

4.  在supervisor启动的时候会调用ReceiverSupervisor的start方法
/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}
5.  onstart方法: 此方法必须在receiver.onStart()之前被调用,来确保BlockGenerator被实例化和启动。Receiver在接收数据的时候是通过BlockGenerator转换成Block形式,因为Receiver一条一条的接收数据,需要将此数据合并成Block,RDD的处理单位是Block。
/**
 * Called when supervisor is started.
 * Note that this must be called before the receiver.onStart() is called to ensure
 * things like [[BlockGenerator]]s are started before the receiver starts sending data.
 */
protected def onStart() { }
6.  onStart方法具体实现是在RceiverSupervisorImpl方法中实现的。
override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

什么是BlockGenerator?
将接收到的数据以Batch的方式存在,并且以特定的频率存储。
BlockGenerator会启动两条线程:
1. 一条线程会周期性的把Receiver接收到的数据合并成Block。
2. 另一条线程是把接收到的数据使用BlockManager存储。
BlockGenerator继承自RateLimiter,由此可以看出无法限定流熟度,但是可以限定存储的速度,转过来限制流进来的速度。

/**
 * Generates batches of objects received by a
 * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
 * named blocks at regular intervals. This class starts two threads,
 * one to periodically start a new batch and prepare the previous batch of as a block,
 * the other to push the blocks into the block manager.
 *
 * Note: Do not create BlockGenerator instances directly inside receivers. Use
 * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
 */
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {

BlockGenerator是怎么产生的?
7. 在ReceiverSupervisorImpl的createBlockGenerator方法中实现了BlockGenerator的创建。

override def createBlockGenerator(
    blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
  // Cleanup BlockGenerators that have already been stopped
  registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
//一个streamId指服务于一个BlockGenerator
  val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
  registeredBlockGenerators += newBlockGenerator
  newBlockGenerator
}
8.  回到上面ReceiverTrackerImpl的onStart方法
override protected def onStart() {
//启动BlockGenerator的定时器不断的把数据放在内存中的Buffer中然后将多条Buffer合并成Block,此时只是准备去接收Receiver的数据
  registeredBlockGenerators.foreach { _.start() }
}
9.  BlockGenerator的start方法启动了BlockGenerator的两条线程。
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
  if (state == Initialized) {
    state = Active
    blockIntervalTimer.start()
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  } else {
    throw new SparkException(
      s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
  }
}
blockIntervalTimer是RecurringTimer实例。
private val blockIntervalTimer =
  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
11. blockIntervalTimer的start方法。
/**
 * Start at the earliest time it can start based on the period.
 */
def start(): Long = {
  start(getStartTime())
}
12. 启动线程
/**
 * Start at the given start time.
 */
def start(startTime: Long): Long = synchronized {
  nextTime = startTime
  thread.start()
  logInfo("Started timer for " + name + " at time " + nextTime)
  nextTime
}
13. Tread启动loop.
private val thread = new Thread("RecurringTimer - " + name) {
  setDaemon(true)
  override def run() { loop }
}
14. Loop也就会调用triggerActionForNextInterval()
  /**
   * Repeatedly call the callback every interval.
   */
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

}
15. 此时callback函数就会回调updateCurrentBuffer方法。

private def triggerActionForNextInterval(): Unit = {
  clock.waitTillTime(nextTime)
  callback(nextTime)
  prevTime = nextTime
  nextTime += period
  logDebug("Callback for " + name + " called at time " + prevTime)
}
16. 在RecurringTimer实例创建的时候,第三个参数传入的就是updateCurrentBuffer方法。
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {
17. 把接收到的数据放入到Buffer缓存中,然后再把Buffer按照一定的大小合并成Block.
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
  try {
    var newBlock: Block = null
    synchronized {
      if (currentBuffer.nonEmpty) {
        val newBlockBuffer = currentBuffer
        = new ArrayBuffer[Any]
        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
        listener.onGenerateBlock(blockId)
        newBlock = new Block(blockId, newBlockBuffer)
      }
    }

    if (newBlock != null) {
//将生成成功的Block放入到队列中
      blocksForPushing.put(newBlock)  // put is blocking when queue is full
    }
  } catch {
    case ie: InterruptedException =>
      logInfo("Block updating timer thread was interrupted")
    case e: Exception =>
      reportError("Error in block updating thread", e)
  }
}

BlockGenerator的start启动就分析完了,至此准备好接收Receiver数据了。
BlockGenerator的start启动过程如下:
这里写图片描述
至此ReceiverTrackerImpl的onStart就介绍完了。

18. 回到ReceiverSupervisor的start方法。
/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}
19. 启动receiver
** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}

onReceiverStart方法在ReceiverSupervisorImpl中实现的。

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
//此时的endpoint是Receiver的管理者ReceiverSupervisorImpl的消息循环体
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
//Driver端的Endpoint,此时的Boolean必须为true的时候才可以正在startReceiver等后续的工作。
//此时的消息就发送给了ReceiverTracker
  trackerEndpoint.askWithRetry[Boolean](msg) //此时就将消息发送给Driver
}

Driver端:ReceiverTrackerEndpoint 接收到ReceiveSupervisor发来的消息。

20. 在receiverAndReply中接收的,源码如下:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
  case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
      walBatchingThreadPool.execute(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          if (active) {
            context.reply(addBlock(receivedBlockInfo))
          } else {
            throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
          }
        }
      })
    } else {
      context.reply(addBlock(receivedBlockInfo))
    }
  case DeregisterReceiver(streamId, message, error) =>
    deregisterReceiver(streamId, message, error)
    context.reply(true)
  // Local messages
  case AllReceiverIds =>
    context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
  case StopAllReceivers =>
    assert(isTrackerStopping || isTrackerStopped)
    stopReceivers()
    context.reply(true)
}
21. registerReceiver源码如下:
/** Register a receiver */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {
//判断streamId是否是元数据信息中的
  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }

  if (isTrackerStopping || isTrackerStopped) {
    return false
  }

  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
  val acceptableExecutors = if (scheduledLocations.nonEmpty) {
      // This receiver is registering and it&#39;s scheduled by
      // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
      scheduledLocations.get
    } else {
      // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
      // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
      scheduleReceiver(streamId)
    }

  def isAcceptable: Boolean = acceptableExecutors.exists {
    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
    case loc: TaskLocation => loc.host == host
  }

  if (!isAcceptable) {
    // Refuse it since it&#39;s scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
  }
}
22. 在Receiver的onstart方法必须初始化所有的资源内容,包括线程,buffer等来准备接收数据,并且必须是非阻塞的。
**
 * This method is called by the system when the receiver is started. This function
 * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
 * This function must be non-blocking, so receiving the data must occur on a different
 * thread. Received data can be stored with Spark by calling `store(data)`.
 *
 * If there are errors in threads started here, then following options can be done
 * (i) `reportError(...)` can be called to report the error to the driver.
 * The receiving of data will continue uninterrupted.
 * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
 * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
 * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
 * immediately, and then `onStart()` after a delay.
 */
def onStart()
23. 例如SocketReceiver这里具体socketReceiver.启动线程接收数据。
def onStart() {
  // Start the thread that receives data over a connection
  new Thread("Socket Receiver") {
    setDaemon(true)
    override def run() { receive() }
  }.start()
}
24. 在接收数据的时候不断的存储。
/** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }
}
25. 使用ReceiverSupervisorImpl去存储数据。
/**
 * Store a single item of received data to Spark&#39;s memory.
 * These single items will be aggregated together into data blocks before
 * being pushed into Spark&#39;s memory.
 */
def store(dataItem: T) {
  supervisor.pushSingle(dataItem)
}
26. 最终调用的是BlockGenerator的addData方法去存储数据。
/**
 * Push a single data item into the buffer.
 */
def addData(data: Any): Unit = {
  if (state == Active) {
    waitToPush()
    synchronized {
      if (state == Active) {
// currentBuffer不断的存储数据。
        currentBuffer += data
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
  } else {
    throw new SparkException(
      "Cannot add data as BlockGenerator has not been started or has been stopped")
  }
}
27. currentBuffer是一个ArrayBuffer.
@volatile private var currentBuffer = new ArrayBuffer[Any]

自此,就知道了Spark Streaming使用Receiver接收数据,但是这些数据何时转换成Block?
转换成Block是由BlockGenerator完成的。
1. 在BlockGenerator的start方法中使用定时器把数据不断的生成Block

/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
  if (state == Initialized) {
    state = Active
    blockIntervalTimer.start()
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  } else {
    throw new SparkException(
      s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
  }
}
2.  blockIntervalTimer赋值源码如下:
private val blockIntervalTimer =
  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
3.  更新Buffer
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
  try {
    var newBlock: Block = null
    synchronized {
      if (currentBuffer.nonEmpty) {
        val newBlockBuffer = currentBuffer
        currentBuffer = new ArrayBuffer[Any]
        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
        listener.onGenerateBlock(blockId)
//产生Block
        newBlock = new Block(blockId, newBlockBuffer)
      }
    }

    if (newBlock != null) {
//如果生成Block成功的话,就将Block放入到队列中。
      blocksForPushing.put(newBlock)  // put is blocking when queue is full
    }
  } catch {
    case ie: InterruptedException =>
      logInfo("Block updating timer thread was interrupted")
    case e: Exception =>
      reportError("Error in block updating thread", e)
  }
}
4.  blocksForPushing源码如下:
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
5.  Spark默认规定每200ms产生一个Block。
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")

队列中的数据如何写入到磁盘中?

6.  在BlockGenerator的start方法中,通过blockPushThread将Block写入到磁盘中。
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
  if (state == Initialized) {
    state = Active
    blockIntervalTimer.start()
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  } else {
    throw new SparkException(
      s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
  }
}
7.  blockPushThread启动的时候不断的调用keepPushingBlocks
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
8.  不断的从队列中取出Block数据,然后通过BlockManager存储。
/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
  logInfo("Started block pushing thread")

  def areBlocksBeingGenerated: Boolean = synchronized {
    state != StoppedGeneratingBlocks
  }

  try {
    // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
    while (areBlocksBeingGenerated) {
//每个10ms从队列查看下队列是否有数据,是一个定时器
      Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
        case Some(block) => pushBlock(block)
        case None =>
      }
    }

    // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
    logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
    while (!blocksForPushing.isEmpty) {
      val block = blocksForPushing.take()
      logDebug(s"Pushing block $block")
      pushBlock(block)
      logInfo("Blocks left to push " + blocksForPushing.size())
    }
    logInfo("Stopped block pushing thread")
  } catch {
    case ie: InterruptedException =>
      logInfo("Block pushing thread was interrupted")
    case e: Exception =>
      reportError("Error in block pushing thread", e)
  }
}
9.  pushBlock源码如下:
  private def pushBlock(block: Block) {
//此时listener是BlockGeneratorListener
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
  }
}
10. 此时的listener是在BlockGenerator构造的时候传入的。
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {
11. 在ReceiverSupervisorImpl中我们前面调用的就是onPushBlock.
/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
  def onAddData(data: Any, metadata: Any): Unit = { }

  def onGenerateBlock(blockId: StreamBlockId): Unit = { }

  def onError(message: String, throwable: Throwable) {
    reportError(message, throwable)
  }

  def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
    pushArrayBuffer(arrayBuffer, None, Some(blockId))
  }
}
12. pushArrayBuffer源码如下:
/** Store an ArrayBuffer of received data as a data block into Spark&#39;s memory. */
def pushArrayBuffer(
    arrayBuffer: ArrayBuffer[_],
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}
13. pushAndReportBlock源码如下:
/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//把存储后的元数据信息告诉Driver.
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}
14. ReceivedBlockHandler中ReceivedBlockHandler负责存储receiver接收的数据Block.
/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {

  /** Store a received block with the given block id and return related metadata */
  def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult

  /** Cleanup old blocks older than the given threshold time */
  def cleanupOldBlocks(threshTime: Long)
}
15. store存储的时候分为两种
/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */
private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)
  extends ReceivedBlockHandler with Logging {

  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

    var numRecords = None: Option[Long]

    val putResult: Seq[(BlockId, BlockStatus)] = block match {
      case ArrayBufferBlock(arrayBuffer) =>
        numRecords = Some(arrayBuffer.size.toLong)
        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
          tellMaster = true)
      case IteratorBlock(iterator) =>
        val countIterator = new CountingIterator(iterator)
        val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
          tellMaster = true)
        numRecords = countIterator.count
        putResult
      case ByteBufferBlock(byteBuffer) =>
        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
      case o =>
        throw new SparkException(
          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
    }
    if (!putResult.map { _._1 }.contains(blockId)) {
      throw new SparkException(
        s"Could not store $blockId to block manager with storage level $storageLevel")
    }
    BlockManagerBasedStoreResult(blockId, numRecords)
  }

ReceiverSupervisor的startReceiver启动全过程流程如下:
这里写图片描述

Spark Streaming的流数据不断接收数据总体流程如下:
这里写图片描述

热点推荐