我是刚接触火花并尝试提取包含“ Subject:”的行并将其保存在arraylist中。我没有遇到任何错误,但数组列表为空。你能指导我哪里错了吗?或做到这一点的最佳方法?

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
public final class extractSubject {

public static void main(String[] args) {



    SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt");
    final ArrayList<String> list = new ArrayList<>();
    sample.foreach(new VoidFunction<String>(){

                    public void call(String line) {

                       if (line.contains("Subject:")) {
                           System.out.println(line);
                           list.add(line);
                       }
                   }}
    );

    System.out.println(list);
    sc.stop();
}
 }

最佳答案

请记住,Spark应用程序可以分布式并行运行。因此,您不能在Spark执行的功能之外修改变量。

相反,您需要从这些函数返回结果。在您的情况下,您需要flatMap(而不是没有结果的foreach),该函数将作为函数结果返回的集合连接起来。

如果一行匹配,则返回包含匹配行的列表,否则返回空列表。

要在main函数中打印数据,首先必须通过调用collect()在主节点中收集可能分发的数据。

这里是一个例子:

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public final class extractSubject {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("JavaBookExample");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        //JavaRDD<String> sample = sc.textFile("/Users/Desktop/sample.txt");
        JavaRDD<String> sample = sc.parallelize(Arrays.asList("Subject: first",
                                                              "nothing here",
                                                              "Subject: second",
                                                              "dummy"));

        JavaRDD<String> subjectLinesRdd = sample.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String line) {
                if (line.contains("Subject:")) {
                    return Collections.singletonList(line);  // line matches → return list with the line as its only element
                } else {
                    return Collections.emptyList();  // ignore line → return empty list
                }
            }
        });

        List<String> subjectLines = subjectLinesRdd.collect();  // collect values from Spark workers
        System.out.println(subjectLines);  // → "[Subject: first, Subject: second]"

        sc.stop();
    }
}

10-07 19:50
查看更多