假设我想使用某种创建成本很高的对象来映射RDD。我希望每个工作程序/线程都有一个该对象,并且必须在处理每个工作程序上的RDD分区的项目之前创建该对象。

我的解决方案是:

    final Function0<ModelEvaluator> f = () -> {

        if (ModelEvaluator.getInstance() == null) {
            ModelEvaluator m = new ModelEvaluator(script);
            ModelEvaluator.setInstance(m);
        }

        return ModelEvaluator.getInstance();
    };

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
            (t) -> {
                try {
                    double val = f.call().evaluateModel(t);
                    return new Tuple2<>(val, t);
                } catch (Exception ex) {
                    return null;
                }
            }
    );



public class ModelEvaluator {

  private static ModelEvaluator instance;

  public static void setInstance(ModelEvaluator instance) {
    ModelEvaluator.instance = instance;
  }

  public static ModelEvaluator getInstance() {
      return instance;
  }
...


在这种情况下,“ ModelEvaluator”对象将解析脚本,然后使用“ Service”对象列表来配置模型参数,以便为该参数配置计算关联的响应度量。但是我不想每次处理RDD行时都解析脚本。

我还配置了集群以为每个集群创建一个进程,并且每个进程将仅产生一个工作进程,因为在同一进程中多个工作进程同时访问具有可变状态的单例实例会很麻烦。

我的问题是否有更优雅的解决方案?

最佳答案

这可以通过Broadcast变量来完成。这将允许您在驱动程序上创建一个对象,并且根据需要将每个工人发送一次。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script));

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
        (t) -> {
            try {
                double val = model.value().evaluateModel(t);
                return new Tuple2<>(val, t);
            } catch (Exception ex) {
                return null;
            }
        }
);

10-07 16:10