问题描述
我想实现火花斯卡拉流应用。我想使用FILESTREAM()方法来处理Hadoop的目录中新来的文件,以及旧文件present。
I am trying to implement spark streaming application in scala. I want to use fileStream() method to process newly arrived files as well as older files present in hadoop directory.
我按照FILESTREAM()实现从下面从计算器两个线程为:
I have followed fileStream() implementation from following two threads from stackoverflow as:
- Scala Spark streaming fileStream
- spark streaming fileStream
我使用FILESTREAM()如下:
I am using fileStream() as following:
val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)
但是我得到错误信息如下:
But i am getting error message as following:
type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V],
implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
wrong number of type parameters for overloaded method value fileStream with alternatives:
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
我使用的火花1.4.1 和的Hadoop 2.7.1 。张贴这个问题之前我已经看过了计算器讨论不同的实施,也引发文档,但没有什么帮助了我。任何帮助将是AP preciated。
I am using spark 1.4.1 and hadoop 2.7.1. Before posting this question i have looked different implementation discussed over stackoverflow and also spark docs but nothing helped me. Any help would be appreciated.
谢谢
罗杰尼希。
ThanksRajneesh.
推荐答案
请在下面找到示例Java code,以正确的进口,对我来说它的做工精细
Please find below the sample java code, with correct imports, its working fine for me
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
// JavaDStream<String> rawInput = jssc.textFileStream(inputPath);
JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
inputPath, LongWritable.class, Text.class,
TextInputFormat.class, new Function<Path, Boolean>() {
@Override
public Boolean call(Path v1) throws Exception {
if ( v1.getName().contains("COPYING") ) {
// This eliminates staging files.
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}, true);
JavaDStream<String> rawInput = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> v1) throws Exception {
return v1._2().toString();
}
});
log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
rawInput.print();
这篇关于SparkStreaming:错误FILESTREAM()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!