版本信息spark version 2.3.3jdk 1.8idea 2019MacBook Pro
最近领导让做一次关于 Spark 的分享,于是专门把 spark 的流程看了一边,做一下记录,也是为了练练 MarkDown,仅此而已。
版本信息 spark version 2.3.3 jdk 1.8 idea 2019 MacBook Pro
从 RDD 开始在 spark 中,一个 action 算子触发真正的计算,我们看下 RDD 上的 count
/** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
这就是一个一般的函数调用,有点内容的东西,就是这个方法
/** * Counts the number of elements of an iterator using a while loop rather than calling * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower * in the current version of Scala. */ def getIteratorSize[T](iterator: Iterator[T]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count }
这个方法也很简单,就是一个计数。所以 RDD 上的 action 算子没有什么难点,很容易明白。
RDD 上的 action 算子实际触发的上 SparkContext 的 runJob 方法,下面就进入了。
SparkContext/** * Run a job on all partitions in an RDD and return the results in an array. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @return in-memory collection with a result of the job (each collection element will contain * a result from one partition) */ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
这个函数也没什么难点,就是函数重载调用,多了一个计算分区的参数
/** * Run a function on a given set of partitions in an RDD and return the results as an array. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @return in-memory collection with a result of the job (each collection element will contain * a result from one partition) */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) }
需要注意下这个 clean 函数的功能
val cleanedFunc = clean(func)
我们传递的匿名函数,可能有外部变量,这里专门做了处理,使之可以序列化。
具体的技术细节,不明白,仔细看了一遍注释说明,这个 clean 的目的是明白了。
有兴趣的同学可以点进去看看源码上的注释。
继续调用 runJob 的重载方法,这里的函数参数。
可能比较难理解一点的是这个参数
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
这里是我难理解的一个地方,作为对比,我们放一起看两个 runJob 调用
runJob(rdd, func, 0 until rdd.partitions.length)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
上面的 runJob 中的参数都是实际参数而在下面的 runjob 中,怎么看都觉得第二个参数
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
是一个 $\color{red}{形式参数}$ ,而不是实际参数,这是一个疑惑点。
昨天想了一晚上,搞明白了,这里传递的就是一个函数, 其实在 count 函数中 Utils.getIteratorSize 也是一个函数, 我们看下参数原型
/** * Run a function on a given set of partitions in an RDD and return the results as an array. * The function that is run against each partition additionally takes `TaskContext` argument. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @return in-memory collection with a result of the job (each collection element will contain * a result from one partition) */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) results }
参数原型是
func: (TaskContext, Iterator[T]) => U
我们实现了这个参数,只不过套用了 cleanedFunc 来实现,啥也没干而已
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
$\color{red}{那么这个函数会在 什么时候,什么地点被用到呢?}$
答案是 将来在 ResultStage 中会被序列化到 ResultTask 中,从 driver 端发送到 executor 端,开始计算任务
计算结果拉回到 driver 上,填充数组
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
又一次 clean 闭包,这里的最后一句代码
rdd.doCheckpoint()
递归保存设置了检查点的 RDD,可见 checkpoint 操作在 job 完成后调用。开始转入高层调度器
DAGScheduler/** * Run an action job on the given RDD and pass all the results to the resultHandler function as * they arrive. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @note Throws `Exception` when the job fails */ def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
DAGScheduler 的 runJob 方法没有难理解的地方,内部调用方法 submitJob 开始提交 job
/** * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. * * @throws IllegalArgumentException when partitions ids are illegal */ def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
可以看到 submitJob 中向 eventProcessLoop 投递了一个 JobSubmitted 事件,DAGScheduler 内部的消息循环体DAGSchedulerEventProcessLoop#doOnReceive 处理事件
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => val workerLost = reason match { case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, workerLost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
反过来再次调用 DAGScheduler 的内部方法 handleJobSubmitted 真正的 job 提交
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
handleJobSubmitted 方法分成两个阶段
划分 stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
提交 stage
submitStage(finalStage)
我们先整体描述一下思路划分 stage 的阶段,用语言描述如下
代码先后顺序是:1. 首先创建最后一个 ResultStage 时,这时需要倒数第二个 ShuffleMapStage 作为父 Stage2. 创建倒数第二个 ShuffleMapStage 时,需要倒数第三个 ShuffleMapStage 作为父 Stage3. 除了最后 2 个 stage 是单独创建以外,其他的 stage 批量存在栈中然后创建所以,真实的创建过程如下1. 用后进先出的 stack 结构,从后向前回溯 shuffleDependency 入栈,然后出栈创建 ShuffleMapStage2. 创建倒数第二个 ShuffleMapStage3. 创建最后一个 ResultStage
Stage 的提交是递归提交
1. 如果有父 Stage 未提交,提交父 Stage2. 如果父 Stage 都已经提交,提交该 Stage
下面看下具体代码实现,划分 stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
/** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
这个代码逻辑并不复杂,很好理解,创建父 Stage,然后创建 ResultStage,因为代码调用前套关系,这个方法中的 ResultStage 自然就是 job 中最后一个 Stage倒数第二个 Stage(如果 ResultStage 是 join 产生的那就是倒数第二个和倒数第三个)
/** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
代码很短,理解起来也很容易
/** * Returns shuffle dependencies that are immediate parents of the given RDD. * * This function will not return more distant ancestors. For example, if C has a shuffle * dependency on B which has a shuffle dependency on A: * * A // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
这里和 createResultStage 如出一辙
/** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
方法 createResultStage 中,先创建 parents,后创建 ResultStage方法 getOrCreateShuffleMapStage 中,先创建 ancestor shuffle dependencies,后创建倒数第二个 Stage,也就是倒数第一个 ShuffleMapStage,因为 ResultStage 是最后一个 Stage
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit getShuffleDependencies(toVisit).foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } // Otherwise, the dependency and its ancestors have already been registered. } } } ancestors }
这里用一个 stack 结构,从后向前回溯,一直回溯到第一个 rdd,然后把获得的所有的 ShuffleDependency 装入栈中
这里的 ShuffleDependency 是从倒数第二个 ShuffleDependency 开始的 倒数第一个 ShuffleDependency 不在其中
/** * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a * previously run stage generated the same shuffle data, this function will copy the output * locations that are still available from the previous shuffle to avoid unnecessarily * regenerating data. */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
这里又调用了 getOrCreateParentStages
getOrCreateParentStages -->getOrCreateShuffleMapStage -->createShuffleMapStage --> getOrCreateParentStages
形成了一个循环从 createShuffleMapStage 中,我们可以看到
一个 ShuffleMapStage 是由 ShuffleDependency 确定的 比如下图中 stage1 由 groupBy 确定 stage2 由 join 确定 stage3 是 resultStage
Stage 都创建好以后,可以提交,提交一个 Stage,对应了提交一组 Task
/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // If there are tasks to execute, record the submission time of the stage. Otherwise, // post the even without the submission time, which indicates that this stage was // skipped. if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null var partitions: Array[Partition] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). var taskBinaryBytes: Array[Byte] = null // taskBinaryBytes and partitions are both effected by the checkpoint status. We need // this synchronization in case another concurrent job is checkpointing this RDD, so we get a // consistent view of both variables. RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case e: Throwable => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage // Abort execution return } val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => logDebug(s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})") markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) } }
在 submitMissingTasks 中
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap
这一段获取 rdd 的 partition 数据在集群中的物理位置,是 绝对 的
后面的
@DeveloperApiobject TaskLocality extends Enumeration { // Process local is expected to be used ONLY within TaskSetManager for now. val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { condition JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions }
从这里我们看到,ShuffleMapStage 和 ResultStage 的 task 是不同的我们的 count 算子的函数是在 ResultStage 中序列化的
case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func):
并且 stage.shuffleDep 和 stage.func 地位是对等的,在 Task 类中可以看到
ShuffleMapTask writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } }
我们看到一个 stage 生成一组 task,每个 task 对应一个 partition
这里有一点思考 后面将看到发送到 executor 端的不是 Task,而是 TaskDescription 所以在目前,Task 上看不到 executor 信息,也就是说, 这些 task 无论发送到哪个 executor 上都可以完成计算,差别大的是计算时间
task 完成封装后,这一组 task 打包成 TaskSet 交给底层调度器 TaskScheduler,
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
在 submitTasks 方法中,taskSet 又封装成 TaskSetManager
TaskSchedulerImpl override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
在这个 submitTasks 方法中,createTaskSetManager 中创建 TaskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// Label as private[scheduler] to allow tests to swap in different task set managers if necessary private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) }
在 TaskSetManager 的构造函数中首先根据 task 对应 partition 的数据物理位置,分门别类存入 Map 结构中
// Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. for (i val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => for (e logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } case _ => } pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index for (rack makeOffers()
调用 DriverEndpoint 类中的 makeOffers 方法
// Make fake resource offers on all executors private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq scheduler.resourceOffers(workOffers) } if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }
决策哪个 task 发送到哪个 executor,在下面代码中
scheduler.resourceOffers(workOffers)
代码中的 workOffers 是 driver 拿到的全部计算资源中剩余 executor 里的可用资源逻辑抽象表示,scheduler 是 TaskSchedulerImpl
TaskSchedulerImpl.submitTasks -->
CoarseGrainedSchedulerBackend#reviveOffers -->
CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers-->
CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks
整个调用逻辑 TaskSchedulerImpl 到 driver,并且 driver 上调用了 TaskSchedulerImpl 的方法因为资源并不在 TaskSchedulerImpl 手中,而是在 driver 手中,TaskSchedulerImpl 只管调度
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o !blacklistTracker.isNodeBlacklisted(offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers) val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet log.debug("Could not stop writer", e) } throw e } }
几行关键代码
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L func(context, rdd.iterator(partition, context)) }
这个里边关键代码的理解就比 ShuffleMapTask 简单多了
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]func(context, rdd.iterator(partition, context))
那么到此为止,整个 job 的流程就大致结束了,内部关于 Shuffle 的细节还需要一次思考。
本文首发于 GitChat,未经授权不得转载,转载需与 GitChat 联系。
阅读全文: http://gitbook.cn/gitchat/activity/5d63f3ddeafb4d1efa9e7ef5
您还可以下载 CSDN 旗下精品原创内容社区 GitChat App ,阅读更多 GitChat 专享技术内容哦。