12.4 通过RDD分析电影点评系统仿QQ和微信等用户群分析及广播背后机制解密
通过RDD分析大数据电影点评系统仿QQ和微信等用户群分析,在本节统计最受不同年龄段人员欢迎的电影TopN。
用户文件users.dat的格式描述如下:
1. UserID::Gender::Age::Occupation::Zip-code 2. 用户ID、性别、年龄、职业、邮编代码
电影文件movies.dat的格式描述如下:
1. MovieID::Title::Genres 2. 电影ID、电影名、电影类型
评级文件ratings.dat的格式描述如下:
1. UserID::MovieID::Rating::Timestamp 2. 用户ID、电影ID、评分数据、时间戳
大数据电影点评系统仿QQ和微信等用户群分析实现思路:首先计算TopN,但是这里的关注点有两个。
1.不同年龄阶段如何界定的问题
这个问题其实是业务问题。一般情况下,我们都是在原始数据中直接对要进行分组的年龄段提前进行数据清洗ETL,例如,进行数据清洗ETL后产生以下数据。
2.性能问题
第一点:在实现的时候可以使用RDD的filter算子,如13 < age <18,但这样做会导致运行时进行大量的计算,因为要进行扫描,所以非常耗性能,通过提前进行数据清洗ETL把计算发生在Spark的业务逻辑运行前,用空间换时间,当然,这些实现也可以使用Hive,因为Hive语法支持非常强悍且内置了最多的函数。
第二点:这里要使用mapjoin,原因是targetUsers数据只有UserID,数据量一般不会太大。
大数据电影点评系统仿QQ和微信等用户群分析,先过滤出仿QQ用户及微信用户18年龄段(从18岁到24岁)及仿淘宝用户25年龄段(从25岁到34岁)的用户数据,然后构建Broadcast数据结构类型的targetQQUsersBroadcast、targetTaobaoUsersBroadcast广播变量。具体实现方法如下。
(1)仿QQ用户及微信用户18年龄段用户targetQQUsers的创建:将usersRDD中的每行数据按“::”分隔符分割,然后map格式化为(用户ID,年龄)元组,最后使用filter算子遍历过滤出元组的第二个元素等于18的数据。
(2)仿淘宝用户25年龄段用户targetTaobaoUsers的创建:将usersRDD中的每行数据按“::”分隔符分割,然后map格式化为(用户ID,年龄)元组,然后使用filter算子遍历过滤出元组的第二个元素等于25的数据。
(3)构建仿QQ用户及微信用户数据集合targetQQUsersSet。将targetQQUsers(用户ID,年龄)元组使用map转换函数获取第一个元素用户ID,然后使用collect()算子收集所有的仿QQ用户及微信用户的用户ID。代码“HashSet() ++ targetQQUsers.map(_._1).collect()”,这里使用“++”运算符是因为:collect()算子返回的数据类型是数组类型Array[T],使用HashSet()进行“++”运算,结果将是HashSet[String]类型,在之后的广播变量计算中将使用。而如果使用“+”运算符,结果将是HashSet[Array[String]]类型,之后的广播变量计算编译器会提示类型不匹配。
(4)构建仿淘宝用户数据集合targetTaobaoUsersSet。将targetTaobaoUsers(用户ID,年龄)元组使用map转换函数获取第一个元素用户ID,然后使用collect()算子收集所有的仿淘宝用户的用户ID。
(5)构建广播变量数据结构targetQQUsersBroadcast、targetTaobaoUsersBroadcast。在Spark中如何实现mapjoin呢?显然是要借助于Broadcast,把数据广播到Executor级别,让该Executor上的所有任务共享该唯一的数据,而不是每次运行Task的时候都要发送一份数据的复制,这显著地降低了网络数据的传输和JVM内存的消耗。这里我们使用sc.broadcast分别创建了广播变量targetQQUsersBroadcast、targetTaobaoUsersBroadcast。
所有电影中仿QQ或者微信核心目标用户最喜爱电影TopN分析,具体实现如下。
(1)将moviesRDD中的每行数据按“::”分隔符分割,然后map格式化为(电影ID,电影名)元组,使用collect算子收集所有数据,通过toMap转换成map数据结构。
(2)将ratingsRDD中的每行数据按“::”分隔符分割,然后map格式化为(用户ID,电影ID)元组,最后使用filter算子遍历过滤。过滤条件为:元组的第一个元素用户ID包含在广播变量targetQQUsersBroadcast的集合HashSet(用户ID)中,即从ratingsRDD中过滤出18年龄段(从18岁到24岁)的用户的评分数据,数据格式为(用户ID,电影ID)。
(3)然后进行map转换,把数据变成Key-Value,取ratingsRDD元组的第2个元素电影ID作为Key,计数1次作为Value,格式化为Key-Value,即(电影ID,1)。
(4)通过reduceByKey操作实现聚合:对相同Key的Value值进行累加。生成Key-Value,即(电影ID,总次数)。
(5)然后进行Key和Value的交换。上一步reduceByKey算子执行完毕,然后进行map转换操作,交换Key-Value值,即将(电影ID,总次数)转换成(总次数,电影ID)。
(6)接下来排序,我们使用sortByKey(false)算子按总次数降序排列。
(7)排序完成,再次进行Key和Value的交换。我们使用map转换函数将(总次数,电影ID)进行交换,转换为(电影ID,总次数),再通过take(10)算子获取Top10的记录。数据格式为(电影ID,总次数)。
(8)为了使输出显示更加直观,我们使用movieID2Name.getOrElse获取电影ID对应的电影名,然后打印输出,格式为(电影名,总次数),即打印输出所有电影中仿QQ或者微信用户最喜爱电影的Top10。
仿QQ或者微信核心目标用户最喜爱电影TopN分析代码如下:
在IDEA中运行代码,结果如下:
所有电影中淘宝核心目标用户最喜爱电影TopN分析,具体实现思路和仿QQ或者微信核心目标用户最喜爱电影TopN分析类似,这里不再赘述,实现代码如下:
在IDEA中运行代码,结果如下: