3.9 基于DataSet的代码如何转化为RDD
基于DataSet的代码转换为RDD之前需要一个Action的操作,基于Spark中的新解析引擎Catalyst进行优化,Spark中的Catalyst不仅限于SQL的优化,Spark的五大子框架(Spark Cores、Spark SQL、Spark Streaming、Spark GraphX、Spark Mlib)将来都会基于Catalyst基础之上。
Dataset.scala的collect方法的源码如下:
withAction方法是一个高阶函数,第一个参数包括两项,字符串“collect”、QueryExecution类实例queryExecution。第二个参数是一个函数(collectFromPlan函数),collectFromPlan函数输入一个SparkPlan计划,输出是一个数组。withAction方法将Dataset的action包裹起来,这样可跟踪QueryExecution和时间成本,然后汇报给用户注册的回调函数。
Dataset.scala的withAction源代码如下:
在第9行执行action(qe.executedPlan),将QueryExecution类queryExecution.executedPlan传给collectFromPlan函数,collectFromPlan函数收集executedPlan执行计划的所有元素,返回一个数组。
QueryExecution类是Spark执行关系查询的主要workflow。查看QueryExecution类的执行计划executedPlan,executedPlan不用来初始化任何SparkPlan,仅用于执行。其中executedPlan.execute()是关键性的代码。
QueryExecution.scala的源码如下:
接下来查看collectFromPlan方法,collectFromPlan方法从SparkPlan中获取所有的数据。
Dataset.scala的源代码如下:
在第5行collectFromPlan调用plan.executeCollect()方法,plan是SparkPlan类实例,executeCollect方法执行查询,以数组形式返回结果。
SparkPlan.scala的executeCollect源代码如下:
其中,byteArrayRdd.collect()方法调用RDD.scala的collect方法。collect方法最终通过sc.runJob提交Spark集群运行。
RDD.scala的collect方法源码如下:
回到SparkPlan.scala的executeCollect方法,getByteArrayRdd方法将UnsafeRows打包到字节数组中,以便更快地序列化。
SparkPlan.scala的getByteArrayRdd源代码如下:
在第2行调用执行execute().mapPartitionsInternal方法,execute方法内部调用doExecute方法,将此查询的结果作为RDD[internalRow]返回。
SparkPlan.scala的execute源代码如下:
execute返回的查询结果类型为RDD[InternalRow]。SparkPlan.scala的doExecute()抽象方法没有具体实现,需通过SparkPlan子类重写此方法来进行具体实现。
1. protected def doExecute(): RDD[InternalRow]
InternalRow是通过语法树生成的一些数据结构。其子类包括BaseGenericInternalRow、JoinedRow、Row、UnsafeRow。
InternalRow.scala的源码如下:
DataSet的代码转化成为RDD的内部流程如下:
Parse SQL(DataSet)→Analyze Logical Plan→Optimize Logical Plan→Generate Physical Plan→Prepareed Spark Plan→Execute SQL→Generate RDD。
基于DataSet的代码一步步转化成为RDD:最终调用execute()生成RDD。