我尝试执行以下操作:

env
  .readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)
  .map(tp => tp)

但是然后我在编辑器中收到类型不匹配错误:
Expected: MapFunction[Tuple2[Text, Text], NotInferedR], actual: (Nothing) => Nothing

我怎样才能解决这个问题?

这是完整的代码:
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job

class OptimizedFlinkTeraPartitioner(underlying:TotalOrderPartitioner) extends Partitioner[OptimizedText] {
  def partition(key:OptimizedText, numPartitions:Int):Int = {
    underlying.getPartition(key.getText())
  }
}


object FlinkTeraSort {

  implicit val textOrdering = new Ordering[Text] {
    override def compare(a:Text, b:Text) = a.compareTo(b)
  }

  def main(args: Array[String]){
    if(args.size != 4){
      println("Usage: FlinkTeraSort hdfs inputPath outputPath #partitions ")
      return
    }

    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()

    val hdfs = args(0)
    val inputPath= hdfs+args(1)
    val outputPath = hdfs+args(2)
    val partitions = args(3).toInt

    val mapredConf = new JobConf()
    mapredConf.set("fs.defaultFS", hdfs)
    mapredConf.set("mapreduce.input.fileinputformat.inputdir", inputPath)
    mapredConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath)
    mapredConf.setInt("mapreduce.job.reduces", partitions)

    val partitionFile = new Path(outputPath, TeraInputFormat.PARTITION_FILENAME)
    val jobContext = Job.getInstance(mapredConf)
    TeraInputFormat.writePartitionFile(jobContext, partitionFile)
    val partitioner = new OptimizedFlinkTeraPartitioner(new TotalOrderPartitioner(mapredConf, partitionFile))

    val data = env.readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)

    data.map(tp => tp)

    data.output(new HadoopOutputFormat[Text, Text](new TeraOutputFormat(), jobContext))

    env.execute("TeraSort")
  }
}

(build.sbt):
name := "terasort"

version := "0.0.1"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.3"

fork in run := true

最佳答案

data.map{case (t1: Text, t2: Text) => t1}

解决了问题。

关于scala - Scala:类型不匹配的MapFunction [Tuple2 [Text,Text],NotInferedR],我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38240903/

10-10 05:19