我正在尝试火花流并监听套接字,我正在使用rawSocketStream方法创建接收器和DStream。但是当我打印DStream时,出现以下异常。

创建DStream的代码:

JavaSparkContext jsc = new JavaSparkContext("Master", "app");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Seconds(3));
JavaReceiverInputDStream<Object> rawStream = jssc.rawSocketStream("localhost", 9999);
log.info(tracePrefix + "Created the stream ...");
rawStream.print();
jssc.start();
jssc.awaitTermination();


通过TCP连接发送protobug对象的代码:

FileInputStream input = new FileInputStream("address_book");
AddressBook book = AddressBookProtos.AddressBook.parseFrom(input);
log.info(tracePrefix + "Size of contacts: " + book.getPersonList().size());

ServerSocket serverSocket = new ServerSocket(9999);
log.info(tracePrefix + "Waiting for connections ...");
Socket s1 = serverSocket.accept();
log.info(tracePrefix + "Accepted a connection ...");
while(true) {
    Thread.sleep(3000);
    ObjectOutputStream out = new ObjectOutputStream(s1.getOutputStream());
    out.writeByte(book.getSerializedSize());
    out.write(book.toByteArray());
    out.flush();
    log.info(tracePrefix + "Written to new socket");
}


Stacktrace如下所示:

java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


2016-04-02 07:45:47,607 ERROR [Executor task launch worker-0] org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
Stopped receiver with error: java.lang.IllegalArgumentException

2016-04-02 07:45:47,613 ERROR [Executor task launch worker-0] org.apache.spark.executor.Executor
Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2016-04-02 07:45:47,646 ERROR [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager
Task 0 in stage 0.0 failed 1 times; aborting job

2016-04-02 07:45:47,656 ERROR [submit-job-thread-pool-0] org.apache.spark.streaming.scheduler.ReceiverTracker
Receiver has been stopped. Try to restart it.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


工作代码

通过TCP发送protobuf对象的代码

        ServerSocket serverSocket = new ServerSocket(9999);
        log.info(tracePrefix + "Waiting for connections ...");
        Socket s1 = serverSocket.accept();
        log.info(tracePrefix + "Accepted a connection ...");
        while(true) {
            Thread.sleep(3000);
            DataOutputStream out = new DataOutputStream(s1.getOutputStream());
            byte[] bytes = book.toByteArray();
            log.info(tracePrefix + "Serialized size: " + book.getSerializedSize());
            out.writeInt(book.getSerializedSize());
            log.info(tracePrefix + "Sending bytes: " + Arrays.toString(bytes));
            out.write(bytes);
//          out.write("hello world !".getBytes());
            out.flush();
            log.info(tracePrefix + "Written to new socket");
        }


创建接收器和DStream的代码

JavaReceiverInputDStream<GeneratedMessage> rawStream = jssc.receiverStream(new JavaSocketReceiver("localhost", 9999));
log.info(tracePrefix + "Created the stream ...");
rawStream.print();

private static class JavaSocketReceiver extends Receiver<GeneratedMessage> {

        /**
         *
         */
        private static final long serialVersionUID = -958378677169958045L;
        String host = null;
        int port = -1;

        JavaSocketReceiver(String host_, int port_) {
            super(StorageLevel.MEMORY_AND_DISK());
            host = host_;
            port = port_;
        }

        @Override
        public void onStart() {
            new Thread() {
                @Override
                public void run() {
                    receive();
                }
            }.start();
        }

        @Override
        public void onStop() {
        }

        private void receive() {
            try {
                Socket socket = null;
                ObjectInputStream in = null;
                try {
                    // Open a socket to the target address and keep reading from
                    // it
                    log.info(tracePrefix + "Connecting to " + host + ":" + port);
                    SocketChannel channel = SocketChannel.open();
                    channel.configureBlocking(true);
                    channel.connect(new InetSocketAddress(host, port));
                    log.info(tracePrefix + "Connected to " + host + ":" + port);

                    ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(2);

                    Thread blockPushingThread = new Thread(new Runnable() {

                        @Override
                        public void run() {
                            int nextBlockNumber = 0;
                            while (true) {
                                try {
                                    ByteBuffer buffer = queue.take();
                                    nextBlockNumber += 1;
                                    AddressBook book = AddressBook.parseFrom(buffer.array());
//                                  log.info(tracePrefix + "Got back the object: " + book);
                                    store(book);
                                } catch (InterruptedException ie) {
                                    log.error(tracePrefix + "Failed processing data", ie);
                                } catch (Throwable t) {
                                    log.error(tracePrefix + "Failed processing data", t);
                                }
                            }
                        }
                    });
                    blockPushingThread.setDaemon(true);

                    blockPushingThread.start();

                    ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
                    while (true) {
                        lengthBuffer.clear();
                        readFully(channel, lengthBuffer);
                        lengthBuffer.flip();
                        int length = lengthBuffer.getInt();
//                      log.info(tracePrefix + "The length read: " + length);
                        ByteBuffer dataBuffer = ByteBuffer.allocate(length);
                        readFully(channel, dataBuffer);
                        dataBuffer.flip();
//                      log.info(tracePrefix + "Read a block with " + length + " bytes");
                        queue.put(dataBuffer);
                    }


                } finally {
                    Closeables.close(in, /* swallowIOException = */ true);
                    Closeables.close(socket, /* swallowIOException = */ true);
                }
            } catch (ConnectException ce) {
                ce.printStackTrace();
                restart("Could not connect", ce);
            } catch (Throwable t) {
                t.printStackTrace();
                restart("Error receiving data", t);
            }
        }

        private void readFully(ReadableByteChannel channel, ByteBuffer dest) {
            while (dest.position() < dest.limit()) {
                try {
                    if (channel.read(dest) == -1) {
                        throw new EOFException("End of channel");
                    }
                } catch (IOException e) {
                    log.error(tracePrefix + "Failed reading from channel: " + channel, e);
                }
            }
        }
    }


上面的JavaSocketReceiver取自Spark Streaming模块的rawSocketStream。在我发送字节的客户端代码中,如果将DataOutputStream更改为ObjectOutputStream,则会收到损坏的标头异常;在流式代码中,如果我使用内置的rawSocketStream侦听传入的数据包,则在ByteBuffer上会收到IllegalArgumentException( 334)

最佳答案

如果我们查看ByteBuffer文档,则IllegalArgumentException只能由尝试分配负缓冲区大小引起。

RawInputDStream协议需要一个整数大小字段,后跟相应的有效负载。该字段是4-byte Integer

问题中显示的发送方程序:

out.writeByte(book.getSerializedSize());


正在将整数大小写为一个字节。因此,当读取端尝试解码有效负载大小时,它将读取一个损坏的字段,因为它将将该字节与有效负载的某些信息进行组合,而这些信息在解码时会产生负整数。

解决方案应该是写一个4字节(32位)的Integer代替:

out.writeInt(book.getSerializedSize());

10-08 00:34