Spark技术内幕: Task向Executor提交的源代码解析
在上文《Spark技术内幕:Stage划分及提交源代码分析》中,我们分析了Stage的生成和提交。可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task。我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的。
这就是本文的主题。
从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks開始,分析Stage是怎样生成TaskSet的。
假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中。那么他会调用submitMissingTasks来提交该Stage所包括的Tasks。
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程例如以下:
pipeline。能够称为大数据处理的基石。仅仅有数据进行pipeline处理,才干将其放到集群中去执行。
对于一个task来说,它从数据源获得逻辑。然后依照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
TaskSet是一个数据结构,存储了这一组task:private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val attempt: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + attemptoverride def toString: String = "TaskSet " + id }管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager。TaskSetManager会负责task的失败重试。跟踪每一个task的执行状态。处理locality-aware的调用。具体的调用堆栈例如以下:
首先看一下org.apache.spark.executor.Executor#launchTask: def launchTask(context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {val tr = new TaskRunner(context, taskId, taskName, serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr) // 開始在executor中执行}
TaskRunner会从序列化的task中反序列化得到task。这个须要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是: final def run(attemptId: Long): T = {context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)context.taskMetrics.hostname = Utils.localHostName()taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}runTask(context)}
对于原来提到的两种Task,即
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)metrics = Some(context.taskMetrics)try {func(context, rdd.iterator(partition, context))} finally {context.markTaskCompleted()}}
而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果。
override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)//此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的metrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者diskreturn writer.stop(success = true).get} catch {case e: Exception =>if (writer != null) {writer.stop(success = false)}throw e} finally {context.markTaskCompleted()}}
这两个task都不要依照拓扑顺序调用rdd的compute来完毕对partition的计算。不同的是ShuffleMapTask须要shuffle write。以供child stage读取shuffle的结果。
对于这两个task都用到的taskBinary,即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的。
通过上述几篇博文,实际上我们已经粗略的分析了从用户定义SparkContext開始。集群是假设为每一个Application分配Executor的,回想一下这个序列图:
还有就是用户触发某个action,集群是怎样生成DAG,假设将DAG划分为能够成Stage,已经Stage是怎样将这些能够pipeline执行的task提交到Executor去执行的。当然了,具体细节还是很值得推敲的。
以后的每一个周末。都会奉上某个细节的实现。
歇息了。明天又会開始忙碌的一周。转载于:https://www.cnblogs.com/llguanli/p/8601055.html
创作挑战赛新人创作奖励来咯,坚持创作打卡瓜分现金大奖总结
以上是生活随笔为你收集整理的Spark技术内幕: Task向Executor提交的源代码解析的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: bl小说里面有个机器人管家_机器人也有攀
- 下一篇: 安全可靠的透明加密软件