这是我的Spark代码和pom.xml。该程序的问题是JavaStreamingContext仅流传输第一批记录,它不再流传输,并且jscc.start()出现错误。
谁能给我一些有关为什么发生这种情况的线索。我的spark依赖项有什么问题吗?


  17/04/11 10:32:20错误StreamingContext:启动上下文时出错,将其标记为已停止
  java.lang.IllegalArgumentException:要求失败:未注册任何输出操作,因此无须执行
      在scala.Predef $ .require(Predef.scala:224)
      在org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
      在org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
      在org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:573)
      在org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
      在org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
      在com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:72)
  线程“主”中的异常java.lang.IllegalArgumentException:要求失败:未注册任何输出操作,因此无须执行
      在scala.Predef $ .require(Predef.scala:224)
      在org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
      在org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
      在org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:573)
      在org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
      在org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
      在com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:72)


代码:pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.abcd.emm.mpp</groupId>
    <artifactId>WholeTextLocal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>WholeTextLocal</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>



    <build>
        <plugins>


            <!-- Maven Shade Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>

                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <excludeScope>system</excludeScope>

                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <excludeGroupIds>junit,org.mockito,org.hamcrest</excludeGroupIds>

                            <transformers>
                                <!-- add Main-Class to manifest file -->
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.abcd.emm.mpp.WholeTextLocal.WholeTextLocal</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>

                <configuration>
                    <finalName>${project.artifactId}</finalName>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>


            </plugin>

        </plugins>
    </build>


    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <version>1.5</version>
        </dependency>



        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId>
            <version>2.4.4</version> </dependency> -->

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8.0_101</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>



        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- required at run time by hive -->
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-core</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-api-jdo</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-rdbms</artifactId>
            <version>3.2.1</version>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.sparkjava/spark-core -->
        <!-- https://mvnrepository.com/artifact/com.sparkjava/spark-core -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>

        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId>
            <version>2.0.2</version> <scope>provided</scope> </dependency> -->

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->

        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>
            <version>2.0.2</version> </dependency> -->

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10 -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10 -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 -->


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>



        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>0.14.0</version>
        </dependency>


        <dependency>
            <groupId>org.mariadb.jdbc</groupId>
            <artifactId>mariadb-java-client</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>



        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
            <!-- <scope>provided</scope> -->
        </dependency>
        <!-- http://mvnrepository.com/artifact/org.apache.avro/avro-tools -->


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.0</version>
            <!--<scope>provided</scope> -->
        </dependency>
        <!-- <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId>
            <version>1.2.17</version> </dependency> -->
        <!-- <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId>
            <version>2.2.4</version> <scope>test</scope> </dependency> -->
    </dependencies>
</project>


Java代码:

package com.comcast.emm.vodip.WholeTextLocal;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WholeTextLocal
    {

        public static void main(String[] args) throws InterruptedException
            {

                SparkConf sparkConf = new SparkConf().setAppName("My app").setMaster("local")
                        .set("spark.driver.allowMultipleContexts", "true");

                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));

                JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext()
                        .wholeTextFiles("C:/Users/aaa/files/abcd7/simple/*.txt");

                // Pick the content.
                JavaRDD<String> myRDD = WholeTextLocalFiles.map(new Function<Tuple2<String, String>, String>()
                    {
                        private static final long serialVersionUID = -551872585218963131L;

                        public String call(Tuple2<String, String> v1) throws Exception
                            {
                                System.out.println("v1._2=" + v1._2);
                                return v1._2;
                            }

                    });

                // Split the content in each file with /n character.
                JavaRDD<String> myRDD2 = myRDD.flatMap(new FlatMapFunction<String, String>()
                    {

                        public Iterator<String> call(String t) throws Exception
                            {

                                return Arrays.asList(t.split("\\r?\\n")).iterator();

                            }
                    });

                // Loop through and print.
                myRDD2.foreachPartition(new VoidFunction<Iterator<String>>()
                    {

                        private static final long serialVersionUID = -4895942417886562330L;

                        public void call(Iterator<String> t) throws Exception
                            {
                                while (t.hasNext())
                                    {
                                        System.out.println(t.next());
                                    }

                            }
                    });

                jssc.start();
                jssc.awaitTermination();

            }

    }


更新的代码:这是流。但是,即使文件被spark识别,sysout也不会打印任何内容。

public class WholeTextLocal
    {

        public static void main(String[] args) throws InterruptedException
            {

                SparkConf sparkConf = new SparkConf().setAppName("My app").setMaster("local")
                        .set("spark.driver.allowMultipleContexts", "true");

                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));


                JavaPairInputDStream<Text, Text> dStream = jssc.fileStream("C:/Users/aaa/files/abcd7/simple/", Text.class, Text.class, WholeTextFileInputFormat.class);



                // Pick the content.
                JavaDStream<String> myRDD = dStream.map(new Function<Tuple2<Text, Text>, String>()
                    {
                        private static final long serialVersionUID = -551872585218963131L;

                        @Override
                        public String call(Tuple2<Text, Text> v1) throws Exception
                            {
                                System.out.println("v1._2=" + v1._1);
                                return v1._2.toString();
                            }

                    });


                // Split the content in each file with /n character.
                JavaDStream<String> myRDD2 = myRDD.flatMap(new FlatMapFunction<String, String>()
                    {
                        @Override
                        public Iterator<String> call(String t) throws Exception
                            {

                                return Arrays.asList(t.split("\\r?\\n")).iterator();

                            }
                    });

                // Loop through and print.

                myRDD2.foreachRDD(itr -> new VoidFunction<Iterator<String>>()
                    {
                        private static final long serialVersionUID = -4895942417886562330L;

                        @Override
                        public void call(Iterator<String> t) throws Exception
                            {
                                while (t.hasNext())
                                    {
                                        System.out.println(t.next());
                                    }

                            }
                    });


                myRDD2.count();

                jssc.start();
                jssc.awaitTermination();

            }

    }

最佳答案

我认为该错误消息告诉您一切:


  没有注册输出操作,因此无须执行


您必须在末尾添加一些操作,即foreachRDD而不是foreachPartition:

myRDD2.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    private static final long serialVersionUID = -4895942417886562330L;

    public void call(JavaRDD<String> rdd) throws Exception {
        rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
            private static final long serialVersionUID = -1L;
            public void call(Iterator<String> rdd) throws Exception {
              while (t.hasNext()) {
              System.out.println(t.next());
            }
        }

    }
});


注意:您可以在foreachRDD中使用foreachPartition

第二:

您正在使用JavaRDD.wholeTextFiles读取文件。它不是流功能。您应该使用JavaStreamingContext.textFileStream阅读并在DStreams上进行操作

您还可以将fileStream用于输入格式,例如WholeTextFileInputFormat:

JavaPairInputDStream<Text, Text> dStream = jssc.fileStream("dir", Text.class, Text.class, WholeTextFileInputFormat.class);

10-05 23:09