超出了GC开销限制

超出了GC开销限制

我们有一个返回超过500万行的Spark SQL查询。收集所有这些内容以进行处理会导致java.lang.OutOfMemoryError:(最终)超出了GC开销限制。这是代码:

final Dataset<Row> jdbcDF = sparkSession.read().format("jdbc")
            .option("url", "xxxx")
            .option("driver", "com.ibm.db2.jcc.DB2Driver")
            .option("query", sql)
            .option("user", "xxxx")
            .option("password", "xxxx")
            .load();
    final Encoder<GdxClaim> gdxClaimEncoder = Encoders.bean(GdxClaim.class);
    final Dataset<GdxClaim> gdxClaimDataset = jdbcDF.as(gdxClaimEncoder);
    System.out.println("BEFORE PARALLELIZE");
    final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());
    System.out.println("AFTER");
    final JavaRDD<ClaimResponse> gdxClaimResponse = gdxClaimJavaRDD.mapPartitions(mapFunc);
    mapFunc = (FlatMapFunction<Iterator<GdxClaim>, ClaimResponse>) claim -> {
        System.out.println(":D " + claim.next().getRBAT_ID());
        if (claim != null && !currentRebateId.equals((claim.next().getRBAT_ID()))) {
            if (redisCommands == null || (claim.next().getRBAT_ID() == null)) {
                    serObjList = Collections.emptyList();
                } else {

                    generateYearQuarterKeys(claim.next());

                    redisBillingKeys = redisBillingKeys.stream().collect(Collectors.toList());
                    final String[] stringArray = redisBillingKeys.toArray(new String[redisBillingKeys.size()]);
                    serObjList = redisCommands.mget(stringArray);

                    serObjList = serObjList.stream().filter(clientObj -> clientObj.hasValue()).collect(Collectors.toList());
                    deserializeClientData(serObjList);
                    currentRebateId = (claim.next().getRBAT_ID());
            }
        }
        return (Iterator) racAssignmentService.assignRac(claim.next(), listClientRegArr);

    };


您可以忽略其中的大部分,永远运行且永远无法返回的行是:

final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());


因为:
    gdxClaimDataset.collectAsList()

我们不确定从哪里去,完全陷入困境。有人可以帮忙吗?我们到处都在寻找帮助的例子。

最佳答案

尝试替换此行:

final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());


与:

final JavaRDD<GdxClaim> gdxClaimJavaRDD = gdxClaimDataset.javaRDD();

10-06 11:13