3.4 解析Spark中的DAG逻辑视图
本节讲解DAG生成的机制,通过DAG,Spark可以对计算的流程进行优化;通过WordCounts的示例对DAG逻辑视图进行解析。
3.4.1 DAG生成的机制
在图论中,如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。而在Spark中,由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,我们必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)。
通过DAG,Spark可以对计算的流程进行优化,对于数据处理,可以将在单一节点上进行的计算操作进行合并,并且计算中间数据通过内存进行高效读写,对于数据处理,需要涉及Shuffle操作的步骤划分Stage,从而使计算资源的利用更加高效和合理,减少计算资源的等待过程,减少计算中间数据读写产生的时间浪费(基于内存的高效读写)。
Spark中DAG生成过程的重点是对Stage的划分,其划分的依据是RDD的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理。对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage;对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分。
在Spark中,DAG生成的流程关键在于回溯,在程序提交后,高层调度器将所有的RDD看成是一个Stage,然后对此Stage进行从后往前的回溯,遇到Shuffle就断开,遇到窄依赖,则归并到同一个Stage。等到所有的步骤回溯完成,便生成一个DAG图。
DAG生成的相关源码位于Spark的DAGScheduler.scala。getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父节点中直接的Shuffle依赖。
Spark 2.2.1版本的DAGScheduler.scala的getOrCreateParentStages的源码如下:
Spark 2.4.3版本的DAGScheduler.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第12行,构建Stack实例修改为构建ArrayStack实例。
3.4.2 DAG逻辑视图解析
本节通过一个简单计数案例讲解DAG具体的生成流程和关系。示例代码如下:
在程序正式运行前,Spark的DAG调度器会将整个流程设定为一个Stage,此Stage包含3个操作,5个RDD,分别为MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)、ShuffleRDD(reduceByKeyshuffle操作)。
(1)回溯整个流程,在shuffleRDD与MapPartitionRDD(reduceByKey的local段的操作)中存在shuffle操作,整个RDD先在此切开,形成两个Stage。
(2)继续向前回溯,MapPartitionRDD(reduceByKey的local段的操作)与MapPartitionRDD(map操作)中间不存在Shuffle(即两个RDD的依赖关系为窄依赖),归为同一个Stage。
(3)继续回溯,发现往前的所有的RDD之间都不存在Shuffle,应归为同一个Stage。
(4)回溯完成,形成DAG,由两个Stage构成。
第一个Stage由MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)构成,如图3-4所示。
图3-4 Stage 0的构成
第二个Stage由ShuffleRDD(reduceByKey Shuffle操作)构成,如图3-5所示。
图3-5 Stage 1的构成