如何更新火花流中的广播变量

如何更新火花流中的广播变量

本文介绍了如何更新火花流中的广播变量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我相信,我有一个相对常见的 Spark 流用例:

I have, I believe, a relatively common use case for spark streaming:

我有一个对象流,我想根据一些参考数据对其进行过滤

最初,我认为使用广播变量来实现这是一件非常简单的事情:

Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

但是,尽管不经常,我的参考数据会定期更改

我的印象是我可以修改并重新广播我在驱动程序上的变量,它会传播给每个工作人员,但是 Broadcast 对象不是 Serializable 并且需要是 final.

I was under the impression that I could modify and re-broadcast my variable on the driver and it would be propagated to each of the workers, however the Broadcast object is not Serializable and needs to be final.

我有什么选择?我能想到的三个解决方案是:

What alternatives do I have? The three solutions I can think of are:

  1. 将参考数据查找移动到 forEachPartitionforEachRdd 中,以便它完全驻留在工作程序中.然而,参考数据存在于 REST API 中,因此我还需要以某种方式存储一个计时器/计数器,以停止对流中每个元素的远程访问.

  1. Move the reference data lookup into a forEachPartition or forEachRdd so that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.

每次 refdata 更改时都使用新的广播变量重新启动 Spark 上下文.

Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.

将参考数据转换为 RDD,然后 join 流,这样我现在正在流式传输 Pair;,尽管这将随每个对象一起发送参考数据.

Convert the Reference Data to an RDD, then join the streams in such a way that I am now streaming Pair<MyObject, RefData>, though this will ship the reference data with every object.

推荐答案

扩展答案 @Rohan Aletty.这是一个基于某些 ttl 刷新广播变量的 BroadcastWrapper 示例代码

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

您的代码如下所示:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

这对我来说也适用于多集群.希望这有帮助

This worked for me on multi-cluster as well.Hope this helps

这篇关于如何更新火花流中的广播变量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:13