SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkTwitterHelloWorldExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(60000));
System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
String[] filters = new String[] {"Narendra Modi"};
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc,filters);

// Without filter: Output text of all tweets
JavaDStream<String> statuses = twitterStream.map(
        new Function<Status, String>() {
            public String call(Status status) { return status.getText(); }
        }
);
statuses.print();
statuses.saveAsHadoopFiles("hdfs://HadoopSystem-150s:8020/Spark_Twitter_out","txt");

我能够获取Twitter推文,但在写入HDFS时遇到错误。

有人可以帮助我使用Java将推文保存到HDFS中吗

这是我得到的错误:

最佳答案

您需要使用saveAsTextFile()方法。 Hadoop输出格式仅适用于JavaPairDStream(它要求键和值)。

解决方案是:

statuses.dstream().saveAsTextFiles(prefix, suffix);

关于java - 如何使用Spark Streaming Java API将Twitter推文写入HDFS,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32568436/

10-09 20:15