假设我本质上希望Stream.from(0)作为InputDStream。我将如何处理?我可以看到的唯一方法是使用StreamingContext#queueStream,但是我不得不从另一个线程或子类Queue中加入元素,以创建一个行为像无限流的队列,两者都感觉像是黑客。

正确的方法是什么?

最佳答案

我不认为默认情况下它在Spark中可用,但是使用ReceiverInputDStream轻松实现它。

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class InfiniteStreamInputDStream[T](
       @transient ssc_ : StreamingContext,
       stream: Stream[T],
       storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](ssc_)  {

  override def getReceiver(): Receiver[T] = {
    new InfiniteStreamReceiver(stream, storageLevel)
  }
}

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  // Stateful iterator
  private val streamIterator = stream.iterator

  private class ReadAndStore extends Runnable {
    def run(): Unit = {
      while (streamIterator.hasNext) {
        val next = streamIterator.next()
        store(next)
      }
    }
  }

  override def onStart(): Unit = {
    new Thread(new ReadAndStore).run()
  }

  override def onStop(): Unit = { }
}

08-17 02:21