问题描述
我正在使用spark-sql-2.4.1版本.创建如下的广播变量
I am using spark-sql-2.4.1 version.creating a broadcast variable as below
Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);
我将bcVariable传递给函数
Me passing the bcVariable to a function
Service.calculateFunction(sparkSession, bcVariable.getValue());
public static class Service {
public static calculateFunction(
SparkSession sparkSession,
Map<String, Dataset> dataSet ) {
System.out.println("---> size : " + dataSet.size()); //printing size 1
for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
System.out.println( aEntry.getKey()); // printing key
aEntry.getValue().show() // throw null pointer exception
}
}
这是怎么了?如何在函数中传递数据集/数据框?
What is wrong here ? how to pass a dataset/dataframe in the function?
尝试2:
Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);
我将bcVariable传递给函数
Me passing the bcVariable to a function
Service.calculateFunction(sparkSession, bcVariable.getValue());
公共静态类服务{公共静态calculateFunction(SparkSession sparkSession,数据集dataSet){
public static class Service { public static calculateFunction( SparkSession sparkSession, Dataset dataSet ) {
System.out.println("---> size : " + dataSet.size()); // throwing null pointer exception.
}
这是怎么了?如何在函数中传递数据集/数据框?
What is wrong here ? how to pass a dataset/dataframe in the function?
尝试3:
Dataset metaData = //read dataset from oracle table i.e. meta-data.
我将metaData传递给函数
Me passing the metaData to a function
Service.calculateFunction(sparkSession,metaData);
Service.calculateFunction(sparkSession, metaData );
public static class Service {
public static calculateFunction(
SparkSession sparkSession,
Dataset metaData ) {
System.out.println("---> size : " + metaData.size()); // throwing null pointer exception.
}
这是怎么了?如何在函数中传递数据集/数据框?
What is wrong here ? how to pass a dataset/dataframe in the function?
推荐答案
要广播的值必须是任何Scala对象,而不是 DataFrame
.
The value to be broadcast has to be any Scala object but not a DataFrame
.
Service.calculateFunction(sparkSession,metaData)
在执行程序上执行,因此metaData为 null
(因为它没有被序列化并通过电线从驱动程序发送给执行程序)).
Service.calculateFunction(sparkSession, metaData)
is executed on executors and hence metaData is null
(as it was not serialized and sent over the wire from the driver to executors).
向集群广播一个只读变量,返回一个 org.apache.spark.broadcast.Broadcast 对象,用于在分布式函数中读取该对象.该变量将仅发送到每个集群一次.
Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
想想 DataFrame
数据抽象来表示一种分布式计算,该计算以类似SQL的语言(Dataset API或SQL)进行描述.根本没有任何意义,而是将其放在可以提交计算以供执行的驱动程序上(作为执行程序上的任务).
Think of DataFrame
data abstraction to represent a distributed computation that is described in a SQL-like language (Dataset API or SQL). It simply does not make any sense to have it anywhere but on the driver where computations can be submitted for execution (as tasks on executors).
您只需要转换"此计算使用 DataFrame.collect
表示(以 DataFrame
术语表示)的数据.
You simply have to "convert" the data this computation represents (in DataFrame
terms) using DataFrame.collect
.
一旦您收集了数据,就可以使用 .value
方法对其进行广播和引用.
Once you collected the data you can broadcast it and reference using .value
method.
代码如下:
val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable =
javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());
与您的代码相比,唯一的变化是收集
.
The only change compared to your code is collect
.
这篇关于如何广播一个DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!