这是这个问题的延续:Porting a multi-threaded compute intensive job to spark
我按照建议使用forEachPartition
在10000个id的列表上循环,然后执行repartition(20)
操作,因为每个分区都创建db连接,如果创建100个分区,作业就会因为到postgres和mongo的100个开放连接而终止。我使用postgres连接不仅存储数据,还从另一个表中查找一些数据。
我可以从我的任务中去掉直接将数据存储到postgres的方法,并将其作为序列文件的后处理来完成。
但理想情况下,我需要大规模并行化我的spark作业,以便在给定的时间内完成任务,目前它在20小时内处理了大约200个id,但我需要在20小时内处理10000个id。所以repartition(20)
显然没有帮助。我在这里被数据库上的IO绑定。
那么,我可以在所有任务中高效地共享这些数据的选项是什么?我希望Mongo和Postgres中的数据被视为内存中的查找表-总大小约为500GB。
我的选择是:
RDD(我认为RDD不适合我的用例)
数据帧
广播变量(不确定这是否有效,因为它的创建需要在spark驱动程序中提供500gb)
将数据从mongo移到s3,并将任务查找从s3移到s3。
最佳答案
对于此类问题,我们遵循的技术是:
将查找存储在MongoDB的不同集合中。
使用Hadoop MongoDB连接器从MongoDB获取数据并将其存储在RDD中
广播变量,使其对所有节点/工作机都可用
现在,如果数据在hdfs中,则为其创建一个rdd,或者如果数据在mongodb中,则使用hadoop mongodb连接器。
现在执行查找匹配部分
将文件保存为一个序列文件,或者您也可以将其保存在s3上,在我们将其存储回mongodb时需要检查它
关于java - 如何使用Mongo和PostgreSQL中的数据作为内存查找表?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32512392/