Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

1.2 通过DataFrame和DataSet实战电影点评系统

DataFrameAPI是从Spark 1.3开始就有的,它是一种以RDD为基础的分布式无类型数据集,它的出现大幅度降低了普通Spark用户的学习门槛。

DataFrame类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以解析到具体数据的结构信息,从而对DataFrame中的数据源以及对DataFrame的操作进行了非常有效的优化,从而大幅提升了运行效率。

DataSetAPI是从1.6版本提出的,在Spark 2.2的时候,DataSet和DataFrame趋于稳定,可以投入生产环境使用。与DataFrame不同的是,DataSet是强类型的,而DataFrame实际上就是DataSet[Row](也就是Java中的DataSet<Row>)。

DataSet是Lazy级别的,Transformation级别的算子作用于DataSet会得到一个新的DataSet。当Action算子被调用时,Spark的查询优化器会优化Transformation算子形成的逻辑计划,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。

反观RDD,由于无从得知其中数据元素的具体内部结构,故很难被Spark本身自行优化,对于新手用户很不友好,但是,DataSet底层是基于RDD的,所以最终的优化尽头还是对RDD的优化,这就意味着优化引擎不能自动优化的地方,用户在RDD上可能有机会进行手动优化。

1.2.1 通过DataFrame实战电影点评系统案例

现在我们通过实现几个功能来了解DataFrame的具体用法。先来看第一个功能:通过DataFrame实现某部电影观看者中男性和女性不同年龄分别有多少人。

最终打印结果如图1-6所示,类似一张普通的数据库表。

图1-6 电影观看者中男性和女性人数

上面案例中的代码无论是从思路上,还是从结构上都和SQL语句十分类似,下面通过写SQL语句的方式来实现上面的案例。

这样我们就可以得到与上面案例相同的结果,这对写SQL比较多的用户是十分友好的。但是有一个问题需要注意,这里调用createTempView创建的临时表是会话级别的,会话结束时这个表也会消失。那么,怎么创建一个Application级别的临时表呢?可以使用createGlobalTempView来创建临时表,但是这样就要在写SQL语句时在表名前面加上global_temp,例如:

第一个DataFrame案例实现了简单的类似SQL语句的功能,但这是远远不够的,我们要引入一个隐式转换来实现复杂的功能。

从图1-7的结果可以看到,求平均值的那一列列名和在SQL语句里使用函数时的列名一样变成了avg(Rating),程序中的orderBy里传入的列名要和这个列名一致,否则会报错,提示找不到列。

图1-7 电影系统SQL运行结果

有时我们也可能会在使用DataFrame的时候在中间某一步转换到RDD里操作,以便实现更加复杂的逻辑。下面来看一下DataFrame和RDD的混合编程。

1.2.2 通过DataSet实战电影点评系统案例

前面提到的DataFrame其实就是DataSet[Row],所以只要学会了DataFrame的使用,就可以快速接入DataSet,只不过在创建DataSet的时候要注意与创建DataFrame的方式略有不同。DataSet可以由DataFrame转换而来,只需要用yourDataFrame.as[yourClass]即可得到封装了yourClass类型的DataSet,之后就可以像操作DataFrame一样操作DataSet了。接下来我们讲一下如何直接创建DataSet,因为DataSet是强类型的,封装的是具体的类(DataFrame其实封装了Row类型),而类本身可以视作带有Schema的,所以只需要把数据封装进具体的类,然后直接创建DataSet即可。

首先引入一个隐式转换,并创建几个caseClass用来封装数据。

电影系统运行结果如图1-8所示,列名为User类的属性名。下面使用同样的方法创建ratingsDataSet并实现一个案例:找出观看某部电影的不同性别不同年龄的人数。

观看电影性别、年龄统计结果如图1-9所示。

图1-8 电影系统运行结果

图1-9 观看电影性别、年龄统计结果

当然,也可以把DataFrame和DataSet混着用(这样做会导致代码混乱,故不建议这样做),得到的结果完全一样。

最后根据源码,有几点需要补充:

RDD的cache方法等于MEMORY_ONLY级别的persist,而DataSet的cache方法等于MEMORY_AND_DISK级别的persist,因为重新计算的代价非常昂贵。如果想使用其他级别的缓存,可以使用persist并传入相应的级别。

RDD.scala源码如下:

Dataset.scala源码如下:

基于DataSet的计算会像SQL一样被Catalyst引擎解析生成执行查询计划,然后执行。我们可以使用explain方法来查看执行计划。