我用Rust徒手重写了一个Spark,并把它开源了
本文作者Raja Sekar已经有三年多Spark的使用经验,他认为Spark的DataFrame非常优秀,可以解决大多数分析工作负载问题,但仍然有一些地方使用RDD会更方便。于是,他萌生出了一个使用原生语言重新实现Spark的想法,想看看重写后在性能和资源管理效率方面可以达到怎样的效果。最后他选择了最近很火的Rust,重写后的FastSpark不仅在运行速度上比Spark更快,而且能够节省相当多的内存,作者接下来的目标也很简单:将其作为Apache Spark的替代方案。
这一切都始于我对各种分布式调度器和分布式计算框架的研究,而Spark就是其中的一个。因为有三年多使用Spark的经验,所以我对Spark的内部原理已经有了一定的了解。Spark之所以取得巨大成功,不仅是因为速度和效率,还因为它提供了非常直观的API。pandas之所以这么流行,也是因为这个。否则的话,如果出于对性能的考虑,人们可以选择其他更好的替代方案,比如Flink、Naiad或者像OpenMP这样的HPC框架。
Spark是一个通用的分布式框架,RDD非常适合用来处理非结构化数据或复杂的任务。而现在的Spark都是关于DataFrame和SQL,它们比RDD更受欢迎。DataFrame的性能比RDD更好。RDD是Spark生态系统的基础,那为什么DataFrame会获得更好的性能呢?是因为做了查询优化吗?以最简单的查询为例,你可以使用RDD定义出最好的数据和计算流,但DataFrame仍然可能胜过你定义的查询。秘密在于Tungsten引擎。Spark的性能主要依赖内存。Spark在处理典型任务时,JVM很快就会把资源消耗光。因此,Spark使用“sun.misc.Unsafe”来直接管理原始内存。这样造成的结果是DataFrame API不如RDD灵活。它们只能处理固定的预定义数据类型,所以你不能在数据流中自由地使用数据结构或对象。在实际当中,DataFrame可以解决大多数分析工作负载,但仍然有一些地方使用RDD会更方便。
于是,我有了一个使用原生语言重新实现Spark的想法,看看它在性能和资源管理效率方面可以达到怎样的效果。Spark已经经过多年的优化,所以我不指望在性能上能有巨大的差别,如果有,很可能是在内存使用方面。另外,我希望它可以像Spark一样通用。我决定使用Rust来实现,因为我也没有太多其他的选择。虽然C++也非常适合做这个,但我更喜欢Rust的简洁,而且Rust在很多方面与Scala相似。如果你看过代码库里的示例代码,就会知道它与典型的Scala Spark代码有多么相似。我实现了一个非常基本的版本,支持部分RDD操作和转换。不过我还没有实现文件的读取和写入,所以暂时需要手动编写代码来读取文件。
FastSpark项目开源链接:https://github.com/rajasekarv/native_spark
性能基准测试使用了一个1.2TB的CSV文件。
· 使用了5个GCloud节点(1主4副);
· 机器类型:n1-standard-8(8个vCPU, 30GB内存)。
这是一个简单的reduceBy操作,参考代码:FastSpark和Spark。
· FastSpark使用的时间为:19分钟35秒
· Apache Spark使用的时间为:1小时2分钟
这个结果十分令人惊讶。我并不指望在运行速度方面会有什么不同,我比较期待的是内存的使用情况。因为任务运行在分布式进程上,无法测量CPU使用时间,所以我测量了执行节点在程序运行期间的CPU使用情况。
FastSpark
Apache Spark
可以看到,FastSpark进行了大量的I/O操作,CPU使用率约为28%,而Spark CPU使用率始终为100%。iotop显示,FastSpark在执行期间I/O完全饱和,而Spark只使用了一半多。
在执行节点上,FastSpark峰值的内存利用率不超过150MB,而Spark达到了5-6GB,并在这个范围内波动。这种巨大的差异可能是由于JVM对象分配造成的,对象分配是非常昂贵的操作。我最初的实现版本比当前版本慢了两倍多。最初的实现版本使用了很多克隆和装箱,移除掉一部分之后就带来了巨大的性能提升。
同样的逻辑使用Rust实现比FastSpark RDD要快两倍多。性能分析显示,上述的FastSpark程序在分配和系统调用方面占用了75%的运行时间,主要是因为Rust实现的版本为动态分派装箱了大量数据。
下面是在本地运行4种不同实现版本的结果(处理10GB数据)。文件是CSV格式,并且保存在硬盘上(是的,我的硬盘很慢),这里主要关注user时间。
Rust基本版本:
real 6m05.203s user 1m02.049s sys 0m8.572s
FastSpark版本:
real 6m32.884s user 1m48.904s sys 0m4.489s
Apache Spark RDD版本:
real 10m14.646s user 14m45.452s sys 0m9.021s
Apache Spark DataFrame版本:
real 8m23.422s user 10m34.975s sys 0m8.882s
CPU密集型任务
Spark的分析任务几乎总是需要消耗大量CPU,因为我们通常会使用压缩文件,如Parquet或Avro。下面是读取Parquet文件(从10GB CSV文件生成,800MB左右)并执行相同操作的结果。
FastSpark版本:
real 1m31.327s user 4m50.342s sys 0m1.922s
现在变成需要消耗大量CPU,它把所有的CPU时间都花在解压缩和哈希操作上。
Spark DataFrame版本:
real 0m55.227s user 2m03.311s sys 0m2.343s
这就是Dataframe API提供的优化结果。代码与之前是一样的,只是用Parquet代替了CSV格式。但需要注意的是,Spark SQL生成的代码只选择需要的列,而FastSpark使用get_row_iter遍历所有行。
我写了一段读取文件的代码,只读取需要的列,让我们看看结果。代码请参考这里。
FastSpark(只选择需要的列):
real 0m13.582s user 0m34.753s sys 0m0.556s
这样的速度相当快了。它仍然是IO密集的。此外,它只使用了大约400MB内存,而Spark DataFrame使用了2-3GB。这是我更喜欢RDD API的原因之一,我们可以对数据流进行更灵活的控制。虽然抽象对于大多数应用程序来说是没问题的,但有时候我们需要完成一些任务,而具有良好的性能的底层API更适用。
实际上,这可以让FastSpark DataFrame变得比Apache Spark DataFrame更强大、更通用。与Spark不一样的是,FastSpark DataFrame可以支持任意的数据类型,还可以通过为数据类型实现自定义散列来连接具有不同数据类型的列。不过FastSpark DataFrame还没有开源出来,因为现在还处在试验阶段。我倾向于选择类似pandas那样的设计,不仅灵活,还具有很高的性能。如果有可能,它还可以与Python对象进行互操作,但又不同于PySpark。
这个工作流非常简单,也在非常大型的数据集上运行过,所以我选择了它。当然,这也可能是我的个人偏好。如果可能的话,请读者自己运行代码,并向我反馈结果。Spark经过了很多优化,特别是shuffle,在这方面我的实现(非常简单)比Spark要慢得多。另外,使用FastSpark执行CPU密集型任务通常会更快。
主要目标:
· 将其作为Apache Spark的替代方案。这意味着用户端API应该要保持一致。
· 在与Python集成方面比PySpark做得更好。
已完成:
· 项目处于非常初级的POC阶段,只支持少数的RDD操作和转换。
· 分布式调度器已实现,但离投入生产还差得很远,而且非常脆弱。容错和缓存还没有完成,但应该很快就能完成。
未来规划:
· 通用的文件读取器很快就可以完成。文件接口与Spark的不一样。支持HDFS还需要做大量的工作,但支持其他文件系统应该很简单。这个项目的主要目标之一是让它成为Apache Spark的完全替代品(至少支持Python和R语言等非JVM语言),所以我将尽量保持用户端API的兼容性。
· 因为代码是试验性的,所以其中有很多硬编码的东西。支持配置和部署将是下一个优先事项。
· shuffle操作实现得还很简单,性能也不好,这个需要改进。