我是刚接触火花并尝试提取包含“ Subject:”的行并将其保存在arraylist中。我没有遇到任何错误,但数组列表为空。你能指导我哪里错了吗?或做到这一点的最佳方法?
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
public final class extractSubject {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt");
final ArrayList<String> list = new ArrayList<>();
sample.foreach(new VoidFunction<String>(){
public void call(String line) {
if (line.contains("Subject:")) {
System.out.println(line);
list.add(line);
}
}}
);
System.out.println(list);
sc.stop();
}
}
最佳答案
请记住,Spark应用程序可以分布式并行运行。因此,您不能在Spark执行的功能之外修改变量。
相反,您需要从这些函数返回结果。在您的情况下,您需要flatMap
(而不是没有结果的foreach
),该函数将作为函数结果返回的集合连接起来。
如果一行匹配,则返回包含匹配行的列表,否则返回空列表。
要在main
函数中打印数据,首先必须通过调用collect()
在主节点中收集可能分发的数据。
这里是一个例子:
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
public final class extractSubject {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt");
JavaRDD<String> sample = sc.parallelize(Arrays.asList("Subject: first",
"nothing here",
"Subject: second",
"dummy"));
JavaRDD<String> subjectLinesRdd = sample.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
if (line.contains("Subject:")) {
return Collections.singletonList(line); // line matches → return list with the line as its only element
} else {
return Collections.emptyList(); // ignore line → return empty list
}
}
});
List<String> subjectLines = subjectLinesRdd.collect(); // collect values from Spark workers
System.out.println(subjectLines); // → "[Subject: first, Subject: second]"
sc.stop();
}
}