本文介绍了使用Spark Streaming读取binaryFile的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人知道如何设置`

实际使用二进制文件.

  • 在哪里可以找到所有inputformatClass?该文件没有链接.我认为ValueClass与inputformatClass不知何故.

  • 在使用方法binaryfiles的非流版本中,我可以得到每个文件的ByteArrays.有没有办法我可以得到相同的sparkStreaming?如果没有,我在哪里可以找到那些细节.意思是支持inputformat及其产生的值类.终于可以一个选择任何KeyClass,不是所有这些元素都连接了吗?

如果有人澄清该方法的使用.

EDIT1

我尝试了以下操作:

但是编译器会这样抱怨:

[error] /xxxxxxxxx/src/main/scala/EstimatorStreamingApp.scala:14: type arguments [org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat] conform to the bounds of none of the overloaded alternatives of
[error]  value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$10: scala.reflect.ClassTag[K], implicit evidence$11: scala.reflect.ClassTag[V], implicit evidence$12: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence$7: scala.reflect.ClassTag[K], implicit evidence$8: scala.reflect.ClassTag[V], implicit evidence$9: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$4: scala.reflect.ClassTag[K], implicit evidence$5: scala.reflect.ClassTag[V], implicit evidence$6: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
[error]   val bfiles = ssc.fileStream[BytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat]("/xxxxxxxxx/Casalini_streamed")

我在做什么错了?

解决方案

点击链接以了解有关所有 hadoop输入格式

我发现此处关于序列文件格式的详细记录的答案.

由于导入不匹配,您面临编译问题. Hadoop Mapred与mapreduce

例如

Java

JavaPairInputDStream<Text,BytesWritable> dstream=
        sc.fileStream("/somepath",org.apache.hadoop.io.Text.class,
        org.apache.hadoop.io.BytesWritable.class,
    org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class);

我没有在Scala中尝试过,但应该是类似的东西;

val dstream = sc.fileStream("/somepath",
        classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.BytesWritable],
        classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat] ) ;

Does any one know how to setup the `

to actually consume binary files.

  • Where can I find all the inputformatClass ? The documentation give nolinks for that. I imagine that the ValueClass is related to theinputformatClass somehow.

  • In the non-streaming version using the method binaryfiles, I can getByteArrays for each files. Is there a way i can get the same withsparkStreaming ? If not where can i find those details. Meaning theinputformat supportted and the value class it produces. Finally Canone pick any KeyClass, aren't all those element connected ?

If someone clarify the use of the method.

EDIT1

I have tried the following:

However the compiler complain as such:

[error] /xxxxxxxxx/src/main/scala/EstimatorStreamingApp.scala:14: type arguments [org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat] conform to the bounds of none of the overloaded alternatives of
[error]  value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$10: scala.reflect.ClassTag[K], implicit evidence$11: scala.reflect.ClassTag[V], implicit evidence$12: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence$7: scala.reflect.ClassTag[K], implicit evidence$8: scala.reflect.ClassTag[V], implicit evidence$9: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$4: scala.reflect.ClassTag[K], implicit evidence$5: scala.reflect.ClassTag[V], implicit evidence$6: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
[error]   val bfiles = ssc.fileStream[BytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat]("/xxxxxxxxx/Casalini_streamed")

What am i doing wrong ?

解决方案

Follow link to read about about all hadoop input formats

I found here well documented answer about sequence file format.

You are facing the compilation issue because of import missmatch.Hadoop Mapred vs mapreduce

E.g.

Java

JavaPairInputDStream<Text,BytesWritable> dstream=
        sc.fileStream("/somepath",org.apache.hadoop.io.Text.class,
        org.apache.hadoop.io.BytesWritable.class,
    org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class);

I didn't try in scala but it should be something similar;

val dstream = sc.fileStream("/somepath",
        classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.BytesWritable],
        classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat] ) ;

这篇关于使用Spark Streaming读取binaryFile的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:25
查看更多