我们有一个返回超过500万行的Spark SQL查询。收集所有这些内容以进行处理会导致java.lang.OutOfMemoryError:(最终)超出了GC开销限制。这是代码:
final Dataset<Row> jdbcDF = sparkSession.read().format("jdbc")
.option("url", "xxxx")
.option("driver", "com.ibm.db2.jcc.DB2Driver")
.option("query", sql)
.option("user", "xxxx")
.option("password", "xxxx")
.load();
final Encoder<GdxClaim> gdxClaimEncoder = Encoders.bean(GdxClaim.class);
final Dataset<GdxClaim> gdxClaimDataset = jdbcDF.as(gdxClaimEncoder);
System.out.println("BEFORE PARALLELIZE");
final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());
System.out.println("AFTER");
final JavaRDD<ClaimResponse> gdxClaimResponse = gdxClaimJavaRDD.mapPartitions(mapFunc);
mapFunc = (FlatMapFunction<Iterator<GdxClaim>, ClaimResponse>) claim -> {
System.out.println(":D " + claim.next().getRBAT_ID());
if (claim != null && !currentRebateId.equals((claim.next().getRBAT_ID()))) {
if (redisCommands == null || (claim.next().getRBAT_ID() == null)) {
serObjList = Collections.emptyList();
} else {
generateYearQuarterKeys(claim.next());
redisBillingKeys = redisBillingKeys.stream().collect(Collectors.toList());
final String[] stringArray = redisBillingKeys.toArray(new String[redisBillingKeys.size()]);
serObjList = redisCommands.mget(stringArray);
serObjList = serObjList.stream().filter(clientObj -> clientObj.hasValue()).collect(Collectors.toList());
deserializeClientData(serObjList);
currentRebateId = (claim.next().getRBAT_ID());
}
}
return (Iterator) racAssignmentService.assignRac(claim.next(), listClientRegArr);
};
您可以忽略其中的大部分,永远运行且永远无法返回的行是:
final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());
因为:
gdxClaimDataset.collectAsList()
我们不确定从哪里去,完全陷入困境。有人可以帮忙吗?我们到处都在寻找帮助的例子。
最佳答案
尝试替换此行:
final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());
与:
final JavaRDD<GdxClaim> gdxClaimJavaRDD = gdxClaimDataset.javaRDD();